PostgreSQL Source Code git master
walsender.h File Reference
#include "access/xlogdefs.h"
Include dependency graph for walsender.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define WalSndWakeupRequest()    do { wake_wal_senders = true; } while (0)
 

Enumerations

enum  CRSSnapshotAction { CRS_EXPORT_SNAPSHOT , CRS_NOEXPORT_SNAPSHOT , CRS_USE_SNAPSHOT }
 

Functions

void InitWalSender (void)
 
bool exec_replication_command (const char *cmd_string)
 
void WalSndErrorCleanup (void)
 
void PhysicalWakeupLogicalWalSnd (void)
 
XLogRecPtr GetStandbyFlushRecPtr (TimeLineID *tli)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (bool physical, bool logical)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndRqstFileReload (void)
 
static void WalSndWakeupProcessRequests (bool physical, bool logical)
 

Variables

PGDLLIMPORT bool am_walsender
 
PGDLLIMPORT bool am_cascading_walsender
 
PGDLLIMPORT bool am_db_walsender
 
PGDLLIMPORT bool wake_wal_senders
 
PGDLLIMPORT int max_wal_senders
 
PGDLLIMPORT int wal_sender_timeout
 
PGDLLIMPORT bool log_replication_commands
 

Macro Definition Documentation

◆ WalSndWakeupRequest

#define WalSndWakeupRequest ( )     do { wake_wal_senders = true; } while (0)

Definition at line 58 of file walsender.h.

Enumeration Type Documentation

◆ CRSSnapshotAction

Enumerator
CRS_EXPORT_SNAPSHOT 
CRS_NOEXPORT_SNAPSHOT 
CRS_USE_SNAPSHOT 

Definition at line 20 of file walsender.h.

21{
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22

Function Documentation

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1952 of file walsender.c.

1953{
1954 yyscan_t scanner;
1955 int parse_rc;
1956 Node *cmd_node;
1957 const char *cmdtag;
1958 MemoryContext cmd_context;
1959 MemoryContext old_context;
1960
1961 /*
1962 * If WAL sender has been told that shutdown is getting close, switch its
1963 * status accordingly to handle the next replication commands correctly.
1964 */
1965 if (got_STOPPING)
1967
1968 /*
1969 * Throw error if in stopping mode. We need prevent commands that could
1970 * generate WAL while the shutdown checkpoint is being written. To be
1971 * safe, we just prohibit all new commands.
1972 */
1974 ereport(ERROR,
1975 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1976 errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1977
1978 /*
1979 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1980 * command arrives. Clean up the old stuff if there's anything.
1981 */
1983
1985
1986 /*
1987 * Prepare to parse and execute the command.
1988 */
1990 "Replication command context",
1992 old_context = MemoryContextSwitchTo(cmd_context);
1993
1994 replication_scanner_init(cmd_string, &scanner);
1995
1996 /*
1997 * Is it a WalSender command?
1998 */
2000 {
2001 /* Nope; clean up and get out. */
2003
2004 MemoryContextSwitchTo(old_context);
2005 MemoryContextDelete(cmd_context);
2006
2007 /* XXX this is a pretty random place to make this check */
2008 if (MyDatabaseId == InvalidOid)
2009 ereport(ERROR,
2010 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2011 errmsg("cannot execute SQL commands in WAL sender for physical replication")));
2012
2013 /* Tell the caller that this wasn't a WalSender command. */
2014 return false;
2015 }
2016
2017 /*
2018 * Looks like a WalSender command, so parse it.
2019 */
2020 parse_rc = replication_yyparse(&cmd_node, scanner);
2021 if (parse_rc != 0)
2022 ereport(ERROR,
2023 (errcode(ERRCODE_SYNTAX_ERROR),
2024 errmsg_internal("replication command parser returned %d",
2025 parse_rc)));
2027
2028 /*
2029 * Report query to various monitoring facilities. For this purpose, we
2030 * report replication commands just like SQL commands.
2031 */
2032 debug_query_string = cmd_string;
2033
2035
2036 /*
2037 * Log replication command if log_replication_commands is enabled. Even
2038 * when it's disabled, log the command with DEBUG1 level for backward
2039 * compatibility.
2040 */
2042 (errmsg("received replication command: %s", cmd_string)));
2043
2044 /*
2045 * Disallow replication commands in aborted transaction blocks.
2046 */
2048 ereport(ERROR,
2049 (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
2050 errmsg("current transaction is aborted, "
2051 "commands ignored until end of transaction block")));
2052
2054
2055 /*
2056 * Allocate buffers that will be used for each outgoing and incoming
2057 * message. We do this just once per command to reduce palloc overhead.
2058 */
2062
2063 switch (cmd_node->type)
2064 {
2065 case T_IdentifySystemCmd:
2066 cmdtag = "IDENTIFY_SYSTEM";
2067 set_ps_display(cmdtag);
2069 EndReplicationCommand(cmdtag);
2070 break;
2071
2072 case T_ReadReplicationSlotCmd:
2073 cmdtag = "READ_REPLICATION_SLOT";
2074 set_ps_display(cmdtag);
2076 EndReplicationCommand(cmdtag);
2077 break;
2078
2079 case T_BaseBackupCmd:
2080 cmdtag = "BASE_BACKUP";
2081 set_ps_display(cmdtag);
2082 PreventInTransactionBlock(true, cmdtag);
2084 EndReplicationCommand(cmdtag);
2085 break;
2086
2087 case T_CreateReplicationSlotCmd:
2088 cmdtag = "CREATE_REPLICATION_SLOT";
2089 set_ps_display(cmdtag);
2091 EndReplicationCommand(cmdtag);
2092 break;
2093
2094 case T_DropReplicationSlotCmd:
2095 cmdtag = "DROP_REPLICATION_SLOT";
2096 set_ps_display(cmdtag);
2098 EndReplicationCommand(cmdtag);
2099 break;
2100
2101 case T_AlterReplicationSlotCmd:
2102 cmdtag = "ALTER_REPLICATION_SLOT";
2103 set_ps_display(cmdtag);
2105 EndReplicationCommand(cmdtag);
2106 break;
2107
2108 case T_StartReplicationCmd:
2109 {
2110 StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
2111
2112 cmdtag = "START_REPLICATION";
2113 set_ps_display(cmdtag);
2114 PreventInTransactionBlock(true, cmdtag);
2115
2116 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
2117 StartReplication(cmd);
2118 else
2120
2121 /* dupe, but necessary per libpqrcv_endstreaming */
2122 EndReplicationCommand(cmdtag);
2123
2124 Assert(xlogreader != NULL);
2125 break;
2126 }
2127
2128 case T_TimeLineHistoryCmd:
2129 cmdtag = "TIMELINE_HISTORY";
2130 set_ps_display(cmdtag);
2131 PreventInTransactionBlock(true, cmdtag);
2133 EndReplicationCommand(cmdtag);
2134 break;
2135
2136 case T_VariableShowStmt:
2137 {
2139 VariableShowStmt *n = (VariableShowStmt *) cmd_node;
2140
2141 cmdtag = "SHOW";
2142 set_ps_display(cmdtag);
2143
2144 /* syscache access needs a transaction environment */
2146 GetPGVariable(n->name, dest);
2148 EndReplicationCommand(cmdtag);
2149 }
2150 break;
2151
2152 case T_UploadManifestCmd:
2153 cmdtag = "UPLOAD_MANIFEST";
2154 set_ps_display(cmdtag);
2155 PreventInTransactionBlock(true, cmdtag);
2157 EndReplicationCommand(cmdtag);
2158 break;
2159
2160 default:
2161 elog(ERROR, "unrecognized replication command node tag: %u",
2162 cmd_node->type);
2163 }
2164
2165 /* done */
2166 MemoryContextSwitchTo(old_context);
2167 MemoryContextDelete(cmd_context);
2168
2169 /*
2170 * We need not update ps display or pg_stat_activity, because PostgresMain
2171 * will reset those to "idle". But we must reset debug_query_string to
2172 * ensure it doesn't become a dangling pointer.
2173 */
2174 debug_query_string = NULL;
2175
2176 return true;
2177}
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void SendBaseBackup(BaseBackupCmd *cmd, IncrementalBackupInfo *ib)
Definition: basebackup.c:990
#define Assert(condition)
Definition: c.h:815
void * yyscan_t
Definition: cubedata.h:67
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:205
@ DestRemoteSimple
Definition: dest.h:91
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1157
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define LOG
Definition: elog.h:31
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
Oid MyDatabaseId
Definition: globals.c:93
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc_funcs.c:382
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
const char * debug_query_string
Definition: postgres.c:87
#define InvalidOid
Definition: postgres_ext.h:37
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
bool replication_scanner_is_replication_command(yyscan_t yyscanner)
Definition: repl_scanner.l:299
void replication_scanner_finish(yyscan_t yyscanner)
Definition: repl_scanner.l:284
void replication_scanner_init(const char *str, yyscan_t *yyscannerp)
Definition: repl_scanner.l:268
@ REPLICATION_KIND_PHYSICAL
Definition: replnodes.h:22
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:600
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
Definition: nodes.h:129
NodeTag type
Definition: nodes.h:130
ReplicationKind kind
Definition: replnodes.h:94
WalSndState state
static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
Definition: walsender.c:1385
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:557
WalSnd * MyWalSnd
Definition: walsender.c:112
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition: walsender.c:458
static StringInfoData tmpbuf
Definition: walsender.c:170
static void IdentifySystem(void)
Definition: walsender.c:377
static StringInfoData reply_message
Definition: walsender.c:169
void WalSndSetState(WalSndState state)
Definition: walsender.c:3780
static StringInfoData output_message
Definition: walsender.c:168
static void UploadManifest(void)
Definition: walsender.c:647
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:198
bool log_replication_commands
Definition: walsender.c:125
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1171
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1427
static IncrementalBackupInfo * uploaded_manifest
Definition: walsender.c:147
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1376
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:789
static XLogReaderState * xlogreader
Definition: walsender.c:137
@ WALSNDSTATE_STOPPING
int replication_yyparse(Node **replication_parse_result_p, yyscan_t yyscanner)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3640
void StartTransactionCommand(void)
Definition: xact.c:3051
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:406
void CommitTransactionCommand(void)
Definition: xact.c:3149

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, AlterReplicationSlot(), Assert, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, MyWalSnd, VariableShowStmt::name, output_message, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_scanner_finish(), replication_scanner_init(), replication_scanner_is_replication_command(), replication_yyparse(), reply_message, SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, tmpbuf, Node::type, ReadReplicationSlotCmd::type, uploaded_manifest, UploadManifest(), WalSndSetState(), WALSNDSTATE_STOPPING, and xlogreader.

Referenced by PostgresMain().

◆ GetStandbyFlushRecPtr()

XLogRecPtr GetStandbyFlushRecPtr ( TimeLineID tli)

Definition at line 3475 of file walsender.c.

3476{
3477 XLogRecPtr replayPtr;
3478 TimeLineID replayTLI;
3479 XLogRecPtr receivePtr;
3481 XLogRecPtr result;
3482
3484
3485 /*
3486 * We can safely send what's already been replayed. Also, if walreceiver
3487 * is streaming WAL from the same timeline, we can send anything that it
3488 * has streamed, but hasn't been replayed yet.
3489 */
3490
3491 receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
3492 replayPtr = GetXLogReplayRecPtr(&replayTLI);
3493
3494 if (tli)
3495 *tli = replayTLI;
3496
3497 result = replayPtr;
3498 if (receiveTLI == replayTLI && receivePtr > replayPtr)
3499 result = receivePtr;
3500
3501 return result;
3502}
bool IsSyncingReplicationSlots(void)
Definition: slotsync.c:1649
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
bool am_cascading_walsender
Definition: walsender.c:116
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59
static TimeLineID receiveTLI
Definition: xlogrecovery.c:263
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References am_cascading_walsender, Assert, GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), IsSyncingReplicationSlots(), and receiveTLI.

Referenced by IdentifySystem(), StartReplication(), synchronize_one_slot(), XLogSendLogical(), and XLogSendPhysical().

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3531 of file walsender.c.

3532{
3534
3535 /*
3536 * If replication has not yet started, die like with SIGTERM. If
3537 * replication is active, only set a flag and wake up the main loop. It
3538 * will send any outstanding WAL, wait for it to be replicated to the
3539 * standby, and then exit gracefully.
3540 */
3541 if (!replication_active)
3542 kill(MyProcPid, SIGTERM);
3543 else
3544 got_STOPPING = true;
3545}
int MyProcPid
Definition: globals.c:46
bool am_walsender
Definition: walsender.c:115
static volatile sig_atomic_t replication_active
Definition: walsender.c:206
#define kill(pid, sig)
Definition: win32_port.h:493

References am_walsender, Assert, got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

◆ InitWalSender()

void InitWalSender ( void  )

Referenced by PostgresMain().

◆ PhysicalWakeupLogicalWalSnd()

void PhysicalWakeupLogicalWalSnd ( void  )

Definition at line 1708 of file walsender.c.

1709{
1711
1712 /*
1713 * If we are running in a standby, there is no need to wake up walsenders.
1714 * This is because we do not support syncing slots to cascading standbys,
1715 * so, there are no walsenders waiting for standbys to catch up.
1716 */
1717 if (RecoveryInProgress())
1718 return;
1719
1722}
#define NameStr(name)
Definition: c.h:703
void ConditionVariableBroadcast(ConditionVariable *cv)
ReplicationSlot * MyReplicationSlot
Definition: slot.c:138
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition: slot.c:2572
#define SlotIsPhysical(slot)
Definition: slot.h:216
ReplicationSlotPersistentData data
Definition: slot.h:181
ConditionVariable wal_confirm_rcv_cv
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
bool RecoveryInProgress(void)
Definition: xlog.c:6334

References Assert, ConditionVariableBroadcast(), ReplicationSlot::data, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, RecoveryInProgress(), SlotExistsInSyncStandbySlots(), SlotIsPhysical, WalSndCtlData::wal_confirm_rcv_cv, and WalSndCtl.

Referenced by pg_physical_replication_slot_advance(), and PhysicalConfirmReceivedLocation().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 325 of file walsender.c.

326{
330
331 if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
333
334 if (MyReplicationSlot != NULL)
336
338
339 replication_active = false;
340
341 /*
342 * If there is a transaction in progress, it will clean up our
343 * ResourceOwner, but if a replication command set up a resource owner
344 * without a transaction, we've got to clean that up now.
345 */
348
350 proc_exit(0);
351
352 /* Revert back to startup state */
354}
bool ConditionVariableCancelSleep(void)
void proc_exit(int code)
Definition: ipc.c:104
void LWLockReleaseAll(void)
Definition: lwlock.c:1876
void ReleaseAuxProcessResources(bool isCommit)
Definition: resowner.c:1002
void ReplicationSlotRelease(void)
Definition: slot.c:652
void ReplicationSlotCleanup(bool synced_only)
Definition: slot.c:745
WALOpenSegment seg
Definition: xlogreader.h:272
static void pgstat_report_wait_end(void)
Definition: wait_event.h:101
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:197
@ WALSNDSTATE_STARTUP
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4981
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:842

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), ReleaseAuxProcessResources(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndSetState(), WALSNDSTATE_STARTUP, WALOpenSegment::ws_file, and xlogreader.

Referenced by PostgresMain().

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3716 of file walsender.c.

3717{
3718 int i;
3719
3720 for (i = 0; i < max_wal_senders; i++)
3721 {
3722 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3723 pid_t pid;
3724
3725 SpinLockAcquire(&walsnd->mutex);
3726 pid = walsnd->pid;
3727 SpinLockRelease(&walsnd->mutex);
3728
3729 if (pid == 0)
3730 continue;
3731
3733 }
3734}
int i
Definition: isn.c:72
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition: procsignal.c:281
@ PROCSIG_WALSND_INIT_STOPPING
Definition: procsignal.h:35
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
int max_wal_senders
Definition: walsender.c:121

References i, INVALID_PROC_NUMBER, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3508 of file walsender.c.

3509{
3510 int i;
3511
3512 for (i = 0; i < max_wal_senders; i++)
3513 {
3514 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3515
3516 SpinLockAcquire(&walsnd->mutex);
3517 if (walsnd->pid == 0)
3518 {
3519 SpinLockRelease(&walsnd->mutex);
3520 continue;
3521 }
3522 walsnd->needreload = true;
3523 SpinLockRelease(&walsnd->mutex);
3524 }
3525}
bool needreload

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3592 of file walsender.c.

3593{
3594 bool found;
3595 int i;
3596
3598 ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3599
3600 if (!found)
3601 {
3602 /* First time through, so initialize */
3604
3605 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3607
3608 for (i = 0; i < max_wal_senders; i++)
3609 {
3610 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3611
3612 SpinLockInit(&walsnd->mutex);
3613 }
3614
3618 }
3619}
#define MemSet(start, val, len)
Definition: c.h:977
void ConditionVariableInit(ConditionVariable *cv)
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:382
#define SpinLockInit(lock)
Definition: spin.h:57
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:27
Size WalSndShmemSize(void)
Definition: walsender.c:3580

References ConditionVariableInit(), dlist_init(), i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateOrAttachShmemStructs().

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3580 of file walsender.c.

3581{
3582 Size size = 0;
3583
3584 size = offsetof(WalSndCtlData, walsnds);
3586
3587 return size;
3588}
size_t Size
Definition: c.h:562
Size add_size(Size s1, Size s2)
Definition: shmem.c:488
Size mul_size(Size s1, Size s2)
Definition: shmem.c:505
static pg_noinline void Size size
Definition: slab.c:607

References add_size(), max_wal_senders, mul_size(), and size.

Referenced by CalculateShmemSize(), and WalSndShmemInit().

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3561 of file walsender.c.

3562{
3563 /* Set up signal handlers */
3565 pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3566 pqsignal(SIGTERM, die); /* request shutdown */
3567 /* SIGQUIT handler was already set up by InitPostmasterChild */
3568 InitializeTimeouts(); /* establishes SIGALRM handler */
3569 pqsignal(SIGPIPE, SIG_IGN);
3571 pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3572 * shutdown */
3573
3574 /* Reset some signals that are accepted by postmaster but not here */
3575 pqsignal(SIGCHLD, SIG_DFL);
3576}
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
#define die(msg)
#define pqsignal
Definition: port.h:520
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:3031
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:671
void InitializeTimeouts(void)
Definition: timeout.c:470
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3553
#define SIGCHLD
Definition: win32_port.h:168
#define SIGHUP
Definition: win32_port.h:158
#define SIGPIPE
Definition: win32_port.h:163
#define SIGUSR1
Definition: win32_port.h:170
#define SIGUSR2
Definition: win32_port.h:171

References die, InitializeTimeouts(), pqsignal, procsignal_sigusr1_handler(), SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3742 of file walsender.c.

3743{
3744 for (;;)
3745 {
3746 int i;
3747 bool all_stopped = true;
3748
3749 for (i = 0; i < max_wal_senders; i++)
3750 {
3751 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3752
3753 SpinLockAcquire(&walsnd->mutex);
3754
3755 if (walsnd->pid == 0)
3756 {
3757 SpinLockRelease(&walsnd->mutex);
3758 continue;
3759 }
3760
3761 if (walsnd->state != WALSNDSTATE_STOPPING)
3762 {
3763 all_stopped = false;
3764 SpinLockRelease(&walsnd->mutex);
3765 break;
3766 }
3767 SpinLockRelease(&walsnd->mutex);
3768 }
3769
3770 /* safe to leave if confirmation is done for all WAL senders */
3771 if (all_stopped)
3772 return;
3773
3774 pg_usleep(10000L); /* wait for 10 msec */
3775 }
3776}
void pg_usleep(long microsec)
Definition: signal.c:53

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtl, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

◆ WalSndWakeup()

void WalSndWakeup ( bool  physical,
bool  logical 
)

Definition at line 3637 of file walsender.c.

3638{
3639 /*
3640 * Wake up all the walsenders waiting on WAL being flushed or replayed
3641 * respectively. Note that waiting walsender would have prepared to sleep
3642 * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
3643 * before actually waiting.
3644 */
3645 if (physical)
3647
3648 if (logical)
3650}

References ConditionVariableBroadcast(), WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, and WalSndCtl.

Referenced by ApplyWalRecord(), KeepFileRestoredFromArchive(), StartupXLOG(), WalSndWakeupProcessRequests(), and XLogWalRcvFlush().

◆ WalSndWakeupProcessRequests()

static void WalSndWakeupProcessRequests ( bool  physical,
bool  logical 
)
inlinestatic

Definition at line 65 of file walsender.h.

66{
68 {
69 wake_wal_senders = false;
70 if (max_wal_senders > 0)
71 WalSndWakeup(physical, logical);
72 }
73}
PGDLLIMPORT bool wake_wal_senders
Definition: walsender.c:130
void WalSndWakeup(bool physical, bool logical)
Definition: walsender.c:3637
PGDLLIMPORT int max_wal_senders
Definition: walsender.c:121

References max_wal_senders, wake_wal_senders, and WalSndWakeup().

Referenced by XLogBackgroundFlush(), and XLogFlush().

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

PGDLLIMPORT bool am_db_walsender
extern

Definition at line 118 of file walsender.c.

Referenced by check_db(), ClientAuthentication(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ log_replication_commands

PGDLLIMPORT bool log_replication_commands
extern

◆ max_wal_senders

◆ wake_wal_senders

PGDLLIMPORT bool wake_wal_senders
extern

Definition at line 130 of file walsender.c.

Referenced by WalSndWakeupProcessRequests().

◆ wal_sender_timeout