PostgreSQL Source Code git master
walsender.h File Reference
#include "access/xlogdefs.h"
Include dependency graph for walsender.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define WalSndWakeupRequest()    do { wake_wal_senders = true; } while (0)
 

Enumerations

enum  CRSSnapshotAction { CRS_EXPORT_SNAPSHOT , CRS_NOEXPORT_SNAPSHOT , CRS_USE_SNAPSHOT }
 

Functions

void InitWalSender (void)
 
bool exec_replication_command (const char *cmd_string)
 
void WalSndErrorCleanup (void)
 
void PhysicalWakeupLogicalWalSnd (void)
 
XLogRecPtr GetStandbyFlushRecPtr (TimeLineID *tli)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (bool physical, bool logical)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndRqstFileReload (void)
 
static void WalSndWakeupProcessRequests (bool physical, bool logical)
 

Variables

PGDLLIMPORT bool am_walsender
 
PGDLLIMPORT bool am_cascading_walsender
 
PGDLLIMPORT bool am_db_walsender
 
PGDLLIMPORT bool wake_wal_senders
 
PGDLLIMPORT int max_wal_senders
 
PGDLLIMPORT int wal_sender_timeout
 
PGDLLIMPORT bool log_replication_commands
 

Macro Definition Documentation

◆ WalSndWakeupRequest

#define WalSndWakeupRequest ( )     do { wake_wal_senders = true; } while (0)

Definition at line 58 of file walsender.h.

Enumeration Type Documentation

◆ CRSSnapshotAction

Enumerator
CRS_EXPORT_SNAPSHOT 
CRS_NOEXPORT_SNAPSHOT 
CRS_USE_SNAPSHOT 

Definition at line 20 of file walsender.h.

21{
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22

Function Documentation

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1988 of file walsender.c.

1989{
1990 yyscan_t scanner;
1991 int parse_rc;
1992 Node *cmd_node;
1993 const char *cmdtag;
1994 MemoryContext old_context = CurrentMemoryContext;
1995
1996 /* We save and re-use the cmd_context across calls */
1997 static MemoryContext cmd_context = NULL;
1998
1999 /*
2000 * If WAL sender has been told that shutdown is getting close, switch its
2001 * status accordingly to handle the next replication commands correctly.
2002 */
2003 if (got_STOPPING)
2005
2006 /*
2007 * Throw error if in stopping mode. We need prevent commands that could
2008 * generate WAL while the shutdown checkpoint is being written. To be
2009 * safe, we just prohibit all new commands.
2010 */
2012 ereport(ERROR,
2013 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2014 errmsg("cannot execute new commands while WAL sender is in stopping mode")));
2015
2016 /*
2017 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
2018 * command arrives. Clean up the old stuff if there's anything.
2019 */
2021
2023
2024 /*
2025 * Prepare to parse and execute the command.
2026 *
2027 * Because replication command execution can involve beginning or ending
2028 * transactions, we need a working context that will survive that, so we
2029 * make it a child of TopMemoryContext. That in turn creates a hazard of
2030 * long-lived memory leaks if we lose track of the working context. We
2031 * deal with that by creating it only once per walsender, and resetting it
2032 * for each new command. (Normally this reset is a no-op, but if the
2033 * prior exec_replication_command call failed with an error, it won't be.)
2034 *
2035 * This is subtler than it looks. The transactions we manage can extend
2036 * across replication commands, indeed SnapBuildClearExportedSnapshot
2037 * might have just ended one. Because transaction exit will revert to the
2038 * memory context that was current at transaction start, we need to be
2039 * sure that that context is still valid. That motivates re-using the
2040 * same cmd_context rather than making a new one each time.
2041 */
2042 if (cmd_context == NULL)
2044 "Replication command context",
2046 else
2047 MemoryContextReset(cmd_context);
2048
2049 MemoryContextSwitchTo(cmd_context);
2050
2051 replication_scanner_init(cmd_string, &scanner);
2052
2053 /*
2054 * Is it a WalSender command?
2055 */
2057 {
2058 /* Nope; clean up and get out. */
2060
2061 MemoryContextSwitchTo(old_context);
2062 MemoryContextReset(cmd_context);
2063
2064 /* XXX this is a pretty random place to make this check */
2065 if (MyDatabaseId == InvalidOid)
2066 ereport(ERROR,
2067 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2068 errmsg("cannot execute SQL commands in WAL sender for physical replication")));
2069
2070 /* Tell the caller that this wasn't a WalSender command. */
2071 return false;
2072 }
2073
2074 /*
2075 * Looks like a WalSender command, so parse it.
2076 */
2077 parse_rc = replication_yyparse(&cmd_node, scanner);
2078 if (parse_rc != 0)
2079 ereport(ERROR,
2080 (errcode(ERRCODE_SYNTAX_ERROR),
2081 errmsg_internal("replication command parser returned %d",
2082 parse_rc)));
2084
2085 /*
2086 * Report query to various monitoring facilities. For this purpose, we
2087 * report replication commands just like SQL commands.
2088 */
2089 debug_query_string = cmd_string;
2090
2092
2093 /*
2094 * Log replication command if log_replication_commands is enabled. Even
2095 * when it's disabled, log the command with DEBUG1 level for backward
2096 * compatibility.
2097 */
2099 (errmsg("received replication command: %s", cmd_string)));
2100
2101 /*
2102 * Disallow replication commands in aborted transaction blocks.
2103 */
2105 ereport(ERROR,
2106 (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
2107 errmsg("current transaction is aborted, "
2108 "commands ignored until end of transaction block")));
2109
2111
2112 /*
2113 * Allocate buffers that will be used for each outgoing and incoming
2114 * message. We do this just once per command to reduce palloc overhead.
2115 */
2119
2120 switch (cmd_node->type)
2121 {
2122 case T_IdentifySystemCmd:
2123 cmdtag = "IDENTIFY_SYSTEM";
2124 set_ps_display(cmdtag);
2126 EndReplicationCommand(cmdtag);
2127 break;
2128
2129 case T_ReadReplicationSlotCmd:
2130 cmdtag = "READ_REPLICATION_SLOT";
2131 set_ps_display(cmdtag);
2133 EndReplicationCommand(cmdtag);
2134 break;
2135
2136 case T_BaseBackupCmd:
2137 cmdtag = "BASE_BACKUP";
2138 set_ps_display(cmdtag);
2139 PreventInTransactionBlock(true, cmdtag);
2141 EndReplicationCommand(cmdtag);
2142 break;
2143
2144 case T_CreateReplicationSlotCmd:
2145 cmdtag = "CREATE_REPLICATION_SLOT";
2146 set_ps_display(cmdtag);
2148 EndReplicationCommand(cmdtag);
2149 break;
2150
2151 case T_DropReplicationSlotCmd:
2152 cmdtag = "DROP_REPLICATION_SLOT";
2153 set_ps_display(cmdtag);
2155 EndReplicationCommand(cmdtag);
2156 break;
2157
2158 case T_AlterReplicationSlotCmd:
2159 cmdtag = "ALTER_REPLICATION_SLOT";
2160 set_ps_display(cmdtag);
2162 EndReplicationCommand(cmdtag);
2163 break;
2164
2165 case T_StartReplicationCmd:
2166 {
2167 StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
2168
2169 cmdtag = "START_REPLICATION";
2170 set_ps_display(cmdtag);
2171 PreventInTransactionBlock(true, cmdtag);
2172
2173 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
2174 StartReplication(cmd);
2175 else
2177
2178 /* dupe, but necessary per libpqrcv_endstreaming */
2179 EndReplicationCommand(cmdtag);
2180
2181 Assert(xlogreader != NULL);
2182 break;
2183 }
2184
2185 case T_TimeLineHistoryCmd:
2186 cmdtag = "TIMELINE_HISTORY";
2187 set_ps_display(cmdtag);
2188 PreventInTransactionBlock(true, cmdtag);
2190 EndReplicationCommand(cmdtag);
2191 break;
2192
2193 case T_VariableShowStmt:
2194 {
2196 VariableShowStmt *n = (VariableShowStmt *) cmd_node;
2197
2198 cmdtag = "SHOW";
2199 set_ps_display(cmdtag);
2200
2201 /* syscache access needs a transaction environment */
2203 GetPGVariable(n->name, dest);
2205 EndReplicationCommand(cmdtag);
2206 }
2207 break;
2208
2209 case T_UploadManifestCmd:
2210 cmdtag = "UPLOAD_MANIFEST";
2211 set_ps_display(cmdtag);
2212 PreventInTransactionBlock(true, cmdtag);
2214 EndReplicationCommand(cmdtag);
2215 break;
2216
2217 default:
2218 elog(ERROR, "unrecognized replication command node tag: %u",
2219 cmd_node->type);
2220 }
2221
2222 /*
2223 * Done. Revert to caller's memory context, and clean out the cmd_context
2224 * to recover memory right away.
2225 */
2226 MemoryContextSwitchTo(old_context);
2227 MemoryContextReset(cmd_context);
2228
2229 /*
2230 * We need not update ps display or pg_stat_activity, because PostgresMain
2231 * will reset those to "idle". But we must reset debug_query_string to
2232 * ensure it doesn't become a dangling pointer.
2233 */
2234 debug_query_string = NULL;
2235
2236 return true;
2237}
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void SendBaseBackup(BaseBackupCmd *cmd, IncrementalBackupInfo *ib)
Definition: basebackup.c:990
void * yyscan_t
Definition: cubedata.h:65
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:205
@ DestRemoteSimple
Definition: dest.h:91
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1170
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define LOG
Definition: elog.h:31
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define ereport(elevel,...)
Definition: elog.h:150
Oid MyDatabaseId
Definition: globals.c:94
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc_funcs.c:408
Assert(PointerIsAligned(start, uint64))
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:400
MemoryContext TopMemoryContext
Definition: mcxt.c:166
MemoryContext CurrentMemoryContext
Definition: mcxt.c:160
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
const char * debug_query_string
Definition: postgres.c:89
#define InvalidOid
Definition: postgres_ext.h:37
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
bool replication_scanner_is_replication_command(yyscan_t yyscanner)
Definition: repl_scanner.l:299
void replication_scanner_finish(yyscan_t yyscanner)
Definition: repl_scanner.l:284
void replication_scanner_init(const char *str, yyscan_t *yyscannerp)
Definition: repl_scanner.l:268
@ REPLICATION_KIND_PHYSICAL
Definition: replnodes.h:22
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:600
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
Definition: nodes.h:135
NodeTag type
Definition: nodes.h:136
ReplicationKind kind
Definition: replnodes.h:94
WalSndState state
static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
Definition: walsender.c:1409
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:581
WalSnd * MyWalSnd
Definition: walsender.c:120
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition: walsender.c:482
static StringInfoData tmpbuf
Definition: walsender.c:178
static void IdentifySystem(void)
Definition: walsender.c:401
static StringInfoData reply_message
Definition: walsender.c:177
void WalSndSetState(WalSndState state)
Definition: walsender.c:3936
static StringInfoData output_message
Definition: walsender.c:176
static void UploadManifest(void)
Definition: walsender.c:671
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:206
bool log_replication_commands
Definition: walsender.c:133
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1195
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1451
static IncrementalBackupInfo * uploaded_manifest
Definition: walsender.c:155
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1400
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:813
static XLogReaderState * xlogreader
Definition: walsender.c:145
@ WALSNDSTATE_STOPPING
int replication_yyparse(Node **replication_parse_result_p, yyscan_t yyscanner)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3666
void StartTransactionCommand(void)
Definition: xact.c:3077
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:408
void CommitTransactionCommand(void)
Definition: xact.c:3175

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, AlterReplicationSlot(), Assert(), CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextReset(), MemoryContextSwitchTo(), MyDatabaseId, MyWalSnd, VariableShowStmt::name, output_message, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_scanner_finish(), replication_scanner_init(), replication_scanner_is_replication_command(), replication_yyparse(), reply_message, SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, tmpbuf, TopMemoryContext, Node::type, ReadReplicationSlotCmd::type, uploaded_manifest, UploadManifest(), WalSndSetState(), WALSNDSTATE_STOPPING, and xlogreader.

Referenced by PostgresMain().

◆ GetStandbyFlushRecPtr()

XLogRecPtr GetStandbyFlushRecPtr ( TimeLineID tli)

Definition at line 3631 of file walsender.c.

3632{
3633 XLogRecPtr replayPtr;
3634 TimeLineID replayTLI;
3635 XLogRecPtr receivePtr;
3637 XLogRecPtr result;
3638
3640
3641 /*
3642 * We can safely send what's already been replayed. Also, if walreceiver
3643 * is streaming WAL from the same timeline, we can send anything that it
3644 * has streamed, but hasn't been replayed yet.
3645 */
3646
3647 receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
3648 replayPtr = GetXLogReplayRecPtr(&replayTLI);
3649
3650 if (tli)
3651 *tli = replayTLI;
3652
3653 result = replayPtr;
3654 if (receiveTLI == replayTLI && receivePtr > replayPtr)
3655 result = receivePtr;
3656
3657 return result;
3658}
bool IsSyncingReplicationSlots(void)
Definition: slotsync.c:1754
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
bool am_cascading_walsender
Definition: walsender.c:124
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:63
static TimeLineID receiveTLI
Definition: xlogrecovery.c:266
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References am_cascading_walsender, Assert(), GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), IsSyncingReplicationSlots(), and receiveTLI.

Referenced by IdentifySystem(), StartReplication(), synchronize_one_slot(), and XLogSendPhysical().

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3687 of file walsender.c.

3688{
3690
3691 /*
3692 * If replication has not yet started, die like with SIGTERM. If
3693 * replication is active, only set a flag and wake up the main loop. It
3694 * will send any outstanding WAL, wait for it to be replicated to the
3695 * standby, and then exit gracefully.
3696 */
3697 if (!replication_active)
3698 kill(MyProcPid, SIGTERM);
3699 else
3700 got_STOPPING = true;
3701}
int MyProcPid
Definition: globals.c:47
bool am_walsender
Definition: walsender.c:123
static volatile sig_atomic_t replication_active
Definition: walsender.c:214
#define kill(pid, sig)
Definition: win32_port.h:493

References am_walsender, Assert(), got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

◆ InitWalSender()

void InitWalSender ( void  )

Definition at line 301 of file walsender.c.

302{
304
305 /* Create a per-walsender data structure in shared memory */
307
308 /* need resource owner for e.g. basebackups */
310
311 /*
312 * Let postmaster know that we're a WAL sender. Once we've declared us as
313 * a WAL sender process, postmaster will let us outlive the bgwriter and
314 * kill us last in the shutdown sequence, so we get a chance to stream all
315 * remaining WAL at shutdown, including the shutdown checkpoint. Note that
316 * there's no going back, and we mustn't write any WAL records after this.
317 */
320
321 /*
322 * If the client didn't specify a database to connect to, show in PGPROC
323 * that our advertised xmin should affect vacuum horizons in all
324 * databases. This allows physical replication clients to send hot
325 * standby feedback that will delay vacuum cleanup in all databases.
326 */
328 {
330 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
333 LWLockRelease(ProcArrayLock);
334 }
335
336 /* Initialize empty timestamp buffer for lag tracking. */
338}
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1174
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1894
@ LW_EXCLUSIVE
Definition: lwlock.h:112
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1263
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:165
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:309
@ PMSIGNAL_ADVANCE_STATE_MACHINE
Definition: pmsignal.h:43
#define PROC_AFFECTS_ALL_HORIZONS
Definition: proc.h:62
void CreateAuxProcessResourceOwner(void)
Definition: resowner.c:996
PGPROC * MyProc
Definition: proc.c:67
PROC_HDR * ProcGlobal
Definition: proc.c:79
TransactionId xmin
Definition: proc.h:194
uint8 statusFlags
Definition: proc.h:259
int pgxactoff
Definition: proc.h:201
uint8 * statusFlags
Definition: proc.h:403
#define InvalidTransactionId
Definition: transam.h:31
static void InitWalSenderSlot(void)
Definition: walsender.c:3015
static LagTracker * lag_tracker
Definition: walsender.c:252
bool RecoveryInProgress(void)
Definition: xlog.c:6406

References am_cascading_walsender, Assert(), CreateAuxProcessResourceOwner(), InitWalSenderSlot(), InvalidOid, InvalidTransactionId, lag_tracker, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MarkPostmasterChildWalSender(), MemoryContextAllocZero(), MyDatabaseId, MyProc, PGPROC::pgxactoff, PMSIGNAL_ADVANCE_STATE_MACHINE, PROC_AFFECTS_ALL_HORIZONS, ProcGlobal, RecoveryInProgress(), SendPostmasterSignal(), PGPROC::statusFlags, PROC_HDR::statusFlags, TopMemoryContext, and PGPROC::xmin.

Referenced by PostgresMain().

◆ PhysicalWakeupLogicalWalSnd()

void PhysicalWakeupLogicalWalSnd ( void  )

Definition at line 1732 of file walsender.c.

1733{
1735
1736 /*
1737 * If we are running in a standby, there is no need to wake up walsenders.
1738 * This is because we do not support syncing slots to cascading standbys,
1739 * so, there are no walsenders waiting for standbys to catch up.
1740 */
1741 if (RecoveryInProgress())
1742 return;
1743
1746}
#define NameStr(name)
Definition: c.h:754
void ConditionVariableBroadcast(ConditionVariable *cv)
ReplicationSlot * MyReplicationSlot
Definition: slot.c:148
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition: slot.c:2869
#define SlotIsPhysical(slot)
Definition: slot.h:284
ReplicationSlotPersistentData data
Definition: slot.h:210
ConditionVariable wal_confirm_rcv_cv
WalSndCtlData * WalSndCtl
Definition: walsender.c:117

References Assert(), ConditionVariableBroadcast(), ReplicationSlot::data, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, RecoveryInProgress(), SlotExistsInSyncStandbySlots(), SlotIsPhysical, WalSndCtlData::wal_confirm_rcv_cv, and WalSndCtl.

Referenced by pg_physical_replication_slot_advance(), and PhysicalConfirmReceivedLocation().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 348 of file walsender.c.

349{
354
355 if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
357
358 if (MyReplicationSlot != NULL)
360
362
363 replication_active = false;
364
365 /*
366 * If there is a transaction in progress, it will clean up our
367 * ResourceOwner, but if a replication command set up a resource owner
368 * without a transaction, we've got to clean that up now.
369 */
372
374 proc_exit(0);
375
376 /* Revert back to startup state */
378}
void pgaio_error_cleanup(void)
Definition: aio.c:1165
bool ConditionVariableCancelSleep(void)
void proc_exit(int code)
Definition: ipc.c:104
void LWLockReleaseAll(void)
Definition: lwlock.c:1945
void ReleaseAuxProcessResources(bool isCommit)
Definition: resowner.c:1016
void ReplicationSlotRelease(void)
Definition: slot.c:764
void ReplicationSlotCleanup(bool synced_only)
Definition: slot.c:853
WALOpenSegment seg
Definition: xlogreader.h:271
static void pgstat_report_wait_end(void)
Definition: wait_event.h:85
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:205
@ WALSNDSTATE_STARTUP
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:5007
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:831

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgaio_error_cleanup(), pgstat_report_wait_end(), proc_exit(), ReleaseAuxProcessResources(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndSetState(), WALSNDSTATE_STARTUP, WALOpenSegment::ws_file, and xlogreader.

Referenced by PostgresMain().

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3872 of file walsender.c.

3873{
3874 int i;
3875
3876 for (i = 0; i < max_wal_senders; i++)
3877 {
3878 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3879 pid_t pid;
3880
3881 SpinLockAcquire(&walsnd->mutex);
3882 pid = walsnd->pid;
3883 SpinLockRelease(&walsnd->mutex);
3884
3885 if (pid == 0)
3886 continue;
3887
3889 }
3890}
int i
Definition: isn.c:77
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition: procsignal.c:284
@ PROCSIG_WALSND_INIT_STOPPING
Definition: procsignal.h:35
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
int max_wal_senders
Definition: walsender.c:129

References i, INVALID_PROC_NUMBER, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3664 of file walsender.c.

3665{
3666 int i;
3667
3668 for (i = 0; i < max_wal_senders; i++)
3669 {
3670 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3671
3672 SpinLockAcquire(&walsnd->mutex);
3673 if (walsnd->pid == 0)
3674 {
3675 SpinLockRelease(&walsnd->mutex);
3676 continue;
3677 }
3678 walsnd->needreload = true;
3679 SpinLockRelease(&walsnd->mutex);
3680 }
3681}
bool needreload

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3748 of file walsender.c.

3749{
3750 bool found;
3751 int i;
3752
3754 ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3755
3756 if (!found)
3757 {
3758 /* First time through, so initialize */
3760
3761 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3763
3764 for (i = 0; i < max_wal_senders; i++)
3765 {
3766 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3767
3768 SpinLockInit(&walsnd->mutex);
3769 }
3770
3774 }
3775}
#define MemSet(start, val, len)
Definition: c.h:1022
void ConditionVariableInit(ConditionVariable *cv)
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:389
#define SpinLockInit(lock)
Definition: spin.h:57
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:27
Size WalSndShmemSize(void)
Definition: walsender.c:3736

References ConditionVariableInit(), dlist_init(), i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateOrAttachShmemStructs().

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3736 of file walsender.c.

3737{
3738 Size size = 0;
3739
3740 size = offsetof(WalSndCtlData, walsnds);
3741 size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3742
3743 return size;
3744}
size_t Size
Definition: c.h:613
Size add_size(Size s1, Size s2)
Definition: shmem.c:495
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510

References add_size(), max_wal_senders, and mul_size().

Referenced by CalculateShmemSize(), and WalSndShmemInit().

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3717 of file walsender.c.

3718{
3719 /* Set up signal handlers */
3721 pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3722 pqsignal(SIGTERM, die); /* request shutdown */
3723 /* SIGQUIT handler was already set up by InitPostmasterChild */
3724 InitializeTimeouts(); /* establishes SIGALRM handler */
3725 pqsignal(SIGPIPE, SIG_IGN);
3727 pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3728 * shutdown */
3729
3730 /* Reset some signals that are accepted by postmaster but not here */
3731 pqsignal(SIGCHLD, SIG_DFL);
3732}
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
#define die(msg)
#define pqsignal
Definition: port.h:552
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:3062
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:674
void InitializeTimeouts(void)
Definition: timeout.c:470
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3709
#define SIGCHLD
Definition: win32_port.h:168
#define SIGHUP
Definition: win32_port.h:158
#define SIGPIPE
Definition: win32_port.h:163
#define SIGUSR1
Definition: win32_port.h:170
#define SIGUSR2
Definition: win32_port.h:171

References die, InitializeTimeouts(), pqsignal, procsignal_sigusr1_handler(), SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3898 of file walsender.c.

3899{
3900 for (;;)
3901 {
3902 int i;
3903 bool all_stopped = true;
3904
3905 for (i = 0; i < max_wal_senders; i++)
3906 {
3907 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3908
3909 SpinLockAcquire(&walsnd->mutex);
3910
3911 if (walsnd->pid == 0)
3912 {
3913 SpinLockRelease(&walsnd->mutex);
3914 continue;
3915 }
3916
3917 if (walsnd->state != WALSNDSTATE_STOPPING)
3918 {
3919 all_stopped = false;
3920 SpinLockRelease(&walsnd->mutex);
3921 break;
3922 }
3923 SpinLockRelease(&walsnd->mutex);
3924 }
3925
3926 /* safe to leave if confirmation is done for all WAL senders */
3927 if (all_stopped)
3928 return;
3929
3930 pg_usleep(10000L); /* wait for 10 msec */
3931 }
3932}
void pg_usleep(long microsec)
Definition: signal.c:53

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtl, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

◆ WalSndWakeup()

void WalSndWakeup ( bool  physical,
bool  logical 
)

Definition at line 3793 of file walsender.c.

3794{
3795 /*
3796 * Wake up all the walsenders waiting on WAL being flushed or replayed
3797 * respectively. Note that waiting walsender would have prepared to sleep
3798 * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
3799 * before actually waiting.
3800 */
3801 if (physical)
3803
3804 if (logical)
3806}

References ConditionVariableBroadcast(), WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, and WalSndCtl.

Referenced by ApplyWalRecord(), KeepFileRestoredFromArchive(), StartupXLOG(), WalSndWakeupProcessRequests(), and XLogWalRcvFlush().

◆ WalSndWakeupProcessRequests()

static void WalSndWakeupProcessRequests ( bool  physical,
bool  logical 
)
inlinestatic

Definition at line 65 of file walsender.h.

66{
68 {
69 wake_wal_senders = false;
70 if (max_wal_senders > 0)
71 WalSndWakeup(physical, logical);
72 }
73}
PGDLLIMPORT bool wake_wal_senders
Definition: walsender.c:138
void WalSndWakeup(bool physical, bool logical)
Definition: walsender.c:3793
PGDLLIMPORT int max_wal_senders
Definition: walsender.c:129

References max_wal_senders, wake_wal_senders, and WalSndWakeup().

Referenced by XLogBackgroundFlush(), and XLogFlush().

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

PGDLLIMPORT bool am_db_walsender
extern

Definition at line 126 of file walsender.c.

Referenced by check_db(), ClientAuthentication(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ log_replication_commands

PGDLLIMPORT bool log_replication_commands
extern

◆ max_wal_senders

◆ wake_wal_senders

PGDLLIMPORT bool wake_wal_senders
extern

Definition at line 138 of file walsender.c.

Referenced by WalSndWakeupProcessRequests().

◆ wal_sender_timeout