PostgreSQL Source Code  git master
walsender.h File Reference
#include "access/xlogdefs.h"
Include dependency graph for walsender.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define WalSndWakeupRequest()    do { wake_wal_senders = true; } while (0)
 

Enumerations

enum  CRSSnapshotAction { CRS_EXPORT_SNAPSHOT , CRS_NOEXPORT_SNAPSHOT , CRS_USE_SNAPSHOT }
 

Functions

void InitWalSender (void)
 
bool exec_replication_command (const char *cmd_string)
 
void WalSndErrorCleanup (void)
 
void PhysicalWakeupLogicalWalSnd (void)
 
XLogRecPtr GetStandbyFlushRecPtr (TimeLineID *tli)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (bool physical, bool logical)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndRqstFileReload (void)
 
static void WalSndWakeupProcessRequests (bool physical, bool logical)
 

Variables

PGDLLIMPORT bool am_walsender
 
PGDLLIMPORT bool am_cascading_walsender
 
PGDLLIMPORT bool am_db_walsender
 
PGDLLIMPORT bool wake_wal_senders
 
PGDLLIMPORT int max_wal_senders
 
PGDLLIMPORT int wal_sender_timeout
 
PGDLLIMPORT bool log_replication_commands
 

Macro Definition Documentation

◆ WalSndWakeupRequest

#define WalSndWakeupRequest ( )     do { wake_wal_senders = true; } while (0)

Definition at line 58 of file walsender.h.

Enumeration Type Documentation

◆ CRSSnapshotAction

Enumerator
CRS_EXPORT_SNAPSHOT 
CRS_NOEXPORT_SNAPSHOT 
CRS_USE_SNAPSHOT 

Definition at line 20 of file walsender.h.

21 {
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22

Function Documentation

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1952 of file walsender.c.

1953 {
1954  int parse_rc;
1955  Node *cmd_node;
1956  const char *cmdtag;
1957  MemoryContext cmd_context;
1958  MemoryContext old_context;
1959 
1960  /*
1961  * If WAL sender has been told that shutdown is getting close, switch its
1962  * status accordingly to handle the next replication commands correctly.
1963  */
1964  if (got_STOPPING)
1966 
1967  /*
1968  * Throw error if in stopping mode. We need prevent commands that could
1969  * generate WAL while the shutdown checkpoint is being written. To be
1970  * safe, we just prohibit all new commands.
1971  */
1973  ereport(ERROR,
1974  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1975  errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1976 
1977  /*
1978  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1979  * command arrives. Clean up the old stuff if there's anything.
1980  */
1982 
1984 
1985  /*
1986  * Prepare to parse and execute the command.
1987  */
1989  "Replication command context",
1991  old_context = MemoryContextSwitchTo(cmd_context);
1992 
1993  replication_scanner_init(cmd_string);
1994 
1995  /*
1996  * Is it a WalSender command?
1997  */
1999  {
2000  /* Nope; clean up and get out. */
2002 
2003  MemoryContextSwitchTo(old_context);
2004  MemoryContextDelete(cmd_context);
2005 
2006  /* XXX this is a pretty random place to make this check */
2007  if (MyDatabaseId == InvalidOid)
2008  ereport(ERROR,
2009  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2010  errmsg("cannot execute SQL commands in WAL sender for physical replication")));
2011 
2012  /* Tell the caller that this wasn't a WalSender command. */
2013  return false;
2014  }
2015 
2016  /*
2017  * Looks like a WalSender command, so parse it.
2018  */
2019  parse_rc = replication_yyparse();
2020  if (parse_rc != 0)
2021  ereport(ERROR,
2022  (errcode(ERRCODE_SYNTAX_ERROR),
2023  errmsg_internal("replication command parser returned %d",
2024  parse_rc)));
2026 
2027  cmd_node = replication_parse_result;
2028 
2029  /*
2030  * Report query to various monitoring facilities. For this purpose, we
2031  * report replication commands just like SQL commands.
2032  */
2033  debug_query_string = cmd_string;
2034 
2036 
2037  /*
2038  * Log replication command if log_replication_commands is enabled. Even
2039  * when it's disabled, log the command with DEBUG1 level for backward
2040  * compatibility.
2041  */
2043  (errmsg("received replication command: %s", cmd_string)));
2044 
2045  /*
2046  * Disallow replication commands in aborted transaction blocks.
2047  */
2049  ereport(ERROR,
2050  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
2051  errmsg("current transaction is aborted, "
2052  "commands ignored until end of transaction block")));
2053 
2055 
2056  /*
2057  * Allocate buffers that will be used for each outgoing and incoming
2058  * message. We do this just once per command to reduce palloc overhead.
2059  */
2063 
2064  switch (cmd_node->type)
2065  {
2066  case T_IdentifySystemCmd:
2067  cmdtag = "IDENTIFY_SYSTEM";
2068  set_ps_display(cmdtag);
2069  IdentifySystem();
2070  EndReplicationCommand(cmdtag);
2071  break;
2072 
2073  case T_ReadReplicationSlotCmd:
2074  cmdtag = "READ_REPLICATION_SLOT";
2075  set_ps_display(cmdtag);
2077  EndReplicationCommand(cmdtag);
2078  break;
2079 
2080  case T_BaseBackupCmd:
2081  cmdtag = "BASE_BACKUP";
2082  set_ps_display(cmdtag);
2083  PreventInTransactionBlock(true, cmdtag);
2085  EndReplicationCommand(cmdtag);
2086  break;
2087 
2088  case T_CreateReplicationSlotCmd:
2089  cmdtag = "CREATE_REPLICATION_SLOT";
2090  set_ps_display(cmdtag);
2092  EndReplicationCommand(cmdtag);
2093  break;
2094 
2095  case T_DropReplicationSlotCmd:
2096  cmdtag = "DROP_REPLICATION_SLOT";
2097  set_ps_display(cmdtag);
2099  EndReplicationCommand(cmdtag);
2100  break;
2101 
2102  case T_AlterReplicationSlotCmd:
2103  cmdtag = "ALTER_REPLICATION_SLOT";
2104  set_ps_display(cmdtag);
2106  EndReplicationCommand(cmdtag);
2107  break;
2108 
2109  case T_StartReplicationCmd:
2110  {
2111  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
2112 
2113  cmdtag = "START_REPLICATION";
2114  set_ps_display(cmdtag);
2115  PreventInTransactionBlock(true, cmdtag);
2116 
2117  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
2118  StartReplication(cmd);
2119  else
2121 
2122  /* dupe, but necessary per libpqrcv_endstreaming */
2123  EndReplicationCommand(cmdtag);
2124 
2125  Assert(xlogreader != NULL);
2126  break;
2127  }
2128 
2129  case T_TimeLineHistoryCmd:
2130  cmdtag = "TIMELINE_HISTORY";
2131  set_ps_display(cmdtag);
2132  PreventInTransactionBlock(true, cmdtag);
2134  EndReplicationCommand(cmdtag);
2135  break;
2136 
2137  case T_VariableShowStmt:
2138  {
2140  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
2141 
2142  cmdtag = "SHOW";
2143  set_ps_display(cmdtag);
2144 
2145  /* syscache access needs a transaction environment */
2147  GetPGVariable(n->name, dest);
2149  EndReplicationCommand(cmdtag);
2150  }
2151  break;
2152 
2153  case T_UploadManifestCmd:
2154  cmdtag = "UPLOAD_MANIFEST";
2155  set_ps_display(cmdtag);
2156  PreventInTransactionBlock(true, cmdtag);
2157  UploadManifest();
2158  EndReplicationCommand(cmdtag);
2159  break;
2160 
2161  default:
2162  elog(ERROR, "unrecognized replication command node tag: %u",
2163  cmd_node->type);
2164  }
2165 
2166  /* done */
2167  MemoryContextSwitchTo(old_context);
2168  MemoryContextDelete(cmd_context);
2169 
2170  /*
2171  * We need not update ps display or pg_stat_activity, because PostgresMain
2172  * will reset those to "idle". But we must reset debug_query_string to
2173  * ensure it doesn't become a dangling pointer.
2174  */
2175  debug_query_string = NULL;
2176 
2177  return true;
2178 }
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void SendBaseBackup(BaseBackupCmd *cmd, IncrementalBackupInfo *ib)
Definition: basebackup.c:991
#define Assert(condition)
Definition: c.h:849
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:205
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
@ DestRemoteSimple
Definition: dest.h:91
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1157
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define LOG
Definition: elog.h:31
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
Oid MyDatabaseId
Definition: globals.c:93
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc_funcs.c:382
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
const char * debug_query_string
Definition: postgres.c:88
#define InvalidOid
Definition: postgres_ext.h:36
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
MemoryContextSwitchTo(old_ctx)
@ REPLICATION_KIND_PHYSICAL
Definition: replnodes.h:22
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:600
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
Definition: nodes.h:129
NodeTag type
Definition: nodes.h:130
ReplicationKind kind
Definition: replnodes.h:94
WalSndState state
static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
Definition: walsender.c:1385
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:557
WalSnd * MyWalSnd
Definition: walsender.c:112
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition: walsender.c:458
static StringInfoData tmpbuf
Definition: walsender.c:170
static void IdentifySystem(void)
Definition: walsender.c:377
static StringInfoData reply_message
Definition: walsender.c:169
void WalSndSetState(WalSndState state)
Definition: walsender.c:3781
static StringInfoData output_message
Definition: walsender.c:168
static void UploadManifest(void)
Definition: walsender.c:647
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:198
bool log_replication_commands
Definition: walsender.c:125
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1171
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1427
static IncrementalBackupInfo * uploaded_manifest
Definition: walsender.c:147
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1376
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:789
static XLogReaderState * xlogreader
Definition: walsender.c:137
PGDLLIMPORT Node * replication_parse_result
@ WALSNDSTATE_STOPPING
void replication_scanner_finish(void)
int replication_yyparse(void)
void replication_scanner_init(const char *str)
bool replication_scanner_is_replication_command(void)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3628
void StartTransactionCommand(void)
Definition: xact.c:3039
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:406
void CommitTransactionCommand(void)
Definition: xact.c:3137

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, AlterReplicationSlot(), Assert, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, MyWalSnd, VariableShowStmt::name, output_message, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_finish(), replication_scanner_init(), replication_scanner_is_replication_command(), replication_yyparse(), reply_message, SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, tmpbuf, Node::type, ReadReplicationSlotCmd::type, uploaded_manifest, UploadManifest(), WalSndSetState(), WALSNDSTATE_STOPPING, and xlogreader.

Referenced by PostgresMain().

◆ GetStandbyFlushRecPtr()

XLogRecPtr GetStandbyFlushRecPtr ( TimeLineID tli)

Definition at line 3476 of file walsender.c.

3477 {
3478  XLogRecPtr replayPtr;
3479  TimeLineID replayTLI;
3480  XLogRecPtr receivePtr;
3482  XLogRecPtr result;
3483 
3485 
3486  /*
3487  * We can safely send what's already been replayed. Also, if walreceiver
3488  * is streaming WAL from the same timeline, we can send anything that it
3489  * has streamed, but hasn't been replayed yet.
3490  */
3491 
3492  receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
3493  replayPtr = GetXLogReplayRecPtr(&replayTLI);
3494 
3495  if (tli)
3496  *tli = replayTLI;
3497 
3498  result = replayPtr;
3499  if (receiveTLI == replayTLI && receivePtr > replayPtr)
3500  result = receivePtr;
3501 
3502  return result;
3503 }
bool IsSyncingReplicationSlots(void)
Definition: slotsync.c:1651
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
bool am_cascading_walsender
Definition: walsender.c:116
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59
static TimeLineID receiveTLI
Definition: xlogrecovery.c:264
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References am_cascading_walsender, Assert, GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), IsSyncingReplicationSlots(), and receiveTLI.

Referenced by IdentifySystem(), StartReplication(), synchronize_one_slot(), XLogSendLogical(), and XLogSendPhysical().

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3532 of file walsender.c.

3533 {
3535 
3536  /*
3537  * If replication has not yet started, die like with SIGTERM. If
3538  * replication is active, only set a flag and wake up the main loop. It
3539  * will send any outstanding WAL, wait for it to be replicated to the
3540  * standby, and then exit gracefully.
3541  */
3542  if (!replication_active)
3543  kill(MyProcPid, SIGTERM);
3544  else
3545  got_STOPPING = true;
3546 }
int MyProcPid
Definition: globals.c:46
bool am_walsender
Definition: walsender.c:115
static volatile sig_atomic_t replication_active
Definition: walsender.c:206
#define kill(pid, sig)
Definition: win32_port.h:503

References am_walsender, Assert, got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

◆ InitWalSender()

void InitWalSender ( void  )

Referenced by PostgresMain().

◆ PhysicalWakeupLogicalWalSnd()

void PhysicalWakeupLogicalWalSnd ( void  )

Definition at line 1708 of file walsender.c.

1709 {
1711 
1712  /*
1713  * If we are running in a standby, there is no need to wake up walsenders.
1714  * This is because we do not support syncing slots to cascading standbys,
1715  * so, there are no walsenders waiting for standbys to catch up.
1716  */
1717  if (RecoveryInProgress())
1718  return;
1719 
1722 }
#define NameStr(name)
Definition: c.h:737
void ConditionVariableBroadcast(ConditionVariable *cv)
ReplicationSlot * MyReplicationSlot
Definition: slot.c:138
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition: slot.c:2575
#define SlotIsPhysical(slot)
Definition: slot.h:212
ReplicationSlotPersistentData data
Definition: slot.h:181
ConditionVariable wal_confirm_rcv_cv
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
bool RecoveryInProgress(void)
Definition: xlog.c:6333

References Assert, ConditionVariableBroadcast(), ReplicationSlot::data, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, RecoveryInProgress(), SlotExistsInSyncStandbySlots(), SlotIsPhysical, WalSndCtlData::wal_confirm_rcv_cv, and WalSndCtl.

Referenced by pg_physical_replication_slot_advance(), and PhysicalConfirmReceivedLocation().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 325 of file walsender.c.

326 {
330 
331  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
333 
334  if (MyReplicationSlot != NULL)
336 
337  ReplicationSlotCleanup(false);
338 
339  replication_active = false;
340 
341  /*
342  * If there is a transaction in progress, it will clean up our
343  * ResourceOwner, but if a replication command set up a resource owner
344  * without a transaction, we've got to clean that up now.
345  */
348 
349  if (got_STOPPING || got_SIGUSR2)
350  proc_exit(0);
351 
352  /* Revert back to startup state */
354 }
bool ConditionVariableCancelSleep(void)
void proc_exit(int code)
Definition: ipc.c:104
void LWLockReleaseAll(void)
Definition: lwlock.c:1876
void ReleaseAuxProcessResources(bool isCommit)
Definition: resowner.c:1002
void ReplicationSlotRelease(void)
Definition: slot.c:652
void ReplicationSlotCleanup(bool synced_only)
Definition: slot.c:745
WALOpenSegment seg
Definition: xlogreader.h:272
static void pgstat_report_wait_end(void)
Definition: wait_event.h:101
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:197
@ WALSNDSTATE_STARTUP
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4982
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:842

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), ReleaseAuxProcessResources(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndSetState(), WALSNDSTATE_STARTUP, WALOpenSegment::ws_file, and xlogreader.

Referenced by PostgresMain().

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3717 of file walsender.c.

3718 {
3719  int i;
3720 
3721  for (i = 0; i < max_wal_senders; i++)
3722  {
3723  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3724  pid_t pid;
3725 
3726  SpinLockAcquire(&walsnd->mutex);
3727  pid = walsnd->pid;
3728  SpinLockRelease(&walsnd->mutex);
3729 
3730  if (pid == 0)
3731  continue;
3732 
3734  }
3735 }
int i
Definition: isn.c:73
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition: procsignal.c:281
@ PROCSIG_WALSND_INIT_STOPPING
Definition: procsignal.h:35
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
int max_wal_senders
Definition: walsender.c:121

References i, INVALID_PROC_NUMBER, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3509 of file walsender.c.

3510 {
3511  int i;
3512 
3513  for (i = 0; i < max_wal_senders; i++)
3514  {
3515  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3516 
3517  SpinLockAcquire(&walsnd->mutex);
3518  if (walsnd->pid == 0)
3519  {
3520  SpinLockRelease(&walsnd->mutex);
3521  continue;
3522  }
3523  walsnd->needreload = true;
3524  SpinLockRelease(&walsnd->mutex);
3525  }
3526 }
bool needreload

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3593 of file walsender.c.

3594 {
3595  bool found;
3596  int i;
3597 
3598  WalSndCtl = (WalSndCtlData *)
3599  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3600 
3601  if (!found)
3602  {
3603  /* First time through, so initialize */
3605 
3606  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3608 
3609  for (i = 0; i < max_wal_senders; i++)
3610  {
3611  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3612 
3613  SpinLockInit(&walsnd->mutex);
3614  }
3615 
3619  }
3620 }
#define MemSet(start, val, len)
Definition: c.h:1011
void ConditionVariableInit(ConditionVariable *cv)
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
#define SpinLockInit(lock)
Definition: spin.h:57
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:27
Size WalSndShmemSize(void)
Definition: walsender.c:3581

References ConditionVariableInit(), dlist_init(), i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateOrAttachShmemStructs().

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3581 of file walsender.c.

3582 {
3583  Size size = 0;
3584 
3585  size = offsetof(WalSndCtlData, walsnds);
3587 
3588  return size;
3589 }
size_t Size
Definition: c.h:596
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510
static pg_noinline void Size size
Definition: slab.c:607

References add_size(), max_wal_senders, mul_size(), and size.

Referenced by CalculateShmemSize(), and WalSndShmemInit().

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3562 of file walsender.c.

3563 {
3564  /* Set up signal handlers */
3566  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3567  pqsignal(SIGTERM, die); /* request shutdown */
3568  /* SIGQUIT handler was already set up by InitPostmasterChild */
3569  InitializeTimeouts(); /* establishes SIGALRM handler */
3572  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3573  * shutdown */
3574 
3575  /* Reset some signals that are accepted by postmaster but not here */
3577 }
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
#define die(msg)
pqsigfunc pqsignal(int signo, pqsigfunc func)
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:3035
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:671
void InitializeTimeouts(void)
Definition: timeout.c:470
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3554
#define SIGCHLD
Definition: win32_port.h:178
#define SIGHUP
Definition: win32_port.h:168
#define SIG_DFL
Definition: win32_port.h:163
#define SIGPIPE
Definition: win32_port.h:173
#define SIGUSR1
Definition: win32_port.h:180
#define SIGUSR2
Definition: win32_port.h:181
#define SIG_IGN
Definition: win32_port.h:165

References die, InitializeTimeouts(), pqsignal(), procsignal_sigusr1_handler(), SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3743 of file walsender.c.

3744 {
3745  for (;;)
3746  {
3747  int i;
3748  bool all_stopped = true;
3749 
3750  for (i = 0; i < max_wal_senders; i++)
3751  {
3752  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3753 
3754  SpinLockAcquire(&walsnd->mutex);
3755 
3756  if (walsnd->pid == 0)
3757  {
3758  SpinLockRelease(&walsnd->mutex);
3759  continue;
3760  }
3761 
3762  if (walsnd->state != WALSNDSTATE_STOPPING)
3763  {
3764  all_stopped = false;
3765  SpinLockRelease(&walsnd->mutex);
3766  break;
3767  }
3768  SpinLockRelease(&walsnd->mutex);
3769  }
3770 
3771  /* safe to leave if confirmation is done for all WAL senders */
3772  if (all_stopped)
3773  return;
3774 
3775  pg_usleep(10000L); /* wait for 10 msec */
3776  }
3777 }
void pg_usleep(long microsec)
Definition: signal.c:53

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtl, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

◆ WalSndWakeup()

void WalSndWakeup ( bool  physical,
bool  logical 
)

Definition at line 3638 of file walsender.c.

3639 {
3640  /*
3641  * Wake up all the walsenders waiting on WAL being flushed or replayed
3642  * respectively. Note that waiting walsender would have prepared to sleep
3643  * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
3644  * before actually waiting.
3645  */
3646  if (physical)
3648 
3649  if (logical)
3651 }

References ConditionVariableBroadcast(), WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, and WalSndCtl.

Referenced by ApplyWalRecord(), KeepFileRestoredFromArchive(), StartupXLOG(), WalSndWakeupProcessRequests(), and XLogWalRcvFlush().

◆ WalSndWakeupProcessRequests()

static void WalSndWakeupProcessRequests ( bool  physical,
bool  logical 
)
inlinestatic

Definition at line 65 of file walsender.h.

66 {
67  if (wake_wal_senders)
68  {
69  wake_wal_senders = false;
70  if (max_wal_senders > 0)
71  WalSndWakeup(physical, logical);
72  }
73 }
PGDLLIMPORT bool wake_wal_senders
Definition: walsender.c:130
void WalSndWakeup(bool physical, bool logical)
Definition: walsender.c:3638
PGDLLIMPORT int max_wal_senders
Definition: walsender.c:121

References max_wal_senders, wake_wal_senders, and WalSndWakeup().

Referenced by XLogBackgroundFlush(), and XLogFlush().

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

PGDLLIMPORT bool am_db_walsender
extern

Definition at line 118 of file walsender.c.

Referenced by check_db(), ClientAuthentication(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ log_replication_commands

PGDLLIMPORT bool log_replication_commands
extern

◆ max_wal_senders

◆ wake_wal_senders

PGDLLIMPORT bool wake_wal_senders
extern

Definition at line 130 of file walsender.c.

Referenced by WalSndWakeupProcessRequests().

◆ wal_sender_timeout