PostgreSQL Source Code  git master
walsender.h File Reference
#include <signal.h>
Include dependency graph for walsender.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define WalSndWakeupRequest()   do { wake_wal_senders = true; } while (0)
 
#define WalSndWakeupProcessRequests()
 

Enumerations

enum  CRSSnapshotAction { CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT }
 

Functions

void InitWalSender (void)
 
bool exec_replication_command (const char *query_string)
 
void WalSndErrorCleanup (void)
 
void WalSndResourceCleanup (bool isCommit)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndRqstFileReload (void)
 

Variables

bool am_walsender
 
bool am_cascading_walsender
 
bool am_db_walsender
 
bool wake_wal_senders
 
int max_wal_senders
 
int wal_sender_timeout
 
bool log_replication_commands
 

Macro Definition Documentation

◆ WalSndWakeupProcessRequests

#define WalSndWakeupProcessRequests ( )
Value:
do \
{ \
{ \
wake_wal_senders = false; \
if (max_wal_senders > 0) \
WalSndWakeup(); \
} \
} while (0)
int max_wal_senders
Definition: walsender.c:121
bool wake_wal_senders
Definition: walsender.c:130

Definition at line 63 of file walsender.h.

Referenced by XLogBackgroundFlush(), and XLogFlush().

◆ WalSndWakeupRequest

#define WalSndWakeupRequest ( )    do { wake_wal_senders = true; } while (0)

Definition at line 57 of file walsender.h.

Referenced by XLogWrite().

Enumeration Type Documentation

◆ CRSSnapshotAction

Enumerator
CRS_EXPORT_SNAPSHOT 
CRS_NOEXPORT_SNAPSHOT 
CRS_USE_SNAPSHOT 

Definition at line 20 of file walsender.h.

Function Documentation

◆ exec_replication_command()

bool exec_replication_command ( const char *  query_string)

Definition at line 1509 of file walsender.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsA, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, VariableShowStmt::name, pgstat_report_activity(), PreventInTransactionBlock(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_finish(), replication_scanner_init(), replication_yyparse(), SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, T_IdentifySystemCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_VariableShowStmt, TimeLineHistoryCmd::type, Node::type, WalSndSetState(), and WALSNDSTATE_STOPPING.

Referenced by PostgresMain().

1510 {
1511  int parse_rc;
1512  Node *cmd_node;
1513  const char *cmdtag;
1514  MemoryContext cmd_context;
1515  MemoryContext old_context;
1516 
1517  /*
1518  * If WAL sender has been told that shutdown is getting close, switch its
1519  * status accordingly to handle the next replication commands correctly.
1520  */
1521  if (got_STOPPING)
1523 
1524  /*
1525  * Throw error if in stopping mode. We need prevent commands that could
1526  * generate WAL while the shutdown checkpoint is being written. To be
1527  * safe, we just prohibit all new commands.
1528  */
1530  ereport(ERROR,
1531  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1532 
1533  /*
1534  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1535  * command arrives. Clean up the old stuff if there's anything.
1536  */
1538 
1540 
1541  /*
1542  * Parse the command.
1543  */
1545  "Replication command context",
1547  old_context = MemoryContextSwitchTo(cmd_context);
1548 
1549  replication_scanner_init(cmd_string);
1550  parse_rc = replication_yyparse();
1551  if (parse_rc != 0)
1552  ereport(ERROR,
1553  (errcode(ERRCODE_SYNTAX_ERROR),
1554  errmsg_internal("replication command parser returned %d",
1555  parse_rc)));
1557 
1558  cmd_node = replication_parse_result;
1559 
1560  /*
1561  * If it's a SQL command, just clean up our mess and return false; the
1562  * caller will take care of executing it.
1563  */
1564  if (IsA(cmd_node, SQLCmd))
1565  {
1566  if (MyDatabaseId == InvalidOid)
1567  ereport(ERROR,
1568  (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1569 
1570  MemoryContextSwitchTo(old_context);
1571  MemoryContextDelete(cmd_context);
1572 
1573  /* Tell the caller that this wasn't a WalSender command. */
1574  return false;
1575  }
1576 
1577  /*
1578  * Report query to various monitoring facilities. For this purpose, we
1579  * report replication commands just like SQL commands.
1580  */
1581  debug_query_string = cmd_string;
1582 
1584 
1585  /*
1586  * Log replication command if log_replication_commands is enabled. Even
1587  * when it's disabled, log the command with DEBUG1 level for backward
1588  * compatibility.
1589  */
1591  (errmsg("received replication command: %s", cmd_string)));
1592 
1593  /*
1594  * Disallow replication commands in aborted transaction blocks.
1595  */
1597  ereport(ERROR,
1598  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1599  errmsg("current transaction is aborted, "
1600  "commands ignored until end of transaction block")));
1601 
1603 
1604  /*
1605  * Allocate buffers that will be used for each outgoing and incoming
1606  * message. We do this just once per command to reduce palloc overhead.
1607  */
1611 
1612  switch (cmd_node->type)
1613  {
1614  case T_IdentifySystemCmd:
1615  cmdtag = "IDENTIFY_SYSTEM";
1616  set_ps_display(cmdtag);
1617  IdentifySystem();
1618  EndReplicationCommand(cmdtag);
1619  break;
1620 
1621  case T_BaseBackupCmd:
1622  cmdtag = "BASE_BACKUP";
1623  set_ps_display(cmdtag);
1624  PreventInTransactionBlock(true, cmdtag);
1625  SendBaseBackup((BaseBackupCmd *) cmd_node);
1626  EndReplicationCommand(cmdtag);
1627  break;
1628 
1630  cmdtag = "CREATE_REPLICATION_SLOT";
1631  set_ps_display(cmdtag);
1633  EndReplicationCommand(cmdtag);
1634  break;
1635 
1637  cmdtag = "DROP_REPLICATION_SLOT";
1638  set_ps_display(cmdtag);
1640  EndReplicationCommand(cmdtag);
1641  break;
1642 
1643  case T_StartReplicationCmd:
1644  {
1645  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1646 
1647  cmdtag = "START_REPLICATION";
1648  set_ps_display(cmdtag);
1649  PreventInTransactionBlock(true, cmdtag);
1650 
1651  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1652  StartReplication(cmd);
1653  else
1655 
1656  /* dupe, but necessary per libpqrcv_endstreaming */
1657  EndReplicationCommand(cmdtag);
1658 
1659  Assert(xlogreader != NULL);
1660  break;
1661  }
1662 
1663  case T_TimeLineHistoryCmd:
1664  cmdtag = "TIMELINE_HISTORY";
1665  set_ps_display(cmdtag);
1666  PreventInTransactionBlock(true, cmdtag);
1668  EndReplicationCommand(cmdtag);
1669  break;
1670 
1671  case T_VariableShowStmt:
1672  {
1674  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1675 
1676  cmdtag = "SHOW";
1677  set_ps_display(cmdtag);
1678 
1679  /* syscache access needs a transaction environment */
1681  GetPGVariable(n->name, dest);
1683  EndReplicationCommand(cmdtag);
1684  }
1685  break;
1686 
1687  default:
1688  elog(ERROR, "unrecognized replication command node tag: %u",
1689  cmd_node->type);
1690  }
1691 
1692  /* done */
1693  MemoryContextSwitchTo(old_context);
1694  MemoryContextDelete(cmd_context);
1695 
1696  /*
1697  * We need not update ps display or pg_stat_activity, because PostgresMain
1698  * will reset those to "idle". But we must reset debug_query_string to
1699  * ensure it doesn't become a dangling pointer.
1700  */
1701  debug_query_string = NULL;
1702 
1703  return true;
1704 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:587
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:218
#define AllocSetContextCreate
Definition: memutils.h:173
#define DEBUG1
Definition: elog.h:25
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:465
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1128
void CommitTransactionCommand(void)
Definition: xact.c:2949
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:391
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:536
static StringInfoData output_message
Definition: walsender.c:157
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
int errcode(int sqlerrcode)
Definition: elog.c:698
#define LOG
Definition: elog.h:26
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:9327
Node * replication_parse_result
void set_ps_display(const char *activity)
Definition: ps_status.c:349
ReplicationKind kind
Definition: replnodes.h:82
#define ERROR
Definition: elog.h:46
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:932
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:195
WalSndState state
NodeTag type
Definition: nodes.h:538
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:571
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3389
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
static XLogReaderState * xlogreader
Definition: walsender.c:137
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:221
int replication_yyparse(void)
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
const char * debug_query_string
Definition: postgres.c:89
static StringInfoData reply_message
Definition: walsender.c:158
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1138
Oid MyDatabaseId
Definition: globals.c:88
WalSnd * MyWalSnd
Definition: walsender.c:112
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:157
int errmsg_internal(const char *fmt,...)
Definition: elog.c:996
#define Assert(condition)
Definition: c.h:804
void WalSndSetState(WalSndState state)
Definition: walsender.c:3229
void StartTransactionCommand(void)
Definition: xact.c:2848
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:924
static StringInfoData tmpbuf
Definition: walsender.c:159
bool log_replication_commands
Definition: walsender.c:125
int errmsg(const char *fmt,...)
Definition: elog.c:909
void pgstat_report_activity(BackendState state, const char *cmd_str)
#define elog(elevel,...)
Definition: elog.h:232
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
static void IdentifySystem(void)
Definition: walsender.c:376
void replication_scanner_finish(void)
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:683
void replication_scanner_init(const char *query_string)

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3023 of file walsender.c.

References am_walsender, Assert, got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

3024 {
3026 
3027  /*
3028  * If replication has not yet started, die like with SIGTERM. If
3029  * replication is active, only set a flag and wake up the main loop. It
3030  * will send any outstanding WAL, wait for it to be replicated to the
3031  * standby, and then exit gracefully.
3032  */
3033  if (!replication_active)
3034  kill(MyProcPid, SIGTERM);
3035  else
3036  got_STOPPING = true;
3037 }
int MyProcPid
Definition: globals.c:43
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
#define kill(pid, sig)
Definition: win32_port.h:454
bool am_walsender
Definition: walsender.c:115
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
#define Assert(condition)
Definition: c.h:804

◆ InitWalSender()

void InitWalSender ( void  )

Referenced by PostgresMain().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 296 of file walsender.c.

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndResourceCleanup(), WalSndSetState(), WALSNDSTATE_STARTUP, and WALOpenSegment::ws_file.

Referenced by PostgresMain().

297 {
301 
302  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
304 
305  if (MyReplicationSlot != NULL)
307 
309 
310  replication_active = false;
311 
312  /*
313  * If there is a transaction in progress, it will clean up our
314  * ResourceOwner, but if a replication command set up a resource owner
315  * without a transaction, we've got to clean that up now.
316  */
318  WalSndResourceCleanup(false);
319 
320  if (got_STOPPING || got_SIGUSR2)
321  proc_exit(0);
322 
323  /* Revert back to startup state */
325 }
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:829
static void pgstat_report_wait_end(void)
Definition: wait_event.h:278
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:104
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4711
WALOpenSegment seg
Definition: xlogreader.h:215
void ConditionVariableCancelSleep(void)
void ReplicationSlotRelease(void)
Definition: slot.c:469
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
static XLogReaderState * xlogreader
Definition: walsender.c:137
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
void WalSndResourceCleanup(bool isCommit)
Definition: walsender.c:331
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
void WalSndSetState(WalSndState state)
Definition: walsender.c:3229
void ReplicationSlotCleanup(void)
Definition: slot.c:525
void LWLockReleaseAll(void)
Definition: lwlock.c:1902

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3165 of file walsender.c.

References i, InvalidBackendId, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

3166 {
3167  int i;
3168 
3169  for (i = 0; i < max_wal_senders; i++)
3170  {
3171  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3172  pid_t pid;
3173 
3174  SpinLockAcquire(&walsnd->mutex);
3175  pid = walsnd->pid;
3176  SpinLockRelease(&walsnd->mutex);
3177 
3178  if (pid == 0)
3179  continue;
3180 
3182  }
3183 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:261
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
#define InvalidBackendId
Definition: backendid.h:23
int i

◆ WalSndResourceCleanup()

void WalSndResourceCleanup ( bool  isCommit)

Definition at line 331 of file walsender.c.

References CurrentResourceOwner, DestNone, DestRemote, proc_exit(), RESOURCE_RELEASE_AFTER_LOCKS, RESOURCE_RELEASE_BEFORE_LOCKS, RESOURCE_RELEASE_LOCKS, ResourceOwnerDelete(), ResourceOwnerRelease(), WalSndShutdown(), and whereToSendOutput.

Referenced by perform_base_backup(), and WalSndErrorCleanup().

332 {
333  ResourceOwner resowner;
334 
335  if (CurrentResourceOwner == NULL)
336  return;
337 
338  /*
339  * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
340  * in a local variable and clear it first.
341  */
342  resowner = CurrentResourceOwner;
343  CurrentResourceOwner = NULL;
344 
345  /* Now we can release resources and delete it. */
346  ResourceOwnerRelease(resowner,
347  RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
348  ResourceOwnerRelease(resowner,
349  RESOURCE_RELEASE_LOCKS, isCommit, true);
350  ResourceOwnerRelease(resowner,
351  RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
352  ResourceOwnerDelete(resowner);
353 }
ResourceOwner CurrentResourceOwner
Definition: resowner.c:146
void ResourceOwnerDelete(ResourceOwner owner)
Definition: resowner.c:737
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
Definition: resowner.c:486

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3000 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

3001 {
3002  int i;
3003 
3004  for (i = 0; i < max_wal_senders; i++)
3005  {
3006  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3007 
3008  SpinLockAcquire(&walsnd->mutex);
3009  if (walsnd->pid == 0)
3010  {
3011  SpinLockRelease(&walsnd->mutex);
3012  continue;
3013  }
3014  walsnd->needreload = true;
3015  SpinLockRelease(&walsnd->mutex);
3016  }
3017 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
bool needreload
int i

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3088 of file walsender.c.

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

3089 {
3090  bool found;
3091  int i;
3092 
3093  WalSndCtl = (WalSndCtlData *)
3094  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3095 
3096  if (!found)
3097  {
3098  /* First time through, so initialize */
3100 
3101  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3103 
3104  for (i = 0; i < max_wal_senders; i++)
3105  {
3106  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3107 
3108  SpinLockInit(&walsnd->mutex);
3109  }
3110  }
3111 }
Size WalSndShmemSize(void)
Definition: walsender.c:3076
#define SpinLockInit(lock)
Definition: spin.h:60
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
#define MemSet(start, val, len)
Definition: c.h:1008
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int max_wal_senders
Definition: walsender.c:121
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3076 of file walsender.c.

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CalculateShmemSize(), and WalSndShmemInit().

3077 {
3078  Size size = 0;
3079 
3080  size = offsetof(WalSndCtlData, walsnds);
3081  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3082 
3083  return size;
3084 }
int max_wal_senders
Definition: walsender.c:121
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
size_t Size
Definition: c.h:540
#define offsetof(type, field)
Definition: c.h:727

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3057 of file walsender.c.

References die, InitializeTimeouts(), pqsignal(), procsignal_sigusr1_handler(), SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

3058 {
3059  /* Set up signal handlers */
3061  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3062  pqsignal(SIGTERM, die); /* request shutdown */
3063  /* SIGQUIT handler was already set up by InitPostmasterChild */
3064  InitializeTimeouts(); /* establishes SIGALRM handler */
3067  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3068  * shutdown */
3069 
3070  /* Reset some signals that are accepted by postmaster but not here */
3072 }
void InitializeTimeouts(void)
Definition: timeout.c:435
#define SIGUSR1
Definition: win32_port.h:171
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3045
#define SIGCHLD
Definition: win32_port.h:169
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
#define SIGPIPE
Definition: win32_port.h:164
#define SIGUSR2
Definition: win32_port.h:172
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2949
#define SIGHUP
Definition: win32_port.h:159
#define SIG_IGN
Definition: win32_port.h:156
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
#define SIG_DFL
Definition: win32_port.h:154
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:642
#define die(msg)
Definition: pg_test_fsync.c:97

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3191 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

3192 {
3193  for (;;)
3194  {
3195  int i;
3196  bool all_stopped = true;
3197 
3198  for (i = 0; i < max_wal_senders; i++)
3199  {
3200  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3201 
3202  SpinLockAcquire(&walsnd->mutex);
3203 
3204  if (walsnd->pid == 0)
3205  {
3206  SpinLockRelease(&walsnd->mutex);
3207  continue;
3208  }
3209 
3210  if (walsnd->state != WALSNDSTATE_STOPPING)
3211  {
3212  all_stopped = false;
3213  SpinLockRelease(&walsnd->mutex);
3214  break;
3215  }
3216  SpinLockRelease(&walsnd->mutex);
3217  }
3218 
3219  /* safe to leave if confirmation is done for all WAL senders */
3220  if (all_stopped)
3221  return;
3222 
3223  pg_usleep(10000L); /* wait for 10 msec */
3224  }
3225 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
WalSndState state
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

◆ WalSndWakeup()

void WalSndWakeup ( void  )

Definition at line 3120 of file walsender.c.

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

3121 {
3122  int i;
3123 
3124  for (i = 0; i < max_wal_senders; i++)
3125  {
3126  Latch *latch;
3127  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3128 
3129  /*
3130  * Get latch pointer with spinlock held, for the unlikely case that
3131  * pointer reads aren't atomic (as they're 8 bytes).
3132  */
3133  SpinLockAcquire(&walsnd->mutex);
3134  latch = walsnd->latch;
3135  SpinLockRelease(&walsnd->mutex);
3136 
3137  if (latch != NULL)
3138  SetLatch(latch);
3139  }
3140 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
void SetLatch(Latch *latch)
Definition: latch.c:567
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
Definition: latch.h:110
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

bool am_db_walsender

Definition at line 118 of file walsender.c.

Referenced by check_db(), ClientAuthentication(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ log_replication_commands

bool log_replication_commands

Definition at line 125 of file walsender.c.

Referenced by exec_replication_command().

◆ max_wal_senders

◆ wake_wal_senders

bool wake_wal_senders

Definition at line 130 of file walsender.c.

◆ wal_sender_timeout

int wal_sender_timeout