PostgreSQL Source Code  git master
walsender.h File Reference
#include <signal.h>
Include dependency graph for walsender.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define WalSndWakeupRequest()    do { wake_wal_senders = true; } while (0)
 
#define WalSndWakeupProcessRequests()
 

Enumerations

enum  CRSSnapshotAction { CRS_EXPORT_SNAPSHOT , CRS_NOEXPORT_SNAPSHOT , CRS_USE_SNAPSHOT }
 

Functions

void InitWalSender (void)
 
bool exec_replication_command (const char *query_string)
 
void WalSndErrorCleanup (void)
 
void WalSndResourceCleanup (bool isCommit)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndRqstFileReload (void)
 

Variables

PGDLLIMPORT bool am_walsender
 
PGDLLIMPORT bool am_cascading_walsender
 
PGDLLIMPORT bool am_db_walsender
 
PGDLLIMPORT bool wake_wal_senders
 
PGDLLIMPORT int max_wal_senders
 
PGDLLIMPORT int wal_sender_timeout
 
PGDLLIMPORT bool log_replication_commands
 

Macro Definition Documentation

◆ WalSndWakeupProcessRequests

#define WalSndWakeupProcessRequests ( )
Value:
do \
{ \
{ \
wake_wal_senders = false; \
if (max_wal_senders > 0) \
WalSndWakeup(); \
} \
} while (0)
PGDLLIMPORT bool wake_wal_senders
Definition: walsender.c:131
PGDLLIMPORT int max_wal_senders
Definition: walsender.c:122

Definition at line 63 of file walsender.h.

◆ WalSndWakeupRequest

#define WalSndWakeupRequest ( )     do { wake_wal_senders = true; } while (0)

Definition at line 57 of file walsender.h.

Enumeration Type Documentation

◆ CRSSnapshotAction

Enumerator
CRS_EXPORT_SNAPSHOT 
CRS_NOEXPORT_SNAPSHOT 
CRS_USE_SNAPSHOT 

Definition at line 20 of file walsender.h.

21 {
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22

Function Documentation

◆ exec_replication_command()

bool exec_replication_command ( const char *  query_string)

Definition at line 1682 of file walsender.c.

1683 {
1684  int parse_rc;
1685  Node *cmd_node;
1686  const char *cmdtag;
1687  MemoryContext cmd_context;
1688  MemoryContext old_context;
1689 
1690  /*
1691  * If WAL sender has been told that shutdown is getting close, switch its
1692  * status accordingly to handle the next replication commands correctly.
1693  */
1694  if (got_STOPPING)
1696 
1697  /*
1698  * Throw error if in stopping mode. We need prevent commands that could
1699  * generate WAL while the shutdown checkpoint is being written. To be
1700  * safe, we just prohibit all new commands.
1701  */
1703  ereport(ERROR,
1704  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1705  errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1706 
1707  /*
1708  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1709  * command arrives. Clean up the old stuff if there's anything.
1710  */
1712 
1714 
1715  /*
1716  * Prepare to parse and execute the command.
1717  */
1719  "Replication command context",
1721  old_context = MemoryContextSwitchTo(cmd_context);
1722 
1723  replication_scanner_init(cmd_string);
1724 
1725  /*
1726  * Is it a WalSender command?
1727  */
1729  {
1730  /* Nope; clean up and get out. */
1732 
1733  MemoryContextSwitchTo(old_context);
1734  MemoryContextDelete(cmd_context);
1735 
1736  /* XXX this is a pretty random place to make this check */
1737  if (MyDatabaseId == InvalidOid)
1738  ereport(ERROR,
1739  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1740  errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1741 
1742  /* Tell the caller that this wasn't a WalSender command. */
1743  return false;
1744  }
1745 
1746  /*
1747  * Looks like a WalSender command, so parse it.
1748  */
1749  parse_rc = replication_yyparse();
1750  if (parse_rc != 0)
1751  ereport(ERROR,
1752  (errcode(ERRCODE_SYNTAX_ERROR),
1753  errmsg_internal("replication command parser returned %d",
1754  parse_rc)));
1756 
1757  cmd_node = replication_parse_result;
1758 
1759  /*
1760  * Report query to various monitoring facilities. For this purpose, we
1761  * report replication commands just like SQL commands.
1762  */
1763  debug_query_string = cmd_string;
1764 
1766 
1767  /*
1768  * Log replication command if log_replication_commands is enabled. Even
1769  * when it's disabled, log the command with DEBUG1 level for backward
1770  * compatibility.
1771  */
1773  (errmsg("received replication command: %s", cmd_string)));
1774 
1775  /*
1776  * Disallow replication commands in aborted transaction blocks.
1777  */
1779  ereport(ERROR,
1780  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1781  errmsg("current transaction is aborted, "
1782  "commands ignored until end of transaction block")));
1783 
1785 
1786  /*
1787  * Allocate buffers that will be used for each outgoing and incoming
1788  * message. We do this just once per command to reduce palloc overhead.
1789  */
1793 
1794  switch (cmd_node->type)
1795  {
1796  case T_IdentifySystemCmd:
1797  cmdtag = "IDENTIFY_SYSTEM";
1798  set_ps_display(cmdtag);
1799  IdentifySystem();
1800  EndReplicationCommand(cmdtag);
1801  break;
1802 
1804  cmdtag = "READ_REPLICATION_SLOT";
1805  set_ps_display(cmdtag);
1807  EndReplicationCommand(cmdtag);
1808  break;
1809 
1810  case T_BaseBackupCmd:
1811  cmdtag = "BASE_BACKUP";
1812  set_ps_display(cmdtag);
1813  PreventInTransactionBlock(true, cmdtag);
1814  SendBaseBackup((BaseBackupCmd *) cmd_node);
1815  EndReplicationCommand(cmdtag);
1816  break;
1817 
1819  cmdtag = "CREATE_REPLICATION_SLOT";
1820  set_ps_display(cmdtag);
1822  EndReplicationCommand(cmdtag);
1823  break;
1824 
1826  cmdtag = "DROP_REPLICATION_SLOT";
1827  set_ps_display(cmdtag);
1829  EndReplicationCommand(cmdtag);
1830  break;
1831 
1832  case T_StartReplicationCmd:
1833  {
1834  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1835 
1836  cmdtag = "START_REPLICATION";
1837  set_ps_display(cmdtag);
1838  PreventInTransactionBlock(true, cmdtag);
1839 
1840  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1841  StartReplication(cmd);
1842  else
1844 
1845  /* dupe, but necessary per libpqrcv_endstreaming */
1846  EndReplicationCommand(cmdtag);
1847 
1848  Assert(xlogreader != NULL);
1849  break;
1850  }
1851 
1852  case T_TimeLineHistoryCmd:
1853  cmdtag = "TIMELINE_HISTORY";
1854  set_ps_display(cmdtag);
1855  PreventInTransactionBlock(true, cmdtag);
1857  EndReplicationCommand(cmdtag);
1858  break;
1859 
1860  case T_VariableShowStmt:
1861  {
1863  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1864 
1865  cmdtag = "SHOW";
1866  set_ps_display(cmdtag);
1867 
1868  /* syscache access needs a transaction environment */
1870  GetPGVariable(n->name, dest);
1872  EndReplicationCommand(cmdtag);
1873  }
1874  break;
1875 
1876  default:
1877  elog(ERROR, "unrecognized replication command node tag: %u",
1878  cmd_node->type);
1879  }
1880 
1881  /* done */
1882  MemoryContextSwitchTo(old_context);
1883  MemoryContextDelete(cmd_context);
1884 
1885  /*
1886  * We need not update ps display or pg_stat_activity, because PostgresMain
1887  * will reset those to "idle". But we must reset debug_query_string to
1888  * ensure it doesn't become a dangling pointer.
1889  */
1890  debug_query_string = NULL;
1891 
1892  return true;
1893 }
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:948
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:221
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
@ DestRemoteSimple
Definition: dest.h:93
int errmsg_internal(const char *fmt,...)
Definition: elog.c:991
int errcode(int sqlerrcode)
Definition: elog.c:693
int errmsg(const char *fmt,...)
Definition: elog.c:904
#define LOG
Definition: elog.h:25
#define DEBUG1
Definition: elog.h:24
#define ERROR
Definition: elog.h:33
#define elog(elevel,...)
Definition: elog.h:218
#define ereport(elevel,...)
Definition: elog.h:143
Oid MyDatabaseId
Definition: globals.c:89
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:9666
Assert(fmt[strlen(fmt) - 1] !='\n')
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:218
#define AllocSetContextCreate
Definition: memutils.h:173
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:197
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
@ T_IdentifySystemCmd
Definition: nodes.h:531
@ T_BaseBackupCmd
Definition: nodes.h:532
@ T_VariableShowStmt
Definition: nodes.h:376
@ T_ReadReplicationSlotCmd
Definition: nodes.h:535
@ T_DropReplicationSlotCmd
Definition: nodes.h:534
@ T_TimeLineHistoryCmd
Definition: nodes.h:537
@ T_StartReplicationCmd
Definition: nodes.h:536
@ T_CreateReplicationSlotCmd
Definition: nodes.h:533
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
const char * debug_query_string
Definition: postgres.c:89
#define InvalidOid
Definition: postgres_ext.h:36
void set_ps_display(const char *activity)
Definition: ps_status.c:349
@ REPLICATION_KIND_PHYSICAL
Definition: replnodes.h:22
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:683
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
Definition: nodes.h:574
NodeTag type
Definition: nodes.h:575
ReplicationKind kind
Definition: replnodes.h:82
WalSndState state
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:580
WalSnd * MyWalSnd
Definition: walsender.c:113
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition: walsender.c:479
static StringInfoData tmpbuf
Definition: walsender.c:160
static void IdentifySystem(void)
Definition: walsender.c:395
static StringInfoData reply_message
Definition: walsender.c:159
void WalSndSetState(WalSndState state)
Definition: walsender.c:3418
static StringInfoData output_message
Definition: walsender.c:158
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:188
bool log_replication_commands
Definition: walsender.c:126
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1047
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1257
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1247
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:686
static XLogReaderState * xlogreader
Definition: walsender.c:138
PGDLLIMPORT Node * replication_parse_result
void replication_scanner_init(const char *query_string)
@ WALSNDSTATE_STOPPING
void replication_scanner_finish(void)
int replication_yyparse(void)
bool replication_scanner_is_replication_command(void)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3462
void StartTransactionCommand(void)
Definition: xact.c:2925
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:394
void CommitTransactionCommand(void)
Definition: xact.c:3022

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert(), CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, MyWalSnd, VariableShowStmt::name, output_message, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_finish(), replication_scanner_init(), replication_scanner_is_replication_command(), replication_yyparse(), reply_message, SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, T_IdentifySystemCmd, T_ReadReplicationSlotCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_VariableShowStmt, tmpbuf, Node::type, ReadReplicationSlotCmd::type, WalSndSetState(), WALSNDSTATE_STOPPING, and xlogreader.

Referenced by PostgresMain().

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3212 of file walsender.c.

3213 {
3215 
3216  /*
3217  * If replication has not yet started, die like with SIGTERM. If
3218  * replication is active, only set a flag and wake up the main loop. It
3219  * will send any outstanding WAL, wait for it to be replicated to the
3220  * standby, and then exit gracefully.
3221  */
3222  if (!replication_active)
3223  kill(MyProcPid, SIGTERM);
3224  else
3225  got_STOPPING = true;
3226 }
int MyProcPid
Definition: globals.c:44
bool am_walsender
Definition: walsender.c:116
static volatile sig_atomic_t replication_active
Definition: walsender.c:196
#define kill(pid, sig)
Definition: win32_port.h:464

References am_walsender, Assert(), got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

◆ InitWalSender()

void InitWalSender ( void  )

Referenced by PostgresMain().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 315 of file walsender.c.

316 {
320 
321  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
323 
324  if (MyReplicationSlot != NULL)
326 
328 
329  replication_active = false;
330 
331  /*
332  * If there is a transaction in progress, it will clean up our
333  * ResourceOwner, but if a replication command set up a resource owner
334  * without a transaction, we've got to clean that up now.
335  */
337  WalSndResourceCleanup(false);
338 
339  if (got_STOPPING || got_SIGUSR2)
340  proc_exit(0);
341 
342  /* Revert back to startup state */
344 }
void ConditionVariableCancelSleep(void)
void proc_exit(int code)
Definition: ipc.c:104
void LWLockReleaseAll(void)
Definition: lwlock.c:1899
void ReplicationSlotCleanup(void)
Definition: slot.c:578
ReplicationSlot * MyReplicationSlot
Definition: slot.c:97
void ReplicationSlotRelease(void)
Definition: slot.c:522
WALOpenSegment seg
Definition: xlogreader.h:272
static void pgstat_report_wait_end(void)
Definition: wait_event.h:282
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:187
void WalSndResourceCleanup(bool isCommit)
Definition: walsender.c:350
@ WALSNDSTATE_STARTUP
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4784
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:856

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndResourceCleanup(), WalSndSetState(), WALSNDSTATE_STARTUP, WALOpenSegment::ws_file, and xlogreader.

Referenced by PostgresMain().

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3354 of file walsender.c.

3355 {
3356  int i;
3357 
3358  for (i = 0; i < max_wal_senders; i++)
3359  {
3360  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3361  pid_t pid;
3362 
3363  SpinLockAcquire(&walsnd->mutex);
3364  pid = walsnd->pid;
3365  SpinLockRelease(&walsnd->mutex);
3366 
3367  if (pid == 0)
3368  continue;
3369 
3371  }
3372 }
#define InvalidBackendId
Definition: backendid.h:23
int i
Definition: isn.c:73
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:261
@ PROCSIG_WALSND_INIT_STOPPING
Definition: procsignal.h:35
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
int max_wal_senders
Definition: walsender.c:122
WalSndCtlData * WalSndCtl
Definition: walsender.c:110

References i, InvalidBackendId, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

◆ WalSndResourceCleanup()

void WalSndResourceCleanup ( bool  isCommit)

Definition at line 350 of file walsender.c.

351 {
352  ResourceOwner resowner;
353 
354  if (CurrentResourceOwner == NULL)
355  return;
356 
357  /*
358  * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
359  * in a local variable and clear it first.
360  */
361  resowner = CurrentResourceOwner;
362  CurrentResourceOwner = NULL;
363 
364  /* Now we can release resources and delete it. */
365  ResourceOwnerRelease(resowner,
366  RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
367  ResourceOwnerRelease(resowner,
368  RESOURCE_RELEASE_LOCKS, isCommit, true);
369  ResourceOwnerRelease(resowner,
370  RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
371  ResourceOwnerDelete(resowner);
372 }
ResourceOwner CurrentResourceOwner
Definition: resowner.c:146
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
Definition: resowner.c:486
void ResourceOwnerDelete(ResourceOwner owner)
Definition: resowner.c:737
@ RESOURCE_RELEASE_LOCKS
Definition: resowner.h:49
@ RESOURCE_RELEASE_BEFORE_LOCKS
Definition: resowner.h:48
@ RESOURCE_RELEASE_AFTER_LOCKS
Definition: resowner.h:50

References CurrentResourceOwner, RESOURCE_RELEASE_AFTER_LOCKS, RESOURCE_RELEASE_BEFORE_LOCKS, RESOURCE_RELEASE_LOCKS, ResourceOwnerDelete(), and ResourceOwnerRelease().

Referenced by perform_base_backup(), and WalSndErrorCleanup().

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3189 of file walsender.c.

3190 {
3191  int i;
3192 
3193  for (i = 0; i < max_wal_senders; i++)
3194  {
3195  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3196 
3197  SpinLockAcquire(&walsnd->mutex);
3198  if (walsnd->pid == 0)
3199  {
3200  SpinLockRelease(&walsnd->mutex);
3201  continue;
3202  }
3203  walsnd->needreload = true;
3204  SpinLockRelease(&walsnd->mutex);
3205  }
3206 }
bool needreload

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3277 of file walsender.c.

3278 {
3279  bool found;
3280  int i;
3281 
3282  WalSndCtl = (WalSndCtlData *)
3283  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3284 
3285  if (!found)
3286  {
3287  /* First time through, so initialize */
3289 
3290  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3292 
3293  for (i = 0; i < max_wal_senders; i++)
3294  {
3295  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3296 
3297  SpinLockInit(&walsnd->mutex);
3298  }
3299  }
3300 }
#define MemSet(start, val, len)
Definition: c.h:1008
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
#define SpinLockInit(lock)
Definition: spin.h:60
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28
Size WalSndShmemSize(void)
Definition: walsender.c:3265

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtl, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3265 of file walsender.c.

3266 {
3267  Size size = 0;
3268 
3269  size = offsetof(WalSndCtlData, walsnds);
3270  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3271 
3272  return size;
3273 }
#define offsetof(type, field)
Definition: c.h:727
size_t Size
Definition: c.h:540
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CalculateShmemSize(), and WalSndShmemInit().

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3246 of file walsender.c.

3247 {
3248  /* Set up signal handlers */
3250  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3251  pqsignal(SIGTERM, die); /* request shutdown */
3252  /* SIGQUIT handler was already set up by InitPostmasterChild */
3253  InitializeTimeouts(); /* establishes SIGALRM handler */
3256  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3257  * shutdown */
3258 
3259  /* Reset some signals that are accepted by postmaster but not here */
3261 }
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
#define die(msg)
Definition: pg_test_fsync.c:95
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2962
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:638
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:180
void InitializeTimeouts(void)
Definition: timeout.c:474
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3234
#define SIGCHLD
Definition: win32_port.h:177
#define SIGHUP
Definition: win32_port.h:167
#define SIG_DFL
Definition: win32_port.h:162
#define SIGPIPE
Definition: win32_port.h:172
#define SIGUSR1
Definition: win32_port.h:179
#define SIGUSR2
Definition: win32_port.h:180
#define SIG_IGN
Definition: win32_port.h:164

References die, InitializeTimeouts(), pqsignal(), procsignal_sigusr1_handler(), SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3380 of file walsender.c.

3381 {
3382  for (;;)
3383  {
3384  int i;
3385  bool all_stopped = true;
3386 
3387  for (i = 0; i < max_wal_senders; i++)
3388  {
3389  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3390 
3391  SpinLockAcquire(&walsnd->mutex);
3392 
3393  if (walsnd->pid == 0)
3394  {
3395  SpinLockRelease(&walsnd->mutex);
3396  continue;
3397  }
3398 
3399  if (walsnd->state != WALSNDSTATE_STOPPING)
3400  {
3401  all_stopped = false;
3402  SpinLockRelease(&walsnd->mutex);
3403  break;
3404  }
3405  SpinLockRelease(&walsnd->mutex);
3406  }
3407 
3408  /* safe to leave if confirmation is done for all WAL senders */
3409  if (all_stopped)
3410  return;
3411 
3412  pg_usleep(10000L); /* wait for 10 msec */
3413  }
3414 }
void pg_usleep(long microsec)
Definition: signal.c:53

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtl, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

◆ WalSndWakeup()

void WalSndWakeup ( void  )

Definition at line 3309 of file walsender.c.

3310 {
3311  int i;
3312 
3313  for (i = 0; i < max_wal_senders; i++)
3314  {
3315  Latch *latch;
3316  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3317 
3318  /*
3319  * Get latch pointer with spinlock held, for the unlikely case that
3320  * pointer reads aren't atomic (as they're 8 bytes).
3321  */
3322  SpinLockAcquire(&walsnd->mutex);
3323  latch = walsnd->latch;
3324  SpinLockRelease(&walsnd->mutex);
3325 
3326  if (latch != NULL)
3327  SetLatch(latch);
3328  }
3329 }
void SetLatch(Latch *latch)
Definition: latch.c:566
Definition: latch.h:111
Latch * latch

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ApplyWalRecord(), KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

PGDLLIMPORT bool am_db_walsender
extern

Definition at line 119 of file walsender.c.

Referenced by check_db(), ClientAuthentication(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ log_replication_commands

PGDLLIMPORT bool log_replication_commands
extern

Definition at line 126 of file walsender.c.

Referenced by exec_replication_command().

◆ max_wal_senders

◆ wake_wal_senders

PGDLLIMPORT bool wake_wal_senders
extern

Definition at line 131 of file walsender.c.

◆ wal_sender_timeout