PostgreSQL Source Code  git master
walsender.h File Reference
#include <signal.h>
Include dependency graph for walsender.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define WalSndWakeupRequest()   do { wake_wal_senders = true; } while (0)
 
#define WalSndWakeupProcessRequests()
 

Enumerations

enum  CRSSnapshotAction { CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT }
 

Functions

void InitWalSender (void)
 
bool exec_replication_command (const char *query_string)
 
void WalSndErrorCleanup (void)
 
void WalSndResourceCleanup (bool isCommit)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndRqstFileReload (void)
 

Variables

bool am_walsender
 
bool am_cascading_walsender
 
bool am_db_walsender
 
bool wake_wal_senders
 
int max_wal_senders
 
int wal_sender_timeout
 
bool log_replication_commands
 

Macro Definition Documentation

◆ WalSndWakeupProcessRequests

#define WalSndWakeupProcessRequests ( )
Value:
do \
{ \
{ \
wake_wal_senders = false; \
if (max_wal_senders > 0) \
WalSndWakeup(); \
} \
} while (0)
int max_wal_senders
Definition: walsender.c:121
bool wake_wal_senders
Definition: walsender.c:130

Definition at line 63 of file walsender.h.

Referenced by XLogBackgroundFlush(), and XLogFlush().

◆ WalSndWakeupRequest

#define WalSndWakeupRequest ( )    do { wake_wal_senders = true; } while (0)

Definition at line 57 of file walsender.h.

Referenced by XLogWrite().

Enumeration Type Documentation

◆ CRSSnapshotAction

Enumerator
CRS_EXPORT_SNAPSHOT 
CRS_NOEXPORT_SNAPSHOT 
CRS_USE_SNAPSHOT 

Definition at line 20 of file walsender.h.

Function Documentation

◆ exec_replication_command()

bool exec_replication_command ( const char *  query_string)

Definition at line 1516 of file walsender.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, generate_unaccent_rules::dest, DestRemote, DestRemoteSimple, DropReplicationSlot(), elog, EndCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsA, IsAbortedTransactionBlockState(), IsTransactionBlock(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, VariableShowStmt::name, pgstat_report_activity(), PreventInTransactionBlock(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_init(), replication_yyparse(), SendBaseBackup(), SendTimeLineHistory(), SetQueryCompletion(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_IDLE, STATE_RUNNING, T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, T_IdentifySystemCmd, T_SQLCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_VariableShowStmt, TimeLineHistoryCmd::type, Node::type, WalSndSetState(), and WALSNDSTATE_STOPPING.

Referenced by PostgresMain().

1517 {
1518  int parse_rc;
1519  Node *cmd_node;
1520  MemoryContext cmd_context;
1521  MemoryContext old_context;
1522  QueryCompletion qc;
1523 
1524  /*
1525  * If WAL sender has been told that shutdown is getting close, switch its
1526  * status accordingly to handle the next replication commands correctly.
1527  */
1528  if (got_STOPPING)
1530 
1531  /*
1532  * Throw error if in stopping mode. We need prevent commands that could
1533  * generate WAL while the shutdown checkpoint is being written. To be
1534  * safe, we just prohibit all new commands.
1535  */
1537  ereport(ERROR,
1538  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1539 
1540  /*
1541  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1542  * command arrives. Clean up the old stuff if there's anything.
1543  */
1545 
1547 
1549  "Replication command context",
1551  old_context = MemoryContextSwitchTo(cmd_context);
1552 
1553  replication_scanner_init(cmd_string);
1554  parse_rc = replication_yyparse();
1555  if (parse_rc != 0)
1556  ereport(ERROR,
1557  (errcode(ERRCODE_SYNTAX_ERROR),
1558  errmsg_internal("replication command parser returned %d",
1559  parse_rc)));
1560 
1561  cmd_node = replication_parse_result;
1562 
1563  /*
1564  * Log replication command if log_replication_commands is enabled. Even
1565  * when it's disabled, log the command with DEBUG1 level for backward
1566  * compatibility. Note that SQL commands are not logged here, and will be
1567  * logged later if log_statement is enabled.
1568  */
1569  if (cmd_node->type != T_SQLCmd)
1571  (errmsg("received replication command: %s", cmd_string)));
1572 
1573  /*
1574  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
1575  * called outside of transaction the snapshot should be cleared here.
1576  */
1577  if (!IsTransactionBlock())
1579 
1580  /*
1581  * For aborted transactions, don't allow anything except pure SQL, the
1582  * exec_simple_query() will handle it correctly.
1583  */
1584  if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
1585  ereport(ERROR,
1586  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1587  errmsg("current transaction is aborted, "
1588  "commands ignored until end of transaction block")));
1589 
1591 
1592  /*
1593  * Allocate buffers that will be used for each outgoing and incoming
1594  * message. We do this just once per command to reduce palloc overhead.
1595  */
1599 
1600  /* Report to pgstat that this process is running */
1602 
1603  switch (cmd_node->type)
1604  {
1605  case T_IdentifySystemCmd:
1606  IdentifySystem();
1607  break;
1608 
1609  case T_BaseBackupCmd:
1610  PreventInTransactionBlock(true, "BASE_BACKUP");
1611  SendBaseBackup((BaseBackupCmd *) cmd_node);
1612  break;
1613 
1616  break;
1617 
1620  break;
1621 
1622  case T_StartReplicationCmd:
1623  {
1624  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1625 
1626  PreventInTransactionBlock(true, "START_REPLICATION");
1627 
1628  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1629  StartReplication(cmd);
1630  else
1632 
1633  Assert(xlogreader != NULL);
1634  break;
1635  }
1636 
1637  case T_TimeLineHistoryCmd:
1638  PreventInTransactionBlock(true, "TIMELINE_HISTORY");
1640  break;
1641 
1642  case T_VariableShowStmt:
1643  {
1645  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1646 
1647  /* syscache access needs a transaction environment */
1649  GetPGVariable(n->name, dest);
1651  }
1652  break;
1653 
1654  case T_SQLCmd:
1655  if (MyDatabaseId == InvalidOid)
1656  ereport(ERROR,
1657  (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1658 
1659  /* Report to pgstat that this process is now idle */
1661 
1662  /* Tell the caller that this wasn't a WalSender command. */
1663  return false;
1664 
1665  default:
1666  elog(ERROR, "unrecognized replication command node tag: %u",
1667  cmd_node->type);
1668  }
1669 
1670  /* done */
1671  MemoryContextSwitchTo(old_context);
1672  MemoryContextDelete(cmd_context);
1673 
1674  /* Send CommandComplete message */
1675  SetQueryCompletion(&qc, CMDTAG_SELECT, 0);
1676  EndCommand(&qc, DestRemote, true);
1677 
1678  /* Report to pgstat that this process is now idle */
1680 
1681  return true;
1682 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:580
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
#define AllocSetContextCreate
Definition: memutils.h:170
#define DEBUG1
Definition: elog.h:25
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3132
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:464
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1123
void CommitTransactionCommand(void)
Definition: xact.c:2947
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:391
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:529
static StringInfoData output_message
Definition: walsender.c:157
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
int errcode(int sqlerrcode)
Definition: elog.c:610
#define LOG
Definition: elog.h:26
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:8965
Node * replication_parse_result
bool IsTransactionBlock(void)
Definition: xact.c:4684
ReplicationKind kind
Definition: replnodes.h:82
#define ERROR
Definition: elog.h:43
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:166
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:922
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
WalSndState state
NodeTag type
Definition: nodes.h:531
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:570
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3380
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static XLogReaderState * xlogreader
Definition: walsender.c:137
int replication_yyparse(void)
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
static StringInfoData reply_message
Definition: walsender.c:158
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1137
Oid MyDatabaseId
Definition: globals.c:85
WalSnd * MyWalSnd
Definition: walsender.c:112
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:144
int errmsg_internal(const char *fmt,...)
Definition: elog.c:911
#define Assert(condition)
Definition: c.h:745
void WalSndSetState(WalSndState state)
Definition: walsender.c:3188
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:36
void StartTransactionCommand(void)
Definition: xact.c:2846
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:921
static StringInfoData tmpbuf
Definition: walsender.c:159
bool log_replication_commands
Definition: walsender.c:125
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define elog(elevel,...)
Definition: elog.h:214
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static void IdentifySystem(void)
Definition: walsender.c:375
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:692
void replication_scanner_init(const char *query_string)

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 2998 of file walsender.c.

References am_walsender, Assert, got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

2999 {
3001 
3002  /*
3003  * If replication has not yet started, die like with SIGTERM. If
3004  * replication is active, only set a flag and wake up the main loop. It
3005  * will send any outstanding WAL, wait for it to be replicated to the
3006  * standby, and then exit gracefully.
3007  */
3008  if (!replication_active)
3009  kill(MyProcPid, SIGTERM);
3010  else
3011  got_STOPPING = true;
3012 }
int MyProcPid
Definition: globals.c:40
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
#define kill(pid, sig)
Definition: win32_port.h:426
bool am_walsender
Definition: walsender.c:115
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
#define Assert(condition)
Definition: c.h:745

◆ InitWalSender()

void InitWalSender ( void  )

Referenced by PostgresMain().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 295 of file walsender.c.

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndResourceCleanup(), WalSndSetState(), WALSNDSTATE_STARTUP, and WALOpenSegment::ws_file.

Referenced by PostgresMain().

296 {
300 
301  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
303 
304  if (MyReplicationSlot != NULL)
306 
308 
309  replication_active = false;
310 
311  /*
312  * If there is a transaction in progress, it will clean up our
313  * ResourceOwner, but if a replication command set up a resource owner
314  * without a transaction, we've got to clean that up now.
315  */
317  WalSndResourceCleanup(false);
318 
319  if (got_STOPPING || got_SIGUSR2)
320  proc_exit(0);
321 
322  /* Revert back to startup state */
324 }
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:812
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:104
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4702
WALOpenSegment seg
Definition: xlogreader.h:215
void ConditionVariableCancelSleep(void)
void ReplicationSlotRelease(void)
Definition: slot.c:476
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1381
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
static XLogReaderState * xlogreader
Definition: walsender.c:137
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
void WalSndResourceCleanup(bool isCommit)
Definition: walsender.c:330
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
void WalSndSetState(WalSndState state)
Definition: walsender.c:3188
void ReplicationSlotCleanup(void)
Definition: slot.c:532
void LWLockReleaseAll(void)
Definition: lwlock.c:1911

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3124 of file walsender.c.

References i, InvalidBackendId, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

3125 {
3126  int i;
3127 
3128  for (i = 0; i < max_wal_senders; i++)
3129  {
3130  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3131  pid_t pid;
3132 
3133  SpinLockAcquire(&walsnd->mutex);
3134  pid = walsnd->pid;
3135  SpinLockRelease(&walsnd->mutex);
3136 
3137  if (pid == 0)
3138  continue;
3139 
3141  }
3142 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:250
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
#define InvalidBackendId
Definition: backendid.h:23
int i

◆ WalSndResourceCleanup()

void WalSndResourceCleanup ( bool  isCommit)

Definition at line 330 of file walsender.c.

References CurrentResourceOwner, DestNone, DestRemote, proc_exit(), RESOURCE_RELEASE_AFTER_LOCKS, RESOURCE_RELEASE_BEFORE_LOCKS, RESOURCE_RELEASE_LOCKS, ResourceOwnerDelete(), ResourceOwnerRelease(), WalSndShutdown(), and whereToSendOutput.

Referenced by perform_base_backup(), and WalSndErrorCleanup().

331 {
332  ResourceOwner resowner;
333 
334  if (CurrentResourceOwner == NULL)
335  return;
336 
337  /*
338  * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
339  * in a local variable and clear it first.
340  */
341  resowner = CurrentResourceOwner;
342  CurrentResourceOwner = NULL;
343 
344  /* Now we can release resources and delete it. */
345  ResourceOwnerRelease(resowner,
346  RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
347  ResourceOwnerRelease(resowner,
348  RESOURCE_RELEASE_LOCKS, isCommit, true);
349  ResourceOwnerRelease(resowner,
350  RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
351  ResourceOwnerDelete(resowner);
352 }
ResourceOwner CurrentResourceOwner
Definition: resowner.c:142
void ResourceOwnerDelete(ResourceOwner owner)
Definition: resowner.c:712
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
Definition: resowner.c:478

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 2975 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

2976 {
2977  int i;
2978 
2979  for (i = 0; i < max_wal_senders; i++)
2980  {
2981  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2982 
2983  SpinLockAcquire(&walsnd->mutex);
2984  if (walsnd->pid == 0)
2985  {
2986  SpinLockRelease(&walsnd->mutex);
2987  continue;
2988  }
2989  walsnd->needreload = true;
2990  SpinLockRelease(&walsnd->mutex);
2991  }
2992 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
bool needreload
int i

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3063 of file walsender.c.

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

3064 {
3065  bool found;
3066  int i;
3067 
3068  WalSndCtl = (WalSndCtlData *)
3069  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3070 
3071  if (!found)
3072  {
3073  /* First time through, so initialize */
3075 
3076  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3078 
3079  for (i = 0; i < max_wal_senders; i++)
3080  {
3081  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3082 
3083  SpinLockInit(&walsnd->mutex);
3084  }
3085  }
3086 }
Size WalSndShmemSize(void)
Definition: walsender.c:3051
#define SpinLockInit(lock)
Definition: spin.h:60
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
#define MemSet(start, val, len)
Definition: c.h:949
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:392
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int max_wal_senders
Definition: walsender.c:121
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3051 of file walsender.c.

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and WalSndShmemInit().

3052 {
3053  Size size = 0;
3054 
3055  size = offsetof(WalSndCtlData, walsnds);
3056  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3057 
3058  return size;
3059 }
int max_wal_senders
Definition: walsender.c:121
Size mul_size(Size s1, Size s2)
Definition: shmem.c:515
Size add_size(Size s1, Size s2)
Definition: shmem.c:498
size_t Size
Definition: c.h:473
#define offsetof(type, field)
Definition: c.h:668

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3032 of file walsender.c.

References die, InitializeTimeouts(), pqsignal(), procsignal_sigusr1_handler(), quickdie(), SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGQUIT, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

3033 {
3034  /* Set up signal handlers */
3036  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3037  pqsignal(SIGTERM, die); /* request shutdown */
3038  pqsignal(SIGQUIT, quickdie); /* hard crash time */
3039  InitializeTimeouts(); /* establishes SIGALRM handler */
3042  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3043  * shutdown */
3044 
3045  /* Reset some signals that are accepted by postmaster but not here */
3047 }
void InitializeTimeouts(void)
Definition: timeout.c:346
#define SIGQUIT
Definition: win32_port.h:154
#define SIGUSR1
Definition: win32_port.h:165
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3020
#define SIGCHLD
Definition: win32_port.h:163
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
#define SIGPIPE
Definition: win32_port.h:158
#define SIGUSR2
Definition: win32_port.h:166
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2857
#define SIGHUP
Definition: win32_port.h:153
#define SIG_IGN
Definition: win32_port.h:150
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
#define SIG_DFL
Definition: win32_port.h:148
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:551
void quickdie(SIGNAL_ARGS)
Definition: postgres.c:2759
#define die(msg)
Definition: pg_test_fsync.c:96

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3150 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

3151 {
3152  for (;;)
3153  {
3154  int i;
3155  bool all_stopped = true;
3156 
3157  for (i = 0; i < max_wal_senders; i++)
3158  {
3159  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3160 
3161  SpinLockAcquire(&walsnd->mutex);
3162 
3163  if (walsnd->pid == 0)
3164  {
3165  SpinLockRelease(&walsnd->mutex);
3166  continue;
3167  }
3168 
3169  if (walsnd->state != WALSNDSTATE_STOPPING)
3170  {
3171  all_stopped = false;
3172  SpinLockRelease(&walsnd->mutex);
3173  break;
3174  }
3175  SpinLockRelease(&walsnd->mutex);
3176  }
3177 
3178  /* safe to leave if confirmation is done for all WAL senders */
3179  if (all_stopped)
3180  return;
3181 
3182  pg_usleep(10000L); /* wait for 10 msec */
3183  }
3184 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
WalSndState state
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

◆ WalSndWakeup()

void WalSndWakeup ( void  )

Definition at line 3095 of file walsender.c.

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

3096 {
3097  int i;
3098 
3099  for (i = 0; i < max_wal_senders; i++)
3100  {
3101  Latch *latch;
3102  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3103 
3104  /*
3105  * Get latch pointer with spinlock held, for the unlikely case that
3106  * pointer reads aren't atomic (as they're 8 bytes).
3107  */
3108  SpinLockAcquire(&walsnd->mutex);
3109  latch = walsnd->latch;
3110  SpinLockRelease(&walsnd->mutex);
3111 
3112  if (latch != NULL)
3113  SetLatch(latch);
3114  }
3115 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
void SetLatch(Latch *latch)
Definition: latch.c:505
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
Definition: latch.h:110
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

bool am_db_walsender

Definition at line 118 of file walsender.c.

Referenced by check_db(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ log_replication_commands

bool log_replication_commands

Definition at line 125 of file walsender.c.

Referenced by exec_replication_command().

◆ max_wal_senders

◆ wake_wal_senders

bool wake_wal_senders

Definition at line 130 of file walsender.c.

◆ wal_sender_timeout

int wal_sender_timeout