PostgreSQL Source Code git master
walsender.h File Reference
#include "access/xlogdefs.h"
Include dependency graph for walsender.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define WalSndWakeupRequest()    do { wake_wal_senders = true; } while (0)
 

Enumerations

enum  CRSSnapshotAction { CRS_EXPORT_SNAPSHOT , CRS_NOEXPORT_SNAPSHOT , CRS_USE_SNAPSHOT }
 

Functions

void InitWalSender (void)
 
bool exec_replication_command (const char *cmd_string)
 
void WalSndErrorCleanup (void)
 
void PhysicalWakeupLogicalWalSnd (void)
 
XLogRecPtr GetStandbyFlushRecPtr (TimeLineID *tli)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (bool physical, bool logical)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndRqstFileReload (void)
 
static void WalSndWakeupProcessRequests (bool physical, bool logical)
 

Variables

PGDLLIMPORT bool am_walsender
 
PGDLLIMPORT bool am_cascading_walsender
 
PGDLLIMPORT bool am_db_walsender
 
PGDLLIMPORT bool wake_wal_senders
 
PGDLLIMPORT int max_wal_senders
 
PGDLLIMPORT int wal_sender_timeout
 
PGDLLIMPORT bool log_replication_commands
 

Macro Definition Documentation

◆ WalSndWakeupRequest

#define WalSndWakeupRequest ( )     do { wake_wal_senders = true; } while (0)

Definition at line 58 of file walsender.h.

Enumeration Type Documentation

◆ CRSSnapshotAction

Enumerator
CRS_EXPORT_SNAPSHOT 
CRS_NOEXPORT_SNAPSHOT 
CRS_USE_SNAPSHOT 

Definition at line 20 of file walsender.h.

21{
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22

Function Documentation

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1995 of file walsender.c.

1996{
1997 yyscan_t scanner;
1998 int parse_rc;
1999 Node *cmd_node;
2000 const char *cmdtag;
2001 MemoryContext old_context = CurrentMemoryContext;
2002
2003 /* We save and re-use the cmd_context across calls */
2004 static MemoryContext cmd_context = NULL;
2005
2006 /*
2007 * If WAL sender has been told that shutdown is getting close, switch its
2008 * status accordingly to handle the next replication commands correctly.
2009 */
2010 if (got_STOPPING)
2012
2013 /*
2014 * Throw error if in stopping mode. We need prevent commands that could
2015 * generate WAL while the shutdown checkpoint is being written. To be
2016 * safe, we just prohibit all new commands.
2017 */
2019 ereport(ERROR,
2020 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2021 errmsg("cannot execute new commands while WAL sender is in stopping mode")));
2022
2023 /*
2024 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
2025 * command arrives. Clean up the old stuff if there's anything.
2026 */
2028
2030
2031 /*
2032 * Prepare to parse and execute the command.
2033 *
2034 * Because replication command execution can involve beginning or ending
2035 * transactions, we need a working context that will survive that, so we
2036 * make it a child of TopMemoryContext. That in turn creates a hazard of
2037 * long-lived memory leaks if we lose track of the working context. We
2038 * deal with that by creating it only once per walsender, and resetting it
2039 * for each new command. (Normally this reset is a no-op, but if the
2040 * prior exec_replication_command call failed with an error, it won't be.)
2041 *
2042 * This is subtler than it looks. The transactions we manage can extend
2043 * across replication commands, indeed SnapBuildClearExportedSnapshot
2044 * might have just ended one. Because transaction exit will revert to the
2045 * memory context that was current at transaction start, we need to be
2046 * sure that that context is still valid. That motivates re-using the
2047 * same cmd_context rather than making a new one each time.
2048 */
2049 if (cmd_context == NULL)
2051 "Replication command context",
2053 else
2054 MemoryContextReset(cmd_context);
2055
2056 MemoryContextSwitchTo(cmd_context);
2057
2058 replication_scanner_init(cmd_string, &scanner);
2059
2060 /*
2061 * Is it a WalSender command?
2062 */
2064 {
2065 /* Nope; clean up and get out. */
2067
2068 MemoryContextSwitchTo(old_context);
2069 MemoryContextReset(cmd_context);
2070
2071 /* XXX this is a pretty random place to make this check */
2072 if (MyDatabaseId == InvalidOid)
2073 ereport(ERROR,
2074 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2075 errmsg("cannot execute SQL commands in WAL sender for physical replication")));
2076
2077 /* Tell the caller that this wasn't a WalSender command. */
2078 return false;
2079 }
2080
2081 /*
2082 * Looks like a WalSender command, so parse it.
2083 */
2084 parse_rc = replication_yyparse(&cmd_node, scanner);
2085 if (parse_rc != 0)
2086 ereport(ERROR,
2087 (errcode(ERRCODE_SYNTAX_ERROR),
2088 errmsg_internal("replication command parser returned %d",
2089 parse_rc)));
2091
2092 /*
2093 * Report query to various monitoring facilities. For this purpose, we
2094 * report replication commands just like SQL commands.
2095 */
2096 debug_query_string = cmd_string;
2097
2099
2100 /*
2101 * Log replication command if log_replication_commands is enabled. Even
2102 * when it's disabled, log the command with DEBUG1 level for backward
2103 * compatibility.
2104 */
2106 (errmsg("received replication command: %s", cmd_string)));
2107
2108 /*
2109 * Disallow replication commands in aborted transaction blocks.
2110 */
2112 ereport(ERROR,
2113 (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
2114 errmsg("current transaction is aborted, "
2115 "commands ignored until end of transaction block")));
2116
2118
2119 /*
2120 * Allocate buffers that will be used for each outgoing and incoming
2121 * message. We do this just once per command to reduce palloc overhead.
2122 */
2126
2127 switch (cmd_node->type)
2128 {
2129 case T_IdentifySystemCmd:
2130 cmdtag = "IDENTIFY_SYSTEM";
2131 set_ps_display(cmdtag);
2133 EndReplicationCommand(cmdtag);
2134 break;
2135
2136 case T_ReadReplicationSlotCmd:
2137 cmdtag = "READ_REPLICATION_SLOT";
2138 set_ps_display(cmdtag);
2140 EndReplicationCommand(cmdtag);
2141 break;
2142
2143 case T_BaseBackupCmd:
2144 cmdtag = "BASE_BACKUP";
2145 set_ps_display(cmdtag);
2146 PreventInTransactionBlock(true, cmdtag);
2148 EndReplicationCommand(cmdtag);
2149 break;
2150
2151 case T_CreateReplicationSlotCmd:
2152 cmdtag = "CREATE_REPLICATION_SLOT";
2153 set_ps_display(cmdtag);
2155 EndReplicationCommand(cmdtag);
2156 break;
2157
2158 case T_DropReplicationSlotCmd:
2159 cmdtag = "DROP_REPLICATION_SLOT";
2160 set_ps_display(cmdtag);
2162 EndReplicationCommand(cmdtag);
2163 break;
2164
2165 case T_AlterReplicationSlotCmd:
2166 cmdtag = "ALTER_REPLICATION_SLOT";
2167 set_ps_display(cmdtag);
2169 EndReplicationCommand(cmdtag);
2170 break;
2171
2172 case T_StartReplicationCmd:
2173 {
2174 StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
2175
2176 cmdtag = "START_REPLICATION";
2177 set_ps_display(cmdtag);
2178 PreventInTransactionBlock(true, cmdtag);
2179
2180 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
2181 StartReplication(cmd);
2182 else
2184
2185 /* dupe, but necessary per libpqrcv_endstreaming */
2186 EndReplicationCommand(cmdtag);
2187
2188 Assert(xlogreader != NULL);
2189 break;
2190 }
2191
2192 case T_TimeLineHistoryCmd:
2193 cmdtag = "TIMELINE_HISTORY";
2194 set_ps_display(cmdtag);
2195 PreventInTransactionBlock(true, cmdtag);
2197 EndReplicationCommand(cmdtag);
2198 break;
2199
2200 case T_VariableShowStmt:
2201 {
2203 VariableShowStmt *n = (VariableShowStmt *) cmd_node;
2204
2205 cmdtag = "SHOW";
2206 set_ps_display(cmdtag);
2207
2208 /* syscache access needs a transaction environment */
2210 GetPGVariable(n->name, dest);
2212 EndReplicationCommand(cmdtag);
2213 }
2214 break;
2215
2216 case T_UploadManifestCmd:
2217 cmdtag = "UPLOAD_MANIFEST";
2218 set_ps_display(cmdtag);
2219 PreventInTransactionBlock(true, cmdtag);
2221 EndReplicationCommand(cmdtag);
2222 break;
2223
2224 default:
2225 elog(ERROR, "unrecognized replication command node tag: %u",
2226 cmd_node->type);
2227 }
2228
2229 /*
2230 * Done. Revert to caller's memory context, and clean out the cmd_context
2231 * to recover memory right away.
2232 */
2233 MemoryContextSwitchTo(old_context);
2234 MemoryContextReset(cmd_context);
2235
2236 /*
2237 * We need not update ps display or pg_stat_activity, because PostgresMain
2238 * will reset those to "idle". But we must reset debug_query_string to
2239 * ensure it doesn't become a dangling pointer.
2240 */
2241 debug_query_string = NULL;
2242
2243 return true;
2244}
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void SendBaseBackup(BaseBackupCmd *cmd, IncrementalBackupInfo *ib)
Definition: basebackup.c:990
void * yyscan_t
Definition: cubedata.h:65
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:205
@ DestRemoteSimple
Definition: dest.h:91
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1170
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define LOG
Definition: elog.h:31
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define ereport(elevel,...)
Definition: elog.h:150
Oid MyDatabaseId
Definition: globals.c:94
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc_funcs.c:408
Assert(PointerIsAligned(start, uint64))
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:403
MemoryContext TopMemoryContext
Definition: mcxt.c:166
MemoryContext CurrentMemoryContext
Definition: mcxt.c:160
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
const char * debug_query_string
Definition: postgres.c:89
#define InvalidOid
Definition: postgres_ext.h:37
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
bool replication_scanner_is_replication_command(yyscan_t yyscanner)
Definition: repl_scanner.l:299
void replication_scanner_finish(yyscan_t yyscanner)
Definition: repl_scanner.l:284
void replication_scanner_init(const char *str, yyscan_t *yyscannerp)
Definition: repl_scanner.l:268
@ REPLICATION_KIND_PHYSICAL
Definition: replnodes.h:22
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:599
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
Definition: nodes.h:135
NodeTag type
Definition: nodes.h:136
ReplicationKind kind
Definition: replnodes.h:94
WalSndState state
static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
Definition: walsender.c:1416
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:581
WalSnd * MyWalSnd
Definition: walsender.c:120
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition: walsender.c:482
static StringInfoData tmpbuf
Definition: walsender.c:178
static void IdentifySystem(void)
Definition: walsender.c:401
static StringInfoData reply_message
Definition: walsender.c:177
void WalSndSetState(WalSndState state)
Definition: walsender.c:3943
static StringInfoData output_message
Definition: walsender.c:176
static void UploadManifest(void)
Definition: walsender.c:671
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:206
bool log_replication_commands
Definition: walsender.c:133
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1195
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1458
static IncrementalBackupInfo * uploaded_manifest
Definition: walsender.c:155
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1407
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:813
static XLogReaderState * xlogreader
Definition: walsender.c:145
@ WALSNDSTATE_STOPPING
int replication_yyparse(Node **replication_parse_result_p, yyscan_t yyscanner)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3669
void StartTransactionCommand(void)
Definition: xact.c:3080
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:408
void CommitTransactionCommand(void)
Definition: xact.c:3178

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, AlterReplicationSlot(), Assert(), CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextReset(), MemoryContextSwitchTo(), MyDatabaseId, MyWalSnd, VariableShowStmt::name, output_message, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_scanner_finish(), replication_scanner_init(), replication_scanner_is_replication_command(), replication_yyparse(), reply_message, SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, tmpbuf, TopMemoryContext, Node::type, ReadReplicationSlotCmd::type, uploaded_manifest, UploadManifest(), WalSndSetState(), WALSNDSTATE_STOPPING, and xlogreader.

Referenced by PostgresMain().

◆ GetStandbyFlushRecPtr()

XLogRecPtr GetStandbyFlushRecPtr ( TimeLineID tli)

Definition at line 3638 of file walsender.c.

3639{
3640 XLogRecPtr replayPtr;
3641 TimeLineID replayTLI;
3642 XLogRecPtr receivePtr;
3644 XLogRecPtr result;
3645
3647
3648 /*
3649 * We can safely send what's already been replayed. Also, if walreceiver
3650 * is streaming WAL from the same timeline, we can send anything that it
3651 * has streamed, but hasn't been replayed yet.
3652 */
3653
3654 receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
3655 replayPtr = GetXLogReplayRecPtr(&replayTLI);
3656
3657 if (tli)
3658 *tli = replayTLI;
3659
3660 result = replayPtr;
3661 if (receiveTLI == replayTLI && receivePtr > replayPtr)
3662 result = receivePtr;
3663
3664 return result;
3665}
bool IsSyncingReplicationSlots(void)
Definition: slotsync.c:1882
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
bool am_cascading_walsender
Definition: walsender.c:124
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:63
static TimeLineID receiveTLI
Definition: xlogrecovery.c:266
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References am_cascading_walsender, Assert(), GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), IsSyncingReplicationSlots(), and receiveTLI.

Referenced by IdentifySystem(), StartReplication(), synchronize_one_slot(), and XLogSendPhysical().

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3694 of file walsender.c.

3695{
3697
3698 /*
3699 * If replication has not yet started, die like with SIGTERM. If
3700 * replication is active, only set a flag and wake up the main loop. It
3701 * will send any outstanding WAL, wait for it to be replicated to the
3702 * standby, and then exit gracefully.
3703 */
3704 if (!replication_active)
3705 kill(MyProcPid, SIGTERM);
3706 else
3707 got_STOPPING = true;
3708}
int MyProcPid
Definition: globals.c:47
bool am_walsender
Definition: walsender.c:123
static volatile sig_atomic_t replication_active
Definition: walsender.c:214
#define kill(pid, sig)
Definition: win32_port.h:490

References am_walsender, Assert(), got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

◆ InitWalSender()

void InitWalSender ( void  )

Definition at line 301 of file walsender.c.

302{
304
305 /* Create a per-walsender data structure in shared memory */
307
308 /* need resource owner for e.g. basebackups */
310
311 /*
312 * Let postmaster know that we're a WAL sender. Once we've declared us as
313 * a WAL sender process, postmaster will let us outlive the bgwriter and
314 * kill us last in the shutdown sequence, so we get a chance to stream all
315 * remaining WAL at shutdown, including the shutdown checkpoint. Note that
316 * there's no going back, and we mustn't write any WAL records after this.
317 */
320
321 /*
322 * If the client didn't specify a database to connect to, show in PGPROC
323 * that our advertised xmin should affect vacuum horizons in all
324 * databases. This allows physical replication clients to send hot
325 * standby feedback that will delay vacuum cleanup in all databases.
326 */
328 {
330 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
333 LWLockRelease(ProcArrayLock);
334 }
335
336 /* Initialize empty timestamp buffer for lag tracking. */
338}
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1178
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1898
@ LW_EXCLUSIVE
Definition: lwlock.h:112
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1266
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:165
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:309
@ PMSIGNAL_ADVANCE_STATE_MACHINE
Definition: pmsignal.h:43
#define PROC_AFFECTS_ALL_HORIZONS
Definition: proc.h:62
void CreateAuxProcessResourceOwner(void)
Definition: resowner.c:996
PGPROC * MyProc
Definition: proc.c:67
PROC_HDR * ProcGlobal
Definition: proc.c:79
TransactionId xmin
Definition: proc.h:194
uint8 statusFlags
Definition: proc.h:259
int pgxactoff
Definition: proc.h:201
uint8 * statusFlags
Definition: proc.h:403
#define InvalidTransactionId
Definition: transam.h:31
static void InitWalSenderSlot(void)
Definition: walsender.c:3022
static LagTracker * lag_tracker
Definition: walsender.c:252
bool RecoveryInProgress(void)
Definition: xlog.c:6461

References am_cascading_walsender, Assert(), CreateAuxProcessResourceOwner(), InitWalSenderSlot(), InvalidOid, InvalidTransactionId, lag_tracker, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MarkPostmasterChildWalSender(), MemoryContextAllocZero(), MyDatabaseId, MyProc, PGPROC::pgxactoff, PMSIGNAL_ADVANCE_STATE_MACHINE, PROC_AFFECTS_ALL_HORIZONS, ProcGlobal, RecoveryInProgress(), SendPostmasterSignal(), PGPROC::statusFlags, PROC_HDR::statusFlags, TopMemoryContext, and PGPROC::xmin.

Referenced by PostgresMain().

◆ PhysicalWakeupLogicalWalSnd()

void PhysicalWakeupLogicalWalSnd ( void  )

Definition at line 1739 of file walsender.c.

1740{
1742
1743 /*
1744 * If we are running in a standby, there is no need to wake up walsenders.
1745 * This is because we do not support syncing slots to cascading standbys,
1746 * so, there are no walsenders waiting for standbys to catch up.
1747 */
1748 if (RecoveryInProgress())
1749 return;
1750
1753}
#define NameStr(name)
Definition: c.h:771
void ConditionVariableBroadcast(ConditionVariable *cv)
ReplicationSlot * MyReplicationSlot
Definition: slot.c:148
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition: slot.c:3050
#define SlotIsPhysical(slot)
Definition: slot.h:284
ReplicationSlotPersistentData data
Definition: slot.h:210
ConditionVariable wal_confirm_rcv_cv
WalSndCtlData * WalSndCtl
Definition: walsender.c:117

References Assert(), ConditionVariableBroadcast(), ReplicationSlot::data, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, RecoveryInProgress(), SlotExistsInSyncStandbySlots(), SlotIsPhysical, WalSndCtlData::wal_confirm_rcv_cv, and WalSndCtl.

Referenced by pg_physical_replication_slot_advance(), and PhysicalConfirmReceivedLocation().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 348 of file walsender.c.

349{
354
355 if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
357
358 if (MyReplicationSlot != NULL)
360
362
363 replication_active = false;
364
365 /*
366 * If there is a transaction in progress, it will clean up our
367 * ResourceOwner, but if a replication command set up a resource owner
368 * without a transaction, we've got to clean that up now.
369 */
372
374 proc_exit(0);
375
376 /* Revert back to startup state */
378}
void pgaio_error_cleanup(void)
Definition: aio.c:1165
bool ConditionVariableCancelSleep(void)
void proc_exit(int code)
Definition: ipc.c:104
void LWLockReleaseAll(void)
Definition: lwlock.c:1949
void ReleaseAuxProcessResources(bool isCommit)
Definition: resowner.c:1016
void ReplicationSlotRelease(void)
Definition: slot.c:758
void ReplicationSlotCleanup(bool synced_only)
Definition: slot.c:857
WALOpenSegment seg
Definition: xlogreader.h:271
static void pgstat_report_wait_end(void)
Definition: wait_event.h:85
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:205
@ WALSNDSTATE_STARTUP
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:5011
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:831

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgaio_error_cleanup(), pgstat_report_wait_end(), proc_exit(), ReleaseAuxProcessResources(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndSetState(), WALSNDSTATE_STARTUP, WALOpenSegment::ws_file, and xlogreader.

Referenced by PostgresMain().

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3879 of file walsender.c.

3880{
3881 int i;
3882
3883 for (i = 0; i < max_wal_senders; i++)
3884 {
3885 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3886 pid_t pid;
3887
3888 SpinLockAcquire(&walsnd->mutex);
3889 pid = walsnd->pid;
3890 SpinLockRelease(&walsnd->mutex);
3891
3892 if (pid == 0)
3893 continue;
3894
3896 }
3897}
int i
Definition: isn.c:77
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition: procsignal.c:284
@ PROCSIG_WALSND_INIT_STOPPING
Definition: procsignal.h:35
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
int max_wal_senders
Definition: walsender.c:129

References i, INVALID_PROC_NUMBER, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3671 of file walsender.c.

3672{
3673 int i;
3674
3675 for (i = 0; i < max_wal_senders; i++)
3676 {
3677 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3678
3679 SpinLockAcquire(&walsnd->mutex);
3680 if (walsnd->pid == 0)
3681 {
3682 SpinLockRelease(&walsnd->mutex);
3683 continue;
3684 }
3685 walsnd->needreload = true;
3686 SpinLockRelease(&walsnd->mutex);
3687 }
3688}
bool needreload

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3755 of file walsender.c.

3756{
3757 bool found;
3758 int i;
3759
3761 ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3762
3763 if (!found)
3764 {
3765 /* First time through, so initialize */
3767
3768 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3770
3771 for (i = 0; i < max_wal_senders; i++)
3772 {
3773 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3774
3775 SpinLockInit(&walsnd->mutex);
3776 }
3777
3781 }
3782}
#define MemSet(start, val, len)
Definition: c.h:1019
void ConditionVariableInit(ConditionVariable *cv)
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:389
#define SpinLockInit(lock)
Definition: spin.h:57
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:27
Size WalSndShmemSize(void)
Definition: walsender.c:3743

References ConditionVariableInit(), dlist_init(), i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateOrAttachShmemStructs().

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3743 of file walsender.c.

3744{
3745 Size size = 0;
3746
3747 size = offsetof(WalSndCtlData, walsnds);
3748 size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3749
3750 return size;
3751}
size_t Size
Definition: c.h:625
Size add_size(Size s1, Size s2)
Definition: shmem.c:495
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510

References add_size(), max_wal_senders, and mul_size().

Referenced by CalculateShmemSize(), and WalSndShmemInit().

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3724 of file walsender.c.

3725{
3726 /* Set up signal handlers */
3728 pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3729 pqsignal(SIGTERM, die); /* request shutdown */
3730 /* SIGQUIT handler was already set up by InitPostmasterChild */
3731 InitializeTimeouts(); /* establishes SIGALRM handler */
3732 pqsignal(SIGPIPE, SIG_IGN);
3734 pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3735 * shutdown */
3736
3737 /* Reset some signals that are accepted by postmaster but not here */
3738 pqsignal(SIGCHLD, SIG_DFL);
3739}
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
#define die(msg)
Definition: pg_test_fsync.c:99
#define pqsignal
Definition: port.h:551
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:3062
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:677
void InitializeTimeouts(void)
Definition: timeout.c:470
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3716
#define SIGCHLD
Definition: win32_port.h:168
#define SIGHUP
Definition: win32_port.h:158
#define SIGPIPE
Definition: win32_port.h:163
#define SIGUSR1
Definition: win32_port.h:170
#define SIGUSR2
Definition: win32_port.h:171

References die, InitializeTimeouts(), pqsignal, procsignal_sigusr1_handler(), SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3905 of file walsender.c.

3906{
3907 for (;;)
3908 {
3909 int i;
3910 bool all_stopped = true;
3911
3912 for (i = 0; i < max_wal_senders; i++)
3913 {
3914 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3915
3916 SpinLockAcquire(&walsnd->mutex);
3917
3918 if (walsnd->pid == 0)
3919 {
3920 SpinLockRelease(&walsnd->mutex);
3921 continue;
3922 }
3923
3924 if (walsnd->state != WALSNDSTATE_STOPPING)
3925 {
3926 all_stopped = false;
3927 SpinLockRelease(&walsnd->mutex);
3928 break;
3929 }
3930 SpinLockRelease(&walsnd->mutex);
3931 }
3932
3933 /* safe to leave if confirmation is done for all WAL senders */
3934 if (all_stopped)
3935 return;
3936
3937 pg_usleep(10000L); /* wait for 10 msec */
3938 }
3939}
void pg_usleep(long microsec)
Definition: signal.c:53

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtl, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

◆ WalSndWakeup()

void WalSndWakeup ( bool  physical,
bool  logical 
)

Definition at line 3800 of file walsender.c.

3801{
3802 /*
3803 * Wake up all the walsenders waiting on WAL being flushed or replayed
3804 * respectively. Note that waiting walsender would have prepared to sleep
3805 * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
3806 * before actually waiting.
3807 */
3808 if (physical)
3810
3811 if (logical)
3813}

References ConditionVariableBroadcast(), WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, and WalSndCtl.

Referenced by ApplyWalRecord(), KeepFileRestoredFromArchive(), StartupXLOG(), WalSndWakeupProcessRequests(), and XLogWalRcvFlush().

◆ WalSndWakeupProcessRequests()

static void WalSndWakeupProcessRequests ( bool  physical,
bool  logical 
)
inlinestatic

Definition at line 65 of file walsender.h.

66{
68 {
69 wake_wal_senders = false;
70 if (max_wal_senders > 0)
71 WalSndWakeup(physical, logical);
72 }
73}
PGDLLIMPORT bool wake_wal_senders
Definition: walsender.c:138
void WalSndWakeup(bool physical, bool logical)
Definition: walsender.c:3800
PGDLLIMPORT int max_wal_senders
Definition: walsender.c:129

References max_wal_senders, wake_wal_senders, and WalSndWakeup().

Referenced by XLogBackgroundFlush(), and XLogFlush().

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

PGDLLIMPORT bool am_db_walsender
extern

Definition at line 126 of file walsender.c.

Referenced by check_db(), ClientAuthentication(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ log_replication_commands

PGDLLIMPORT bool log_replication_commands
extern

◆ max_wal_senders

◆ wake_wal_senders

PGDLLIMPORT bool wake_wal_senders
extern

Definition at line 138 of file walsender.c.

Referenced by WalSndWakeupProcessRequests().

◆ wal_sender_timeout