PostgreSQL Source Code git master
Loading...
Searching...
No Matches
walsender.h File Reference
#include "access/xlogdefs.h"
Include dependency graph for walsender.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define WalSndWakeupRequest()    do { wake_wal_senders = true; } while (0)
 

Enumerations

enum  CRSSnapshotAction { CRS_EXPORT_SNAPSHOT , CRS_NOEXPORT_SNAPSHOT , CRS_USE_SNAPSHOT }
 

Functions

void InitWalSender (void)
 
bool exec_replication_command (const char *cmd_string)
 
void WalSndErrorCleanup (void)
 
void PhysicalWakeupLogicalWalSnd (void)
 
XLogRecPtr GetStandbyFlushRecPtr (TimeLineID *tli)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (bool physical, bool logical)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndRqstFileReload (void)
 
static void WalSndWakeupProcessRequests (bool physical, bool logical)
 

Variables

PGDLLIMPORT bool am_walsender
 
PGDLLIMPORT bool am_cascading_walsender
 
PGDLLIMPORT bool am_db_walsender
 
PGDLLIMPORT bool wake_wal_senders
 
PGDLLIMPORT int max_wal_senders
 
PGDLLIMPORT int wal_sender_timeout
 
PGDLLIMPORT bool log_replication_commands
 

Macro Definition Documentation

◆ WalSndWakeupRequest

#define WalSndWakeupRequest ( )     do { wake_wal_senders = true; } while (0)

Definition at line 58 of file walsender.h.

59 { wake_wal_senders = true; } while (0)
PGDLLIMPORT bool wake_wal_senders
Definition walsender.c:139

Enumeration Type Documentation

◆ CRSSnapshotAction

Enumerator
CRS_EXPORT_SNAPSHOT 
CRS_NOEXPORT_SNAPSHOT 
CRS_USE_SNAPSHOT 

Definition at line 20 of file walsender.h.

21{
CRSSnapshotAction
Definition walsender.h:21
@ CRS_USE_SNAPSHOT
Definition walsender.h:24
@ CRS_NOEXPORT_SNAPSHOT
Definition walsender.h:23
@ CRS_EXPORT_SNAPSHOT
Definition walsender.h:22

Function Documentation

◆ exec_replication_command()

bool exec_replication_command ( const char cmd_string)
extern

Definition at line 2022 of file walsender.c.

2023{
2024 yyscan_t scanner;
2025 int parse_rc;
2026 Node *cmd_node;
2027 const char *cmdtag;
2029
2030 /* We save and re-use the cmd_context across calls */
2032
2033 /*
2034 * If WAL sender has been told that shutdown is getting close, switch its
2035 * status accordingly to handle the next replication commands correctly.
2036 */
2037 if (got_STOPPING)
2039
2040 /*
2041 * Throw error if in stopping mode. We need prevent commands that could
2042 * generate WAL while the shutdown checkpoint is being written. To be
2043 * safe, we just prohibit all new commands.
2044 */
2046 ereport(ERROR,
2048 errmsg("cannot execute new commands while WAL sender is in stopping mode")));
2049
2050 /*
2051 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
2052 * command arrives. Clean up the old stuff if there's anything.
2053 */
2055
2057
2058 /*
2059 * Prepare to parse and execute the command.
2060 *
2061 * Because replication command execution can involve beginning or ending
2062 * transactions, we need a working context that will survive that, so we
2063 * make it a child of TopMemoryContext. That in turn creates a hazard of
2064 * long-lived memory leaks if we lose track of the working context. We
2065 * deal with that by creating it only once per walsender, and resetting it
2066 * for each new command. (Normally this reset is a no-op, but if the
2067 * prior exec_replication_command call failed with an error, it won't be.)
2068 *
2069 * This is subtler than it looks. The transactions we manage can extend
2070 * across replication commands, indeed SnapBuildClearExportedSnapshot
2071 * might have just ended one. Because transaction exit will revert to the
2072 * memory context that was current at transaction start, we need to be
2073 * sure that that context is still valid. That motivates re-using the
2074 * same cmd_context rather than making a new one each time.
2075 */
2076 if (cmd_context == NULL)
2078 "Replication command context",
2080 else
2082
2084
2086
2087 /*
2088 * Is it a WalSender command?
2089 */
2091 {
2092 /* Nope; clean up and get out. */
2094
2097
2098 /* XXX this is a pretty random place to make this check */
2099 if (MyDatabaseId == InvalidOid)
2100 ereport(ERROR,
2102 errmsg("cannot execute SQL commands in WAL sender for physical replication")));
2103
2104 /* Tell the caller that this wasn't a WalSender command. */
2105 return false;
2106 }
2107
2108 /*
2109 * Looks like a WalSender command, so parse it.
2110 */
2112 if (parse_rc != 0)
2113 ereport(ERROR,
2115 errmsg_internal("replication command parser returned %d",
2116 parse_rc)));
2118
2119 /*
2120 * Report query to various monitoring facilities. For this purpose, we
2121 * report replication commands just like SQL commands.
2122 */
2124
2126
2127 /*
2128 * Log replication command if log_replication_commands is enabled. Even
2129 * when it's disabled, log the command with DEBUG1 level for backward
2130 * compatibility.
2131 */
2133 (errmsg("received replication command: %s", cmd_string)));
2134
2135 /*
2136 * Disallow replication commands in aborted transaction blocks.
2137 */
2139 ereport(ERROR,
2141 errmsg("current transaction is aborted, "
2142 "commands ignored until end of transaction block")));
2143
2145
2146 /*
2147 * Allocate buffers that will be used for each outgoing and incoming
2148 * message. We do this just once per command to reduce palloc overhead.
2149 */
2153
2154 switch (cmd_node->type)
2155 {
2157 cmdtag = "IDENTIFY_SYSTEM";
2161 break;
2162
2164 cmdtag = "READ_REPLICATION_SLOT";
2168 break;
2169
2170 case T_BaseBackupCmd:
2171 cmdtag = "BASE_BACKUP";
2176 break;
2177
2179 cmdtag = "CREATE_REPLICATION_SLOT";
2183 break;
2184
2186 cmdtag = "DROP_REPLICATION_SLOT";
2190 break;
2191
2193 cmdtag = "ALTER_REPLICATION_SLOT";
2197 break;
2198
2200 {
2202
2203 cmdtag = "START_REPLICATION";
2206
2207 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
2208 StartReplication(cmd);
2209 else
2211
2212 /* dupe, but necessary per libpqrcv_endstreaming */
2214
2216 break;
2217 }
2218
2220 cmdtag = "TIMELINE_HISTORY";
2225 break;
2226
2227 case T_VariableShowStmt:
2228 {
2231
2232 cmdtag = "SHOW";
2234
2235 /* syscache access needs a transaction environment */
2237 GetPGVariable(n->name, dest);
2240 }
2241 break;
2242
2244 cmdtag = "UPLOAD_MANIFEST";
2249 break;
2250
2251 default:
2252 elog(ERROR, "unrecognized replication command node tag: %u",
2253 cmd_node->type);
2254 }
2255
2256 /*
2257 * Done. Revert to caller's memory context, and clean out the cmd_context
2258 * to recover memory right away.
2259 */
2262
2263 /*
2264 * We need not update ps display or pg_stat_activity, because PostgresMain
2265 * will reset those to "idle". But we must reset debug_query_string to
2266 * ensure it doesn't become a dangling pointer.
2267 */
2269
2270 return true;
2271}
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void SendBaseBackup(BaseBackupCmd *cmd, IncrementalBackupInfo *ib)
Definition basebackup.c:992
#define Assert(condition)
Definition c.h:945
void * yyscan_t
Definition cubedata.h:65
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition dest.c:113
void EndReplicationCommand(const char *commandTag)
Definition dest.c:206
@ DestRemoteSimple
Definition dest.h:91
int errcode(int sqlerrcode)
Definition elog.c:874
#define LOG
Definition elog.h:31
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
#define DEBUG1
Definition elog.h:30
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
Oid MyDatabaseId
Definition globals.c:94
void GetPGVariable(const char *name, DestReceiver *dest)
Definition guc_funcs.c:410
void MemoryContextReset(MemoryContext context)
Definition mcxt.c:403
MemoryContext TopMemoryContext
Definition mcxt.c:166
MemoryContext CurrentMemoryContext
Definition mcxt.c:160
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
static char * errmsg
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
const char * debug_query_string
Definition postgres.c:91
#define InvalidOid
static int fb(int x)
static void set_ps_display(const char *activity)
Definition ps_status.h:40
bool replication_scanner_is_replication_command(yyscan_t yyscanner)
void replication_scanner_finish(yyscan_t yyscanner)
void replication_scanner_init(const char *str, yyscan_t *yyscannerp)
@ REPLICATION_KIND_PHYSICAL
Definition replnodes.h:22
void SnapBuildClearExportedSnapshot(void)
Definition snapbuild.c:602
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
Definition nodes.h:135
ReplicationKind kind
Definition replnodes.h:94
WalSndState state
static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
Definition walsender.c:1421
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition walsender.c:583
WalSnd * MyWalSnd
Definition walsender.c:121
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition walsender.c:483
static StringInfoData tmpbuf
Definition walsender.c:179
static void IdentifySystem(void)
Definition walsender.c:401
static StringInfoData reply_message
Definition walsender.c:178
void WalSndSetState(WalSndState state)
Definition walsender.c:3965
static StringInfoData output_message
Definition walsender.c:177
static void UploadManifest(void)
Definition walsender.c:674
static volatile sig_atomic_t got_STOPPING
Definition walsender.c:207
bool log_replication_commands
Definition walsender.c:134
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition walsender.c:1199
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition walsender.c:1463
static IncrementalBackupInfo * uploaded_manifest
Definition walsender.c:156
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition walsender.c:1412
static void StartReplication(StartReplicationCmd *cmd)
Definition walsender.c:816
static XLogReaderState * xlogreader
Definition walsender.c:146
@ WALSNDSTATE_STOPPING
int replication_yyparse(Node **replication_parse_result_p, yyscan_t yyscanner)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition xact.c:3670
void StartTransactionCommand(void)
Definition xact.c:3081
bool IsAbortedTransactionBlockState(void)
Definition xact.c:409
void CommitTransactionCommand(void)
Definition xact.c:3179

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, AlterReplicationSlot(), Assert, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg, errmsg_internal(), ERROR, fb(), GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextReset(), MemoryContextSwitchTo(), MyDatabaseId, MyWalSnd, VariableShowStmt::name, output_message, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_scanner_finish(), replication_scanner_init(), replication_scanner_is_replication_command(), replication_yyparse(), reply_message, SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, tmpbuf, TopMemoryContext, uploaded_manifest, UploadManifest(), WalSndSetState(), WALSNDSTATE_STOPPING, and xlogreader.

Referenced by PostgresMain().

◆ GetStandbyFlushRecPtr()

XLogRecPtr GetStandbyFlushRecPtr ( TimeLineID tli)
extern

Definition at line 3660 of file walsender.c.

3661{
3663 TimeLineID replayTLI;
3666 XLogRecPtr result;
3667
3669
3670 /*
3671 * We can safely send what's already been replayed. Also, if walreceiver
3672 * is streaming WAL from the same timeline, we can send anything that it
3673 * has streamed, but hasn't been replayed yet.
3674 */
3675
3677 replayPtr = GetXLogReplayRecPtr(&replayTLI);
3678
3679 if (tli)
3680 *tli = replayTLI;
3681
3682 result = replayPtr;
3683 if (receiveTLI == replayTLI && receivePtr > replayPtr)
3684 result = receivePtr;
3685
3686 return result;
3687}
bool IsSyncingReplicationSlots(void)
Definition slotsync.c:1825
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
bool am_cascading_walsender
Definition walsender.c:125
uint64 XLogRecPtr
Definition xlogdefs.h:21
uint32 TimeLineID
Definition xlogdefs.h:63
static TimeLineID receiveTLI
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References am_cascading_walsender, Assert, fb(), GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), IsSyncingReplicationSlots(), and receiveTLI.

Referenced by IdentifySystem(), StartReplication(), update_local_synced_slot(), and XLogSendPhysical().

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )
extern

Definition at line 3716 of file walsender.c.

3717{
3719
3720 /*
3721 * If replication has not yet started, die like with SIGTERM. If
3722 * replication is active, only set a flag and wake up the main loop. It
3723 * will send any outstanding WAL, wait for it to be replicated to the
3724 * standby, and then exit gracefully.
3725 */
3726 if (!replication_active)
3728 else
3729 got_STOPPING = true;
3730}
int MyProcPid
Definition globals.c:47
bool am_walsender
Definition walsender.c:124
static volatile sig_atomic_t replication_active
Definition walsender.c:215
#define kill(pid, sig)
Definition win32_port.h:490

References am_walsender, Assert, fb(), got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

◆ InitWalSender()

void InitWalSender ( void  )
extern

Definition at line 302 of file walsender.c.

303{
305
306 /* Create a per-walsender data structure in shared memory */
308
309 /* need resource owner for e.g. basebackups */
311
312 /*
313 * Let postmaster know that we're a WAL sender. Once we've declared us as
314 * a WAL sender process, postmaster will let us outlive the bgwriter and
315 * kill us last in the shutdown sequence, so we get a chance to stream all
316 * remaining WAL at shutdown, including the shutdown checkpoint. Note that
317 * there's no going back, and we mustn't write any WAL records after this.
318 */
321
322 /*
323 * If the client didn't specify a database to connect to, show in PGPROC
324 * that our advertised xmin should affect vacuum horizons in all
325 * databases. This allows physical replication clients to send hot
326 * standby feedback that will delay vacuum cleanup in all databases.
327 */
329 {
335 }
336
337 /* Initialize empty timestamp buffer for lag tracking. */
339}
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1177
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1794
@ LW_EXCLUSIVE
Definition lwlock.h:112
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition mcxt.c:1266
void SendPostmasterSignal(PMSignalReason reason)
Definition pmsignal.c:165
void MarkPostmasterChildWalSender(void)
Definition pmsignal.c:309
@ PMSIGNAL_ADVANCE_STATE_MACHINE
Definition pmsignal.h:43
#define PROC_AFFECTS_ALL_HORIZONS
Definition proc.h:63
void CreateAuxProcessResourceOwner(void)
Definition resowner.c:996
PGPROC * MyProc
Definition proc.c:68
PROC_HDR * ProcGlobal
Definition proc.c:71
TransactionId xmin
Definition proc.h:239
uint8 statusFlags
Definition proc.h:207
int pgxactoff
Definition proc.h:204
uint8 * statusFlags
Definition proc.h:453
#define InvalidTransactionId
Definition transam.h:31
static void InitWalSenderSlot(void)
Definition walsender.c:3044
static LagTracker * lag_tracker
Definition walsender.c:253
bool RecoveryInProgress(void)
Definition xlog.c:6444

References am_cascading_walsender, Assert, CreateAuxProcessResourceOwner(), fb(), InitWalSenderSlot(), InvalidOid, InvalidTransactionId, lag_tracker, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MarkPostmasterChildWalSender(), MemoryContextAllocZero(), MyDatabaseId, MyProc, PGPROC::pgxactoff, PMSIGNAL_ADVANCE_STATE_MACHINE, PROC_AFFECTS_ALL_HORIZONS, ProcGlobal, RecoveryInProgress(), SendPostmasterSignal(), PGPROC::statusFlags, PROC_HDR::statusFlags, TopMemoryContext, and PGPROC::xmin.

Referenced by PostgresMain().

◆ PhysicalWakeupLogicalWalSnd()

void PhysicalWakeupLogicalWalSnd ( void  )
extern

Definition at line 1765 of file walsender.c.

1766{
1768
1769 /*
1770 * If we are running in a standby, there is no need to wake up walsenders.
1771 * This is because we do not support syncing slots to cascading standbys,
1772 * so, there are no walsenders waiting for standbys to catch up.
1773 */
1774 if (RecoveryInProgress())
1775 return;
1776
1779}
#define NameStr(name)
Definition c.h:837
void ConditionVariableBroadcast(ConditionVariable *cv)
ReplicationSlot * MyReplicationSlot
Definition slot.c:149
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition slot.c:3060
#define SlotIsPhysical(slot)
Definition slot.h:287
ReplicationSlotPersistentData data
Definition slot.h:213
ConditionVariable wal_confirm_rcv_cv
WalSndCtlData * WalSndCtl
Definition walsender.c:118

References Assert, ConditionVariableBroadcast(), ReplicationSlot::data, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, RecoveryInProgress(), SlotExistsInSyncStandbySlots(), SlotIsPhysical, WalSndCtlData::wal_confirm_rcv_cv, and WalSndCtl.

Referenced by pg_physical_replication_slot_advance(), and PhysicalConfirmReceivedLocation().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )
extern

Definition at line 349 of file walsender.c.

350{
355
356 if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
358
359 if (MyReplicationSlot != NULL)
361
363
364 replication_active = false;
365
366 /*
367 * If there is a transaction in progress, it will clean up our
368 * ResourceOwner, but if a replication command set up a resource owner
369 * without a transaction, we've got to clean that up now.
370 */
373
375 proc_exit(0);
376
377 /* Revert back to startup state */
379}
void pgaio_error_cleanup(void)
Definition aio.c:1165
bool ConditionVariableCancelSleep(void)
void proc_exit(int code)
Definition ipc.c:105
void LWLockReleaseAll(void)
Definition lwlock.c:1893
void ReleaseAuxProcessResources(bool isCommit)
Definition resowner.c:1016
void ReplicationSlotRelease(void)
Definition slot.c:762
void ReplicationSlotCleanup(bool synced_only)
Definition slot.c:861
WALOpenSegment seg
Definition xlogreader.h:271
static void pgstat_report_wait_end(void)
Definition wait_event.h:85
static volatile sig_atomic_t got_SIGUSR2
Definition walsender.c:206
@ WALSNDSTATE_STARTUP
bool IsTransactionOrTransactionBlock(void)
Definition xact.c:5012
void wal_segment_close(XLogReaderState *state)
Definition xlogutils.c:831

References ConditionVariableCancelSleep(), fb(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgaio_error_cleanup(), pgstat_report_wait_end(), proc_exit(), ReleaseAuxProcessResources(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndSetState(), WALSNDSTATE_STARTUP, WALOpenSegment::ws_file, and xlogreader.

Referenced by PostgresMain().

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )
extern

Definition at line 3901 of file walsender.c.

3902{
3903 int i;
3904
3905 for (i = 0; i < max_wal_senders; i++)
3906 {
3908 pid_t pid;
3909
3910 SpinLockAcquire(&walsnd->mutex);
3911 pid = walsnd->pid;
3912 SpinLockRelease(&walsnd->mutex);
3913
3914 if (pid == 0)
3915 continue;
3916
3918 }
3919}
int i
Definition isn.c:77
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition procsignal.c:287
@ PROCSIG_WALSND_INIT_STOPPING
Definition procsignal.h:35
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
int max_wal_senders
Definition walsender.c:130

References fb(), i, INVALID_PROC_NUMBER, max_wal_senders, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire(), SpinLockRelease(), WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )
extern

Definition at line 3693 of file walsender.c.

3694{
3695 int i;
3696
3697 for (i = 0; i < max_wal_senders; i++)
3698 {
3700
3701 SpinLockAcquire(&walsnd->mutex);
3702 if (walsnd->pid == 0)
3703 {
3704 SpinLockRelease(&walsnd->mutex);
3705 continue;
3706 }
3707 walsnd->needreload = true;
3708 SpinLockRelease(&walsnd->mutex);
3709 }
3710}

References fb(), i, max_wal_senders, SpinLockAcquire(), SpinLockRelease(), WalSndCtl, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )
extern

Definition at line 3777 of file walsender.c.

3778{
3779 bool found;
3780 int i;
3781
3783 ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3784
3785 if (!found)
3786 {
3787 /* First time through, so initialize */
3789
3790 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3792
3793 for (i = 0; i < max_wal_senders; i++)
3794 {
3796
3797 SpinLockInit(&walsnd->mutex);
3798 }
3799
3803 }
3804}
#define MemSet(start, val, len)
Definition c.h:1109
void ConditionVariableInit(ConditionVariable *cv)
static void dlist_init(dlist_head *head)
Definition ilist.h:314
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:381
static void SpinLockInit(volatile slock_t *lock)
Definition spin.h:50
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv
#define NUM_SYNC_REP_WAIT_MODE
Definition syncrep.h:27
Size WalSndShmemSize(void)
Definition walsender.c:3765

References ConditionVariableInit(), dlist_init(), fb(), i, max_wal_senders, MemSet, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SpinLockInit(), WalSndCtlData::SyncRepQueue, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateOrAttachShmemStructs().

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )
extern

Definition at line 3765 of file walsender.c.

3766{
3767 Size size = 0;
3768
3769 size = offsetof(WalSndCtlData, walsnds);
3770 size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3771
3772 return size;
3773}
size_t Size
Definition c.h:691
Size add_size(Size s1, Size s2)
Definition shmem.c:485
Size mul_size(Size s1, Size s2)
Definition shmem.c:500

References add_size(), fb(), max_wal_senders, and mul_size().

Referenced by CalculateShmemSize(), and WalSndShmemInit().

◆ WalSndSignals()

void WalSndSignals ( void  )
extern

Definition at line 3746 of file walsender.c.

3747{
3748 /* Set up signal handlers */
3750 pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3751 pqsignal(SIGTERM, die); /* request shutdown */
3752 /* SIGQUIT handler was already set up by InitPostmasterChild */
3753 InitializeTimeouts(); /* establishes SIGALRM handler */
3756 pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3757 * shutdown */
3758
3759 /* Reset some signals that are accepted by postmaster but not here */
3761}
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition interrupt.c:61
#define die(msg)
#define pqsignal
Definition port.h:547
void StatementCancelHandler(SIGNAL_ARGS)
Definition postgres.c:3042
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition procsignal.c:680
void InitializeTimeouts(void)
Definition timeout.c:470
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition walsender.c:3738
#define SIGCHLD
Definition win32_port.h:168
#define SIGHUP
Definition win32_port.h:158
#define SIGPIPE
Definition win32_port.h:163
#define SIGUSR1
Definition win32_port.h:170
#define SIGUSR2
Definition win32_port.h:171

References die, fb(), InitializeTimeouts(), pqsignal, procsignal_sigusr1_handler(), SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )
extern

Definition at line 3927 of file walsender.c.

3928{
3929 for (;;)
3930 {
3931 int i;
3932 bool all_stopped = true;
3933
3934 for (i = 0; i < max_wal_senders; i++)
3935 {
3937
3938 SpinLockAcquire(&walsnd->mutex);
3939
3940 if (walsnd->pid == 0)
3941 {
3942 SpinLockRelease(&walsnd->mutex);
3943 continue;
3944 }
3945
3946 if (walsnd->state != WALSNDSTATE_STOPPING)
3947 {
3948 all_stopped = false;
3949 SpinLockRelease(&walsnd->mutex);
3950 break;
3951 }
3952 SpinLockRelease(&walsnd->mutex);
3953 }
3954
3955 /* safe to leave if confirmation is done for all WAL senders */
3956 if (all_stopped)
3957 return;
3958
3959 pg_usleep(10000L); /* wait for 10 msec */
3960 }
3961}
void pg_usleep(long microsec)
Definition signal.c:53

References fb(), i, max_wal_senders, pg_usleep(), SpinLockAcquire(), SpinLockRelease(), WalSndCtl, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

◆ WalSndWakeup()

void WalSndWakeup ( bool  physical,
bool  logical 
)
extern

Definition at line 3822 of file walsender.c.

3823{
3824 /*
3825 * Wake up all the walsenders waiting on WAL being flushed or replayed
3826 * respectively. Note that waiting walsender would have prepared to sleep
3827 * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
3828 * before actually waiting.
3829 */
3830 if (physical)
3832
3833 if (logical)
3835}

References ConditionVariableBroadcast(), WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, and WalSndCtl.

Referenced by ApplyWalRecord(), KeepFileRestoredFromArchive(), StartupXLOG(), WalSndWakeupProcessRequests(), and XLogWalRcvFlush().

◆ WalSndWakeupProcessRequests()

static void WalSndWakeupProcessRequests ( bool  physical,
bool  logical 
)
inlinestatic

Definition at line 65 of file walsender.h.

66{
68 {
69 wake_wal_senders = false;
70 if (max_wal_senders > 0)
71 WalSndWakeup(physical, logical);
72 }
73}
void WalSndWakeup(bool physical, bool logical)
Definition walsender.c:3822
PGDLLIMPORT int max_wal_senders
Definition walsender.c:130

References max_wal_senders, wake_wal_senders, and WalSndWakeup().

Referenced by XLogBackgroundFlush(), and XLogFlush().

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

PGDLLIMPORT bool am_db_walsender
extern

Definition at line 127 of file walsender.c.

Referenced by check_db(), ClientAuthentication(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ log_replication_commands

PGDLLIMPORT bool log_replication_commands
extern

◆ max_wal_senders

◆ wake_wal_senders

PGDLLIMPORT bool wake_wal_senders
extern

Definition at line 139 of file walsender.c.

Referenced by WalSndWakeupProcessRequests().

◆ wal_sender_timeout