PostgreSQL Source Code  git master
walsender.h File Reference
#include "access/xlogdefs.h"
Include dependency graph for walsender.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define WalSndWakeupRequest()    do { wake_wal_senders = true; } while (0)
 

Enumerations

enum  CRSSnapshotAction { CRS_EXPORT_SNAPSHOT , CRS_NOEXPORT_SNAPSHOT , CRS_USE_SNAPSHOT }
 

Functions

void InitWalSender (void)
 
bool exec_replication_command (const char *cmd_string)
 
void WalSndErrorCleanup (void)
 
void WalSndResourceCleanup (bool isCommit)
 
void PhysicalWakeupLogicalWalSnd (void)
 
XLogRecPtr GetStandbyFlushRecPtr (TimeLineID *tli)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (bool physical, bool logical)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndRqstFileReload (void)
 
static void WalSndWakeupProcessRequests (bool physical, bool logical)
 

Variables

PGDLLIMPORT bool am_walsender
 
PGDLLIMPORT bool am_cascading_walsender
 
PGDLLIMPORT bool am_db_walsender
 
PGDLLIMPORT bool wake_wal_senders
 
PGDLLIMPORT int max_wal_senders
 
PGDLLIMPORT int wal_sender_timeout
 
PGDLLIMPORT bool log_replication_commands
 

Macro Definition Documentation

◆ WalSndWakeupRequest

#define WalSndWakeupRequest ( )     do { wake_wal_senders = true; } while (0)

Definition at line 59 of file walsender.h.

Enumeration Type Documentation

◆ CRSSnapshotAction

Enumerator
CRS_EXPORT_SNAPSHOT 
CRS_NOEXPORT_SNAPSHOT 
CRS_USE_SNAPSHOT 

Definition at line 20 of file walsender.h.

21 {
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22

Function Documentation

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1977 of file walsender.c.

1978 {
1979  int parse_rc;
1980  Node *cmd_node;
1981  const char *cmdtag;
1982  MemoryContext cmd_context;
1983  MemoryContext old_context;
1984 
1985  /*
1986  * If WAL sender has been told that shutdown is getting close, switch its
1987  * status accordingly to handle the next replication commands correctly.
1988  */
1989  if (got_STOPPING)
1991 
1992  /*
1993  * Throw error if in stopping mode. We need prevent commands that could
1994  * generate WAL while the shutdown checkpoint is being written. To be
1995  * safe, we just prohibit all new commands.
1996  */
1998  ereport(ERROR,
1999  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2000  errmsg("cannot execute new commands while WAL sender is in stopping mode")));
2001 
2002  /*
2003  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
2004  * command arrives. Clean up the old stuff if there's anything.
2005  */
2007 
2009 
2010  /*
2011  * Prepare to parse and execute the command.
2012  */
2014  "Replication command context",
2016  old_context = MemoryContextSwitchTo(cmd_context);
2017 
2018  replication_scanner_init(cmd_string);
2019 
2020  /*
2021  * Is it a WalSender command?
2022  */
2024  {
2025  /* Nope; clean up and get out. */
2027 
2028  MemoryContextSwitchTo(old_context);
2029  MemoryContextDelete(cmd_context);
2030 
2031  /* XXX this is a pretty random place to make this check */
2032  if (MyDatabaseId == InvalidOid)
2033  ereport(ERROR,
2034  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2035  errmsg("cannot execute SQL commands in WAL sender for physical replication")));
2036 
2037  /* Tell the caller that this wasn't a WalSender command. */
2038  return false;
2039  }
2040 
2041  /*
2042  * Looks like a WalSender command, so parse it.
2043  */
2044  parse_rc = replication_yyparse();
2045  if (parse_rc != 0)
2046  ereport(ERROR,
2047  (errcode(ERRCODE_SYNTAX_ERROR),
2048  errmsg_internal("replication command parser returned %d",
2049  parse_rc)));
2051 
2052  cmd_node = replication_parse_result;
2053 
2054  /*
2055  * Report query to various monitoring facilities. For this purpose, we
2056  * report replication commands just like SQL commands.
2057  */
2058  debug_query_string = cmd_string;
2059 
2061 
2062  /*
2063  * Log replication command if log_replication_commands is enabled. Even
2064  * when it's disabled, log the command with DEBUG1 level for backward
2065  * compatibility.
2066  */
2068  (errmsg("received replication command: %s", cmd_string)));
2069 
2070  /*
2071  * Disallow replication commands in aborted transaction blocks.
2072  */
2074  ereport(ERROR,
2075  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
2076  errmsg("current transaction is aborted, "
2077  "commands ignored until end of transaction block")));
2078 
2080 
2081  /*
2082  * Allocate buffers that will be used for each outgoing and incoming
2083  * message. We do this just once per command to reduce palloc overhead.
2084  */
2088 
2089  switch (cmd_node->type)
2090  {
2091  case T_IdentifySystemCmd:
2092  cmdtag = "IDENTIFY_SYSTEM";
2093  set_ps_display(cmdtag);
2094  IdentifySystem();
2095  EndReplicationCommand(cmdtag);
2096  break;
2097 
2098  case T_ReadReplicationSlotCmd:
2099  cmdtag = "READ_REPLICATION_SLOT";
2100  set_ps_display(cmdtag);
2102  EndReplicationCommand(cmdtag);
2103  break;
2104 
2105  case T_BaseBackupCmd:
2106  cmdtag = "BASE_BACKUP";
2107  set_ps_display(cmdtag);
2108  PreventInTransactionBlock(true, cmdtag);
2110  EndReplicationCommand(cmdtag);
2111  break;
2112 
2113  case T_CreateReplicationSlotCmd:
2114  cmdtag = "CREATE_REPLICATION_SLOT";
2115  set_ps_display(cmdtag);
2117  EndReplicationCommand(cmdtag);
2118  break;
2119 
2120  case T_DropReplicationSlotCmd:
2121  cmdtag = "DROP_REPLICATION_SLOT";
2122  set_ps_display(cmdtag);
2124  EndReplicationCommand(cmdtag);
2125  break;
2126 
2127  case T_AlterReplicationSlotCmd:
2128  cmdtag = "ALTER_REPLICATION_SLOT";
2129  set_ps_display(cmdtag);
2131  EndReplicationCommand(cmdtag);
2132  break;
2133 
2134  case T_StartReplicationCmd:
2135  {
2136  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
2137 
2138  cmdtag = "START_REPLICATION";
2139  set_ps_display(cmdtag);
2140  PreventInTransactionBlock(true, cmdtag);
2141 
2142  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
2143  StartReplication(cmd);
2144  else
2146 
2147  /* dupe, but necessary per libpqrcv_endstreaming */
2148  EndReplicationCommand(cmdtag);
2149 
2150  Assert(xlogreader != NULL);
2151  break;
2152  }
2153 
2154  case T_TimeLineHistoryCmd:
2155  cmdtag = "TIMELINE_HISTORY";
2156  set_ps_display(cmdtag);
2157  PreventInTransactionBlock(true, cmdtag);
2159  EndReplicationCommand(cmdtag);
2160  break;
2161 
2162  case T_VariableShowStmt:
2163  {
2165  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
2166 
2167  cmdtag = "SHOW";
2168  set_ps_display(cmdtag);
2169 
2170  /* syscache access needs a transaction environment */
2172  GetPGVariable(n->name, dest);
2174  EndReplicationCommand(cmdtag);
2175  }
2176  break;
2177 
2178  case T_UploadManifestCmd:
2179  cmdtag = "UPLOAD_MANIFEST";
2180  set_ps_display(cmdtag);
2181  PreventInTransactionBlock(true, cmdtag);
2182  UploadManifest();
2183  EndReplicationCommand(cmdtag);
2184  break;
2185 
2186  default:
2187  elog(ERROR, "unrecognized replication command node tag: %u",
2188  cmd_node->type);
2189  }
2190 
2191  /* done */
2192  MemoryContextSwitchTo(old_context);
2193  MemoryContextDelete(cmd_context);
2194 
2195  /*
2196  * We need not update ps display or pg_stat_activity, because PostgresMain
2197  * will reset those to "idle". But we must reset debug_query_string to
2198  * ensure it doesn't become a dangling pointer.
2199  */
2200  debug_query_string = NULL;
2201 
2202  return true;
2203 }
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void SendBaseBackup(BaseBackupCmd *cmd, IncrementalBackupInfo *ib)
Definition: basebackup.c:988
#define Assert(condition)
Definition: c.h:858
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:205
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
@ DestRemoteSimple
Definition: dest.h:91
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1159
int errcode(int sqlerrcode)
Definition: elog.c:859
int errmsg(const char *fmt,...)
Definition: elog.c:1072
#define LOG
Definition: elog.h:31
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:224
#define ereport(elevel,...)
Definition: elog.h:149
Oid MyDatabaseId
Definition: globals.c:91
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc_funcs.c:382
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
const char * debug_query_string
Definition: postgres.c:87
#define InvalidOid
Definition: postgres_ext.h:36
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
MemoryContextSwitchTo(old_ctx)
@ REPLICATION_KIND_PHYSICAL
Definition: replnodes.h:22
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:729
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
Definition: nodes.h:129
NodeTag type
Definition: nodes.h:130
ReplicationKind kind
Definition: replnodes.h:94
WalSndState state
static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
Definition: walsender.c:1439
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:589
WalSnd * MyWalSnd
Definition: walsender.c:112
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition: walsender.c:490
static StringInfoData tmpbuf
Definition: walsender.c:170
static void IdentifySystem(void)
Definition: walsender.c:407
static StringInfoData reply_message
Definition: walsender.c:169
void WalSndSetState(WalSndState state)
Definition: walsender.c:3809
static StringInfoData output_message
Definition: walsender.c:168
static void UploadManifest(void)
Definition: walsender.c:679
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:198
bool log_replication_commands
Definition: walsender.c:125
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1200
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1452
static IncrementalBackupInfo * uploaded_manifest
Definition: walsender.c:147
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1405
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:819
static XLogReaderState * xlogreader
Definition: walsender.c:137
PGDLLIMPORT Node * replication_parse_result
@ WALSNDSTATE_STOPPING
void replication_scanner_finish(void)
int replication_yyparse(void)
void replication_scanner_init(const char *str)
bool replication_scanner_is_replication_command(void)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3584
void StartTransactionCommand(void)
Definition: xact.c:2995
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:404
void CommitTransactionCommand(void)
Definition: xact.c:3093

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, AlterReplicationSlot(), Assert, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, MyWalSnd, VariableShowStmt::name, output_message, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_finish(), replication_scanner_init(), replication_scanner_is_replication_command(), replication_yyparse(), reply_message, SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, tmpbuf, Node::type, ReadReplicationSlotCmd::type, uploaded_manifest, UploadManifest(), WalSndSetState(), WALSNDSTATE_STOPPING, and xlogreader.

Referenced by PostgresMain().

◆ GetStandbyFlushRecPtr()

XLogRecPtr GetStandbyFlushRecPtr ( TimeLineID tli)

Definition at line 3504 of file walsender.c.

3505 {
3506  XLogRecPtr replayPtr;
3507  TimeLineID replayTLI;
3508  XLogRecPtr receivePtr;
3510  XLogRecPtr result;
3511 
3513 
3514  /*
3515  * We can safely send what's already been replayed. Also, if walreceiver
3516  * is streaming WAL from the same timeline, we can send anything that it
3517  * has streamed, but hasn't been replayed yet.
3518  */
3519 
3520  receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
3521  replayPtr = GetXLogReplayRecPtr(&replayTLI);
3522 
3523  if (tli)
3524  *tli = replayTLI;
3525 
3526  result = replayPtr;
3527  if (receiveTLI == replayTLI && receivePtr > replayPtr)
3528  result = receivePtr;
3529 
3530  return result;
3531 }
bool IsSyncingReplicationSlots(void)
Definition: slotsync.c:1650
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
bool am_cascading_walsender
Definition: walsender.c:116
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59
static TimeLineID receiveTLI
Definition: xlogrecovery.c:263
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References am_cascading_walsender, Assert, GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), IsSyncingReplicationSlots(), and receiveTLI.

Referenced by IdentifySystem(), StartReplication(), synchronize_one_slot(), XLogSendLogical(), and XLogSendPhysical().

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3560 of file walsender.c.

3561 {
3563 
3564  /*
3565  * If replication has not yet started, die like with SIGTERM. If
3566  * replication is active, only set a flag and wake up the main loop. It
3567  * will send any outstanding WAL, wait for it to be replicated to the
3568  * standby, and then exit gracefully.
3569  */
3570  if (!replication_active)
3571  kill(MyProcPid, SIGTERM);
3572  else
3573  got_STOPPING = true;
3574 }
int MyProcPid
Definition: globals.c:45
bool am_walsender
Definition: walsender.c:115
static volatile sig_atomic_t replication_active
Definition: walsender.c:206
#define kill(pid, sig)
Definition: win32_port.h:485

References am_walsender, Assert, got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

◆ InitWalSender()

void InitWalSender ( void  )

Referenced by PostgresMain().

◆ PhysicalWakeupLogicalWalSnd()

void PhysicalWakeupLogicalWalSnd ( void  )

Definition at line 1733 of file walsender.c.

1734 {
1736 
1737  /*
1738  * If we are running in a standby, there is no need to wake up walsenders.
1739  * This is because we do not support syncing slots to cascading standbys,
1740  * so, there are no walsenders waiting for standbys to catch up.
1741  */
1742  if (RecoveryInProgress())
1743  return;
1744 
1747 }
#define NameStr(name)
Definition: c.h:746
void ConditionVariableBroadcast(ConditionVariable *cv)
bool SlotExistsInStandbySlotNames(const char *slot_name)
Definition: slot.c:2549
ReplicationSlot * MyReplicationSlot
Definition: slot.c:138
#define SlotIsPhysical(slot)
Definition: slot.h:209
ReplicationSlotPersistentData data
Definition: slot.h:178
ConditionVariable wal_confirm_rcv_cv
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
bool RecoveryInProgress(void)
Definition: xlog.c:6290

References Assert, ConditionVariableBroadcast(), ReplicationSlot::data, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, RecoveryInProgress(), SlotExistsInStandbySlotNames(), SlotIsPhysical, WalSndCtlData::wal_confirm_rcv_cv, and WalSndCtl.

Referenced by pg_physical_replication_slot_advance(), and PhysicalConfirmReceivedLocation().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 327 of file walsender.c.

328 {
332 
333  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
335 
336  if (MyReplicationSlot != NULL)
338 
339  ReplicationSlotCleanup(false);
340 
341  replication_active = false;
342 
343  /*
344  * If there is a transaction in progress, it will clean up our
345  * ResourceOwner, but if a replication command set up a resource owner
346  * without a transaction, we've got to clean that up now.
347  */
349  WalSndResourceCleanup(false);
350 
351  if (got_STOPPING || got_SIGUSR2)
352  proc_exit(0);
353 
354  /* Revert back to startup state */
356 }
bool ConditionVariableCancelSleep(void)
void proc_exit(int code)
Definition: ipc.c:104
void LWLockReleaseAll(void)
Definition: lwlock.c:1878
void ReplicationSlotRelease(void)
Definition: slot.c:652
void ReplicationSlotCleanup(bool synced_only)
Definition: slot.c:745
WALOpenSegment seg
Definition: xlogreader.h:272
static void pgstat_report_wait_end(void)
Definition: wait_event.h:104
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:197
void WalSndResourceCleanup(bool isCommit)
Definition: walsender.c:362
@ WALSNDSTATE_STARTUP
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4933
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:842

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndResourceCleanup(), WalSndSetState(), WALSNDSTATE_STARTUP, WALOpenSegment::ws_file, and xlogreader.

Referenced by PostgresMain().

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3745 of file walsender.c.

3746 {
3747  int i;
3748 
3749  for (i = 0; i < max_wal_senders; i++)
3750  {
3751  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3752  pid_t pid;
3753 
3754  SpinLockAcquire(&walsnd->mutex);
3755  pid = walsnd->pid;
3756  SpinLockRelease(&walsnd->mutex);
3757 
3758  if (pid == 0)
3759  continue;
3760 
3762  }
3763 }
int i
Definition: isn.c:73
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition: procsignal.c:257
@ PROCSIG_WALSND_INIT_STOPPING
Definition: procsignal.h:35
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
int max_wal_senders
Definition: walsender.c:121

References i, INVALID_PROC_NUMBER, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

◆ WalSndResourceCleanup()

void WalSndResourceCleanup ( bool  isCommit)

Definition at line 362 of file walsender.c.

363 {
364  ResourceOwner resowner;
365 
366  if (CurrentResourceOwner == NULL)
367  return;
368 
369  /*
370  * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
371  * in a local variable and clear it first.
372  */
373  resowner = CurrentResourceOwner;
374  CurrentResourceOwner = NULL;
375 
376  /* Now we can release resources and delete it. */
377  ResourceOwnerRelease(resowner,
378  RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
379  ResourceOwnerRelease(resowner,
380  RESOURCE_RELEASE_LOCKS, isCommit, true);
381  ResourceOwnerRelease(resowner,
382  RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
383  ResourceOwnerDelete(resowner);
384 }
ResourceOwner CurrentResourceOwner
Definition: resowner.c:165
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
Definition: resowner.c:648
void ResourceOwnerDelete(ResourceOwner owner)
Definition: resowner.c:854
@ RESOURCE_RELEASE_LOCKS
Definition: resowner.h:55
@ RESOURCE_RELEASE_BEFORE_LOCKS
Definition: resowner.h:54
@ RESOURCE_RELEASE_AFTER_LOCKS
Definition: resowner.h:56

References CurrentResourceOwner, RESOURCE_RELEASE_AFTER_LOCKS, RESOURCE_RELEASE_BEFORE_LOCKS, RESOURCE_RELEASE_LOCKS, ResourceOwnerDelete(), and ResourceOwnerRelease().

Referenced by perform_base_backup(), UploadManifest(), and WalSndErrorCleanup().

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3537 of file walsender.c.

3538 {
3539  int i;
3540 
3541  for (i = 0; i < max_wal_senders; i++)
3542  {
3543  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3544 
3545  SpinLockAcquire(&walsnd->mutex);
3546  if (walsnd->pid == 0)
3547  {
3548  SpinLockRelease(&walsnd->mutex);
3549  continue;
3550  }
3551  walsnd->needreload = true;
3552  SpinLockRelease(&walsnd->mutex);
3553  }
3554 }
bool needreload

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3621 of file walsender.c.

3622 {
3623  bool found;
3624  int i;
3625 
3626  WalSndCtl = (WalSndCtlData *)
3627  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3628 
3629  if (!found)
3630  {
3631  /* First time through, so initialize */
3633 
3634  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3636 
3637  for (i = 0; i < max_wal_senders; i++)
3638  {
3639  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3640 
3641  SpinLockInit(&walsnd->mutex);
3642  }
3643 
3647  }
3648 }
#define MemSet(start, val, len)
Definition: c.h:1020
void ConditionVariableInit(ConditionVariable *cv)
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
#define SpinLockInit(lock)
Definition: spin.h:60
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:27
Size WalSndShmemSize(void)
Definition: walsender.c:3609

References ConditionVariableInit(), dlist_init(), i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateOrAttachShmemStructs().

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3609 of file walsender.c.

3610 {
3611  Size size = 0;
3612 
3613  size = offsetof(WalSndCtlData, walsnds);
3615 
3616  return size;
3617 }
size_t Size
Definition: c.h:605
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510
static pg_noinline void Size size
Definition: slab.c:607

References add_size(), max_wal_senders, mul_size(), and size.

Referenced by CalculateShmemSize(), and WalSndShmemInit().

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3590 of file walsender.c.

3591 {
3592  /* Set up signal handlers */
3594  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3595  pqsignal(SIGTERM, die); /* request shutdown */
3596  /* SIGQUIT handler was already set up by InitPostmasterChild */
3597  InitializeTimeouts(); /* establishes SIGALRM handler */
3600  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3601  * shutdown */
3602 
3603  /* Reset some signals that are accepted by postmaster but not here */
3605 }
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
#define die(msg)
pqsigfunc pqsignal(int signo, pqsigfunc func)
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:3002
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:635
void InitializeTimeouts(void)
Definition: timeout.c:470
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3582
#define SIGCHLD
Definition: win32_port.h:178
#define SIGHUP
Definition: win32_port.h:168
#define SIG_DFL
Definition: win32_port.h:163
#define SIGPIPE
Definition: win32_port.h:173
#define SIGUSR1
Definition: win32_port.h:180
#define SIGUSR2
Definition: win32_port.h:181
#define SIG_IGN
Definition: win32_port.h:165

References die, InitializeTimeouts(), pqsignal(), procsignal_sigusr1_handler(), SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3771 of file walsender.c.

3772 {
3773  for (;;)
3774  {
3775  int i;
3776  bool all_stopped = true;
3777 
3778  for (i = 0; i < max_wal_senders; i++)
3779  {
3780  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3781 
3782  SpinLockAcquire(&walsnd->mutex);
3783 
3784  if (walsnd->pid == 0)
3785  {
3786  SpinLockRelease(&walsnd->mutex);
3787  continue;
3788  }
3789 
3790  if (walsnd->state != WALSNDSTATE_STOPPING)
3791  {
3792  all_stopped = false;
3793  SpinLockRelease(&walsnd->mutex);
3794  break;
3795  }
3796  SpinLockRelease(&walsnd->mutex);
3797  }
3798 
3799  /* safe to leave if confirmation is done for all WAL senders */
3800  if (all_stopped)
3801  return;
3802 
3803  pg_usleep(10000L); /* wait for 10 msec */
3804  }
3805 }
void pg_usleep(long microsec)
Definition: signal.c:53

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtl, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

◆ WalSndWakeup()

void WalSndWakeup ( bool  physical,
bool  logical 
)

Definition at line 3666 of file walsender.c.

3667 {
3668  /*
3669  * Wake up all the walsenders waiting on WAL being flushed or replayed
3670  * respectively. Note that waiting walsender would have prepared to sleep
3671  * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
3672  * before actually waiting.
3673  */
3674  if (physical)
3676 
3677  if (logical)
3679 }

References ConditionVariableBroadcast(), WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, and WalSndCtl.

Referenced by ApplyWalRecord(), KeepFileRestoredFromArchive(), StartupXLOG(), WalSndWakeupProcessRequests(), and XLogWalRcvFlush().

◆ WalSndWakeupProcessRequests()

static void WalSndWakeupProcessRequests ( bool  physical,
bool  logical 
)
inlinestatic

Definition at line 66 of file walsender.h.

67 {
68  if (wake_wal_senders)
69  {
70  wake_wal_senders = false;
71  if (max_wal_senders > 0)
72  WalSndWakeup(physical, logical);
73  }
74 }
PGDLLIMPORT bool wake_wal_senders
Definition: walsender.c:130
void WalSndWakeup(bool physical, bool logical)
Definition: walsender.c:3666
PGDLLIMPORT int max_wal_senders
Definition: walsender.c:121

References max_wal_senders, wake_wal_senders, and WalSndWakeup().

Referenced by XLogBackgroundFlush(), and XLogFlush().

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

PGDLLIMPORT bool am_db_walsender
extern

Definition at line 118 of file walsender.c.

Referenced by check_db(), ClientAuthentication(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ log_replication_commands

PGDLLIMPORT bool log_replication_commands
extern

◆ max_wal_senders

◆ wake_wal_senders

PGDLLIMPORT bool wake_wal_senders
extern

Definition at line 130 of file walsender.c.

Referenced by WalSndWakeupProcessRequests().

◆ wal_sender_timeout