PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
walsender.h File Reference
#include "access/xlogdefs.h"
Include dependency graph for walsender.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define WalSndWakeupRequest()    do { wake_wal_senders = true; } while (0)
 

Enumerations

enum  CRSSnapshotAction { CRS_EXPORT_SNAPSHOT , CRS_NOEXPORT_SNAPSHOT , CRS_USE_SNAPSHOT }
 

Functions

void InitWalSender (void)
 
bool exec_replication_command (const char *cmd_string)
 
void WalSndErrorCleanup (void)
 
void PhysicalWakeupLogicalWalSnd (void)
 
XLogRecPtr GetStandbyFlushRecPtr (TimeLineID *tli)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (bool physical, bool logical)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndRqstFileReload (void)
 
static void WalSndWakeupProcessRequests (bool physical, bool logical)
 

Variables

PGDLLIMPORT bool am_walsender
 
PGDLLIMPORT bool am_cascading_walsender
 
PGDLLIMPORT bool am_db_walsender
 
PGDLLIMPORT bool wake_wal_senders
 
PGDLLIMPORT int max_wal_senders
 
PGDLLIMPORT int wal_sender_timeout
 
PGDLLIMPORT bool log_replication_commands
 

Macro Definition Documentation

◆ WalSndWakeupRequest

#define WalSndWakeupRequest ( )     do { wake_wal_senders = true; } while (0)

Definition at line 58 of file walsender.h.

Enumeration Type Documentation

◆ CRSSnapshotAction

Enumerator
CRS_EXPORT_SNAPSHOT 
CRS_NOEXPORT_SNAPSHOT 
CRS_USE_SNAPSHOT 

Definition at line 20 of file walsender.h.

21{
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22

Function Documentation

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1970 of file walsender.c.

1971{
1972 yyscan_t scanner;
1973 int parse_rc;
1974 Node *cmd_node;
1975 const char *cmdtag;
1976 MemoryContext old_context = CurrentMemoryContext;
1977
1978 /* We save and re-use the cmd_context across calls */
1979 static MemoryContext cmd_context = NULL;
1980
1981 /*
1982 * If WAL sender has been told that shutdown is getting close, switch its
1983 * status accordingly to handle the next replication commands correctly.
1984 */
1985 if (got_STOPPING)
1987
1988 /*
1989 * Throw error if in stopping mode. We need prevent commands that could
1990 * generate WAL while the shutdown checkpoint is being written. To be
1991 * safe, we just prohibit all new commands.
1992 */
1994 ereport(ERROR,
1995 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1996 errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1997
1998 /*
1999 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
2000 * command arrives. Clean up the old stuff if there's anything.
2001 */
2003
2005
2006 /*
2007 * Prepare to parse and execute the command.
2008 *
2009 * Because replication command execution can involve beginning or ending
2010 * transactions, we need a working context that will survive that, so we
2011 * make it a child of TopMemoryContext. That in turn creates a hazard of
2012 * long-lived memory leaks if we lose track of the working context. We
2013 * deal with that by creating it only once per walsender, and resetting it
2014 * for each new command. (Normally this reset is a no-op, but if the
2015 * prior exec_replication_command call failed with an error, it won't be.)
2016 *
2017 * This is subtler than it looks. The transactions we manage can extend
2018 * across replication commands, indeed SnapBuildClearExportedSnapshot
2019 * might have just ended one. Because transaction exit will revert to the
2020 * memory context that was current at transaction start, we need to be
2021 * sure that that context is still valid. That motivates re-using the
2022 * same cmd_context rather than making a new one each time.
2023 */
2024 if (cmd_context == NULL)
2026 "Replication command context",
2028 else
2029 MemoryContextReset(cmd_context);
2030
2031 MemoryContextSwitchTo(cmd_context);
2032
2033 replication_scanner_init(cmd_string, &scanner);
2034
2035 /*
2036 * Is it a WalSender command?
2037 */
2039 {
2040 /* Nope; clean up and get out. */
2042
2043 MemoryContextSwitchTo(old_context);
2044 MemoryContextReset(cmd_context);
2045
2046 /* XXX this is a pretty random place to make this check */
2047 if (MyDatabaseId == InvalidOid)
2048 ereport(ERROR,
2049 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2050 errmsg("cannot execute SQL commands in WAL sender for physical replication")));
2051
2052 /* Tell the caller that this wasn't a WalSender command. */
2053 return false;
2054 }
2055
2056 /*
2057 * Looks like a WalSender command, so parse it.
2058 */
2059 parse_rc = replication_yyparse(&cmd_node, scanner);
2060 if (parse_rc != 0)
2061 ereport(ERROR,
2062 (errcode(ERRCODE_SYNTAX_ERROR),
2063 errmsg_internal("replication command parser returned %d",
2064 parse_rc)));
2066
2067 /*
2068 * Report query to various monitoring facilities. For this purpose, we
2069 * report replication commands just like SQL commands.
2070 */
2071 debug_query_string = cmd_string;
2072
2074
2075 /*
2076 * Log replication command if log_replication_commands is enabled. Even
2077 * when it's disabled, log the command with DEBUG1 level for backward
2078 * compatibility.
2079 */
2081 (errmsg("received replication command: %s", cmd_string)));
2082
2083 /*
2084 * Disallow replication commands in aborted transaction blocks.
2085 */
2087 ereport(ERROR,
2088 (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
2089 errmsg("current transaction is aborted, "
2090 "commands ignored until end of transaction block")));
2091
2093
2094 /*
2095 * Allocate buffers that will be used for each outgoing and incoming
2096 * message. We do this just once per command to reduce palloc overhead.
2097 */
2101
2102 switch (cmd_node->type)
2103 {
2104 case T_IdentifySystemCmd:
2105 cmdtag = "IDENTIFY_SYSTEM";
2106 set_ps_display(cmdtag);
2108 EndReplicationCommand(cmdtag);
2109 break;
2110
2111 case T_ReadReplicationSlotCmd:
2112 cmdtag = "READ_REPLICATION_SLOT";
2113 set_ps_display(cmdtag);
2115 EndReplicationCommand(cmdtag);
2116 break;
2117
2118 case T_BaseBackupCmd:
2119 cmdtag = "BASE_BACKUP";
2120 set_ps_display(cmdtag);
2121 PreventInTransactionBlock(true, cmdtag);
2123 EndReplicationCommand(cmdtag);
2124 break;
2125
2126 case T_CreateReplicationSlotCmd:
2127 cmdtag = "CREATE_REPLICATION_SLOT";
2128 set_ps_display(cmdtag);
2130 EndReplicationCommand(cmdtag);
2131 break;
2132
2133 case T_DropReplicationSlotCmd:
2134 cmdtag = "DROP_REPLICATION_SLOT";
2135 set_ps_display(cmdtag);
2137 EndReplicationCommand(cmdtag);
2138 break;
2139
2140 case T_AlterReplicationSlotCmd:
2141 cmdtag = "ALTER_REPLICATION_SLOT";
2142 set_ps_display(cmdtag);
2144 EndReplicationCommand(cmdtag);
2145 break;
2146
2147 case T_StartReplicationCmd:
2148 {
2149 StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
2150
2151 cmdtag = "START_REPLICATION";
2152 set_ps_display(cmdtag);
2153 PreventInTransactionBlock(true, cmdtag);
2154
2155 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
2156 StartReplication(cmd);
2157 else
2159
2160 /* dupe, but necessary per libpqrcv_endstreaming */
2161 EndReplicationCommand(cmdtag);
2162
2163 Assert(xlogreader != NULL);
2164 break;
2165 }
2166
2167 case T_TimeLineHistoryCmd:
2168 cmdtag = "TIMELINE_HISTORY";
2169 set_ps_display(cmdtag);
2170 PreventInTransactionBlock(true, cmdtag);
2172 EndReplicationCommand(cmdtag);
2173 break;
2174
2175 case T_VariableShowStmt:
2176 {
2178 VariableShowStmt *n = (VariableShowStmt *) cmd_node;
2179
2180 cmdtag = "SHOW";
2181 set_ps_display(cmdtag);
2182
2183 /* syscache access needs a transaction environment */
2185 GetPGVariable(n->name, dest);
2187 EndReplicationCommand(cmdtag);
2188 }
2189 break;
2190
2191 case T_UploadManifestCmd:
2192 cmdtag = "UPLOAD_MANIFEST";
2193 set_ps_display(cmdtag);
2194 PreventInTransactionBlock(true, cmdtag);
2196 EndReplicationCommand(cmdtag);
2197 break;
2198
2199 default:
2200 elog(ERROR, "unrecognized replication command node tag: %u",
2201 cmd_node->type);
2202 }
2203
2204 /*
2205 * Done. Revert to caller's memory context, and clean out the cmd_context
2206 * to recover memory right away.
2207 */
2208 MemoryContextSwitchTo(old_context);
2209 MemoryContextReset(cmd_context);
2210
2211 /*
2212 * We need not update ps display or pg_stat_activity, because PostgresMain
2213 * will reset those to "idle". But we must reset debug_query_string to
2214 * ensure it doesn't become a dangling pointer.
2215 */
2216 debug_query_string = NULL;
2217
2218 return true;
2219}
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void SendBaseBackup(BaseBackupCmd *cmd, IncrementalBackupInfo *ib)
Definition: basebackup.c:990
void * yyscan_t
Definition: cubedata.h:67
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:205
@ DestRemoteSimple
Definition: dest.h:91
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1158
int errcode(int sqlerrcode)
Definition: elog.c:854
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define LOG
Definition: elog.h:31
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
Oid MyDatabaseId
Definition: globals.c:94
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc_funcs.c:382
Assert(PointerIsAligned(start, uint64))
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:383
MemoryContext TopMemoryContext
Definition: mcxt.c:149
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
const char * debug_query_string
Definition: postgres.c:88
#define InvalidOid
Definition: postgres_ext.h:35
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
bool replication_scanner_is_replication_command(yyscan_t yyscanner)
Definition: repl_scanner.l:299
void replication_scanner_finish(yyscan_t yyscanner)
Definition: repl_scanner.l:284
void replication_scanner_init(const char *str, yyscan_t *yyscannerp)
Definition: repl_scanner.l:268
@ REPLICATION_KIND_PHYSICAL
Definition: replnodes.h:22
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:600
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
Definition: nodes.h:135
NodeTag type
Definition: nodes.h:136
ReplicationKind kind
Definition: replnodes.h:94
WalSndState state
static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
Definition: walsender.c:1391
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:563
WalSnd * MyWalSnd
Definition: walsender.c:117
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition: walsender.c:464
static StringInfoData tmpbuf
Definition: walsender.c:175
static void IdentifySystem(void)
Definition: walsender.c:383
static StringInfoData reply_message
Definition: walsender.c:174
void WalSndSetState(WalSndState state)
Definition: walsender.c:3838
static StringInfoData output_message
Definition: walsender.c:173
static void UploadManifest(void)
Definition: walsender.c:653
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:203
bool log_replication_commands
Definition: walsender.c:130
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1177
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1433
static IncrementalBackupInfo * uploaded_manifest
Definition: walsender.c:152
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1382
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:795
static XLogReaderState * xlogreader
Definition: walsender.c:142
@ WALSNDSTATE_STOPPING
int replication_yyparse(Node **replication_parse_result_p, yyscan_t yyscanner)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3648
void StartTransactionCommand(void)
Definition: xact.c:3059
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:407
void CommitTransactionCommand(void)
Definition: xact.c:3157

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, AlterReplicationSlot(), Assert(), CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextReset(), MemoryContextSwitchTo(), MyDatabaseId, MyWalSnd, VariableShowStmt::name, output_message, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_scanner_finish(), replication_scanner_init(), replication_scanner_is_replication_command(), replication_yyparse(), reply_message, SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, tmpbuf, TopMemoryContext, Node::type, ReadReplicationSlotCmd::type, uploaded_manifest, UploadManifest(), WalSndSetState(), WALSNDSTATE_STOPPING, and xlogreader.

Referenced by PostgresMain().

◆ GetStandbyFlushRecPtr()

XLogRecPtr GetStandbyFlushRecPtr ( TimeLineID tli)

Definition at line 3533 of file walsender.c.

3534{
3535 XLogRecPtr replayPtr;
3536 TimeLineID replayTLI;
3537 XLogRecPtr receivePtr;
3539 XLogRecPtr result;
3540
3542
3543 /*
3544 * We can safely send what's already been replayed. Also, if walreceiver
3545 * is streaming WAL from the same timeline, we can send anything that it
3546 * has streamed, but hasn't been replayed yet.
3547 */
3548
3549 receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
3550 replayPtr = GetXLogReplayRecPtr(&replayTLI);
3551
3552 if (tli)
3553 *tli = replayTLI;
3554
3555 result = replayPtr;
3556 if (receiveTLI == replayTLI && receivePtr > replayPtr)
3557 result = receivePtr;
3558
3559 return result;
3560}
bool IsSyncingReplicationSlots(void)
Definition: slotsync.c:1668
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
bool am_cascading_walsender
Definition: walsender.c:121
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59
static TimeLineID receiveTLI
Definition: xlogrecovery.c:264
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References am_cascading_walsender, Assert(), GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), IsSyncingReplicationSlots(), and receiveTLI.

Referenced by IdentifySystem(), StartReplication(), synchronize_one_slot(), XLogSendLogical(), and XLogSendPhysical().

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3589 of file walsender.c.

3590{
3592
3593 /*
3594 * If replication has not yet started, die like with SIGTERM. If
3595 * replication is active, only set a flag and wake up the main loop. It
3596 * will send any outstanding WAL, wait for it to be replicated to the
3597 * standby, and then exit gracefully.
3598 */
3599 if (!replication_active)
3600 kill(MyProcPid, SIGTERM);
3601 else
3602 got_STOPPING = true;
3603}
int MyProcPid
Definition: globals.c:47
bool am_walsender
Definition: walsender.c:120
static volatile sig_atomic_t replication_active
Definition: walsender.c:211
#define kill(pid, sig)
Definition: win32_port.h:493

References am_walsender, Assert(), got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

◆ InitWalSender()

void InitWalSender ( void  )

Definition at line 283 of file walsender.c.

284{
286
287 /* Create a per-walsender data structure in shared memory */
289
290 /* need resource owner for e.g. basebackups */
292
293 /*
294 * Let postmaster know that we're a WAL sender. Once we've declared us as
295 * a WAL sender process, postmaster will let us outlive the bgwriter and
296 * kill us last in the shutdown sequence, so we get a chance to stream all
297 * remaining WAL at shutdown, including the shutdown checkpoint. Note that
298 * there's no going back, and we mustn't write any WAL records after this.
299 */
302
303 /*
304 * If the client didn't specify a database to connect to, show in PGPROC
305 * that our advertised xmin should affect vacuum horizons in all
306 * databases. This allows physical replication clients to send hot
307 * standby feedback that will delay vacuum cleanup in all databases.
308 */
310 {
312 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
315 LWLockRelease(ProcArrayLock);
316 }
317
318 /* Initialize empty timestamp buffer for lag tracking. */
320}
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1180
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1900
@ LW_EXCLUSIVE
Definition: lwlock.h:114
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1219
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:165
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:309
@ PMSIGNAL_ADVANCE_STATE_MACHINE
Definition: pmsignal.h:43
#define PROC_AFFECTS_ALL_HORIZONS
Definition: proc.h:62
void CreateAuxProcessResourceOwner(void)
Definition: resowner.c:999
PGPROC * MyProc
Definition: proc.c:66
PROC_HDR * ProcGlobal
Definition: proc.c:78
TransactionId xmin
Definition: proc.h:186
uint8 statusFlags
Definition: proc.h:251
int pgxactoff
Definition: proc.h:193
uint8 * statusFlags
Definition: proc.h:395
#define InvalidTransactionId
Definition: transam.h:31
static void InitWalSenderSlot(void)
Definition: walsender.c:2925
static LagTracker * lag_tracker
Definition: walsender.c:235
bool RecoveryInProgress(void)
Definition: xlog.c:6522

References am_cascading_walsender, Assert(), CreateAuxProcessResourceOwner(), InitWalSenderSlot(), InvalidOid, InvalidTransactionId, lag_tracker, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MarkPostmasterChildWalSender(), MemoryContextAllocZero(), MyDatabaseId, MyProc, PGPROC::pgxactoff, PMSIGNAL_ADVANCE_STATE_MACHINE, PROC_AFFECTS_ALL_HORIZONS, ProcGlobal, RecoveryInProgress(), SendPostmasterSignal(), PGPROC::statusFlags, PROC_HDR::statusFlags, TopMemoryContext, and PGPROC::xmin.

Referenced by PostgresMain().

◆ PhysicalWakeupLogicalWalSnd()

void PhysicalWakeupLogicalWalSnd ( void  )

Definition at line 1714 of file walsender.c.

1715{
1717
1718 /*
1719 * If we are running in a standby, there is no need to wake up walsenders.
1720 * This is because we do not support syncing slots to cascading standbys,
1721 * so, there are no walsenders waiting for standbys to catch up.
1722 */
1723 if (RecoveryInProgress())
1724 return;
1725
1728}
#define NameStr(name)
Definition: c.h:717
void ConditionVariableBroadcast(ConditionVariable *cv)
ReplicationSlot * MyReplicationSlot
Definition: slot.c:147
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition: slot.c:2772
#define SlotIsPhysical(slot)
Definition: slot.h:220
ReplicationSlotPersistentData data
Definition: slot.h:185
ConditionVariable wal_confirm_rcv_cv
WalSndCtlData * WalSndCtl
Definition: walsender.c:114

References Assert(), ConditionVariableBroadcast(), ReplicationSlot::data, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, RecoveryInProgress(), SlotExistsInSyncStandbySlots(), SlotIsPhysical, WalSndCtlData::wal_confirm_rcv_cv, and WalSndCtl.

Referenced by pg_physical_replication_slot_advance(), and PhysicalConfirmReceivedLocation().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 330 of file walsender.c.

331{
336
337 if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
339
340 if (MyReplicationSlot != NULL)
342
344
345 replication_active = false;
346
347 /*
348 * If there is a transaction in progress, it will clean up our
349 * ResourceOwner, but if a replication command set up a resource owner
350 * without a transaction, we've got to clean that up now.
351 */
354
356 proc_exit(0);
357
358 /* Revert back to startup state */
360}
void pgaio_error_cleanup(void)
Definition: aio.c:1145
bool ConditionVariableCancelSleep(void)
void proc_exit(int code)
Definition: ipc.c:104
void LWLockReleaseAll(void)
Definition: lwlock.c:1951
void ReleaseAuxProcessResources(bool isCommit)
Definition: resowner.c:1019
void ReplicationSlotRelease(void)
Definition: slot.c:686
void ReplicationSlotCleanup(bool synced_only)
Definition: slot.c:775
WALOpenSegment seg
Definition: xlogreader.h:272
static void pgstat_report_wait_end(void)
Definition: wait_event.h:85
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:202
@ WALSNDSTATE_STARTUP
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4989
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:831

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgaio_error_cleanup(), pgstat_report_wait_end(), proc_exit(), ReleaseAuxProcessResources(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndSetState(), WALSNDSTATE_STARTUP, WALOpenSegment::ws_file, and xlogreader.

Referenced by PostgresMain().

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3774 of file walsender.c.

3775{
3776 int i;
3777
3778 for (i = 0; i < max_wal_senders; i++)
3779 {
3780 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3781 pid_t pid;
3782
3783 SpinLockAcquire(&walsnd->mutex);
3784 pid = walsnd->pid;
3785 SpinLockRelease(&walsnd->mutex);
3786
3787 if (pid == 0)
3788 continue;
3789
3791 }
3792}
int i
Definition: isn.c:77
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition: procsignal.c:284
@ PROCSIG_WALSND_INIT_STOPPING
Definition: procsignal.h:35
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
int max_wal_senders
Definition: walsender.c:126

References i, INVALID_PROC_NUMBER, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3566 of file walsender.c.

3567{
3568 int i;
3569
3570 for (i = 0; i < max_wal_senders; i++)
3571 {
3572 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3573
3574 SpinLockAcquire(&walsnd->mutex);
3575 if (walsnd->pid == 0)
3576 {
3577 SpinLockRelease(&walsnd->mutex);
3578 continue;
3579 }
3580 walsnd->needreload = true;
3581 SpinLockRelease(&walsnd->mutex);
3582 }
3583}
bool needreload

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3650 of file walsender.c.

3651{
3652 bool found;
3653 int i;
3654
3656 ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3657
3658 if (!found)
3659 {
3660 /* First time through, so initialize */
3662
3663 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3665
3666 for (i = 0; i < max_wal_senders; i++)
3667 {
3668 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3669
3670 SpinLockInit(&walsnd->mutex);
3671 }
3672
3676 }
3677}
#define MemSet(start, val, len)
Definition: c.h:991
void ConditionVariableInit(ConditionVariable *cv)
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
#define SpinLockInit(lock)
Definition: spin.h:57
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:27
Size WalSndShmemSize(void)
Definition: walsender.c:3638

References ConditionVariableInit(), dlist_init(), i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateOrAttachShmemStructs().

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3638 of file walsender.c.

3639{
3640 Size size = 0;
3641
3642 size = offsetof(WalSndCtlData, walsnds);
3643 size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3644
3645 return size;
3646}
size_t Size
Definition: c.h:576
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510

References add_size(), max_wal_senders, and mul_size().

Referenced by CalculateShmemSize(), and WalSndShmemInit().

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3619 of file walsender.c.

3620{
3621 /* Set up signal handlers */
3623 pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3624 pqsignal(SIGTERM, die); /* request shutdown */
3625 /* SIGQUIT handler was already set up by InitPostmasterChild */
3626 InitializeTimeouts(); /* establishes SIGALRM handler */
3627 pqsignal(SIGPIPE, SIG_IGN);
3629 pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3630 * shutdown */
3631
3632 /* Reset some signals that are accepted by postmaster but not here */
3633 pqsignal(SIGCHLD, SIG_DFL);
3634}
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
#define die(msg)
#define pqsignal
Definition: port.h:531
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:3056
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:674
void InitializeTimeouts(void)
Definition: timeout.c:470
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3611
#define SIGCHLD
Definition: win32_port.h:168
#define SIGHUP
Definition: win32_port.h:158
#define SIGPIPE
Definition: win32_port.h:163
#define SIGUSR1
Definition: win32_port.h:170
#define SIGUSR2
Definition: win32_port.h:171

References die, InitializeTimeouts(), pqsignal, procsignal_sigusr1_handler(), SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3800 of file walsender.c.

3801{
3802 for (;;)
3803 {
3804 int i;
3805 bool all_stopped = true;
3806
3807 for (i = 0; i < max_wal_senders; i++)
3808 {
3809 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3810
3811 SpinLockAcquire(&walsnd->mutex);
3812
3813 if (walsnd->pid == 0)
3814 {
3815 SpinLockRelease(&walsnd->mutex);
3816 continue;
3817 }
3818
3819 if (walsnd->state != WALSNDSTATE_STOPPING)
3820 {
3821 all_stopped = false;
3822 SpinLockRelease(&walsnd->mutex);
3823 break;
3824 }
3825 SpinLockRelease(&walsnd->mutex);
3826 }
3827
3828 /* safe to leave if confirmation is done for all WAL senders */
3829 if (all_stopped)
3830 return;
3831
3832 pg_usleep(10000L); /* wait for 10 msec */
3833 }
3834}
void pg_usleep(long microsec)
Definition: signal.c:53

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtl, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

◆ WalSndWakeup()

void WalSndWakeup ( bool  physical,
bool  logical 
)

Definition at line 3695 of file walsender.c.

3696{
3697 /*
3698 * Wake up all the walsenders waiting on WAL being flushed or replayed
3699 * respectively. Note that waiting walsender would have prepared to sleep
3700 * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
3701 * before actually waiting.
3702 */
3703 if (physical)
3705
3706 if (logical)
3708}

References ConditionVariableBroadcast(), WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, and WalSndCtl.

Referenced by ApplyWalRecord(), KeepFileRestoredFromArchive(), StartupXLOG(), WalSndWakeupProcessRequests(), and XLogWalRcvFlush().

◆ WalSndWakeupProcessRequests()

static void WalSndWakeupProcessRequests ( bool  physical,
bool  logical 
)
inlinestatic

Definition at line 65 of file walsender.h.

66{
68 {
69 wake_wal_senders = false;
70 if (max_wal_senders > 0)
71 WalSndWakeup(physical, logical);
72 }
73}
PGDLLIMPORT bool wake_wal_senders
Definition: walsender.c:135
void WalSndWakeup(bool physical, bool logical)
Definition: walsender.c:3695
PGDLLIMPORT int max_wal_senders
Definition: walsender.c:126

References max_wal_senders, wake_wal_senders, and WalSndWakeup().

Referenced by XLogBackgroundFlush(), and XLogFlush().

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

PGDLLIMPORT bool am_db_walsender
extern

Definition at line 123 of file walsender.c.

Referenced by check_db(), ClientAuthentication(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ log_replication_commands

PGDLLIMPORT bool log_replication_commands
extern

◆ max_wal_senders

◆ wake_wal_senders

PGDLLIMPORT bool wake_wal_senders
extern

Definition at line 135 of file walsender.c.

Referenced by WalSndWakeupProcessRequests().

◆ wal_sender_timeout