PostgreSQL Source Code  git master
walsender.h File Reference
#include "access/xlogdefs.h"
Include dependency graph for walsender.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define WalSndWakeupRequest()    do { wake_wal_senders = true; } while (0)
 

Enumerations

enum  CRSSnapshotAction { CRS_EXPORT_SNAPSHOT , CRS_NOEXPORT_SNAPSHOT , CRS_USE_SNAPSHOT }
 

Functions

void InitWalSender (void)
 
bool exec_replication_command (const char *cmd_string)
 
void WalSndErrorCleanup (void)
 
void WalSndResourceCleanup (bool isCommit)
 
void PhysicalWakeupLogicalWalSnd (void)
 
XLogRecPtr GetStandbyFlushRecPtr (TimeLineID *tli)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (bool physical, bool logical)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndRqstFileReload (void)
 
static void WalSndWakeupProcessRequests (bool physical, bool logical)
 

Variables

PGDLLIMPORT bool am_walsender
 
PGDLLIMPORT bool am_cascading_walsender
 
PGDLLIMPORT bool am_db_walsender
 
PGDLLIMPORT bool wake_wal_senders
 
PGDLLIMPORT int max_wal_senders
 
PGDLLIMPORT int wal_sender_timeout
 
PGDLLIMPORT bool log_replication_commands
 

Macro Definition Documentation

◆ WalSndWakeupRequest

#define WalSndWakeupRequest ( )     do { wake_wal_senders = true; } while (0)

Definition at line 59 of file walsender.h.

Enumeration Type Documentation

◆ CRSSnapshotAction

Enumerator
CRS_EXPORT_SNAPSHOT 
CRS_NOEXPORT_SNAPSHOT 
CRS_USE_SNAPSHOT 

Definition at line 20 of file walsender.h.

21 {
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22

Function Documentation

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1976 of file walsender.c.

1977 {
1978  int parse_rc;
1979  Node *cmd_node;
1980  const char *cmdtag;
1981  MemoryContext cmd_context;
1982  MemoryContext old_context;
1983 
1984  /*
1985  * If WAL sender has been told that shutdown is getting close, switch its
1986  * status accordingly to handle the next replication commands correctly.
1987  */
1988  if (got_STOPPING)
1990 
1991  /*
1992  * Throw error if in stopping mode. We need prevent commands that could
1993  * generate WAL while the shutdown checkpoint is being written. To be
1994  * safe, we just prohibit all new commands.
1995  */
1997  ereport(ERROR,
1998  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1999  errmsg("cannot execute new commands while WAL sender is in stopping mode")));
2000 
2001  /*
2002  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
2003  * command arrives. Clean up the old stuff if there's anything.
2004  */
2006 
2008 
2009  /*
2010  * Prepare to parse and execute the command.
2011  */
2013  "Replication command context",
2015  old_context = MemoryContextSwitchTo(cmd_context);
2016 
2017  replication_scanner_init(cmd_string);
2018 
2019  /*
2020  * Is it a WalSender command?
2021  */
2023  {
2024  /* Nope; clean up and get out. */
2026 
2027  MemoryContextSwitchTo(old_context);
2028  MemoryContextDelete(cmd_context);
2029 
2030  /* XXX this is a pretty random place to make this check */
2031  if (MyDatabaseId == InvalidOid)
2032  ereport(ERROR,
2033  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2034  errmsg("cannot execute SQL commands in WAL sender for physical replication")));
2035 
2036  /* Tell the caller that this wasn't a WalSender command. */
2037  return false;
2038  }
2039 
2040  /*
2041  * Looks like a WalSender command, so parse it.
2042  */
2043  parse_rc = replication_yyparse();
2044  if (parse_rc != 0)
2045  ereport(ERROR,
2046  (errcode(ERRCODE_SYNTAX_ERROR),
2047  errmsg_internal("replication command parser returned %d",
2048  parse_rc)));
2050 
2051  cmd_node = replication_parse_result;
2052 
2053  /*
2054  * Report query to various monitoring facilities. For this purpose, we
2055  * report replication commands just like SQL commands.
2056  */
2057  debug_query_string = cmd_string;
2058 
2060 
2061  /*
2062  * Log replication command if log_replication_commands is enabled. Even
2063  * when it's disabled, log the command with DEBUG1 level for backward
2064  * compatibility.
2065  */
2067  (errmsg("received replication command: %s", cmd_string)));
2068 
2069  /*
2070  * Disallow replication commands in aborted transaction blocks.
2071  */
2073  ereport(ERROR,
2074  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
2075  errmsg("current transaction is aborted, "
2076  "commands ignored until end of transaction block")));
2077 
2079 
2080  /*
2081  * Allocate buffers that will be used for each outgoing and incoming
2082  * message. We do this just once per command to reduce palloc overhead.
2083  */
2087 
2088  switch (cmd_node->type)
2089  {
2090  case T_IdentifySystemCmd:
2091  cmdtag = "IDENTIFY_SYSTEM";
2092  set_ps_display(cmdtag);
2093  IdentifySystem();
2094  EndReplicationCommand(cmdtag);
2095  break;
2096 
2097  case T_ReadReplicationSlotCmd:
2098  cmdtag = "READ_REPLICATION_SLOT";
2099  set_ps_display(cmdtag);
2101  EndReplicationCommand(cmdtag);
2102  break;
2103 
2104  case T_BaseBackupCmd:
2105  cmdtag = "BASE_BACKUP";
2106  set_ps_display(cmdtag);
2107  PreventInTransactionBlock(true, cmdtag);
2109  EndReplicationCommand(cmdtag);
2110  break;
2111 
2112  case T_CreateReplicationSlotCmd:
2113  cmdtag = "CREATE_REPLICATION_SLOT";
2114  set_ps_display(cmdtag);
2116  EndReplicationCommand(cmdtag);
2117  break;
2118 
2119  case T_DropReplicationSlotCmd:
2120  cmdtag = "DROP_REPLICATION_SLOT";
2121  set_ps_display(cmdtag);
2123  EndReplicationCommand(cmdtag);
2124  break;
2125 
2126  case T_AlterReplicationSlotCmd:
2127  cmdtag = "ALTER_REPLICATION_SLOT";
2128  set_ps_display(cmdtag);
2130  EndReplicationCommand(cmdtag);
2131  break;
2132 
2133  case T_StartReplicationCmd:
2134  {
2135  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
2136 
2137  cmdtag = "START_REPLICATION";
2138  set_ps_display(cmdtag);
2139  PreventInTransactionBlock(true, cmdtag);
2140 
2141  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
2142  StartReplication(cmd);
2143  else
2145 
2146  /* dupe, but necessary per libpqrcv_endstreaming */
2147  EndReplicationCommand(cmdtag);
2148 
2149  Assert(xlogreader != NULL);
2150  break;
2151  }
2152 
2153  case T_TimeLineHistoryCmd:
2154  cmdtag = "TIMELINE_HISTORY";
2155  set_ps_display(cmdtag);
2156  PreventInTransactionBlock(true, cmdtag);
2158  EndReplicationCommand(cmdtag);
2159  break;
2160 
2161  case T_VariableShowStmt:
2162  {
2164  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
2165 
2166  cmdtag = "SHOW";
2167  set_ps_display(cmdtag);
2168 
2169  /* syscache access needs a transaction environment */
2171  GetPGVariable(n->name, dest);
2173  EndReplicationCommand(cmdtag);
2174  }
2175  break;
2176 
2177  case T_UploadManifestCmd:
2178  cmdtag = "UPLOAD_MANIFEST";
2179  set_ps_display(cmdtag);
2180  PreventInTransactionBlock(true, cmdtag);
2181  UploadManifest();
2182  EndReplicationCommand(cmdtag);
2183  break;
2184 
2185  default:
2186  elog(ERROR, "unrecognized replication command node tag: %u",
2187  cmd_node->type);
2188  }
2189 
2190  /* done */
2191  MemoryContextSwitchTo(old_context);
2192  MemoryContextDelete(cmd_context);
2193 
2194  /*
2195  * We need not update ps display or pg_stat_activity, because PostgresMain
2196  * will reset those to "idle". But we must reset debug_query_string to
2197  * ensure it doesn't become a dangling pointer.
2198  */
2199  debug_query_string = NULL;
2200 
2201  return true;
2202 }
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void SendBaseBackup(BaseBackupCmd *cmd, IncrementalBackupInfo *ib)
Definition: basebackup.c:988
#define Assert(condition)
Definition: c.h:858
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:205
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
@ DestRemoteSimple
Definition: dest.h:91
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1155
int errcode(int sqlerrcode)
Definition: elog.c:855
int errmsg(const char *fmt,...)
Definition: elog.c:1068
#define LOG
Definition: elog.h:31
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:224
#define ereport(elevel,...)
Definition: elog.h:149
Oid MyDatabaseId
Definition: globals.c:92
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc_funcs.c:382
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
const char * debug_query_string
Definition: postgres.c:87
#define InvalidOid
Definition: postgres_ext.h:36
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
MemoryContextSwitchTo(old_ctx)
@ REPLICATION_KIND_PHYSICAL
Definition: replnodes.h:22
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:739
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
Definition: nodes.h:129
NodeTag type
Definition: nodes.h:130
ReplicationKind kind
Definition: replnodes.h:94
WalSndState state
static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
Definition: walsender.c:1438
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:587
WalSnd * MyWalSnd
Definition: walsender.c:112
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition: walsender.c:488
static StringInfoData tmpbuf
Definition: walsender.c:170
static void IdentifySystem(void)
Definition: walsender.c:407
static StringInfoData reply_message
Definition: walsender.c:169
void WalSndSetState(WalSndState state)
Definition: walsender.c:3808
static StringInfoData output_message
Definition: walsender.c:168
static void UploadManifest(void)
Definition: walsender.c:677
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:198
bool log_replication_commands
Definition: walsender.c:125
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1199
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1451
static IncrementalBackupInfo * uploaded_manifest
Definition: walsender.c:147
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1404
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:817
static XLogReaderState * xlogreader
Definition: walsender.c:137
PGDLLIMPORT Node * replication_parse_result
@ WALSNDSTATE_STOPPING
void replication_scanner_finish(void)
int replication_yyparse(void)
void replication_scanner_init(const char *str)
bool replication_scanner_is_replication_command(void)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3622
void StartTransactionCommand(void)
Definition: xact.c:3033
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:405
void CommitTransactionCommand(void)
Definition: xact.c:3131

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, AlterReplicationSlot(), Assert, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, MyWalSnd, VariableShowStmt::name, output_message, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_finish(), replication_scanner_init(), replication_scanner_is_replication_command(), replication_yyparse(), reply_message, SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, tmpbuf, Node::type, ReadReplicationSlotCmd::type, uploaded_manifest, UploadManifest(), WalSndSetState(), WALSNDSTATE_STOPPING, and xlogreader.

Referenced by PostgresMain().

◆ GetStandbyFlushRecPtr()

XLogRecPtr GetStandbyFlushRecPtr ( TimeLineID tli)

Definition at line 3503 of file walsender.c.

3504 {
3505  XLogRecPtr replayPtr;
3506  TimeLineID replayTLI;
3507  XLogRecPtr receivePtr;
3509  XLogRecPtr result;
3510 
3512 
3513  /*
3514  * We can safely send what's already been replayed. Also, if walreceiver
3515  * is streaming WAL from the same timeline, we can send anything that it
3516  * has streamed, but hasn't been replayed yet.
3517  */
3518 
3519  receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
3520  replayPtr = GetXLogReplayRecPtr(&replayTLI);
3521 
3522  if (tli)
3523  *tli = replayTLI;
3524 
3525  result = replayPtr;
3526  if (receiveTLI == replayTLI && receivePtr > replayPtr)
3527  result = receivePtr;
3528 
3529  return result;
3530 }
bool IsSyncingReplicationSlots(void)
Definition: slotsync.c:1651
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
bool am_cascading_walsender
Definition: walsender.c:116
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59
static TimeLineID receiveTLI
Definition: xlogrecovery.c:263
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References am_cascading_walsender, Assert, GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), IsSyncingReplicationSlots(), and receiveTLI.

Referenced by IdentifySystem(), StartReplication(), synchronize_one_slot(), XLogSendLogical(), and XLogSendPhysical().

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3559 of file walsender.c.

3560 {
3562 
3563  /*
3564  * If replication has not yet started, die like with SIGTERM. If
3565  * replication is active, only set a flag and wake up the main loop. It
3566  * will send any outstanding WAL, wait for it to be replicated to the
3567  * standby, and then exit gracefully.
3568  */
3569  if (!replication_active)
3570  kill(MyProcPid, SIGTERM);
3571  else
3572  got_STOPPING = true;
3573 }
int MyProcPid
Definition: globals.c:46
bool am_walsender
Definition: walsender.c:115
static volatile sig_atomic_t replication_active
Definition: walsender.c:206
#define kill(pid, sig)
Definition: win32_port.h:485

References am_walsender, Assert, got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

◆ InitWalSender()

void InitWalSender ( void  )

Referenced by PostgresMain().

◆ PhysicalWakeupLogicalWalSnd()

void PhysicalWakeupLogicalWalSnd ( void  )

Definition at line 1732 of file walsender.c.

1733 {
1735 
1736  /*
1737  * If we are running in a standby, there is no need to wake up walsenders.
1738  * This is because we do not support syncing slots to cascading standbys,
1739  * so, there are no walsenders waiting for standbys to catch up.
1740  */
1741  if (RecoveryInProgress())
1742  return;
1743 
1746 }
#define NameStr(name)
Definition: c.h:746
void ConditionVariableBroadcast(ConditionVariable *cv)
ReplicationSlot * MyReplicationSlot
Definition: slot.c:138
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition: slot.c:2547
#define SlotIsPhysical(slot)
Definition: slot.h:209
ReplicationSlotPersistentData data
Definition: slot.h:178
ConditionVariable wal_confirm_rcv_cv
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
bool RecoveryInProgress(void)
Definition: xlog.c:6291

References Assert, ConditionVariableBroadcast(), ReplicationSlot::data, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, RecoveryInProgress(), SlotExistsInSyncStandbySlots(), SlotIsPhysical, WalSndCtlData::wal_confirm_rcv_cv, and WalSndCtl.

Referenced by pg_physical_replication_slot_advance(), and PhysicalConfirmReceivedLocation().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 327 of file walsender.c.

328 {
332 
333  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
335 
336  if (MyReplicationSlot != NULL)
338 
339  ReplicationSlotCleanup(false);
340 
341  replication_active = false;
342 
343  /*
344  * If there is a transaction in progress, it will clean up our
345  * ResourceOwner, but if a replication command set up a resource owner
346  * without a transaction, we've got to clean that up now.
347  */
349  WalSndResourceCleanup(false);
350 
351  if (got_STOPPING || got_SIGUSR2)
352  proc_exit(0);
353 
354  /* Revert back to startup state */
356 }
bool ConditionVariableCancelSleep(void)
void proc_exit(int code)
Definition: ipc.c:104
void LWLockReleaseAll(void)
Definition: lwlock.c:1876
void ReplicationSlotRelease(void)
Definition: slot.c:652
void ReplicationSlotCleanup(bool synced_only)
Definition: slot.c:745
WALOpenSegment seg
Definition: xlogreader.h:272
static void pgstat_report_wait_end(void)
Definition: wait_event.h:101
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:197
void WalSndResourceCleanup(bool isCommit)
Definition: walsender.c:362
@ WALSNDSTATE_STARTUP
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4976
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:842

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndResourceCleanup(), WalSndSetState(), WALSNDSTATE_STARTUP, WALOpenSegment::ws_file, and xlogreader.

Referenced by PostgresMain().

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3744 of file walsender.c.

3745 {
3746  int i;
3747 
3748  for (i = 0; i < max_wal_senders; i++)
3749  {
3750  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3751  pid_t pid;
3752 
3753  SpinLockAcquire(&walsnd->mutex);
3754  pid = walsnd->pid;
3755  SpinLockRelease(&walsnd->mutex);
3756 
3757  if (pid == 0)
3758  continue;
3759 
3761  }
3762 }
int i
Definition: isn.c:73
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition: procsignal.c:257
@ PROCSIG_WALSND_INIT_STOPPING
Definition: procsignal.h:35
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
int max_wal_senders
Definition: walsender.c:121

References i, INVALID_PROC_NUMBER, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

◆ WalSndResourceCleanup()

void WalSndResourceCleanup ( bool  isCommit)

Definition at line 362 of file walsender.c.

363 {
364  ResourceOwner resowner;
365 
366  if (CurrentResourceOwner == NULL)
367  return;
368 
369  /*
370  * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
371  * in a local variable and clear it first.
372  */
373  resowner = CurrentResourceOwner;
374  CurrentResourceOwner = NULL;
375 
376  /* Now we can release resources and delete it. */
377  ResourceOwnerRelease(resowner,
378  RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
379  ResourceOwnerRelease(resowner,
380  RESOURCE_RELEASE_LOCKS, isCommit, true);
381  ResourceOwnerRelease(resowner,
382  RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
383  ResourceOwnerDelete(resowner);
384 }
ResourceOwner CurrentResourceOwner
Definition: resowner.c:165
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
Definition: resowner.c:648
void ResourceOwnerDelete(ResourceOwner owner)
Definition: resowner.c:854
@ RESOURCE_RELEASE_LOCKS
Definition: resowner.h:55
@ RESOURCE_RELEASE_BEFORE_LOCKS
Definition: resowner.h:54
@ RESOURCE_RELEASE_AFTER_LOCKS
Definition: resowner.h:56

References CurrentResourceOwner, RESOURCE_RELEASE_AFTER_LOCKS, RESOURCE_RELEASE_BEFORE_LOCKS, RESOURCE_RELEASE_LOCKS, ResourceOwnerDelete(), and ResourceOwnerRelease().

Referenced by perform_base_backup(), UploadManifest(), and WalSndErrorCleanup().

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3536 of file walsender.c.

3537 {
3538  int i;
3539 
3540  for (i = 0; i < max_wal_senders; i++)
3541  {
3542  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3543 
3544  SpinLockAcquire(&walsnd->mutex);
3545  if (walsnd->pid == 0)
3546  {
3547  SpinLockRelease(&walsnd->mutex);
3548  continue;
3549  }
3550  walsnd->needreload = true;
3551  SpinLockRelease(&walsnd->mutex);
3552  }
3553 }
bool needreload

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3620 of file walsender.c.

3621 {
3622  bool found;
3623  int i;
3624 
3625  WalSndCtl = (WalSndCtlData *)
3626  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3627 
3628  if (!found)
3629  {
3630  /* First time through, so initialize */
3632 
3633  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3635 
3636  for (i = 0; i < max_wal_senders; i++)
3637  {
3638  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3639 
3640  SpinLockInit(&walsnd->mutex);
3641  }
3642 
3646  }
3647 }
#define MemSet(start, val, len)
Definition: c.h:1020
void ConditionVariableInit(ConditionVariable *cv)
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
#define SpinLockInit(lock)
Definition: spin.h:60
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:27
Size WalSndShmemSize(void)
Definition: walsender.c:3608

References ConditionVariableInit(), dlist_init(), i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateOrAttachShmemStructs().

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3608 of file walsender.c.

3609 {
3610  Size size = 0;
3611 
3612  size = offsetof(WalSndCtlData, walsnds);
3614 
3615  return size;
3616 }
size_t Size
Definition: c.h:605
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510
static pg_noinline void Size size
Definition: slab.c:607

References add_size(), max_wal_senders, mul_size(), and size.

Referenced by CalculateShmemSize(), and WalSndShmemInit().

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3589 of file walsender.c.

3590 {
3591  /* Set up signal handlers */
3593  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3594  pqsignal(SIGTERM, die); /* request shutdown */
3595  /* SIGQUIT handler was already set up by InitPostmasterChild */
3596  InitializeTimeouts(); /* establishes SIGALRM handler */
3599  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3600  * shutdown */
3601 
3602  /* Reset some signals that are accepted by postmaster but not here */
3604 }
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
#define die(msg)
pqsigfunc pqsignal(int signo, pqsigfunc func)
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:3002
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:635
void InitializeTimeouts(void)
Definition: timeout.c:470
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3581
#define SIGCHLD
Definition: win32_port.h:178
#define SIGHUP
Definition: win32_port.h:168
#define SIG_DFL
Definition: win32_port.h:163
#define SIGPIPE
Definition: win32_port.h:173
#define SIGUSR1
Definition: win32_port.h:180
#define SIGUSR2
Definition: win32_port.h:181
#define SIG_IGN
Definition: win32_port.h:165

References die, InitializeTimeouts(), pqsignal(), procsignal_sigusr1_handler(), SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3770 of file walsender.c.

3771 {
3772  for (;;)
3773  {
3774  int i;
3775  bool all_stopped = true;
3776 
3777  for (i = 0; i < max_wal_senders; i++)
3778  {
3779  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3780 
3781  SpinLockAcquire(&walsnd->mutex);
3782 
3783  if (walsnd->pid == 0)
3784  {
3785  SpinLockRelease(&walsnd->mutex);
3786  continue;
3787  }
3788 
3789  if (walsnd->state != WALSNDSTATE_STOPPING)
3790  {
3791  all_stopped = false;
3792  SpinLockRelease(&walsnd->mutex);
3793  break;
3794  }
3795  SpinLockRelease(&walsnd->mutex);
3796  }
3797 
3798  /* safe to leave if confirmation is done for all WAL senders */
3799  if (all_stopped)
3800  return;
3801 
3802  pg_usleep(10000L); /* wait for 10 msec */
3803  }
3804 }
void pg_usleep(long microsec)
Definition: signal.c:53

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtl, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

◆ WalSndWakeup()

void WalSndWakeup ( bool  physical,
bool  logical 
)

Definition at line 3665 of file walsender.c.

3666 {
3667  /*
3668  * Wake up all the walsenders waiting on WAL being flushed or replayed
3669  * respectively. Note that waiting walsender would have prepared to sleep
3670  * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
3671  * before actually waiting.
3672  */
3673  if (physical)
3675 
3676  if (logical)
3678 }

References ConditionVariableBroadcast(), WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, and WalSndCtl.

Referenced by ApplyWalRecord(), KeepFileRestoredFromArchive(), StartupXLOG(), WalSndWakeupProcessRequests(), and XLogWalRcvFlush().

◆ WalSndWakeupProcessRequests()

static void WalSndWakeupProcessRequests ( bool  physical,
bool  logical 
)
inlinestatic

Definition at line 66 of file walsender.h.

67 {
68  if (wake_wal_senders)
69  {
70  wake_wal_senders = false;
71  if (max_wal_senders > 0)
72  WalSndWakeup(physical, logical);
73  }
74 }
PGDLLIMPORT bool wake_wal_senders
Definition: walsender.c:130
void WalSndWakeup(bool physical, bool logical)
Definition: walsender.c:3665
PGDLLIMPORT int max_wal_senders
Definition: walsender.c:121

References max_wal_senders, wake_wal_senders, and WalSndWakeup().

Referenced by XLogBackgroundFlush(), and XLogFlush().

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

PGDLLIMPORT bool am_db_walsender
extern

Definition at line 118 of file walsender.c.

Referenced by check_db(), ClientAuthentication(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ log_replication_commands

PGDLLIMPORT bool log_replication_commands
extern

◆ max_wal_senders

◆ wake_wal_senders

PGDLLIMPORT bool wake_wal_senders
extern

Definition at line 130 of file walsender.c.

Referenced by WalSndWakeupProcessRequests().

◆ wal_sender_timeout