PostgreSQL Source Code  git master
walsender.h File Reference
#include <signal.h>
Include dependency graph for walsender.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define WalSndWakeupRequest()   do { wake_wal_senders = true; } while (0)
 
#define WalSndWakeupProcessRequests()
 

Enumerations

enum  CRSSnapshotAction { CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT }
 

Functions

void InitWalSender (void)
 
bool exec_replication_command (const char *query_string)
 
void WalSndErrorCleanup (void)
 
void WalSndResourceCleanup (bool isCommit)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndRqstFileReload (void)
 

Variables

bool am_walsender
 
bool am_cascading_walsender
 
bool am_db_walsender
 
bool wake_wal_senders
 
int max_wal_senders
 
int wal_sender_timeout
 
bool log_replication_commands
 

Macro Definition Documentation

◆ WalSndWakeupProcessRequests

#define WalSndWakeupProcessRequests ( )
Value:
do \
{ \
{ \
wake_wal_senders = false; \
if (max_wal_senders > 0) \
WalSndWakeup(); \
} \
} while (0)
int max_wal_senders
Definition: walsender.c:121
bool wake_wal_senders
Definition: walsender.c:130

Definition at line 63 of file walsender.h.

Referenced by XLogBackgroundFlush(), and XLogFlush().

◆ WalSndWakeupRequest

#define WalSndWakeupRequest ( )    do { wake_wal_senders = true; } while (0)

Definition at line 57 of file walsender.h.

Referenced by XLogWrite().

Enumeration Type Documentation

◆ CRSSnapshotAction

Enumerator
CRS_EXPORT_SNAPSHOT 
CRS_NOEXPORT_SNAPSHOT 
CRS_USE_SNAPSHOT 

Definition at line 20 of file walsender.h.

Function Documentation

◆ exec_replication_command()

bool exec_replication_command ( const char *  query_string)

Definition at line 1512 of file walsender.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsA, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, VariableShowStmt::name, pgstat_report_activity(), PreventInTransactionBlock(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_finish(), replication_scanner_init(), replication_yyparse(), SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, T_IdentifySystemCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_VariableShowStmt, TimeLineHistoryCmd::type, Node::type, WalSndSetState(), and WALSNDSTATE_STOPPING.

Referenced by PostgresMain().

1513 {
1514  int parse_rc;
1515  Node *cmd_node;
1516  const char *cmdtag;
1517  MemoryContext cmd_context;
1518  MemoryContext old_context;
1519 
1520  /*
1521  * If WAL sender has been told that shutdown is getting close, switch its
1522  * status accordingly to handle the next replication commands correctly.
1523  */
1524  if (got_STOPPING)
1526 
1527  /*
1528  * Throw error if in stopping mode. We need prevent commands that could
1529  * generate WAL while the shutdown checkpoint is being written. To be
1530  * safe, we just prohibit all new commands.
1531  */
1533  ereport(ERROR,
1534  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1535 
1536  /*
1537  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1538  * command arrives. Clean up the old stuff if there's anything.
1539  */
1541 
1543 
1544  /*
1545  * Parse the command.
1546  */
1548  "Replication command context",
1550  old_context = MemoryContextSwitchTo(cmd_context);
1551 
1552  replication_scanner_init(cmd_string);
1553  parse_rc = replication_yyparse();
1554  if (parse_rc != 0)
1555  ereport(ERROR,
1556  (errcode(ERRCODE_SYNTAX_ERROR),
1557  errmsg_internal("replication command parser returned %d",
1558  parse_rc)));
1560 
1561  cmd_node = replication_parse_result;
1562 
1563  /*
1564  * If it's a SQL command, just clean up our mess and return false; the
1565  * caller will take care of executing it.
1566  */
1567  if (IsA(cmd_node, SQLCmd))
1568  {
1569  if (MyDatabaseId == InvalidOid)
1570  ereport(ERROR,
1571  (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1572 
1573  MemoryContextSwitchTo(old_context);
1574  MemoryContextDelete(cmd_context);
1575 
1576  /* Tell the caller that this wasn't a WalSender command. */
1577  return false;
1578  }
1579 
1580  /*
1581  * Report query to various monitoring facilities. For this purpose, we
1582  * report replication commands just like SQL commands.
1583  */
1584  debug_query_string = cmd_string;
1585 
1587 
1588  /*
1589  * Log replication command if log_replication_commands is enabled. Even
1590  * when it's disabled, log the command with DEBUG1 level for backward
1591  * compatibility.
1592  */
1594  (errmsg("received replication command: %s", cmd_string)));
1595 
1596  /*
1597  * Disallow replication commands in aborted transaction blocks.
1598  */
1600  ereport(ERROR,
1601  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1602  errmsg("current transaction is aborted, "
1603  "commands ignored until end of transaction block")));
1604 
1606 
1607  /*
1608  * Allocate buffers that will be used for each outgoing and incoming
1609  * message. We do this just once per command to reduce palloc overhead.
1610  */
1614 
1615  switch (cmd_node->type)
1616  {
1617  case T_IdentifySystemCmd:
1618  cmdtag = "IDENTIFY_SYSTEM";
1619  set_ps_display(cmdtag);
1620  IdentifySystem();
1621  EndReplicationCommand(cmdtag);
1622  break;
1623 
1624  case T_BaseBackupCmd:
1625  cmdtag = "BASE_BACKUP";
1626  set_ps_display(cmdtag);
1627  PreventInTransactionBlock(true, cmdtag);
1628  SendBaseBackup((BaseBackupCmd *) cmd_node);
1629  EndReplicationCommand(cmdtag);
1630  break;
1631 
1633  cmdtag = "CREATE_REPLICATION_SLOT";
1634  set_ps_display(cmdtag);
1636  EndReplicationCommand(cmdtag);
1637  break;
1638 
1640  cmdtag = "DROP_REPLICATION_SLOT";
1641  set_ps_display(cmdtag);
1643  EndReplicationCommand(cmdtag);
1644  break;
1645 
1646  case T_StartReplicationCmd:
1647  {
1648  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1649 
1650  cmdtag = "START_REPLICATION";
1651  set_ps_display(cmdtag);
1652  PreventInTransactionBlock(true, cmdtag);
1653 
1654  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1655  StartReplication(cmd);
1656  else
1658 
1659  /* dupe, but necessary per libpqrcv_endstreaming */
1660  EndReplicationCommand(cmdtag);
1661 
1662  Assert(xlogreader != NULL);
1663  break;
1664  }
1665 
1666  case T_TimeLineHistoryCmd:
1667  cmdtag = "TIMELINE_HISTORY";
1668  set_ps_display(cmdtag);
1669  PreventInTransactionBlock(true, cmdtag);
1671  EndReplicationCommand(cmdtag);
1672  break;
1673 
1674  case T_VariableShowStmt:
1675  {
1677  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1678 
1679  cmdtag = "SHOW";
1680  set_ps_display(cmdtag);
1681 
1682  /* syscache access needs a transaction environment */
1684  GetPGVariable(n->name, dest);
1686  EndReplicationCommand(cmdtag);
1687  }
1688  break;
1689 
1690  default:
1691  elog(ERROR, "unrecognized replication command node tag: %u",
1692  cmd_node->type);
1693  }
1694 
1695  /* done */
1696  MemoryContextSwitchTo(old_context);
1697  MemoryContextDelete(cmd_context);
1698 
1699  /*
1700  * We need not update ps display or pg_stat_activity, because PostgresMain
1701  * will reset those to "idle". But we must reset debug_query_string to
1702  * ensure it doesn't become a dangling pointer.
1703  */
1704  debug_query_string = NULL;
1705 
1706  return true;
1707 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:579
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:212
#define AllocSetContextCreate
Definition: memutils.h:170
#define DEBUG1
Definition: elog.h:25
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3355
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:464
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1123
void CommitTransactionCommand(void)
Definition: xact.c:2948
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:391
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:528
static StringInfoData output_message
Definition: walsender.c:157
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
int errcode(int sqlerrcode)
Definition: elog.c:704
#define LOG
Definition: elog.h:26
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:9130
Node * replication_parse_result
void set_ps_display(const char *activity)
Definition: ps_status.c:349
ReplicationKind kind
Definition: replnodes.h:82
#define ERROR
Definition: elog.h:45
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:932
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
WalSndState state
NodeTag type
Definition: nodes.h:530
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:570
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3381
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static XLogReaderState * xlogreader
Definition: walsender.c:137
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:221
int replication_yyparse(void)
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
const char * debug_query_string
Definition: postgres.c:89
static StringInfoData reply_message
Definition: walsender.c:158
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1133
Oid MyDatabaseId
Definition: globals.c:86
WalSnd * MyWalSnd
Definition: walsender.c:112
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:155
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1002
#define Assert(condition)
Definition: c.h:792
void WalSndSetState(WalSndState state)
Definition: walsender.c:3201
void StartTransactionCommand(void)
Definition: xact.c:2847
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:921
static StringInfoData tmpbuf
Definition: walsender.c:159
bool log_replication_commands
Definition: walsender.c:125
int errmsg(const char *fmt,...)
Definition: elog.c:915
#define elog(elevel,...)
Definition: elog.h:228
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:100
static void IdentifySystem(void)
Definition: walsender.c:375
void replication_scanner_finish(void)
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:693
void replication_scanner_init(const char *query_string)

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3011 of file walsender.c.

References am_walsender, Assert, got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

3012 {
3014 
3015  /*
3016  * If replication has not yet started, die like with SIGTERM. If
3017  * replication is active, only set a flag and wake up the main loop. It
3018  * will send any outstanding WAL, wait for it to be replicated to the
3019  * standby, and then exit gracefully.
3020  */
3021  if (!replication_active)
3022  kill(MyProcPid, SIGTERM);
3023  else
3024  got_STOPPING = true;
3025 }
int MyProcPid
Definition: globals.c:41
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
#define kill(pid, sig)
Definition: win32_port.h:454
bool am_walsender
Definition: walsender.c:115
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
#define Assert(condition)
Definition: c.h:792

◆ InitWalSender()

void InitWalSender ( void  )

Referenced by PostgresMain().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 295 of file walsender.c.

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndResourceCleanup(), WalSndSetState(), WALSNDSTATE_STARTUP, and WALOpenSegment::ws_file.

Referenced by PostgresMain().

296 {
300 
301  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
303 
304  if (MyReplicationSlot != NULL)
306 
308 
309  replication_active = false;
310 
311  /*
312  * If there is a transaction in progress, it will clean up our
313  * ResourceOwner, but if a replication command set up a resource owner
314  * without a transaction, we've got to clean that up now.
315  */
317  WalSndResourceCleanup(false);
318 
319  if (got_STOPPING || got_SIGUSR2)
320  proc_exit(0);
321 
322  /* Revert back to startup state */
324 }
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:811
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:104
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4703
WALOpenSegment seg
Definition: xlogreader.h:215
void ConditionVariableCancelSleep(void)
void ReplicationSlotRelease(void)
Definition: slot.c:484
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1512
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
static XLogReaderState * xlogreader
Definition: walsender.c:137
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
void WalSndResourceCleanup(bool isCommit)
Definition: walsender.c:330
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
void WalSndSetState(WalSndState state)
Definition: walsender.c:3201
void ReplicationSlotCleanup(void)
Definition: slot.c:540
void LWLockReleaseAll(void)
Definition: lwlock.c:1907

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3137 of file walsender.c.

References i, InvalidBackendId, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

3138 {
3139  int i;
3140 
3141  for (i = 0; i < max_wal_senders; i++)
3142  {
3143  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3144  pid_t pid;
3145 
3146  SpinLockAcquire(&walsnd->mutex);
3147  pid = walsnd->pid;
3148  SpinLockRelease(&walsnd->mutex);
3149 
3150  if (pid == 0)
3151  continue;
3152 
3154  }
3155 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:256
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
#define InvalidBackendId
Definition: backendid.h:23
int i

◆ WalSndResourceCleanup()

void WalSndResourceCleanup ( bool  isCommit)

Definition at line 330 of file walsender.c.

References CurrentResourceOwner, DestNone, DestRemote, proc_exit(), RESOURCE_RELEASE_AFTER_LOCKS, RESOURCE_RELEASE_BEFORE_LOCKS, RESOURCE_RELEASE_LOCKS, ResourceOwnerDelete(), ResourceOwnerRelease(), WalSndShutdown(), and whereToSendOutput.

Referenced by perform_base_backup(), and WalSndErrorCleanup().

331 {
332  ResourceOwner resowner;
333 
334  if (CurrentResourceOwner == NULL)
335  return;
336 
337  /*
338  * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
339  * in a local variable and clear it first.
340  */
341  resowner = CurrentResourceOwner;
342  CurrentResourceOwner = NULL;
343 
344  /* Now we can release resources and delete it. */
345  ResourceOwnerRelease(resowner,
346  RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
347  ResourceOwnerRelease(resowner,
348  RESOURCE_RELEASE_LOCKS, isCommit, true);
349  ResourceOwnerRelease(resowner,
350  RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
351  ResourceOwnerDelete(resowner);
352 }
ResourceOwner CurrentResourceOwner
Definition: resowner.c:144
void ResourceOwnerDelete(ResourceOwner owner)
Definition: resowner.c:727
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
Definition: resowner.c:482

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 2988 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

2989 {
2990  int i;
2991 
2992  for (i = 0; i < max_wal_senders; i++)
2993  {
2994  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2995 
2996  SpinLockAcquire(&walsnd->mutex);
2997  if (walsnd->pid == 0)
2998  {
2999  SpinLockRelease(&walsnd->mutex);
3000  continue;
3001  }
3002  walsnd->needreload = true;
3003  SpinLockRelease(&walsnd->mutex);
3004  }
3005 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
bool needreload
int i

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3076 of file walsender.c.

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

3077 {
3078  bool found;
3079  int i;
3080 
3081  WalSndCtl = (WalSndCtlData *)
3082  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3083 
3084  if (!found)
3085  {
3086  /* First time through, so initialize */
3088 
3089  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3091 
3092  for (i = 0; i < max_wal_senders; i++)
3093  {
3094  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3095 
3096  SpinLockInit(&walsnd->mutex);
3097  }
3098  }
3099 }
Size WalSndShmemSize(void)
Definition: walsender.c:3064
#define SpinLockInit(lock)
Definition: spin.h:60
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
#define MemSet(start, val, len)
Definition: c.h:996
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int max_wal_senders
Definition: walsender.c:121
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3064 of file walsender.c.

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and WalSndShmemInit().

3065 {
3066  Size size = 0;
3067 
3068  size = offsetof(WalSndCtlData, walsnds);
3069  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3070 
3071  return size;
3072 }
int max_wal_senders
Definition: walsender.c:121
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
size_t Size
Definition: c.h:528
#define offsetof(type, field)
Definition: c.h:715

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3045 of file walsender.c.

References die, InitializeTimeouts(), pqsignal(), procsignal_sigusr1_handler(), SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

3046 {
3047  /* Set up signal handlers */
3049  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3050  pqsignal(SIGTERM, die); /* request shutdown */
3051  /* SIGQUIT handler was already set up by InitPostmasterChild */
3052  InitializeTimeouts(); /* establishes SIGALRM handler */
3055  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3056  * shutdown */
3057 
3058  /* Reset some signals that are accepted by postmaster but not here */
3060 }
void InitializeTimeouts(void)
Definition: timeout.c:435
#define SIGUSR1
Definition: win32_port.h:171
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3033
#define SIGCHLD
Definition: win32_port.h:169
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
#define SIGPIPE
Definition: win32_port.h:164
#define SIGUSR2
Definition: win32_port.h:172
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2891
#define SIGHUP
Definition: win32_port.h:159
#define SIG_IGN
Definition: win32_port.h:156
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
#define SIG_DFL
Definition: win32_port.h:154
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:652
#define die(msg)
Definition: pg_test_fsync.c:97

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3163 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

3164 {
3165  for (;;)
3166  {
3167  int i;
3168  bool all_stopped = true;
3169 
3170  for (i = 0; i < max_wal_senders; i++)
3171  {
3172  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3173 
3174  SpinLockAcquire(&walsnd->mutex);
3175 
3176  if (walsnd->pid == 0)
3177  {
3178  SpinLockRelease(&walsnd->mutex);
3179  continue;
3180  }
3181 
3182  if (walsnd->state != WALSNDSTATE_STOPPING)
3183  {
3184  all_stopped = false;
3185  SpinLockRelease(&walsnd->mutex);
3186  break;
3187  }
3188  SpinLockRelease(&walsnd->mutex);
3189  }
3190 
3191  /* safe to leave if confirmation is done for all WAL senders */
3192  if (all_stopped)
3193  return;
3194 
3195  pg_usleep(10000L); /* wait for 10 msec */
3196  }
3197 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
WalSndState state
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

◆ WalSndWakeup()

void WalSndWakeup ( void  )

Definition at line 3108 of file walsender.c.

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

3109 {
3110  int i;
3111 
3112  for (i = 0; i < max_wal_senders; i++)
3113  {
3114  Latch *latch;
3115  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3116 
3117  /*
3118  * Get latch pointer with spinlock held, for the unlikely case that
3119  * pointer reads aren't atomic (as they're 8 bytes).
3120  */
3121  SpinLockAcquire(&walsnd->mutex);
3122  latch = walsnd->latch;
3123  SpinLockRelease(&walsnd->mutex);
3124 
3125  if (latch != NULL)
3126  SetLatch(latch);
3127  }
3128 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
void SetLatch(Latch *latch)
Definition: latch.c:505
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
Definition: latch.h:110
int max_wal_senders
Definition: walsender.c:121
#define SpinLockRelease(lock)
Definition: spin.h:64
int i

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

bool am_db_walsender

Definition at line 118 of file walsender.c.

Referenced by check_db(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ log_replication_commands

bool log_replication_commands

Definition at line 125 of file walsender.c.

Referenced by exec_replication_command().

◆ max_wal_senders

◆ wake_wal_senders

bool wake_wal_senders

Definition at line 130 of file walsender.c.

◆ wal_sender_timeout

int wal_sender_timeout