PostgreSQL Source Code  git master
walsender.h File Reference
#include <signal.h>
Include dependency graph for walsender.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define WalSndWakeupRequest()   do { wake_wal_senders = true; } while (0)
 
#define WalSndWakeupProcessRequests()
 

Enumerations

enum  CRSSnapshotAction { CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT }
 

Functions

void InitWalSender (void)
 
bool exec_replication_command (const char *query_string)
 
void WalSndErrorCleanup (void)
 
void WalSndResourceCleanup (bool isCommit)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndRqstFileReload (void)
 

Variables

bool am_walsender
 
bool am_cascading_walsender
 
bool am_db_walsender
 
bool wake_wal_senders
 
int max_wal_senders
 
int wal_sender_timeout
 
bool log_replication_commands
 

Macro Definition Documentation

◆ WalSndWakeupProcessRequests

#define WalSndWakeupProcessRequests ( )
Value:
do \
{ \
{ \
wake_wal_senders = false; \
if (max_wal_senders > 0) \
WalSndWakeup(); \
} \
} while (0)
int max_wal_senders
Definition: walsender.c:121
bool wake_wal_senders
Definition: walsender.c:130

Definition at line 63 of file walsender.h.

Referenced by XLogBackgroundFlush(), and XLogFlush().

◆ WalSndWakeupRequest

#define WalSndWakeupRequest ( )    do { wake_wal_senders = true; } while (0)

Definition at line 57 of file walsender.h.

Referenced by XLogWrite().

Enumeration Type Documentation

◆ CRSSnapshotAction

Enumerator
CRS_EXPORT_SNAPSHOT 
CRS_NOEXPORT_SNAPSHOT 
CRS_USE_SNAPSHOT 

Definition at line 20 of file walsender.h.

Function Documentation

◆ exec_replication_command()

bool exec_replication_command ( const char *  query_string)

Definition at line 1501 of file walsender.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsA, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, VariableShowStmt::name, pgstat_report_activity(), PreventInTransactionBlock(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_finish(), replication_scanner_init(), replication_yyparse(), SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, T_IdentifySystemCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_VariableShowStmt, TimeLineHistoryCmd::type, Node::type, WalSndSetState(), and WALSNDSTATE_STOPPING.

Referenced by PostgresMain().

1502 {
1503  int parse_rc;
1504  Node *cmd_node;
1505  const char *cmdtag;
1506  MemoryContext cmd_context;
1507  MemoryContext old_context;
1508 
1509  /*
1510  * If WAL sender has been told that shutdown is getting close, switch its
1511  * status accordingly to handle the next replication commands correctly.
1512  */
1513  if (got_STOPPING)
1515 
1516  /*
1517  * Throw error if in stopping mode. We need prevent commands that could
1518  * generate WAL while the shutdown checkpoint is being written. To be
1519  * safe, we just prohibit all new commands.
1520  */
1522  ereport(ERROR,
1523  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1524 
1525  /*
1526  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1527  * command arrives. Clean up the old stuff if there's anything.
1528  */
1530 
1532 
1533  /*
1534  * Parse the command.
1535  */
1537  "Replication command context",
1539  old_context = MemoryContextSwitchTo(cmd_context);
1540 
1541  replication_scanner_init(cmd_string);
1542  parse_rc = replication_yyparse();
1543  if (parse_rc != 0)
1544  ereport(ERROR,
1545  (errcode(ERRCODE_SYNTAX_ERROR),
1546  errmsg_internal("replication command parser returned %d",
1547  parse_rc)));
1549 
1550  cmd_node = replication_parse_result;
1551 
1552  /*
1553  * If it's a SQL command, just clean up our mess and return false; the
1554  * caller will take care of executing it.
1555  */
1556  if (IsA(cmd_node, SQLCmd))
1557  {
1558  if (MyDatabaseId == InvalidOid)
1559  ereport(ERROR,
1560  (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1561 
1562  MemoryContextSwitchTo(old_context);
1563  MemoryContextDelete(cmd_context);
1564 
1565  /* Tell the caller that this wasn't a WalSender command. */
1566  return false;
1567  }
1568 
1569  /*
1570  * Report query to various monitoring facilities. For this purpose, we
1571  * report replication commands just like SQL commands.
1572  */
1573  debug_query_string = cmd_string;
1574 
1576 
1577  /*
1578  * Log replication command if log_replication_commands is enabled. Even
1579  * when it's disabled, log the command with DEBUG1 level for backward
1580  * compatibility.
1581  */
1583  (errmsg("received replication command: %s", cmd_string)));
1584 
1585  /*
1586  * Disallow replication commands in aborted transaction blocks.
1587  */
1589  ereport(ERROR,
1590  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1591  errmsg("current transaction is aborted, "
1592  "commands ignored until end of transaction block")));
1593 
1595 
1596  /*
1597  * Allocate buffers that will be used for each outgoing and incoming
1598  * message. We do this just once per command to reduce palloc overhead.
1599  */
1603 
1604  switch (cmd_node->type)
1605  {
1606  case T_IdentifySystemCmd:
1607  cmdtag = "IDENTIFY_SYSTEM";
1608  set_ps_display(cmdtag);
1609  IdentifySystem();
1610  EndReplicationCommand(cmdtag);
1611  break;
1612 
1613  case T_BaseBackupCmd:
1614  cmdtag = "BASE_BACKUP";
1615  set_ps_display(cmdtag);
1616  PreventInTransactionBlock(true, cmdtag);
1617  SendBaseBackup((BaseBackupCmd *) cmd_node);
1618  EndReplicationCommand(cmdtag);
1619  break;
1620 
1622  cmdtag = "CREATE_REPLICATION_SLOT";
1623  set_ps_display(cmdtag);
1625  EndReplicationCommand(cmdtag);
1626  break;
1627 
1629  cmdtag = "DROP_REPLICATION_SLOT";
1630  set_ps_display(cmdtag);
1632  EndReplicationCommand(cmdtag);
1633  break;
1634 
1635  case T_StartReplicationCmd:
1636  {
1637  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1638 
1639  cmdtag = "START_REPLICATION";
1640  set_ps_display(cmdtag);
1641  PreventInTransactionBlock(true, cmdtag);
1642 
1643  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1644  StartReplication(cmd);
1645  else
1647 
1648  /* dupe, but necessary per libpqrcv_endstreaming */
1649  EndReplicationCommand(cmdtag);
1650 
1651  Assert(xlogreader != NULL);
1652  break;
1653  }
1654 
1655  case T_TimeLineHistoryCmd:
1656  cmdtag = "TIMELINE_HISTORY";
1657  set_ps_display(cmdtag);
1658  PreventInTransactionBlock(true, cmdtag);
1660  EndReplicationCommand(cmdtag);
1661  break;
1662 
1663  case T_VariableShowStmt:
1664  {
1666  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1667 
1668  cmdtag = "SHOW";
1669  set_ps_display(cmdtag);
1670 
1671  /* syscache access needs a transaction environment */
1673  GetPGVariable(n->name, dest);
1675  EndReplicationCommand(cmdtag);
1676  }
1677  break;
1678 
1679  default:
1680  elog(ERROR, "unrecognized replication command node tag: %u",
1681  cmd_node->type);
1682  }
1683 
1684  /* done */
1685  MemoryContextSwitchTo(old_context);
1686  MemoryContextDelete(cmd_context);
1687 
1688  /*
1689  * We need not update ps display or pg_stat_activity, because PostgresMain
1690  * will reset those to "idle". But we must reset debug_query_string to
1691  * ensure it doesn't become a dangling pointer.
1692  */
1693  debug_query_string = NULL;
1694 
1695  return true;
1696 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:590
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:218
#define AllocSetContextCreate
Definition: memutils.h:173
#define DEBUG1
Definition: elog.h:25
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:465
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1120
void CommitTransactionCommand(void)
Definition: xact.c:2939
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:391
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:539
static StringInfoData output_message
Definition: walsender.c:157
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
int errcode(int sqlerrcode)
Definition: elog.c:698
#define LOG
Definition: elog.h:26
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:9298
Node * replication_parse_result
void set_ps_display(const char *activity)
Definition: ps_status.c:349
ReplicationKind kind
Definition: replnodes.h:83
#define ERROR
Definition: elog.h:46
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:932
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:195
WalSndState state
NodeTag type
Definition: nodes.h:541
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:571
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3379
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
static XLogReaderState * xlogreader
Definition: walsender.c:137
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:221
int replication_yyparse(void)
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
const char * debug_query_string
Definition: postgres.c:89
static StringInfoData reply_message
Definition: walsender.c:158
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1130
Oid MyDatabaseId
Definition: globals.c:88
WalSnd * MyWalSnd
Definition: walsender.c:112
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:157
int errmsg_internal(const char *fmt,...)
Definition: elog.c:996
#define Assert(condition)
Definition: c.h:804
void WalSndSetState(WalSndState state)
Definition: walsender.c:3221
void StartTransactionCommand(void)
Definition: xact.c:2838
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:917
static StringInfoData tmpbuf
Definition: walsender.c:159
bool log_replication_commands
Definition: walsender.c:125
int errmsg(const char *fmt,...)
Definition: elog.c:909
void pgstat_report_activity(BackendState state, const char *cmd_str)
#define elog(elevel,...)
Definition: elog.h:232
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
static void IdentifySystem(void)
Definition: walsender.c:376
void replication_scanner_finish(void)
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:674
void replication_scanner_init(const char *query_string)

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3015 of file walsender.c.

References am_walsender, Assert, got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

3016 {
3018 
3019  /*
3020  * If replication has not yet started, die like with SIGTERM. If
3021  * replication is active, only set a flag and wake up the main loop. It
3022  * will send any outstanding WAL, wait for it to be replicated to the
3023  * standby, and then exit gracefully.
3024  */
3025  if (!replication_active)
3026  kill(MyProcPid, SIGTERM);
3027  else
3028  got_STOPPING = true;
3029 }
int MyProcPid
Definition: globals.c:43
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
#define kill(pid, sig)
Definition: win32_port.h:454
bool am_walsender
Definition: walsender.c:115
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
#define Assert(condition)
Definition: c.h:804

◆ InitWalSender()

void InitWalSender ( void  )

Referenced by PostgresMain().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 296 of file walsender.c.

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndResourceCleanup(), WalSndSetState(), WALSNDSTATE_STARTUP, and WALOpenSegment::ws_file.

Referenced by PostgresMain().

297 {
301 
302  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
304 
305  if (MyReplicationSlot != NULL)
307 
309 
310  replication_active = false;
311 
312  /*
313  * If there is a transaction in progress, it will clean up our
314  * ResourceOwner, but if a replication command set up a resource owner
315  * without a transaction, we've got to clean that up now.
316  */
318  WalSndResourceCleanup(false);
319 
320  if (got_STOPPING || got_SIGUSR2)
321  proc_exit(0);
322 
323  /* Revert back to startup state */
325 }
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:809
static void pgstat_report_wait_end(void)
Definition: wait_event.h:277
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:104
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4701
WALOpenSegment seg
Definition: xlogreader.h:215
void ConditionVariableCancelSleep(void)
void ReplicationSlotRelease(void)
Definition: slot.c:493
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
static XLogReaderState * xlogreader
Definition: walsender.c:137
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
void WalSndResourceCleanup(bool isCommit)
Definition: walsender.c:331
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
void WalSndSetState(WalSndState state)
Definition: walsender.c:3221
void ReplicationSlotCleanup(void)
Definition: slot.c:549
void LWLockReleaseAll(void)
Definition: lwlock.c:1915

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3157 of file walsender.c.

References i, InvalidBackendId, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

3158 {
3159  int i;
3160 
3161  for (i = 0; i < max_wal_senders; i++)
3162  {
3163  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3164  pid_t pid;
3165 
3166  SpinLockAcquire(&walsnd->mutex);
3167  pid = walsnd->pid;
3168  SpinLockRelease(&walsnd->mutex);
3169 
3170  if (pid == 0)
3171  continue;
3172 
3174  }
3175 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:261
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
#define InvalidBackendId
Definition: backendid.h:23
int i

◆ WalSndResourceCleanup()

void WalSndResourceCleanup ( bool  isCommit)

Definition at line 331 of file walsender.c.

References CurrentResourceOwner, DestNone, DestRemote, proc_exit(), RESOURCE_RELEASE_AFTER_LOCKS, RESOURCE_RELEASE_BEFORE_LOCKS, RESOURCE_RELEASE_LOCKS, ResourceOwnerDelete(), ResourceOwnerRelease(), WalSndShutdown(), and whereToSendOutput.

Referenced by perform_base_backup(), and WalSndErrorCleanup().

332 {
333  ResourceOwner resowner;
334 
335  if (CurrentResourceOwner == NULL)
336  return;
337 
338  /*
339  * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
340  * in a local variable and clear it first.
341  */
342  resowner = CurrentResourceOwner;
343  CurrentResourceOwner = NULL;
344 
345  /* Now we can release resources and delete it. */
346  ResourceOwnerRelease(resowner,
347  RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
348  ResourceOwnerRelease(resowner,
349  RESOURCE_RELEASE_LOCKS, isCommit, true);
350  ResourceOwnerRelease(resowner,
351  RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
352  ResourceOwnerDelete(resowner);
353 }
ResourceOwner CurrentResourceOwner
Definition: resowner.c:146
void ResourceOwnerDelete(ResourceOwner owner)
Definition: resowner.c:737
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
Definition: resowner.c:486

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 2992 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

2993 {
2994  int i;
2995 
2996  for (i = 0; i < max_wal_senders; i++)
2997  {
2998  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2999 
3000  SpinLockAcquire(&walsnd->mutex);
3001  if (walsnd->pid == 0)
3002  {
3003  SpinLockRelease(&walsnd->mutex);
3004  continue;
3005  }
3006  walsnd->needreload = true;
3007  SpinLockRelease(&walsnd->mutex);
3008  }
3009 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
bool needreload
int i

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3080 of file walsender.c.

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

3081 {
3082  bool found;
3083  int i;
3084 
3085  WalSndCtl = (WalSndCtlData *)
3086  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3087 
3088  if (!found)
3089  {
3090  /* First time through, so initialize */
3092 
3093  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3095 
3096  for (i = 0; i < max_wal_senders; i++)
3097  {
3098  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3099 
3100  SpinLockInit(&walsnd->mutex);
3101  }
3102  }
3103 }
Size WalSndShmemSize(void)
Definition: walsender.c:3068
#define SpinLockInit(lock)
Definition: spin.h:60
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
#define MemSet(start, val, len)
Definition: c.h:1008
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int max_wal_senders
Definition: walsender.c:121
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3068 of file walsender.c.

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and WalSndShmemInit().

3069 {
3070  Size size = 0;
3071 
3072  size = offsetof(WalSndCtlData, walsnds);
3073  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3074 
3075  return size;
3076 }
int max_wal_senders
Definition: walsender.c:121
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
size_t Size
Definition: c.h:540
#define offsetof(type, field)
Definition: c.h:727

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3049 of file walsender.c.

References die, InitializeTimeouts(), pqsignal(), procsignal_sigusr1_handler(), SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

3050 {
3051  /* Set up signal handlers */
3053  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3054  pqsignal(SIGTERM, die); /* request shutdown */
3055  /* SIGQUIT handler was already set up by InitPostmasterChild */
3056  InitializeTimeouts(); /* establishes SIGALRM handler */
3059  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3060  * shutdown */
3061 
3062  /* Reset some signals that are accepted by postmaster but not here */
3064 }
void InitializeTimeouts(void)
Definition: timeout.c:435
#define SIGUSR1
Definition: win32_port.h:171
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3037
#define SIGCHLD
Definition: win32_port.h:169
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
#define SIGPIPE
Definition: win32_port.h:164
#define SIGUSR2
Definition: win32_port.h:172
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2949
#define SIGHUP
Definition: win32_port.h:159
#define SIG_IGN
Definition: win32_port.h:156
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
#define SIG_DFL
Definition: win32_port.h:154
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:642
#define die(msg)
Definition: pg_test_fsync.c:97

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3183 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

3184 {
3185  for (;;)
3186  {
3187  int i;
3188  bool all_stopped = true;
3189 
3190  for (i = 0; i < max_wal_senders; i++)
3191  {
3192  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3193 
3194  SpinLockAcquire(&walsnd->mutex);
3195 
3196  if (walsnd->pid == 0)
3197  {
3198  SpinLockRelease(&walsnd->mutex);
3199  continue;
3200  }
3201 
3202  if (walsnd->state != WALSNDSTATE_STOPPING)
3203  {
3204  all_stopped = false;
3205  SpinLockRelease(&walsnd->mutex);
3206  break;
3207  }
3208  SpinLockRelease(&walsnd->mutex);
3209  }
3210 
3211  /* safe to leave if confirmation is done for all WAL senders */
3212  if (all_stopped)
3213  return;
3214 
3215  pg_usleep(10000L); /* wait for 10 msec */
3216  }
3217 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
WalSndState state
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

◆ WalSndWakeup()

void WalSndWakeup ( void  )

Definition at line 3112 of file walsender.c.

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

3113 {
3114  int i;
3115 
3116  for (i = 0; i < max_wal_senders; i++)
3117  {
3118  Latch *latch;
3119  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3120 
3121  /*
3122  * Get latch pointer with spinlock held, for the unlikely case that
3123  * pointer reads aren't atomic (as they're 8 bytes).
3124  */
3125  SpinLockAcquire(&walsnd->mutex);
3126  latch = walsnd->latch;
3127  SpinLockRelease(&walsnd->mutex);
3128 
3129  if (latch != NULL)
3130  SetLatch(latch);
3131  }
3132 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
void SetLatch(Latch *latch)
Definition: latch.c:567
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
Definition: latch.h:110
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

bool am_db_walsender

Definition at line 118 of file walsender.c.

Referenced by check_db(), ClientAuthentication(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ log_replication_commands

bool log_replication_commands

Definition at line 125 of file walsender.c.

Referenced by exec_replication_command().

◆ max_wal_senders

◆ wake_wal_senders

bool wake_wal_senders

Definition at line 130 of file walsender.c.

◆ wal_sender_timeout

int wal_sender_timeout