PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
walsender.h File Reference
#include "access/xlogdefs.h"
Include dependency graph for walsender.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define WalSndWakeupRequest()    do { wake_wal_senders = true; } while (0)
 

Enumerations

enum  CRSSnapshotAction { CRS_EXPORT_SNAPSHOT , CRS_NOEXPORT_SNAPSHOT , CRS_USE_SNAPSHOT }
 

Functions

void InitWalSender (void)
 
bool exec_replication_command (const char *cmd_string)
 
void WalSndErrorCleanup (void)
 
void PhysicalWakeupLogicalWalSnd (void)
 
XLogRecPtr GetStandbyFlushRecPtr (TimeLineID *tli)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (bool physical, bool logical)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndRqstFileReload (void)
 
static void WalSndWakeupProcessRequests (bool physical, bool logical)
 

Variables

PGDLLIMPORT bool am_walsender
 
PGDLLIMPORT bool am_cascading_walsender
 
PGDLLIMPORT bool am_db_walsender
 
PGDLLIMPORT bool wake_wal_senders
 
PGDLLIMPORT int max_wal_senders
 
PGDLLIMPORT int wal_sender_timeout
 
PGDLLIMPORT bool log_replication_commands
 

Macro Definition Documentation

◆ WalSndWakeupRequest

#define WalSndWakeupRequest ( )     do { wake_wal_senders = true; } while (0)

Definition at line 58 of file walsender.h.

Enumeration Type Documentation

◆ CRSSnapshotAction

Enumerator
CRS_EXPORT_SNAPSHOT 
CRS_NOEXPORT_SNAPSHOT 
CRS_USE_SNAPSHOT 

Definition at line 20 of file walsender.h.

21{
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22

Function Documentation

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1970 of file walsender.c.

1971{
1972 yyscan_t scanner;
1973 int parse_rc;
1974 Node *cmd_node;
1975 const char *cmdtag;
1976 MemoryContext cmd_context;
1977 MemoryContext old_context;
1978
1979 /*
1980 * If WAL sender has been told that shutdown is getting close, switch its
1981 * status accordingly to handle the next replication commands correctly.
1982 */
1983 if (got_STOPPING)
1985
1986 /*
1987 * Throw error if in stopping mode. We need prevent commands that could
1988 * generate WAL while the shutdown checkpoint is being written. To be
1989 * safe, we just prohibit all new commands.
1990 */
1992 ereport(ERROR,
1993 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1994 errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1995
1996 /*
1997 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1998 * command arrives. Clean up the old stuff if there's anything.
1999 */
2001
2003
2004 /*
2005 * Prepare to parse and execute the command.
2006 */
2008 "Replication command context",
2010 old_context = MemoryContextSwitchTo(cmd_context);
2011
2012 replication_scanner_init(cmd_string, &scanner);
2013
2014 /*
2015 * Is it a WalSender command?
2016 */
2018 {
2019 /* Nope; clean up and get out. */
2021
2022 MemoryContextSwitchTo(old_context);
2023 MemoryContextDelete(cmd_context);
2024
2025 /* XXX this is a pretty random place to make this check */
2026 if (MyDatabaseId == InvalidOid)
2027 ereport(ERROR,
2028 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2029 errmsg("cannot execute SQL commands in WAL sender for physical replication")));
2030
2031 /* Tell the caller that this wasn't a WalSender command. */
2032 return false;
2033 }
2034
2035 /*
2036 * Looks like a WalSender command, so parse it.
2037 */
2038 parse_rc = replication_yyparse(&cmd_node, scanner);
2039 if (parse_rc != 0)
2040 ereport(ERROR,
2041 (errcode(ERRCODE_SYNTAX_ERROR),
2042 errmsg_internal("replication command parser returned %d",
2043 parse_rc)));
2045
2046 /*
2047 * Report query to various monitoring facilities. For this purpose, we
2048 * report replication commands just like SQL commands.
2049 */
2050 debug_query_string = cmd_string;
2051
2053
2054 /*
2055 * Log replication command if log_replication_commands is enabled. Even
2056 * when it's disabled, log the command with DEBUG1 level for backward
2057 * compatibility.
2058 */
2060 (errmsg("received replication command: %s", cmd_string)));
2061
2062 /*
2063 * Disallow replication commands in aborted transaction blocks.
2064 */
2066 ereport(ERROR,
2067 (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
2068 errmsg("current transaction is aborted, "
2069 "commands ignored until end of transaction block")));
2070
2072
2073 /*
2074 * Allocate buffers that will be used for each outgoing and incoming
2075 * message. We do this just once per command to reduce palloc overhead.
2076 */
2080
2081 switch (cmd_node->type)
2082 {
2083 case T_IdentifySystemCmd:
2084 cmdtag = "IDENTIFY_SYSTEM";
2085 set_ps_display(cmdtag);
2087 EndReplicationCommand(cmdtag);
2088 break;
2089
2090 case T_ReadReplicationSlotCmd:
2091 cmdtag = "READ_REPLICATION_SLOT";
2092 set_ps_display(cmdtag);
2094 EndReplicationCommand(cmdtag);
2095 break;
2096
2097 case T_BaseBackupCmd:
2098 cmdtag = "BASE_BACKUP";
2099 set_ps_display(cmdtag);
2100 PreventInTransactionBlock(true, cmdtag);
2102 EndReplicationCommand(cmdtag);
2103 break;
2104
2105 case T_CreateReplicationSlotCmd:
2106 cmdtag = "CREATE_REPLICATION_SLOT";
2107 set_ps_display(cmdtag);
2109 EndReplicationCommand(cmdtag);
2110 break;
2111
2112 case T_DropReplicationSlotCmd:
2113 cmdtag = "DROP_REPLICATION_SLOT";
2114 set_ps_display(cmdtag);
2116 EndReplicationCommand(cmdtag);
2117 break;
2118
2119 case T_AlterReplicationSlotCmd:
2120 cmdtag = "ALTER_REPLICATION_SLOT";
2121 set_ps_display(cmdtag);
2123 EndReplicationCommand(cmdtag);
2124 break;
2125
2126 case T_StartReplicationCmd:
2127 {
2128 StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
2129
2130 cmdtag = "START_REPLICATION";
2131 set_ps_display(cmdtag);
2132 PreventInTransactionBlock(true, cmdtag);
2133
2134 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
2135 StartReplication(cmd);
2136 else
2138
2139 /* dupe, but necessary per libpqrcv_endstreaming */
2140 EndReplicationCommand(cmdtag);
2141
2142 Assert(xlogreader != NULL);
2143 break;
2144 }
2145
2146 case T_TimeLineHistoryCmd:
2147 cmdtag = "TIMELINE_HISTORY";
2148 set_ps_display(cmdtag);
2149 PreventInTransactionBlock(true, cmdtag);
2151 EndReplicationCommand(cmdtag);
2152 break;
2153
2154 case T_VariableShowStmt:
2155 {
2157 VariableShowStmt *n = (VariableShowStmt *) cmd_node;
2158
2159 cmdtag = "SHOW";
2160 set_ps_display(cmdtag);
2161
2162 /* syscache access needs a transaction environment */
2164 GetPGVariable(n->name, dest);
2166 EndReplicationCommand(cmdtag);
2167 }
2168 break;
2169
2170 case T_UploadManifestCmd:
2171 cmdtag = "UPLOAD_MANIFEST";
2172 set_ps_display(cmdtag);
2173 PreventInTransactionBlock(true, cmdtag);
2175 EndReplicationCommand(cmdtag);
2176 break;
2177
2178 default:
2179 elog(ERROR, "unrecognized replication command node tag: %u",
2180 cmd_node->type);
2181 }
2182
2183 /* done */
2184 MemoryContextSwitchTo(old_context);
2185 MemoryContextDelete(cmd_context);
2186
2187 /*
2188 * We need not update ps display or pg_stat_activity, because PostgresMain
2189 * will reset those to "idle". But we must reset debug_query_string to
2190 * ensure it doesn't become a dangling pointer.
2191 */
2192 debug_query_string = NULL;
2193
2194 return true;
2195}
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void SendBaseBackup(BaseBackupCmd *cmd, IncrementalBackupInfo *ib)
Definition: basebackup.c:990
void * yyscan_t
Definition: cubedata.h:67
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:205
@ DestRemoteSimple
Definition: dest.h:91
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1158
int errcode(int sqlerrcode)
Definition: elog.c:854
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define LOG
Definition: elog.h:31
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define ereport(elevel,...)
Definition: elog.h:149
Oid MyDatabaseId
Definition: globals.c:95
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc_funcs.c:382
Assert(PointerIsAligned(start, uint64))
MemoryContext CurrentMemoryContext
Definition: mcxt.c:159
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:485
#define AllocSetContextCreate
Definition: memutils.h:149
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:180
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
const char * debug_query_string
Definition: postgres.c:88
#define InvalidOid
Definition: postgres_ext.h:35
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
bool replication_scanner_is_replication_command(yyscan_t yyscanner)
Definition: repl_scanner.l:299
void replication_scanner_finish(yyscan_t yyscanner)
Definition: repl_scanner.l:284
void replication_scanner_init(const char *str, yyscan_t *yyscannerp)
Definition: repl_scanner.l:268
@ REPLICATION_KIND_PHYSICAL
Definition: replnodes.h:22
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:600
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
Definition: nodes.h:135
NodeTag type
Definition: nodes.h:136
ReplicationKind kind
Definition: replnodes.h:94
WalSndState state
static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
Definition: walsender.c:1391
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:563
WalSnd * MyWalSnd
Definition: walsender.c:117
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition: walsender.c:464
static StringInfoData tmpbuf
Definition: walsender.c:175
static void IdentifySystem(void)
Definition: walsender.c:383
static StringInfoData reply_message
Definition: walsender.c:174
void WalSndSetState(WalSndState state)
Definition: walsender.c:3814
static StringInfoData output_message
Definition: walsender.c:173
static void UploadManifest(void)
Definition: walsender.c:653
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:203
bool log_replication_commands
Definition: walsender.c:130
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1177
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1433
static IncrementalBackupInfo * uploaded_manifest
Definition: walsender.c:152
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1382
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:795
static XLogReaderState * xlogreader
Definition: walsender.c:142
@ WALSNDSTATE_STOPPING
int replication_yyparse(Node **replication_parse_result_p, yyscan_t yyscanner)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3648
void StartTransactionCommand(void)
Definition: xact.c:3059
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:407
void CommitTransactionCommand(void)
Definition: xact.c:3157

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, AlterReplicationSlot(), Assert(), CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, MyWalSnd, VariableShowStmt::name, output_message, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_scanner_finish(), replication_scanner_init(), replication_scanner_is_replication_command(), replication_yyparse(), reply_message, SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, tmpbuf, Node::type, ReadReplicationSlotCmd::type, uploaded_manifest, UploadManifest(), WalSndSetState(), WALSNDSTATE_STOPPING, and xlogreader.

Referenced by PostgresMain().

◆ GetStandbyFlushRecPtr()

XLogRecPtr GetStandbyFlushRecPtr ( TimeLineID tli)

Definition at line 3509 of file walsender.c.

3510{
3511 XLogRecPtr replayPtr;
3512 TimeLineID replayTLI;
3513 XLogRecPtr receivePtr;
3515 XLogRecPtr result;
3516
3518
3519 /*
3520 * We can safely send what's already been replayed. Also, if walreceiver
3521 * is streaming WAL from the same timeline, we can send anything that it
3522 * has streamed, but hasn't been replayed yet.
3523 */
3524
3525 receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
3526 replayPtr = GetXLogReplayRecPtr(&replayTLI);
3527
3528 if (tli)
3529 *tli = replayTLI;
3530
3531 result = replayPtr;
3532 if (receiveTLI == replayTLI && receivePtr > replayPtr)
3533 result = receivePtr;
3534
3535 return result;
3536}
bool IsSyncingReplicationSlots(void)
Definition: slotsync.c:1653
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
bool am_cascading_walsender
Definition: walsender.c:121
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59
static TimeLineID receiveTLI
Definition: xlogrecovery.c:264
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References am_cascading_walsender, Assert(), GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), IsSyncingReplicationSlots(), and receiveTLI.

Referenced by IdentifySystem(), StartReplication(), synchronize_one_slot(), XLogSendLogical(), and XLogSendPhysical().

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3565 of file walsender.c.

3566{
3568
3569 /*
3570 * If replication has not yet started, die like with SIGTERM. If
3571 * replication is active, only set a flag and wake up the main loop. It
3572 * will send any outstanding WAL, wait for it to be replicated to the
3573 * standby, and then exit gracefully.
3574 */
3575 if (!replication_active)
3576 kill(MyProcPid, SIGTERM);
3577 else
3578 got_STOPPING = true;
3579}
int MyProcPid
Definition: globals.c:48
bool am_walsender
Definition: walsender.c:120
static volatile sig_atomic_t replication_active
Definition: walsender.c:211
#define kill(pid, sig)
Definition: win32_port.h:493

References am_walsender, Assert(), got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

◆ InitWalSender()

void InitWalSender ( void  )

Definition at line 283 of file walsender.c.

284{
286
287 /* Create a per-walsender data structure in shared memory */
289
290 /* need resource owner for e.g. basebackups */
292
293 /*
294 * Let postmaster know that we're a WAL sender. Once we've declared us as
295 * a WAL sender process, postmaster will let us outlive the bgwriter and
296 * kill us last in the shutdown sequence, so we get a chance to stream all
297 * remaining WAL at shutdown, including the shutdown checkpoint. Note that
298 * there's no going back, and we mustn't write any WAL records after this.
299 */
302
303 /*
304 * If the client didn't specify a database to connect to, show in PGPROC
305 * that our advertised xmin should affect vacuum horizons in all
306 * databases. This allows physical replication clients to send hot
307 * standby feedback that will delay vacuum cleanup in all databases.
308 */
310 {
312 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
315 LWLockRelease(ProcArrayLock);
316 }
317
318 /* Initialize empty timestamp buffer for lag tracking. */
320}
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1182
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1902
@ LW_EXCLUSIVE
Definition: lwlock.h:114
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1290
MemoryContext TopMemoryContext
Definition: mcxt.c:165
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:165
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:309
@ PMSIGNAL_ADVANCE_STATE_MACHINE
Definition: pmsignal.h:43
#define PROC_AFFECTS_ALL_HORIZONS
Definition: proc.h:62
void CreateAuxProcessResourceOwner(void)
Definition: resowner.c:999
PGPROC * MyProc
Definition: proc.c:67
PROC_HDR * ProcGlobal
Definition: proc.c:79
TransactionId xmin
Definition: proc.h:178
uint8 statusFlags
Definition: proc.h:243
int pgxactoff
Definition: proc.h:185
uint8 * statusFlags
Definition: proc.h:387
#define InvalidTransactionId
Definition: transam.h:31
static void InitWalSenderSlot(void)
Definition: walsender.c:2901
static LagTracker * lag_tracker
Definition: walsender.c:235
bool RecoveryInProgress(void)
Definition: xlog.c:6522

References am_cascading_walsender, Assert(), CreateAuxProcessResourceOwner(), InitWalSenderSlot(), InvalidOid, InvalidTransactionId, lag_tracker, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MarkPostmasterChildWalSender(), MemoryContextAllocZero(), MyDatabaseId, MyProc, PGPROC::pgxactoff, PMSIGNAL_ADVANCE_STATE_MACHINE, PROC_AFFECTS_ALL_HORIZONS, ProcGlobal, RecoveryInProgress(), SendPostmasterSignal(), PGPROC::statusFlags, PROC_HDR::statusFlags, TopMemoryContext, and PGPROC::xmin.

Referenced by PostgresMain().

◆ PhysicalWakeupLogicalWalSnd()

void PhysicalWakeupLogicalWalSnd ( void  )

Definition at line 1714 of file walsender.c.

1715{
1717
1718 /*
1719 * If we are running in a standby, there is no need to wake up walsenders.
1720 * This is because we do not support syncing slots to cascading standbys,
1721 * so, there are no walsenders waiting for standbys to catch up.
1722 */
1723 if (RecoveryInProgress())
1724 return;
1725
1728}
#define NameStr(name)
Definition: c.h:717
void ConditionVariableBroadcast(ConditionVariable *cv)
ReplicationSlot * MyReplicationSlot
Definition: slot.c:147
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition: slot.c:2772
#define SlotIsPhysical(slot)
Definition: slot.h:220
ReplicationSlotPersistentData data
Definition: slot.h:185
ConditionVariable wal_confirm_rcv_cv
WalSndCtlData * WalSndCtl
Definition: walsender.c:114

References Assert(), ConditionVariableBroadcast(), ReplicationSlot::data, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, RecoveryInProgress(), SlotExistsInSyncStandbySlots(), SlotIsPhysical, WalSndCtlData::wal_confirm_rcv_cv, and WalSndCtl.

Referenced by pg_physical_replication_slot_advance(), and PhysicalConfirmReceivedLocation().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 330 of file walsender.c.

331{
336
337 if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
339
340 if (MyReplicationSlot != NULL)
342
344
345 replication_active = false;
346
347 /*
348 * If there is a transaction in progress, it will clean up our
349 * ResourceOwner, but if a replication command set up a resource owner
350 * without a transaction, we've got to clean that up now.
351 */
354
356 proc_exit(0);
357
358 /* Revert back to startup state */
360}
void pgaio_error_cleanup(void)
Definition: aio.c:1062
bool ConditionVariableCancelSleep(void)
void proc_exit(int code)
Definition: ipc.c:104
void LWLockReleaseAll(void)
Definition: lwlock.c:1953
void ReleaseAuxProcessResources(bool isCommit)
Definition: resowner.c:1019
void ReplicationSlotRelease(void)
Definition: slot.c:686
void ReplicationSlotCleanup(bool synced_only)
Definition: slot.c:775
WALOpenSegment seg
Definition: xlogreader.h:272
static void pgstat_report_wait_end(void)
Definition: wait_event.h:101
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:202
@ WALSNDSTATE_STARTUP
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4989
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:831

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgaio_error_cleanup(), pgstat_report_wait_end(), proc_exit(), ReleaseAuxProcessResources(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndSetState(), WALSNDSTATE_STARTUP, WALOpenSegment::ws_file, and xlogreader.

Referenced by PostgresMain().

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3750 of file walsender.c.

3751{
3752 int i;
3753
3754 for (i = 0; i < max_wal_senders; i++)
3755 {
3756 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3757 pid_t pid;
3758
3759 SpinLockAcquire(&walsnd->mutex);
3760 pid = walsnd->pid;
3761 SpinLockRelease(&walsnd->mutex);
3762
3763 if (pid == 0)
3764 continue;
3765
3767 }
3768}
int i
Definition: isn.c:77
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition: procsignal.c:283
@ PROCSIG_WALSND_INIT_STOPPING
Definition: procsignal.h:35
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
int max_wal_senders
Definition: walsender.c:126

References i, INVALID_PROC_NUMBER, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3542 of file walsender.c.

3543{
3544 int i;
3545
3546 for (i = 0; i < max_wal_senders; i++)
3547 {
3548 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3549
3550 SpinLockAcquire(&walsnd->mutex);
3551 if (walsnd->pid == 0)
3552 {
3553 SpinLockRelease(&walsnd->mutex);
3554 continue;
3555 }
3556 walsnd->needreload = true;
3557 SpinLockRelease(&walsnd->mutex);
3558 }
3559}
bool needreload

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3626 of file walsender.c.

3627{
3628 bool found;
3629 int i;
3630
3632 ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3633
3634 if (!found)
3635 {
3636 /* First time through, so initialize */
3638
3639 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3641
3642 for (i = 0; i < max_wal_senders; i++)
3643 {
3644 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3645
3646 SpinLockInit(&walsnd->mutex);
3647 }
3648
3652 }
3653}
#define MemSet(start, val, len)
Definition: c.h:991
void ConditionVariableInit(ConditionVariable *cv)
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
#define SpinLockInit(lock)
Definition: spin.h:57
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:27
Size WalSndShmemSize(void)
Definition: walsender.c:3614

References ConditionVariableInit(), dlist_init(), i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateOrAttachShmemStructs().

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3614 of file walsender.c.

3615{
3616 Size size = 0;
3617
3618 size = offsetof(WalSndCtlData, walsnds);
3619 size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3620
3621 return size;
3622}
size_t Size
Definition: c.h:576
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510

References add_size(), max_wal_senders, and mul_size().

Referenced by CalculateShmemSize(), and WalSndShmemInit().

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3595 of file walsender.c.

3596{
3597 /* Set up signal handlers */
3599 pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3600 pqsignal(SIGTERM, die); /* request shutdown */
3601 /* SIGQUIT handler was already set up by InitPostmasterChild */
3602 InitializeTimeouts(); /* establishes SIGALRM handler */
3603 pqsignal(SIGPIPE, SIG_IGN);
3605 pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3606 * shutdown */
3607
3608 /* Reset some signals that are accepted by postmaster but not here */
3609 pqsignal(SIGCHLD, SIG_DFL);
3610}
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:65
#define die(msg)
#define pqsignal
Definition: port.h:531
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:3058
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:673
void InitializeTimeouts(void)
Definition: timeout.c:470
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3587
#define SIGCHLD
Definition: win32_port.h:168
#define SIGHUP
Definition: win32_port.h:158
#define SIGPIPE
Definition: win32_port.h:163
#define SIGUSR1
Definition: win32_port.h:170
#define SIGUSR2
Definition: win32_port.h:171

References die, InitializeTimeouts(), pqsignal, procsignal_sigusr1_handler(), SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3776 of file walsender.c.

3777{
3778 for (;;)
3779 {
3780 int i;
3781 bool all_stopped = true;
3782
3783 for (i = 0; i < max_wal_senders; i++)
3784 {
3785 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3786
3787 SpinLockAcquire(&walsnd->mutex);
3788
3789 if (walsnd->pid == 0)
3790 {
3791 SpinLockRelease(&walsnd->mutex);
3792 continue;
3793 }
3794
3795 if (walsnd->state != WALSNDSTATE_STOPPING)
3796 {
3797 all_stopped = false;
3798 SpinLockRelease(&walsnd->mutex);
3799 break;
3800 }
3801 SpinLockRelease(&walsnd->mutex);
3802 }
3803
3804 /* safe to leave if confirmation is done for all WAL senders */
3805 if (all_stopped)
3806 return;
3807
3808 pg_usleep(10000L); /* wait for 10 msec */
3809 }
3810}
void pg_usleep(long microsec)
Definition: signal.c:53

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtl, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

◆ WalSndWakeup()

void WalSndWakeup ( bool  physical,
bool  logical 
)

Definition at line 3671 of file walsender.c.

3672{
3673 /*
3674 * Wake up all the walsenders waiting on WAL being flushed or replayed
3675 * respectively. Note that waiting walsender would have prepared to sleep
3676 * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
3677 * before actually waiting.
3678 */
3679 if (physical)
3681
3682 if (logical)
3684}

References ConditionVariableBroadcast(), WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, and WalSndCtl.

Referenced by ApplyWalRecord(), KeepFileRestoredFromArchive(), StartupXLOG(), WalSndWakeupProcessRequests(), and XLogWalRcvFlush().

◆ WalSndWakeupProcessRequests()

static void WalSndWakeupProcessRequests ( bool  physical,
bool  logical 
)
inlinestatic

Definition at line 65 of file walsender.h.

66{
68 {
69 wake_wal_senders = false;
70 if (max_wal_senders > 0)
71 WalSndWakeup(physical, logical);
72 }
73}
PGDLLIMPORT bool wake_wal_senders
Definition: walsender.c:135
void WalSndWakeup(bool physical, bool logical)
Definition: walsender.c:3671
PGDLLIMPORT int max_wal_senders
Definition: walsender.c:126

References max_wal_senders, wake_wal_senders, and WalSndWakeup().

Referenced by XLogBackgroundFlush(), and XLogFlush().

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

PGDLLIMPORT bool am_db_walsender
extern

Definition at line 123 of file walsender.c.

Referenced by check_db(), ClientAuthentication(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ log_replication_commands

PGDLLIMPORT bool log_replication_commands
extern

◆ max_wal_senders

◆ wake_wal_senders

PGDLLIMPORT bool wake_wal_senders
extern

Definition at line 135 of file walsender.c.

Referenced by WalSndWakeupProcessRequests().

◆ wal_sender_timeout