PostgreSQL Source Code  git master
walsender.h File Reference
#include <signal.h>
Include dependency graph for walsender.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define WalSndWakeupRequest()   do { wake_wal_senders = true; } while (0)
 
#define WalSndWakeupProcessRequests()
 

Enumerations

enum  CRSSnapshotAction { CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT }
 

Functions

void InitWalSender (void)
 
bool exec_replication_command (const char *query_string)
 
void WalSndErrorCleanup (void)
 
void WalSndResourceCleanup (bool isCommit)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndRqstFileReload (void)
 

Variables

bool am_walsender
 
bool am_cascading_walsender
 
bool am_db_walsender
 
bool wake_wal_senders
 
int max_wal_senders
 
int wal_sender_timeout
 
bool log_replication_commands
 

Macro Definition Documentation

◆ WalSndWakeupProcessRequests

#define WalSndWakeupProcessRequests ( )
Value:
do \
{ \
{ \
wake_wal_senders = false; \
if (max_wal_senders > 0) \
WalSndWakeup(); \
} \
} while (0)
int max_wal_senders
Definition: walsender.c:121
bool wake_wal_senders
Definition: walsender.c:130

Definition at line 63 of file walsender.h.

Referenced by XLogBackgroundFlush(), and XLogFlush().

◆ WalSndWakeupRequest

#define WalSndWakeupRequest ( )    do { wake_wal_senders = true; } while (0)

Definition at line 57 of file walsender.h.

Referenced by XLogWrite().

Enumeration Type Documentation

◆ CRSSnapshotAction

Enumerator
CRS_EXPORT_SNAPSHOT 
CRS_NOEXPORT_SNAPSHOT 
CRS_USE_SNAPSHOT 

Definition at line 20 of file walsender.h.

Function Documentation

◆ exec_replication_command()

bool exec_replication_command ( const char *  query_string)

Definition at line 1530 of file walsender.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, generate_unaccent_rules::dest, DestRemote, DestRemoteSimple, DropReplicationSlot(), elog, EndCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsA, IsAbortedTransactionBlockState(), IsTransactionBlock(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, VariableShowStmt::name, pgstat_report_activity(), PreventInTransactionBlock(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_init(), replication_yyparse(), SendBaseBackup(), SendTimeLineHistory(), SetQueryCompletion(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_IDLE, STATE_RUNNING, T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, T_IdentifySystemCmd, T_SQLCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_VariableShowStmt, TimeLineHistoryCmd::type, Node::type, WalSndSetState(), and WALSNDSTATE_STOPPING.

Referenced by PostgresMain().

1531 {
1532  int parse_rc;
1533  Node *cmd_node;
1534  MemoryContext cmd_context;
1535  MemoryContext old_context;
1536  QueryCompletion qc;
1537 
1538  /*
1539  * If WAL sender has been told that shutdown is getting close, switch its
1540  * status accordingly to handle the next replication commands correctly.
1541  */
1542  if (got_STOPPING)
1544 
1545  /*
1546  * Throw error if in stopping mode. We need prevent commands that could
1547  * generate WAL while the shutdown checkpoint is being written. To be
1548  * safe, we just prohibit all new commands.
1549  */
1551  ereport(ERROR,
1552  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1553 
1554  /*
1555  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1556  * command arrives. Clean up the old stuff if there's anything.
1557  */
1559 
1561 
1563  "Replication command context",
1565  old_context = MemoryContextSwitchTo(cmd_context);
1566 
1567  replication_scanner_init(cmd_string);
1568  parse_rc = replication_yyparse();
1569  if (parse_rc != 0)
1570  ereport(ERROR,
1571  (errcode(ERRCODE_SYNTAX_ERROR),
1572  errmsg_internal("replication command parser returned %d",
1573  parse_rc)));
1574 
1575  cmd_node = replication_parse_result;
1576 
1577  /*
1578  * Log replication command if log_replication_commands is enabled. Even
1579  * when it's disabled, log the command with DEBUG1 level for backward
1580  * compatibility. Note that SQL commands are not logged here, and will be
1581  * logged later if log_statement is enabled.
1582  */
1583  if (cmd_node->type != T_SQLCmd)
1585  (errmsg("received replication command: %s", cmd_string)));
1586 
1587  /*
1588  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
1589  * called outside of transaction the snapshot should be cleared here.
1590  */
1591  if (!IsTransactionBlock())
1593 
1594  /*
1595  * For aborted transactions, don't allow anything except pure SQL, the
1596  * exec_simple_query() will handle it correctly.
1597  */
1598  if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
1599  ereport(ERROR,
1600  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1601  errmsg("current transaction is aborted, "
1602  "commands ignored until end of transaction block")));
1603 
1605 
1606  /*
1607  * Allocate buffers that will be used for each outgoing and incoming
1608  * message. We do this just once per command to reduce palloc overhead.
1609  */
1613 
1614  /* Report to pgstat that this process is running */
1616 
1617  switch (cmd_node->type)
1618  {
1619  case T_IdentifySystemCmd:
1620  IdentifySystem();
1621  break;
1622 
1623  case T_BaseBackupCmd:
1624  PreventInTransactionBlock(true, "BASE_BACKUP");
1625  SendBaseBackup((BaseBackupCmd *) cmd_node);
1626  break;
1627 
1630  break;
1631 
1634  break;
1635 
1636  case T_StartReplicationCmd:
1637  {
1638  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1639 
1640  PreventInTransactionBlock(true, "START_REPLICATION");
1641 
1642  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1643  StartReplication(cmd);
1644  else
1646  break;
1647  }
1648 
1649  case T_TimeLineHistoryCmd:
1650  PreventInTransactionBlock(true, "TIMELINE_HISTORY");
1652  break;
1653 
1654  case T_VariableShowStmt:
1655  {
1657  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1658 
1659  /* syscache access needs a transaction environment */
1661  GetPGVariable(n->name, dest);
1663  }
1664  break;
1665 
1666  case T_SQLCmd:
1667  if (MyDatabaseId == InvalidOid)
1668  ereport(ERROR,
1669  (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1670 
1671  /* Report to pgstat that this process is now idle */
1673 
1674  /* Tell the caller that this wasn't a WalSender command. */
1675  return false;
1676 
1677  default:
1678  elog(ERROR, "unrecognized replication command node tag: %u",
1679  cmd_node->type);
1680  }
1681 
1682  /* done */
1683  MemoryContextSwitchTo(old_context);
1684  MemoryContextDelete(cmd_context);
1685 
1686  /* Send CommandComplete message */
1687  SetQueryCompletion(&qc, CMDTAG_SELECT, 0);
1688  EndCommand(&qc, DestRemote, true);
1689 
1690  /* Report to pgstat that this process is now idle */
1692 
1693  return true;
1694 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:580
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
#define AllocSetContextCreate
Definition: memutils.h:170
#define DEBUG1
Definition: elog.h:25
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3136
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:481
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1128
void CommitTransactionCommand(void)
Definition: xact.c:2917
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:376
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:529
static StringInfoData output_message
Definition: walsender.c:159
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:189
int errcode(int sqlerrcode)
Definition: elog.c:610
#define LOG
Definition: elog.h:26
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:8957
Node * replication_parse_result
bool IsTransactionBlock(void)
Definition: xact.c:4654
ReplicationKind kind
Definition: replnodes.h:82
#define ERROR
Definition: elog.h:43
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:166
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:924
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
WalSndState state
NodeTag type
Definition: nodes.h:531
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:587
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3350
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
int replication_yyparse(void)
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
static StringInfoData reply_message
Definition: walsender.c:160
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1142
Oid MyDatabaseId
Definition: globals.c:85
WalSnd * MyWalSnd
Definition: walsender.c:112
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:144
int errmsg_internal(const char *fmt,...)
Definition: elog.c:911
void WalSndSetState(WalSndState state)
Definition: walsender.c:3205
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:36
void StartTransactionCommand(void)
Definition: xact.c:2816
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:926
static StringInfoData tmpbuf
Definition: walsender.c:161
bool log_replication_commands
Definition: walsender.c:125
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define elog(elevel,...)
Definition: elog.h:214
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static void IdentifySystem(void)
Definition: walsender.c:392
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:692
void replication_scanner_init(const char *query_string)

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3015 of file walsender.c.

References am_walsender, Assert, got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

3016 {
3018 
3019  /*
3020  * If replication has not yet started, die like with SIGTERM. If
3021  * replication is active, only set a flag and wake up the main loop. It
3022  * will send any outstanding WAL, wait for it to be replicated to the
3023  * standby, and then exit gracefully.
3024  */
3025  if (!replication_active)
3026  kill(MyProcPid, SIGTERM);
3027  else
3028  got_STOPPING = true;
3029 }
int MyProcPid
Definition: globals.c:40
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:189
#define kill(pid, sig)
Definition: win32_port.h:426
bool am_walsender
Definition: walsender.c:115
static volatile sig_atomic_t replication_active
Definition: walsender.c:197
#define Assert(condition)
Definition: c.h:738

◆ InitWalSender()

void InitWalSender ( void  )

Referenced by PostgresMain().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 312 of file walsender.c.

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndResourceCleanup(), WalSndSetState(), WALSNDSTATE_STARTUP, and WALOpenSegment::ws_file.

Referenced by PostgresMain().

313 {
317 
318  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
320 
321  if (MyReplicationSlot != NULL)
323 
325 
326  replication_active = false;
327 
328  /*
329  * If there is a transaction in progress, it will clean up our
330  * ResourceOwner, but if a replication command set up a resource owner
331  * without a transaction, we've got to clean that up now.
332  */
334  WalSndResourceCleanup(false);
335 
336  if (got_STOPPING || got_SIGUSR2)
337  proc_exit(0);
338 
339  /* Revert back to startup state */
341 }
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:813
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:189
void proc_exit(int code)
Definition: ipc.c:104
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4672
WALOpenSegment seg
Definition: xlogreader.h:213
void ConditionVariableCancelSleep(void)
void ReplicationSlotRelease(void)
Definition: slot.c:433
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1380
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:188
static XLogReaderState * xlogreader
Definition: walsender.c:139
static volatile sig_atomic_t replication_active
Definition: walsender.c:197
void WalSndResourceCleanup(bool isCommit)
Definition: walsender.c:347
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
void WalSndSetState(WalSndState state)
Definition: walsender.c:3205
void ReplicationSlotCleanup(void)
Definition: slot.c:488
void LWLockReleaseAll(void)
Definition: lwlock.c:1911

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3141 of file walsender.c.

References i, InvalidBackendId, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

3142 {
3143  int i;
3144 
3145  for (i = 0; i < max_wal_senders; i++)
3146  {
3147  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3148  pid_t pid;
3149 
3150  SpinLockAcquire(&walsnd->mutex);
3151  pid = walsnd->pid;
3152  SpinLockRelease(&walsnd->mutex);
3153 
3154  if (pid == 0)
3155  continue;
3156 
3158  }
3159 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:250
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
#define InvalidBackendId
Definition: backendid.h:23
int i

◆ WalSndResourceCleanup()

void WalSndResourceCleanup ( bool  isCommit)

Definition at line 347 of file walsender.c.

References CurrentResourceOwner, DestNone, DestRemote, proc_exit(), RESOURCE_RELEASE_AFTER_LOCKS, RESOURCE_RELEASE_BEFORE_LOCKS, RESOURCE_RELEASE_LOCKS, ResourceOwnerDelete(), ResourceOwnerRelease(), WalSndShutdown(), and whereToSendOutput.

Referenced by perform_base_backup(), and WalSndErrorCleanup().

348 {
349  ResourceOwner resowner;
350 
351  if (CurrentResourceOwner == NULL)
352  return;
353 
354  /*
355  * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
356  * in a local variable and clear it first.
357  */
358  resowner = CurrentResourceOwner;
359  CurrentResourceOwner = NULL;
360 
361  /* Now we can release resources and delete it. */
362  ResourceOwnerRelease(resowner,
363  RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
364  ResourceOwnerRelease(resowner,
365  RESOURCE_RELEASE_LOCKS, isCommit, true);
366  ResourceOwnerRelease(resowner,
367  RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
368  ResourceOwnerDelete(resowner);
369 }
ResourceOwner CurrentResourceOwner
Definition: resowner.c:142
void ResourceOwnerDelete(ResourceOwner owner)
Definition: resowner.c:712
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
Definition: resowner.c:478

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 2992 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

2993 {
2994  int i;
2995 
2996  for (i = 0; i < max_wal_senders; i++)
2997  {
2998  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2999 
3000  SpinLockAcquire(&walsnd->mutex);
3001  if (walsnd->pid == 0)
3002  {
3003  SpinLockRelease(&walsnd->mutex);
3004  continue;
3005  }
3006  walsnd->needreload = true;
3007  SpinLockRelease(&walsnd->mutex);
3008  }
3009 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
bool needreload
int i

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3080 of file walsender.c.

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

3081 {
3082  bool found;
3083  int i;
3084 
3085  WalSndCtl = (WalSndCtlData *)
3086  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3087 
3088  if (!found)
3089  {
3090  /* First time through, so initialize */
3092 
3093  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3095 
3096  for (i = 0; i < max_wal_senders; i++)
3097  {
3098  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3099 
3100  SpinLockInit(&walsnd->mutex);
3101  }
3102  }
3103 }
Size WalSndShmemSize(void)
Definition: walsender.c:3068
#define SpinLockInit(lock)
Definition: spin.h:60
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
#define MemSet(start, val, len)
Definition: c.h:971
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:392
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int max_wal_senders
Definition: walsender.c:121
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3068 of file walsender.c.

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and WalSndShmemInit().

3069 {
3070  Size size = 0;
3071 
3072  size = offsetof(WalSndCtlData, walsnds);
3073  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3074 
3075  return size;
3076 }
int max_wal_senders
Definition: walsender.c:121
Size mul_size(Size s1, Size s2)
Definition: shmem.c:515
Size add_size(Size s1, Size s2)
Definition: shmem.c:498
size_t Size
Definition: c.h:466
#define offsetof(type, field)
Definition: c.h:661

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3049 of file walsender.c.

References die, InitializeTimeouts(), pqsignal(), procsignal_sigusr1_handler(), quickdie(), SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGQUIT, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

3050 {
3051  /* Set up signal handlers */
3053  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3054  pqsignal(SIGTERM, die); /* request shutdown */
3055  pqsignal(SIGQUIT, quickdie); /* hard crash time */
3056  InitializeTimeouts(); /* establishes SIGALRM handler */
3059  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3060  * shutdown */
3061 
3062  /* Reset some signals that are accepted by postmaster but not here */
3064 }
void InitializeTimeouts(void)
Definition: timeout.c:346
#define SIGQUIT
Definition: win32_port.h:154
#define SIGUSR1
Definition: win32_port.h:165
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3037
#define SIGCHLD
Definition: win32_port.h:163
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
#define SIGPIPE
Definition: win32_port.h:158
#define SIGUSR2
Definition: win32_port.h:166
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2857
#define SIGHUP
Definition: win32_port.h:153
#define SIG_IGN
Definition: win32_port.h:150
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
#define SIG_DFL
Definition: win32_port.h:148
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:533
void quickdie(SIGNAL_ARGS)
Definition: postgres.c:2759
#define die(msg)
Definition: pg_test_fsync.c:96

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3167 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

3168 {
3169  for (;;)
3170  {
3171  int i;
3172  bool all_stopped = true;
3173 
3174  for (i = 0; i < max_wal_senders; i++)
3175  {
3176  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3177 
3178  SpinLockAcquire(&walsnd->mutex);
3179 
3180  if (walsnd->pid == 0)
3181  {
3182  SpinLockRelease(&walsnd->mutex);
3183  continue;
3184  }
3185 
3186  if (walsnd->state != WALSNDSTATE_STOPPING)
3187  {
3188  all_stopped = false;
3189  SpinLockRelease(&walsnd->mutex);
3190  break;
3191  }
3192  SpinLockRelease(&walsnd->mutex);
3193  }
3194 
3195  /* safe to leave if confirmation is done for all WAL senders */
3196  if (all_stopped)
3197  return;
3198 
3199  pg_usleep(10000L); /* wait for 10 msec */
3200  }
3201 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
WalSndState state
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

◆ WalSndWakeup()

void WalSndWakeup ( void  )

Definition at line 3112 of file walsender.c.

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

3113 {
3114  int i;
3115 
3116  for (i = 0; i < max_wal_senders; i++)
3117  {
3118  Latch *latch;
3119  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3120 
3121  /*
3122  * Get latch pointer with spinlock held, for the unlikely case that
3123  * pointer reads aren't atomic (as they're 8 bytes).
3124  */
3125  SpinLockAcquire(&walsnd->mutex);
3126  latch = walsnd->latch;
3127  SpinLockRelease(&walsnd->mutex);
3128 
3129  if (latch != NULL)
3130  SetLatch(latch);
3131  }
3132 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
void SetLatch(Latch *latch)
Definition: latch.c:457
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
Definition: latch.h:110
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

bool am_db_walsender

Definition at line 118 of file walsender.c.

Referenced by check_db(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ log_replication_commands

bool log_replication_commands

Definition at line 125 of file walsender.c.

Referenced by exec_replication_command().

◆ max_wal_senders

◆ wake_wal_senders

bool wake_wal_senders

Definition at line 130 of file walsender.c.

◆ wal_sender_timeout

int wal_sender_timeout