PostgreSQL Source Code  git master
walsender.h File Reference
#include <signal.h>
Include dependency graph for walsender.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define WalSndWakeupRequest()    do { wake_wal_senders = true; } while (0)
 
#define WalSndWakeupProcessRequests()
 

Enumerations

enum  CRSSnapshotAction { CRS_EXPORT_SNAPSHOT , CRS_NOEXPORT_SNAPSHOT , CRS_USE_SNAPSHOT }
 

Functions

void InitWalSender (void)
 
bool exec_replication_command (const char *cmd_string)
 
void WalSndErrorCleanup (void)
 
void WalSndResourceCleanup (bool isCommit)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndRqstFileReload (void)
 

Variables

PGDLLIMPORT bool am_walsender
 
PGDLLIMPORT bool am_cascading_walsender
 
PGDLLIMPORT bool am_db_walsender
 
PGDLLIMPORT bool wake_wal_senders
 
PGDLLIMPORT int max_wal_senders
 
PGDLLIMPORT int wal_sender_timeout
 
PGDLLIMPORT bool log_replication_commands
 

Macro Definition Documentation

◆ WalSndWakeupProcessRequests

#define WalSndWakeupProcessRequests ( )
Value:
do \
{ \
{ \
wake_wal_senders = false; \
if (max_wal_senders > 0) \
WalSndWakeup(); \
} \
} while (0)
PGDLLIMPORT bool wake_wal_senders
Definition: walsender.c:131
PGDLLIMPORT int max_wal_senders
Definition: walsender.c:122

Definition at line 63 of file walsender.h.

◆ WalSndWakeupRequest

#define WalSndWakeupRequest ( )     do { wake_wal_senders = true; } while (0)

Definition at line 57 of file walsender.h.

Enumeration Type Documentation

◆ CRSSnapshotAction

Enumerator
CRS_EXPORT_SNAPSHOT 
CRS_NOEXPORT_SNAPSHOT 
CRS_USE_SNAPSHOT 

Definition at line 20 of file walsender.h.

21 {
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22

Function Documentation

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1669 of file walsender.c.

1670 {
1671  int parse_rc;
1672  Node *cmd_node;
1673  const char *cmdtag;
1674  MemoryContext cmd_context;
1675  MemoryContext old_context;
1676 
1677  /*
1678  * If WAL sender has been told that shutdown is getting close, switch its
1679  * status accordingly to handle the next replication commands correctly.
1680  */
1681  if (got_STOPPING)
1683 
1684  /*
1685  * Throw error if in stopping mode. We need prevent commands that could
1686  * generate WAL while the shutdown checkpoint is being written. To be
1687  * safe, we just prohibit all new commands.
1688  */
1690  ereport(ERROR,
1691  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1692  errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1693 
1694  /*
1695  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1696  * command arrives. Clean up the old stuff if there's anything.
1697  */
1699 
1701 
1702  /*
1703  * Prepare to parse and execute the command.
1704  */
1706  "Replication command context",
1708  old_context = MemoryContextSwitchTo(cmd_context);
1709 
1710  replication_scanner_init(cmd_string);
1711 
1712  /*
1713  * Is it a WalSender command?
1714  */
1716  {
1717  /* Nope; clean up and get out. */
1719 
1720  MemoryContextSwitchTo(old_context);
1721  MemoryContextDelete(cmd_context);
1722 
1723  /* XXX this is a pretty random place to make this check */
1724  if (MyDatabaseId == InvalidOid)
1725  ereport(ERROR,
1726  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1727  errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1728 
1729  /* Tell the caller that this wasn't a WalSender command. */
1730  return false;
1731  }
1732 
1733  /*
1734  * Looks like a WalSender command, so parse it.
1735  */
1736  parse_rc = replication_yyparse();
1737  if (parse_rc != 0)
1738  ereport(ERROR,
1739  (errcode(ERRCODE_SYNTAX_ERROR),
1740  errmsg_internal("replication command parser returned %d",
1741  parse_rc)));
1743 
1744  cmd_node = replication_parse_result;
1745 
1746  /*
1747  * Report query to various monitoring facilities. For this purpose, we
1748  * report replication commands just like SQL commands.
1749  */
1750  debug_query_string = cmd_string;
1751 
1753 
1754  /*
1755  * Log replication command if log_replication_commands is enabled. Even
1756  * when it's disabled, log the command with DEBUG1 level for backward
1757  * compatibility.
1758  */
1760  (errmsg("received replication command: %s", cmd_string)));
1761 
1762  /*
1763  * Disallow replication commands in aborted transaction blocks.
1764  */
1766  ereport(ERROR,
1767  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1768  errmsg("current transaction is aborted, "
1769  "commands ignored until end of transaction block")));
1770 
1772 
1773  /*
1774  * Allocate buffers that will be used for each outgoing and incoming
1775  * message. We do this just once per command to reduce palloc overhead.
1776  */
1780 
1781  switch (cmd_node->type)
1782  {
1783  case T_IdentifySystemCmd:
1784  cmdtag = "IDENTIFY_SYSTEM";
1785  set_ps_display(cmdtag);
1786  IdentifySystem();
1787  EndReplicationCommand(cmdtag);
1788  break;
1789 
1790  case T_ReadReplicationSlotCmd:
1791  cmdtag = "READ_REPLICATION_SLOT";
1792  set_ps_display(cmdtag);
1794  EndReplicationCommand(cmdtag);
1795  break;
1796 
1797  case T_BaseBackupCmd:
1798  cmdtag = "BASE_BACKUP";
1799  set_ps_display(cmdtag);
1800  PreventInTransactionBlock(true, cmdtag);
1801  SendBaseBackup((BaseBackupCmd *) cmd_node);
1802  EndReplicationCommand(cmdtag);
1803  break;
1804 
1805  case T_CreateReplicationSlotCmd:
1806  cmdtag = "CREATE_REPLICATION_SLOT";
1807  set_ps_display(cmdtag);
1809  EndReplicationCommand(cmdtag);
1810  break;
1811 
1812  case T_DropReplicationSlotCmd:
1813  cmdtag = "DROP_REPLICATION_SLOT";
1814  set_ps_display(cmdtag);
1816  EndReplicationCommand(cmdtag);
1817  break;
1818 
1819  case T_StartReplicationCmd:
1820  {
1821  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1822 
1823  cmdtag = "START_REPLICATION";
1824  set_ps_display(cmdtag);
1825  PreventInTransactionBlock(true, cmdtag);
1826 
1827  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1828  StartReplication(cmd);
1829  else
1831 
1832  /* dupe, but necessary per libpqrcv_endstreaming */
1833  EndReplicationCommand(cmdtag);
1834 
1835  Assert(xlogreader != NULL);
1836  break;
1837  }
1838 
1839  case T_TimeLineHistoryCmd:
1840  cmdtag = "TIMELINE_HISTORY";
1841  set_ps_display(cmdtag);
1842  PreventInTransactionBlock(true, cmdtag);
1844  EndReplicationCommand(cmdtag);
1845  break;
1846 
1847  case T_VariableShowStmt:
1848  {
1850  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1851 
1852  cmdtag = "SHOW";
1853  set_ps_display(cmdtag);
1854 
1855  /* syscache access needs a transaction environment */
1857  GetPGVariable(n->name, dest);
1859  EndReplicationCommand(cmdtag);
1860  }
1861  break;
1862 
1863  default:
1864  elog(ERROR, "unrecognized replication command node tag: %u",
1865  cmd_node->type);
1866  }
1867 
1868  /* done */
1869  MemoryContextSwitchTo(old_context);
1870  MemoryContextDelete(cmd_context);
1871 
1872  /*
1873  * We need not update ps display or pg_stat_activity, because PostgresMain
1874  * will reset those to "idle". But we must reset debug_query_string to
1875  * ensure it doesn't become a dangling pointer.
1876  */
1877  debug_query_string = NULL;
1878 
1879  return true;
1880 }
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:964
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:222
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
@ DestRemoteSimple
Definition: dest.h:93
int errmsg_internal(const char *fmt,...)
Definition: elog.c:993
int errcode(int sqlerrcode)
Definition: elog.c:695
int errmsg(const char *fmt,...)
Definition: elog.c:906
#define LOG
Definition: elog.h:27
#define DEBUG1
Definition: elog.h:26
#define ERROR
Definition: elog.h:35
#define ereport(elevel,...)
Definition: elog.h:145
Oid MyDatabaseId
Definition: globals.c:89
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc_funcs.c:382
Assert(fmt[strlen(fmt) - 1] !='\n')
MemoryContext CurrentMemoryContext
Definition: mcxt.c:124
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:376
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:135
const char * debug_query_string
Definition: postgres.c:82
#define InvalidOid
Definition: postgres_ext.h:36
void set_ps_display(const char *activity)
Definition: ps_status.c:342
@ REPLICATION_KIND_PHYSICAL
Definition: replnodes.h:22
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:725
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
Definition: nodes.h:118
NodeTag type
Definition: nodes.h:119
ReplicationKind kind
Definition: replnodes.h:82
WalSndState state
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:577
WalSnd * MyWalSnd
Definition: walsender.c:113
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition: walsender.c:478
static StringInfoData tmpbuf
Definition: walsender.c:160
static void IdentifySystem(void)
Definition: walsender.c:395
static StringInfoData reply_message
Definition: walsender.c:159
void WalSndSetState(WalSndState state)
Definition: walsender.c:3405
static StringInfoData output_message
Definition: walsender.c:158
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:188
bool log_replication_commands
Definition: walsender.c:126
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1030
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1244
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1234
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:670
static XLogReaderState * xlogreader
Definition: walsender.c:138
PGDLLIMPORT Node * replication_parse_result
@ WALSNDSTATE_STOPPING
void replication_scanner_finish(void)
int replication_yyparse(void)
void replication_scanner_init(const char *str)
bool replication_scanner_is_replication_command(void)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3469
void StartTransactionCommand(void)
Definition: xact.c:2925
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:397
void CommitTransactionCommand(void)
Definition: xact.c:3022

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert(), CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog(), EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, MyWalSnd, VariableShowStmt::name, output_message, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_finish(), replication_scanner_init(), replication_scanner_is_replication_command(), replication_yyparse(), reply_message, SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, tmpbuf, Node::type, ReadReplicationSlotCmd::type, WalSndSetState(), WALSNDSTATE_STOPPING, and xlogreader.

Referenced by PostgresMain().

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3199 of file walsender.c.

3200 {
3202 
3203  /*
3204  * If replication has not yet started, die like with SIGTERM. If
3205  * replication is active, only set a flag and wake up the main loop. It
3206  * will send any outstanding WAL, wait for it to be replicated to the
3207  * standby, and then exit gracefully.
3208  */
3209  if (!replication_active)
3210  kill(MyProcPid, SIGTERM);
3211  else
3212  got_STOPPING = true;
3213 }
int MyProcPid
Definition: globals.c:44
bool am_walsender
Definition: walsender.c:116
static volatile sig_atomic_t replication_active
Definition: walsender.c:196
#define kill(pid, sig)
Definition: win32_port.h:482

References am_walsender, Assert(), got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

◆ InitWalSender()

void InitWalSender ( void  )

Referenced by PostgresMain().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 315 of file walsender.c.

316 {
320 
321  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
323 
324  if (MyReplicationSlot != NULL)
326 
328 
329  replication_active = false;
330 
331  /*
332  * If there is a transaction in progress, it will clean up our
333  * ResourceOwner, but if a replication command set up a resource owner
334  * without a transaction, we've got to clean that up now.
335  */
337  WalSndResourceCleanup(false);
338 
339  if (got_STOPPING || got_SIGUSR2)
340  proc_exit(0);
341 
342  /* Revert back to startup state */
344 }
void ConditionVariableCancelSleep(void)
void proc_exit(int code)
Definition: ipc.c:104
void LWLockReleaseAll(void)
Definition: lwlock.c:1901
void ReplicationSlotCleanup(void)
Definition: slot.c:603
ReplicationSlot * MyReplicationSlot
Definition: slot.c:98
void ReplicationSlotRelease(void)
Definition: slot.c:547
WALOpenSegment seg
Definition: xlogreader.h:272
static void pgstat_report_wait_end(void)
Definition: wait_event.h:284
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:187
void WalSndResourceCleanup(bool isCommit)
Definition: walsender.c:350
@ WALSNDSTATE_STARTUP
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4814
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:859

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndResourceCleanup(), WalSndSetState(), WALSNDSTATE_STARTUP, WALOpenSegment::ws_file, and xlogreader.

Referenced by PostgresMain().

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3341 of file walsender.c.

3342 {
3343  int i;
3344 
3345  for (i = 0; i < max_wal_senders; i++)
3346  {
3347  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3348  pid_t pid;
3349 
3350  SpinLockAcquire(&walsnd->mutex);
3351  pid = walsnd->pid;
3352  SpinLockRelease(&walsnd->mutex);
3353 
3354  if (pid == 0)
3355  continue;
3356 
3358  }
3359 }
#define InvalidBackendId
Definition: backendid.h:23
int i
Definition: isn.c:73
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:261
@ PROCSIG_WALSND_INIT_STOPPING
Definition: procsignal.h:35
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
int max_wal_senders
Definition: walsender.c:122
WalSndCtlData * WalSndCtl
Definition: walsender.c:110

References i, InvalidBackendId, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

◆ WalSndResourceCleanup()

void WalSndResourceCleanup ( bool  isCommit)

Definition at line 350 of file walsender.c.

351 {
352  ResourceOwner resowner;
353 
354  if (CurrentResourceOwner == NULL)
355  return;
356 
357  /*
358  * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
359  * in a local variable and clear it first.
360  */
361  resowner = CurrentResourceOwner;
362  CurrentResourceOwner = NULL;
363 
364  /* Now we can release resources and delete it. */
365  ResourceOwnerRelease(resowner,
366  RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
367  ResourceOwnerRelease(resowner,
368  RESOURCE_RELEASE_LOCKS, isCommit, true);
369  ResourceOwnerRelease(resowner,
370  RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
371  ResourceOwnerDelete(resowner);
372 }
ResourceOwner CurrentResourceOwner
Definition: resowner.c:146
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
Definition: resowner.c:486
void ResourceOwnerDelete(ResourceOwner owner)
Definition: resowner.c:742
@ RESOURCE_RELEASE_LOCKS
Definition: resowner.h:49
@ RESOURCE_RELEASE_BEFORE_LOCKS
Definition: resowner.h:48
@ RESOURCE_RELEASE_AFTER_LOCKS
Definition: resowner.h:50

References CurrentResourceOwner, RESOURCE_RELEASE_AFTER_LOCKS, RESOURCE_RELEASE_BEFORE_LOCKS, RESOURCE_RELEASE_LOCKS, ResourceOwnerDelete(), and ResourceOwnerRelease().

Referenced by perform_base_backup(), and WalSndErrorCleanup().

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3176 of file walsender.c.

3177 {
3178  int i;
3179 
3180  for (i = 0; i < max_wal_senders; i++)
3181  {
3182  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3183 
3184  SpinLockAcquire(&walsnd->mutex);
3185  if (walsnd->pid == 0)
3186  {
3187  SpinLockRelease(&walsnd->mutex);
3188  continue;
3189  }
3190  walsnd->needreload = true;
3191  SpinLockRelease(&walsnd->mutex);
3192  }
3193 }
bool needreload

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3264 of file walsender.c.

3265 {
3266  bool found;
3267  int i;
3268 
3269  WalSndCtl = (WalSndCtlData *)
3270  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3271 
3272  if (!found)
3273  {
3274  /* First time through, so initialize */
3276 
3277  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3279 
3280  for (i = 0; i < max_wal_senders; i++)
3281  {
3282  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3283 
3284  SpinLockInit(&walsnd->mutex);
3285  }
3286  }
3287 }
#define MemSet(start, val, len)
Definition: c.h:953
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
#define SpinLockInit(lock)
Definition: spin.h:60
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:27
Size WalSndShmemSize(void)
Definition: walsender.c:3252

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtl, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3252 of file walsender.c.

3253 {
3254  Size size = 0;
3255 
3256  size = offsetof(WalSndCtlData, walsnds);
3257  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3258 
3259  return size;
3260 }
size_t Size
Definition: c.h:541
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519

References add_size(), max_wal_senders, and mul_size().

Referenced by CalculateShmemSize(), and WalSndShmemInit().

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3233 of file walsender.c.

3234 {
3235  /* Set up signal handlers */
3237  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3238  pqsignal(SIGTERM, die); /* request shutdown */
3239  /* SIGQUIT handler was already set up by InitPostmasterChild */
3240  InitializeTimeouts(); /* establishes SIGALRM handler */
3243  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3244  * shutdown */
3245 
3246  /* Reset some signals that are accepted by postmaster but not here */
3248 }
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
#define die(msg)
Definition: pg_test_fsync.c:95
pqsigfunc pqsignal(int signo, pqsigfunc func)
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2951
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:638
void InitializeTimeouts(void)
Definition: timeout.c:474
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3221
#define SIGCHLD
Definition: win32_port.h:186
#define SIGHUP
Definition: win32_port.h:176
#define SIG_DFL
Definition: win32_port.h:171
#define SIGPIPE
Definition: win32_port.h:181
#define SIGUSR1
Definition: win32_port.h:188
#define SIGUSR2
Definition: win32_port.h:189
#define SIG_IGN
Definition: win32_port.h:173

References die, InitializeTimeouts(), pqsignal(), procsignal_sigusr1_handler(), SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3367 of file walsender.c.

3368 {
3369  for (;;)
3370  {
3371  int i;
3372  bool all_stopped = true;
3373 
3374  for (i = 0; i < max_wal_senders; i++)
3375  {
3376  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3377 
3378  SpinLockAcquire(&walsnd->mutex);
3379 
3380  if (walsnd->pid == 0)
3381  {
3382  SpinLockRelease(&walsnd->mutex);
3383  continue;
3384  }
3385 
3386  if (walsnd->state != WALSNDSTATE_STOPPING)
3387  {
3388  all_stopped = false;
3389  SpinLockRelease(&walsnd->mutex);
3390  break;
3391  }
3392  SpinLockRelease(&walsnd->mutex);
3393  }
3394 
3395  /* safe to leave if confirmation is done for all WAL senders */
3396  if (all_stopped)
3397  return;
3398 
3399  pg_usleep(10000L); /* wait for 10 msec */
3400  }
3401 }
void pg_usleep(long microsec)
Definition: signal.c:53

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtl, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

◆ WalSndWakeup()

void WalSndWakeup ( void  )

Definition at line 3296 of file walsender.c.

3297 {
3298  int i;
3299 
3300  for (i = 0; i < max_wal_senders; i++)
3301  {
3302  Latch *latch;
3303  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3304 
3305  /*
3306  * Get latch pointer with spinlock held, for the unlikely case that
3307  * pointer reads aren't atomic (as they're 8 bytes).
3308  */
3309  SpinLockAcquire(&walsnd->mutex);
3310  latch = walsnd->latch;
3311  SpinLockRelease(&walsnd->mutex);
3312 
3313  if (latch != NULL)
3314  SetLatch(latch);
3315  }
3316 }
void SetLatch(Latch *latch)
Definition: latch.c:591
Definition: latch.h:111
Latch * latch

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, SetLatch(), SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ApplyWalRecord(), KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

PGDLLIMPORT bool am_db_walsender
extern

Definition at line 119 of file walsender.c.

Referenced by check_db(), ClientAuthentication(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ log_replication_commands

PGDLLIMPORT bool log_replication_commands
extern

Definition at line 126 of file walsender.c.

Referenced by exec_replication_command().

◆ max_wal_senders

◆ wake_wal_senders

PGDLLIMPORT bool wake_wal_senders
extern

Definition at line 131 of file walsender.c.

◆ wal_sender_timeout