PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
walsender.h File Reference
#include "access/xlogdefs.h"
Include dependency graph for walsender.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define WalSndWakeupRequest()    do { wake_wal_senders = true; } while (0)
 

Enumerations

enum  CRSSnapshotAction { CRS_EXPORT_SNAPSHOT , CRS_NOEXPORT_SNAPSHOT , CRS_USE_SNAPSHOT }
 

Functions

void InitWalSender (void)
 
bool exec_replication_command (const char *cmd_string)
 
void WalSndErrorCleanup (void)
 
void PhysicalWakeupLogicalWalSnd (void)
 
XLogRecPtr GetStandbyFlushRecPtr (TimeLineID *tli)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (bool physical, bool logical)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndRqstFileReload (void)
 
static void WalSndWakeupProcessRequests (bool physical, bool logical)
 

Variables

PGDLLIMPORT bool am_walsender
 
PGDLLIMPORT bool am_cascading_walsender
 
PGDLLIMPORT bool am_db_walsender
 
PGDLLIMPORT bool wake_wal_senders
 
PGDLLIMPORT int max_wal_senders
 
PGDLLIMPORT int wal_sender_timeout
 
PGDLLIMPORT bool log_replication_commands
 

Macro Definition Documentation

◆ WalSndWakeupRequest

#define WalSndWakeupRequest ( )     do { wake_wal_senders = true; } while (0)

Definition at line 58 of file walsender.h.

Enumeration Type Documentation

◆ CRSSnapshotAction

Enumerator
CRS_EXPORT_SNAPSHOT 
CRS_NOEXPORT_SNAPSHOT 
CRS_USE_SNAPSHOT 

Definition at line 20 of file walsender.h.

21{
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22

Function Documentation

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1952 of file walsender.c.

1953{
1954 yyscan_t scanner;
1955 int parse_rc;
1956 Node *cmd_node;
1957 const char *cmdtag;
1958 MemoryContext cmd_context;
1959 MemoryContext old_context;
1960
1961 /*
1962 * If WAL sender has been told that shutdown is getting close, switch its
1963 * status accordingly to handle the next replication commands correctly.
1964 */
1965 if (got_STOPPING)
1967
1968 /*
1969 * Throw error if in stopping mode. We need prevent commands that could
1970 * generate WAL while the shutdown checkpoint is being written. To be
1971 * safe, we just prohibit all new commands.
1972 */
1974 ereport(ERROR,
1975 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1976 errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1977
1978 /*
1979 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1980 * command arrives. Clean up the old stuff if there's anything.
1981 */
1983
1985
1986 /*
1987 * Prepare to parse and execute the command.
1988 */
1990 "Replication command context",
1992 old_context = MemoryContextSwitchTo(cmd_context);
1993
1994 replication_scanner_init(cmd_string, &scanner);
1995
1996 /*
1997 * Is it a WalSender command?
1998 */
2000 {
2001 /* Nope; clean up and get out. */
2003
2004 MemoryContextSwitchTo(old_context);
2005 MemoryContextDelete(cmd_context);
2006
2007 /* XXX this is a pretty random place to make this check */
2008 if (MyDatabaseId == InvalidOid)
2009 ereport(ERROR,
2010 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2011 errmsg("cannot execute SQL commands in WAL sender for physical replication")));
2012
2013 /* Tell the caller that this wasn't a WalSender command. */
2014 return false;
2015 }
2016
2017 /*
2018 * Looks like a WalSender command, so parse it.
2019 */
2020 parse_rc = replication_yyparse(scanner);
2021 if (parse_rc != 0)
2022 ereport(ERROR,
2023 (errcode(ERRCODE_SYNTAX_ERROR),
2024 errmsg_internal("replication command parser returned %d",
2025 parse_rc)));
2027
2028 cmd_node = replication_parse_result;
2029
2030 /*
2031 * Report query to various monitoring facilities. For this purpose, we
2032 * report replication commands just like SQL commands.
2033 */
2034 debug_query_string = cmd_string;
2035
2037
2038 /*
2039 * Log replication command if log_replication_commands is enabled. Even
2040 * when it's disabled, log the command with DEBUG1 level for backward
2041 * compatibility.
2042 */
2044 (errmsg("received replication command: %s", cmd_string)));
2045
2046 /*
2047 * Disallow replication commands in aborted transaction blocks.
2048 */
2050 ereport(ERROR,
2051 (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
2052 errmsg("current transaction is aborted, "
2053 "commands ignored until end of transaction block")));
2054
2056
2057 /*
2058 * Allocate buffers that will be used for each outgoing and incoming
2059 * message. We do this just once per command to reduce palloc overhead.
2060 */
2064
2065 switch (cmd_node->type)
2066 {
2067 case T_IdentifySystemCmd:
2068 cmdtag = "IDENTIFY_SYSTEM";
2069 set_ps_display(cmdtag);
2071 EndReplicationCommand(cmdtag);
2072 break;
2073
2074 case T_ReadReplicationSlotCmd:
2075 cmdtag = "READ_REPLICATION_SLOT";
2076 set_ps_display(cmdtag);
2078 EndReplicationCommand(cmdtag);
2079 break;
2080
2081 case T_BaseBackupCmd:
2082 cmdtag = "BASE_BACKUP";
2083 set_ps_display(cmdtag);
2084 PreventInTransactionBlock(true, cmdtag);
2086 EndReplicationCommand(cmdtag);
2087 break;
2088
2089 case T_CreateReplicationSlotCmd:
2090 cmdtag = "CREATE_REPLICATION_SLOT";
2091 set_ps_display(cmdtag);
2093 EndReplicationCommand(cmdtag);
2094 break;
2095
2096 case T_DropReplicationSlotCmd:
2097 cmdtag = "DROP_REPLICATION_SLOT";
2098 set_ps_display(cmdtag);
2100 EndReplicationCommand(cmdtag);
2101 break;
2102
2103 case T_AlterReplicationSlotCmd:
2104 cmdtag = "ALTER_REPLICATION_SLOT";
2105 set_ps_display(cmdtag);
2107 EndReplicationCommand(cmdtag);
2108 break;
2109
2110 case T_StartReplicationCmd:
2111 {
2112 StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
2113
2114 cmdtag = "START_REPLICATION";
2115 set_ps_display(cmdtag);
2116 PreventInTransactionBlock(true, cmdtag);
2117
2118 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
2119 StartReplication(cmd);
2120 else
2122
2123 /* dupe, but necessary per libpqrcv_endstreaming */
2124 EndReplicationCommand(cmdtag);
2125
2126 Assert(xlogreader != NULL);
2127 break;
2128 }
2129
2130 case T_TimeLineHistoryCmd:
2131 cmdtag = "TIMELINE_HISTORY";
2132 set_ps_display(cmdtag);
2133 PreventInTransactionBlock(true, cmdtag);
2135 EndReplicationCommand(cmdtag);
2136 break;
2137
2138 case T_VariableShowStmt:
2139 {
2141 VariableShowStmt *n = (VariableShowStmt *) cmd_node;
2142
2143 cmdtag = "SHOW";
2144 set_ps_display(cmdtag);
2145
2146 /* syscache access needs a transaction environment */
2148 GetPGVariable(n->name, dest);
2150 EndReplicationCommand(cmdtag);
2151 }
2152 break;
2153
2154 case T_UploadManifestCmd:
2155 cmdtag = "UPLOAD_MANIFEST";
2156 set_ps_display(cmdtag);
2157 PreventInTransactionBlock(true, cmdtag);
2159 EndReplicationCommand(cmdtag);
2160 break;
2161
2162 default:
2163 elog(ERROR, "unrecognized replication command node tag: %u",
2164 cmd_node->type);
2165 }
2166
2167 /* done */
2168 MemoryContextSwitchTo(old_context);
2169 MemoryContextDelete(cmd_context);
2170
2171 /*
2172 * We need not update ps display or pg_stat_activity, because PostgresMain
2173 * will reset those to "idle". But we must reset debug_query_string to
2174 * ensure it doesn't become a dangling pointer.
2175 */
2176 debug_query_string = NULL;
2177
2178 return true;
2179}
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void SendBaseBackup(BaseBackupCmd *cmd, IncrementalBackupInfo *ib)
Definition: basebackup.c:990
#define Assert(condition)
Definition: c.h:812
void * yyscan_t
Definition: cubedata.h:67
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:205
@ DestRemoteSimple
Definition: dest.h:91
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1157
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define LOG
Definition: elog.h:31
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
Oid MyDatabaseId
Definition: globals.c:93
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc_funcs.c:382
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
const char * debug_query_string
Definition: postgres.c:87
#define InvalidOid
Definition: postgres_ext.h:36
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
MemoryContextSwitchTo(old_ctx)
bool replication_scanner_is_replication_command(yyscan_t yyscanner)
Definition: repl_scanner.l:295
void replication_scanner_finish(yyscan_t yyscanner)
Definition: repl_scanner.l:280
void replication_scanner_init(const char *str, yyscan_t *yyscannerp)
Definition: repl_scanner.l:264
@ REPLICATION_KIND_PHYSICAL
Definition: replnodes.h:22
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:600
void initStringInfo(StringInfo str)
Definition: stringinfo.c:56
Definition: nodes.h:129
NodeTag type
Definition: nodes.h:130
ReplicationKind kind
Definition: replnodes.h:94
WalSndState state
static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
Definition: walsender.c:1385
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:557
WalSnd * MyWalSnd
Definition: walsender.c:112
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition: walsender.c:458
static StringInfoData tmpbuf
Definition: walsender.c:170
static void IdentifySystem(void)
Definition: walsender.c:377
static StringInfoData reply_message
Definition: walsender.c:169
void WalSndSetState(WalSndState state)
Definition: walsender.c:3782
static StringInfoData output_message
Definition: walsender.c:168
static void UploadManifest(void)
Definition: walsender.c:647
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:198
bool log_replication_commands
Definition: walsender.c:125
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1171
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1427
static IncrementalBackupInfo * uploaded_manifest
Definition: walsender.c:147
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1376
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:789
static XLogReaderState * xlogreader
Definition: walsender.c:137
PGDLLIMPORT Node * replication_parse_result
@ WALSNDSTATE_STOPPING
int replication_yyparse(yyscan_t yyscanner)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3640
void StartTransactionCommand(void)
Definition: xact.c:3051
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:406
void CommitTransactionCommand(void)
Definition: xact.c:3149

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, AlterReplicationSlot(), Assert, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, MyWalSnd, VariableShowStmt::name, output_message, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_finish(), replication_scanner_init(), replication_scanner_is_replication_command(), replication_yyparse(), reply_message, SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, tmpbuf, Node::type, ReadReplicationSlotCmd::type, uploaded_manifest, UploadManifest(), WalSndSetState(), WALSNDSTATE_STOPPING, and xlogreader.

Referenced by PostgresMain().

◆ GetStandbyFlushRecPtr()

XLogRecPtr GetStandbyFlushRecPtr ( TimeLineID tli)

Definition at line 3477 of file walsender.c.

3478{
3479 XLogRecPtr replayPtr;
3480 TimeLineID replayTLI;
3481 XLogRecPtr receivePtr;
3483 XLogRecPtr result;
3484
3486
3487 /*
3488 * We can safely send what's already been replayed. Also, if walreceiver
3489 * is streaming WAL from the same timeline, we can send anything that it
3490 * has streamed, but hasn't been replayed yet.
3491 */
3492
3493 receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
3494 replayPtr = GetXLogReplayRecPtr(&replayTLI);
3495
3496 if (tli)
3497 *tli = replayTLI;
3498
3499 result = replayPtr;
3500 if (receiveTLI == replayTLI && receivePtr > replayPtr)
3501 result = receivePtr;
3502
3503 return result;
3504}
bool IsSyncingReplicationSlots(void)
Definition: slotsync.c:1649
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
bool am_cascading_walsender
Definition: walsender.c:116
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59
static TimeLineID receiveTLI
Definition: xlogrecovery.c:263
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References am_cascading_walsender, Assert, GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), IsSyncingReplicationSlots(), and receiveTLI.

Referenced by IdentifySystem(), StartReplication(), synchronize_one_slot(), XLogSendLogical(), and XLogSendPhysical().

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3533 of file walsender.c.

3534{
3536
3537 /*
3538 * If replication has not yet started, die like with SIGTERM. If
3539 * replication is active, only set a flag and wake up the main loop. It
3540 * will send any outstanding WAL, wait for it to be replicated to the
3541 * standby, and then exit gracefully.
3542 */
3543 if (!replication_active)
3544 kill(MyProcPid, SIGTERM);
3545 else
3546 got_STOPPING = true;
3547}
int MyProcPid
Definition: globals.c:46
bool am_walsender
Definition: walsender.c:115
static volatile sig_atomic_t replication_active
Definition: walsender.c:206
#define kill(pid, sig)
Definition: win32_port.h:503

References am_walsender, Assert, got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

◆ InitWalSender()

void InitWalSender ( void  )

Referenced by PostgresMain().

◆ PhysicalWakeupLogicalWalSnd()

void PhysicalWakeupLogicalWalSnd ( void  )

Definition at line 1708 of file walsender.c.

1709{
1711
1712 /*
1713 * If we are running in a standby, there is no need to wake up walsenders.
1714 * This is because we do not support syncing slots to cascading standbys,
1715 * so, there are no walsenders waiting for standbys to catch up.
1716 */
1717 if (RecoveryInProgress())
1718 return;
1719
1722}
#define NameStr(name)
Definition: c.h:700
void ConditionVariableBroadcast(ConditionVariable *cv)
ReplicationSlot * MyReplicationSlot
Definition: slot.c:138
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition: slot.c:2572
#define SlotIsPhysical(slot)
Definition: slot.h:216
ReplicationSlotPersistentData data
Definition: slot.h:181
ConditionVariable wal_confirm_rcv_cv
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
bool RecoveryInProgress(void)
Definition: xlog.c:6334

References Assert, ConditionVariableBroadcast(), ReplicationSlot::data, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, RecoveryInProgress(), SlotExistsInSyncStandbySlots(), SlotIsPhysical, WalSndCtlData::wal_confirm_rcv_cv, and WalSndCtl.

Referenced by pg_physical_replication_slot_advance(), and PhysicalConfirmReceivedLocation().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 325 of file walsender.c.

326{
330
331 if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
333
334 if (MyReplicationSlot != NULL)
336
338
339 replication_active = false;
340
341 /*
342 * If there is a transaction in progress, it will clean up our
343 * ResourceOwner, but if a replication command set up a resource owner
344 * without a transaction, we've got to clean that up now.
345 */
348
350 proc_exit(0);
351
352 /* Revert back to startup state */
354}
bool ConditionVariableCancelSleep(void)
void proc_exit(int code)
Definition: ipc.c:104
void LWLockReleaseAll(void)
Definition: lwlock.c:1876
void ReleaseAuxProcessResources(bool isCommit)
Definition: resowner.c:1002
void ReplicationSlotRelease(void)
Definition: slot.c:652
void ReplicationSlotCleanup(bool synced_only)
Definition: slot.c:745
WALOpenSegment seg
Definition: xlogreader.h:272
static void pgstat_report_wait_end(void)
Definition: wait_event.h:101
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:197
@ WALSNDSTATE_STARTUP
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4981
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:842

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), ReleaseAuxProcessResources(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndSetState(), WALSNDSTATE_STARTUP, WALOpenSegment::ws_file, and xlogreader.

Referenced by PostgresMain().

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3718 of file walsender.c.

3719{
3720 int i;
3721
3722 for (i = 0; i < max_wal_senders; i++)
3723 {
3724 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3725 pid_t pid;
3726
3727 SpinLockAcquire(&walsnd->mutex);
3728 pid = walsnd->pid;
3729 SpinLockRelease(&walsnd->mutex);
3730
3731 if (pid == 0)
3732 continue;
3733
3735 }
3736}
int i
Definition: isn.c:72
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition: procsignal.c:281
@ PROCSIG_WALSND_INIT_STOPPING
Definition: procsignal.h:35
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
int max_wal_senders
Definition: walsender.c:121

References i, INVALID_PROC_NUMBER, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3510 of file walsender.c.

3511{
3512 int i;
3513
3514 for (i = 0; i < max_wal_senders; i++)
3515 {
3516 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3517
3518 SpinLockAcquire(&walsnd->mutex);
3519 if (walsnd->pid == 0)
3520 {
3521 SpinLockRelease(&walsnd->mutex);
3522 continue;
3523 }
3524 walsnd->needreload = true;
3525 SpinLockRelease(&walsnd->mutex);
3526 }
3527}
bool needreload

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3594 of file walsender.c.

3595{
3596 bool found;
3597 int i;
3598
3600 ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3601
3602 if (!found)
3603 {
3604 /* First time through, so initialize */
3606
3607 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3609
3610 for (i = 0; i < max_wal_senders; i++)
3611 {
3612 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3613
3614 SpinLockInit(&walsnd->mutex);
3615 }
3616
3620 }
3621}
#define MemSet(start, val, len)
Definition: c.h:974
void ConditionVariableInit(ConditionVariable *cv)
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:382
#define SpinLockInit(lock)
Definition: spin.h:57
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:27
Size WalSndShmemSize(void)
Definition: walsender.c:3582

References ConditionVariableInit(), dlist_init(), i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateOrAttachShmemStructs().

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3582 of file walsender.c.

3583{
3584 Size size = 0;
3585
3586 size = offsetof(WalSndCtlData, walsnds);
3588
3589 return size;
3590}
size_t Size
Definition: c.h:559
Size add_size(Size s1, Size s2)
Definition: shmem.c:488
Size mul_size(Size s1, Size s2)
Definition: shmem.c:505
static pg_noinline void Size size
Definition: slab.c:607

References add_size(), max_wal_senders, mul_size(), and size.

Referenced by CalculateShmemSize(), and WalSndShmemInit().

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3563 of file walsender.c.

3564{
3565 /* Set up signal handlers */
3567 pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3568 pqsignal(SIGTERM, die); /* request shutdown */
3569 /* SIGQUIT handler was already set up by InitPostmasterChild */
3570 InitializeTimeouts(); /* establishes SIGALRM handler */
3573 pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3574 * shutdown */
3575
3576 /* Reset some signals that are accepted by postmaster but not here */
3578}
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
#define die(msg)
pqsigfunc pqsignal(int signo, pqsigfunc func)
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:3031
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:671
void InitializeTimeouts(void)
Definition: timeout.c:470
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3555
#define SIGCHLD
Definition: win32_port.h:178
#define SIGHUP
Definition: win32_port.h:168
#define SIG_DFL
Definition: win32_port.h:163
#define SIGPIPE
Definition: win32_port.h:173
#define SIGUSR1
Definition: win32_port.h:180
#define SIGUSR2
Definition: win32_port.h:181
#define SIG_IGN
Definition: win32_port.h:165

References die, InitializeTimeouts(), pqsignal(), procsignal_sigusr1_handler(), SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3744 of file walsender.c.

3745{
3746 for (;;)
3747 {
3748 int i;
3749 bool all_stopped = true;
3750
3751 for (i = 0; i < max_wal_senders; i++)
3752 {
3753 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3754
3755 SpinLockAcquire(&walsnd->mutex);
3756
3757 if (walsnd->pid == 0)
3758 {
3759 SpinLockRelease(&walsnd->mutex);
3760 continue;
3761 }
3762
3763 if (walsnd->state != WALSNDSTATE_STOPPING)
3764 {
3765 all_stopped = false;
3766 SpinLockRelease(&walsnd->mutex);
3767 break;
3768 }
3769 SpinLockRelease(&walsnd->mutex);
3770 }
3771
3772 /* safe to leave if confirmation is done for all WAL senders */
3773 if (all_stopped)
3774 return;
3775
3776 pg_usleep(10000L); /* wait for 10 msec */
3777 }
3778}
void pg_usleep(long microsec)
Definition: signal.c:53

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtl, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

◆ WalSndWakeup()

void WalSndWakeup ( bool  physical,
bool  logical 
)

Definition at line 3639 of file walsender.c.

3640{
3641 /*
3642 * Wake up all the walsenders waiting on WAL being flushed or replayed
3643 * respectively. Note that waiting walsender would have prepared to sleep
3644 * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
3645 * before actually waiting.
3646 */
3647 if (physical)
3649
3650 if (logical)
3652}

References ConditionVariableBroadcast(), WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, and WalSndCtl.

Referenced by ApplyWalRecord(), KeepFileRestoredFromArchive(), StartupXLOG(), WalSndWakeupProcessRequests(), and XLogWalRcvFlush().

◆ WalSndWakeupProcessRequests()

static void WalSndWakeupProcessRequests ( bool  physical,
bool  logical 
)
inlinestatic

Definition at line 65 of file walsender.h.

66{
68 {
69 wake_wal_senders = false;
70 if (max_wal_senders > 0)
71 WalSndWakeup(physical, logical);
72 }
73}
PGDLLIMPORT bool wake_wal_senders
Definition: walsender.c:130
void WalSndWakeup(bool physical, bool logical)
Definition: walsender.c:3639
PGDLLIMPORT int max_wal_senders
Definition: walsender.c:121

References max_wal_senders, wake_wal_senders, and WalSndWakeup().

Referenced by XLogBackgroundFlush(), and XLogFlush().

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

PGDLLIMPORT bool am_db_walsender
extern

Definition at line 118 of file walsender.c.

Referenced by check_db(), ClientAuthentication(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ log_replication_commands

PGDLLIMPORT bool log_replication_commands
extern

◆ max_wal_senders

◆ wake_wal_senders

PGDLLIMPORT bool wake_wal_senders
extern

Definition at line 130 of file walsender.c.

Referenced by WalSndWakeupProcessRequests().

◆ wal_sender_timeout