PostgreSQL Source Code  git master
walsender.h File Reference
#include <signal.h>
Include dependency graph for walsender.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define WalSndWakeupRequest()   do { wake_wal_senders = true; } while (0)
 
#define WalSndWakeupProcessRequests()
 

Enumerations

enum  CRSSnapshotAction { CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT }
 

Functions

void InitWalSender (void)
 
bool exec_replication_command (const char *query_string)
 
void WalSndErrorCleanup (void)
 
void WalSndResourceCleanup (bool isCommit)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndRqstFileReload (void)
 

Variables

bool am_walsender
 
bool am_cascading_walsender
 
bool am_db_walsender
 
bool wake_wal_senders
 
int max_wal_senders
 
int wal_sender_timeout
 
bool log_replication_commands
 

Macro Definition Documentation

◆ WalSndWakeupProcessRequests

#define WalSndWakeupProcessRequests ( )
Value:
do \
{ \
{ \
wake_wal_senders = false; \
if (max_wal_senders > 0) \
WalSndWakeup(); \
} \
} while (0)
int max_wal_senders
Definition: walsender.c:121
bool wake_wal_senders
Definition: walsender.c:130

Definition at line 63 of file walsender.h.

Referenced by XLogBackgroundFlush(), and XLogFlush().

◆ WalSndWakeupRequest

#define WalSndWakeupRequest ( )    do { wake_wal_senders = true; } while (0)

Definition at line 57 of file walsender.h.

Referenced by XLogWrite().

Enumeration Type Documentation

◆ CRSSnapshotAction

Enumerator
CRS_EXPORT_SNAPSHOT 
CRS_NOEXPORT_SNAPSHOT 
CRS_USE_SNAPSHOT 

Definition at line 20 of file walsender.h.

Function Documentation

◆ exec_replication_command()

bool exec_replication_command ( const char *  query_string)

Definition at line 1512 of file walsender.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsA, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, VariableShowStmt::name, pgstat_report_activity(), PreventInTransactionBlock(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_finish(), replication_scanner_init(), replication_yyparse(), SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, T_IdentifySystemCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_VariableShowStmt, TimeLineHistoryCmd::type, Node::type, WalSndSetState(), and WALSNDSTATE_STOPPING.

Referenced by PostgresMain().

1513 {
1514  int parse_rc;
1515  Node *cmd_node;
1516  const char *cmdtag;
1517  MemoryContext cmd_context;
1518  MemoryContext old_context;
1519 
1520  /*
1521  * If WAL sender has been told that shutdown is getting close, switch its
1522  * status accordingly to handle the next replication commands correctly.
1523  */
1524  if (got_STOPPING)
1526 
1527  /*
1528  * Throw error if in stopping mode. We need prevent commands that could
1529  * generate WAL while the shutdown checkpoint is being written. To be
1530  * safe, we just prohibit all new commands.
1531  */
1533  ereport(ERROR,
1534  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1535 
1536  /*
1537  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1538  * command arrives. Clean up the old stuff if there's anything.
1539  */
1541 
1543 
1544  /*
1545  * Parse the command.
1546  */
1548  "Replication command context",
1550  old_context = MemoryContextSwitchTo(cmd_context);
1551 
1552  replication_scanner_init(cmd_string);
1553  parse_rc = replication_yyparse();
1554  if (parse_rc != 0)
1555  ereport(ERROR,
1556  (errcode(ERRCODE_SYNTAX_ERROR),
1557  errmsg_internal("replication command parser returned %d",
1558  parse_rc)));
1560 
1561  cmd_node = replication_parse_result;
1562 
1563  /*
1564  * If it's a SQL command, just clean up our mess and return false; the
1565  * caller will take care of executing it.
1566  */
1567  if (IsA(cmd_node, SQLCmd))
1568  {
1569  if (MyDatabaseId == InvalidOid)
1570  ereport(ERROR,
1571  (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1572 
1573  MemoryContextSwitchTo(old_context);
1574  MemoryContextDelete(cmd_context);
1575 
1576  /* Tell the caller that this wasn't a WalSender command. */
1577  return false;
1578  }
1579 
1580  /*
1581  * Report query to various monitoring facilities. For this purpose, we
1582  * report replication commands just like SQL commands.
1583  */
1584  debug_query_string = cmd_string;
1585 
1587 
1588  /*
1589  * Log replication command if log_replication_commands is enabled. Even
1590  * when it's disabled, log the command with DEBUG1 level for backward
1591  * compatibility.
1592  */
1594  (errmsg("received replication command: %s", cmd_string)));
1595 
1596  /*
1597  * Disallow replication commands in aborted transaction blocks.
1598  */
1600  ereport(ERROR,
1601  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1602  errmsg("current transaction is aborted, "
1603  "commands ignored until end of transaction block")));
1604 
1606 
1607  /*
1608  * Allocate buffers that will be used for each outgoing and incoming
1609  * message. We do this just once per command to reduce palloc overhead.
1610  */
1614 
1615  switch (cmd_node->type)
1616  {
1617  case T_IdentifySystemCmd:
1618  cmdtag = "IDENTIFY_SYSTEM";
1619  set_ps_display(cmdtag);
1620  IdentifySystem();
1621  EndReplicationCommand(cmdtag);
1622  break;
1623 
1624  case T_BaseBackupCmd:
1625  cmdtag = "BASE_BACKUP";
1626  set_ps_display(cmdtag);
1627  PreventInTransactionBlock(true, cmdtag);
1628  SendBaseBackup((BaseBackupCmd *) cmd_node);
1629  EndReplicationCommand(cmdtag);
1630  break;
1631 
1633  cmdtag = "CREATE_REPLICATION_SLOT";
1634  set_ps_display(cmdtag);
1636  EndReplicationCommand(cmdtag);
1637  break;
1638 
1640  cmdtag = "DROP_REPLICATION_SLOT";
1641  set_ps_display(cmdtag);
1643  EndReplicationCommand(cmdtag);
1644  break;
1645 
1646  case T_StartReplicationCmd:
1647  {
1648  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1649 
1650  cmdtag = "START_REPLICATION";
1651  set_ps_display(cmdtag);
1652  PreventInTransactionBlock(true, cmdtag);
1653 
1654  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1655  StartReplication(cmd);
1656  else
1658 
1659  /* dupe, but necessary per libpqrcv_endstreaming */
1660  EndReplicationCommand(cmdtag);
1661 
1662  Assert(xlogreader != NULL);
1663  break;
1664  }
1665 
1666  case T_TimeLineHistoryCmd:
1667  cmdtag = "TIMELINE_HISTORY";
1668  set_ps_display(cmdtag);
1669  PreventInTransactionBlock(true, cmdtag);
1671  EndReplicationCommand(cmdtag);
1672  break;
1673 
1674  case T_VariableShowStmt:
1675  {
1677  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1678 
1679  cmdtag = "SHOW";
1680  set_ps_display(cmdtag);
1681 
1682  /* syscache access needs a transaction environment */
1684  GetPGVariable(n->name, dest);
1686  EndReplicationCommand(cmdtag);
1687  }
1688  break;
1689 
1690  default:
1691  elog(ERROR, "unrecognized replication command node tag: %u",
1692  cmd_node->type);
1693  }
1694 
1695  /* done */
1696  MemoryContextSwitchTo(old_context);
1697  MemoryContextDelete(cmd_context);
1698 
1699  /*
1700  * We need not update ps display or pg_stat_activity, because PostgresMain
1701  * will reset those to "idle". But we must reset debug_query_string to
1702  * ensure it doesn't become a dangling pointer.
1703  */
1704  debug_query_string = NULL;
1705 
1706  return true;
1707 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:579
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:212
#define AllocSetContextCreate
Definition: memutils.h:170
#define DEBUG1
Definition: elog.h:25
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3275
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:464
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1123
void CommitTransactionCommand(void)
Definition: xact.c:2947
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:391
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:528
static StringInfoData output_message
Definition: walsender.c:157
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
int errcode(int sqlerrcode)
Definition: elog.c:610
#define LOG
Definition: elog.h:26
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:8993
Node * replication_parse_result
void set_ps_display(const char *activity)
Definition: ps_status.c:349
ReplicationKind kind
Definition: replnodes.h:82
#define ERROR
Definition: elog.h:43
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:925
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
WalSndState state
NodeTag type
Definition: nodes.h:530
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:570
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3380
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static XLogReaderState * xlogreader
Definition: walsender.c:137
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:221
int replication_yyparse(void)
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
const char * debug_query_string
Definition: postgres.c:88
static StringInfoData reply_message
Definition: walsender.c:158
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1133
Oid MyDatabaseId
Definition: globals.c:85
WalSnd * MyWalSnd
Definition: walsender.c:112
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:144
int errmsg_internal(const char *fmt,...)
Definition: elog.c:908
#define Assert(condition)
Definition: c.h:746
void WalSndSetState(WalSndState state)
Definition: walsender.c:3213
void StartTransactionCommand(void)
Definition: xact.c:2846
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:921
static StringInfoData tmpbuf
Definition: walsender.c:159
bool log_replication_commands
Definition: walsender.c:125
int errmsg(const char *fmt,...)
Definition: elog.c:821
#define elog(elevel,...)
Definition: elog.h:214
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static void IdentifySystem(void)
Definition: walsender.c:375
void replication_scanner_finish(void)
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:693
void replication_scanner_init(const char *query_string)

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3023 of file walsender.c.

References am_walsender, Assert, got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

3024 {
3026 
3027  /*
3028  * If replication has not yet started, die like with SIGTERM. If
3029  * replication is active, only set a flag and wake up the main loop. It
3030  * will send any outstanding WAL, wait for it to be replicated to the
3031  * standby, and then exit gracefully.
3032  */
3033  if (!replication_active)
3034  kill(MyProcPid, SIGTERM);
3035  else
3036  got_STOPPING = true;
3037 }
int MyProcPid
Definition: globals.c:40
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
#define kill(pid, sig)
Definition: win32_port.h:454
bool am_walsender
Definition: walsender.c:115
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
#define Assert(condition)
Definition: c.h:746

◆ InitWalSender()

void InitWalSender ( void  )

Referenced by PostgresMain().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 295 of file walsender.c.

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndResourceCleanup(), WalSndSetState(), WALSNDSTATE_STARTUP, and WALOpenSegment::ws_file.

Referenced by PostgresMain().

296 {
300 
301  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
303 
304  if (MyReplicationSlot != NULL)
306 
308 
309  replication_active = false;
310 
311  /*
312  * If there is a transaction in progress, it will clean up our
313  * ResourceOwner, but if a replication command set up a resource owner
314  * without a transaction, we've got to clean that up now.
315  */
317  WalSndResourceCleanup(false);
318 
319  if (got_STOPPING || got_SIGUSR2)
320  proc_exit(0);
321 
322  /* Revert back to startup state */
324 }
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:812
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:104
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4702
WALOpenSegment seg
Definition: xlogreader.h:215
void ConditionVariableCancelSleep(void)
void ReplicationSlotRelease(void)
Definition: slot.c:484
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1460
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
static XLogReaderState * xlogreader
Definition: walsender.c:137
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
void WalSndResourceCleanup(bool isCommit)
Definition: walsender.c:330
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
void WalSndSetState(WalSndState state)
Definition: walsender.c:3213
void ReplicationSlotCleanup(void)
Definition: slot.c:540
void LWLockReleaseAll(void)
Definition: lwlock.c:1911

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3149 of file walsender.c.

References i, InvalidBackendId, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

3150 {
3151  int i;
3152 
3153  for (i = 0; i < max_wal_senders; i++)
3154  {
3155  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3156  pid_t pid;
3157 
3158  SpinLockAcquire(&walsnd->mutex);
3159  pid = walsnd->pid;
3160  SpinLockRelease(&walsnd->mutex);
3161 
3162  if (pid == 0)
3163  continue;
3164 
3166  }
3167 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:250
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
#define InvalidBackendId
Definition: backendid.h:23
int i

◆ WalSndResourceCleanup()

void WalSndResourceCleanup ( bool  isCommit)

Definition at line 330 of file walsender.c.

References CurrentResourceOwner, DestNone, DestRemote, proc_exit(), RESOURCE_RELEASE_AFTER_LOCKS, RESOURCE_RELEASE_BEFORE_LOCKS, RESOURCE_RELEASE_LOCKS, ResourceOwnerDelete(), ResourceOwnerRelease(), WalSndShutdown(), and whereToSendOutput.

Referenced by perform_base_backup(), and WalSndErrorCleanup().

331 {
332  ResourceOwner resowner;
333 
334  if (CurrentResourceOwner == NULL)
335  return;
336 
337  /*
338  * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
339  * in a local variable and clear it first.
340  */
341  resowner = CurrentResourceOwner;
342  CurrentResourceOwner = NULL;
343 
344  /* Now we can release resources and delete it. */
345  ResourceOwnerRelease(resowner,
346  RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
347  ResourceOwnerRelease(resowner,
348  RESOURCE_RELEASE_LOCKS, isCommit, true);
349  ResourceOwnerRelease(resowner,
350  RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
351  ResourceOwnerDelete(resowner);
352 }
ResourceOwner CurrentResourceOwner
Definition: resowner.c:142
void ResourceOwnerDelete(ResourceOwner owner)
Definition: resowner.c:712
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
Definition: resowner.c:478

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3000 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

3001 {
3002  int i;
3003 
3004  for (i = 0; i < max_wal_senders; i++)
3005  {
3006  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3007 
3008  SpinLockAcquire(&walsnd->mutex);
3009  if (walsnd->pid == 0)
3010  {
3011  SpinLockRelease(&walsnd->mutex);
3012  continue;
3013  }
3014  walsnd->needreload = true;
3015  SpinLockRelease(&walsnd->mutex);
3016  }
3017 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
bool needreload
int i

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3088 of file walsender.c.

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

3089 {
3090  bool found;
3091  int i;
3092 
3093  WalSndCtl = (WalSndCtlData *)
3094  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3095 
3096  if (!found)
3097  {
3098  /* First time through, so initialize */
3100 
3101  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3103 
3104  for (i = 0; i < max_wal_senders; i++)
3105  {
3106  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3107 
3108  SpinLockInit(&walsnd->mutex);
3109  }
3110  }
3111 }
Size WalSndShmemSize(void)
Definition: walsender.c:3076
#define SpinLockInit(lock)
Definition: spin.h:60
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
#define MemSet(start, val, len)
Definition: c.h:950
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:392
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int max_wal_senders
Definition: walsender.c:121
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3076 of file walsender.c.

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and WalSndShmemInit().

3077 {
3078  Size size = 0;
3079 
3080  size = offsetof(WalSndCtlData, walsnds);
3081  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3082 
3083  return size;
3084 }
int max_wal_senders
Definition: walsender.c:121
Size mul_size(Size s1, Size s2)
Definition: shmem.c:515
Size add_size(Size s1, Size s2)
Definition: shmem.c:498
size_t Size
Definition: c.h:474
#define offsetof(type, field)
Definition: c.h:669

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3057 of file walsender.c.

References die, InitializeTimeouts(), pqsignal(), procsignal_sigusr1_handler(), SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

3058 {
3059  /* Set up signal handlers */
3061  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3062  pqsignal(SIGTERM, die); /* request shutdown */
3063  /* SIGQUIT handler was already set up by InitPostmasterChild */
3064  InitializeTimeouts(); /* establishes SIGALRM handler */
3067  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3068  * shutdown */
3069 
3070  /* Reset some signals that are accepted by postmaster but not here */
3072 }
void InitializeTimeouts(void)
Definition: timeout.c:346
#define SIGUSR1
Definition: win32_port.h:171
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3045
#define SIGCHLD
Definition: win32_port.h:169
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
#define SIGPIPE
Definition: win32_port.h:164
#define SIGUSR2
Definition: win32_port.h:172
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2857
#define SIGHUP
Definition: win32_port.h:159
#define SIG_IGN
Definition: win32_port.h:156
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
#define SIG_DFL
Definition: win32_port.h:154
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:551
#define die(msg)
Definition: pg_test_fsync.c:97

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3175 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

3176 {
3177  for (;;)
3178  {
3179  int i;
3180  bool all_stopped = true;
3181 
3182  for (i = 0; i < max_wal_senders; i++)
3183  {
3184  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3185 
3186  SpinLockAcquire(&walsnd->mutex);
3187 
3188  if (walsnd->pid == 0)
3189  {
3190  SpinLockRelease(&walsnd->mutex);
3191  continue;
3192  }
3193 
3194  if (walsnd->state != WALSNDSTATE_STOPPING)
3195  {
3196  all_stopped = false;
3197  SpinLockRelease(&walsnd->mutex);
3198  break;
3199  }
3200  SpinLockRelease(&walsnd->mutex);
3201  }
3202 
3203  /* safe to leave if confirmation is done for all WAL senders */
3204  if (all_stopped)
3205  return;
3206 
3207  pg_usleep(10000L); /* wait for 10 msec */
3208  }
3209 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
WalSndState state
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

◆ WalSndWakeup()

void WalSndWakeup ( void  )

Definition at line 3120 of file walsender.c.

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

3121 {
3122  int i;
3123 
3124  for (i = 0; i < max_wal_senders; i++)
3125  {
3126  Latch *latch;
3127  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3128 
3129  /*
3130  * Get latch pointer with spinlock held, for the unlikely case that
3131  * pointer reads aren't atomic (as they're 8 bytes).
3132  */
3133  SpinLockAcquire(&walsnd->mutex);
3134  latch = walsnd->latch;
3135  SpinLockRelease(&walsnd->mutex);
3136 
3137  if (latch != NULL)
3138  SetLatch(latch);
3139  }
3140 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
void SetLatch(Latch *latch)
Definition: latch.c:505
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
Definition: latch.h:110
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

bool am_db_walsender

Definition at line 118 of file walsender.c.

Referenced by check_db(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ log_replication_commands

bool log_replication_commands

Definition at line 125 of file walsender.c.

Referenced by exec_replication_command().

◆ max_wal_senders

◆ wake_wal_senders

bool wake_wal_senders

Definition at line 130 of file walsender.c.

◆ wal_sender_timeout

int wal_sender_timeout