PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
walsender.h File Reference
#include <signal.h>
#include "fmgr.h"
Include dependency graph for walsender.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define WalSndWakeupRequest()   do { wake_wal_senders = true; } while (0)
 
#define WalSndWakeupProcessRequests()
 

Enumerations

enum  CRSSnapshotAction { CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT }
 

Functions

void InitWalSender (void)
 
bool exec_replication_command (const char *query_string)
 
void WalSndErrorCleanup (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
void WalSndWaitStopping (void)
 
void WalSndRqstFileReload (void)
 

Variables

bool am_walsender
 
bool am_cascading_walsender
 
bool am_db_walsender
 
bool wake_wal_senders
 
int max_wal_senders
 
int wal_sender_timeout
 
bool log_replication_commands
 

Macro Definition Documentation

#define WalSndWakeupProcessRequests ( )
Value:
do \
{ \
{ \
if (max_wal_senders > 0) \
} \
} while (0)
int max_wal_senders
Definition: walsender.c:117
bool wake_wal_senders
Definition: walsender.c:125
void WalSndWakeup(void)
Definition: walsender.c:3017

Definition at line 62 of file walsender.h.

Referenced by XLogBackgroundFlush(), and XLogFlush().

#define WalSndWakeupRequest ( )    do { wake_wal_senders = true; } while (0)

Definition at line 56 of file walsender.h.

Referenced by XLogWrite().

Enumeration Type Documentation

Enumerator
CRS_EXPORT_SNAPSHOT 
CRS_NOEXPORT_SNAPSHOT 
CRS_USE_SNAPSHOT 

Definition at line 22 of file walsender.h.

Function Documentation

bool exec_replication_command ( const char *  query_string)

Definition at line 1413 of file walsender.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate(), CHECK_FOR_INTERRUPTS, CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, DestRemote, DestRemoteSimple, DropReplicationSlot(), elog, EndCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_SIGUSR2, IdentifySystem(), initStringInfo(), InvalidOid, IsA, IsAbortedTransactionBlockState(), IsTransactionBlock(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, VariableShowStmt::name, PreventTransactionChain(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_init(), replication_yyparse(), SendBaseBackup(), SendTimeLineHistory(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), WalSnd::state, T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, T_IdentifySystemCmd, T_SQLCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_VariableShowStmt, Node::type, WalSndSetState(), and WALSNDSTATE_STOPPING.

Referenced by PostgresMain().

1414 {
1415  int parse_rc;
1416  Node *cmd_node;
1417  MemoryContext cmd_context;
1418  MemoryContext old_context;
1419 
1420  /*
1421  * If WAL sender has been told that shutdown is getting close, switch its
1422  * status accordingly to handle the next replication commands correctly.
1423  */
1424  if (got_SIGUSR2)
1426 
1427  /*
1428  * Throw error if in stopping mode. We need prevent commands that could
1429  * generate WAL while the shutdown checkpoint is being written. To be
1430  * safe, we just prohibit all new commands.
1431  */
1433  ereport(ERROR,
1434  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1435 
1436  /*
1437  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1438  * command arrives. Clean up the old stuff if there's anything.
1439  */
1441 
1443 
1445  "Replication command context",
1447  old_context = MemoryContextSwitchTo(cmd_context);
1448 
1449  replication_scanner_init(cmd_string);
1450  parse_rc = replication_yyparse();
1451  if (parse_rc != 0)
1452  ereport(ERROR,
1453  (errcode(ERRCODE_SYNTAX_ERROR),
1454  (errmsg_internal("replication command parser returned %d",
1455  parse_rc))));
1456 
1457  cmd_node = replication_parse_result;
1458 
1459  /*
1460  * Log replication command if log_replication_commands is enabled. Even
1461  * when it's disabled, log the command with DEBUG1 level for backward
1462  * compatibility. Note that SQL commands are not logged here, and will be
1463  * logged later if log_statement is enabled.
1464  */
1465  if (cmd_node->type != T_SQLCmd)
1467  (errmsg("received replication command: %s", cmd_string)));
1468 
1469  /*
1470  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
1471  * called outside of transaction the snapshot should be cleared here.
1472  */
1473  if (!IsTransactionBlock())
1475 
1476  /*
1477  * For aborted transactions, don't allow anything except pure SQL, the
1478  * exec_simple_query() will handle it correctly.
1479  */
1480  if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
1481  ereport(ERROR,
1482  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1483  errmsg("current transaction is aborted, "
1484  "commands ignored until end of transaction block")));
1485 
1487 
1488  /*
1489  * Allocate buffers that will be used for each outgoing and incoming
1490  * message. We do this just once per command to reduce palloc overhead.
1491  */
1495 
1496  switch (cmd_node->type)
1497  {
1498  case T_IdentifySystemCmd:
1499  IdentifySystem();
1500  break;
1501 
1502  case T_BaseBackupCmd:
1503  PreventTransactionChain(true, "BASE_BACKUP");
1504  SendBaseBackup((BaseBackupCmd *) cmd_node);
1505  break;
1506 
1509  break;
1510 
1513  break;
1514 
1515  case T_StartReplicationCmd:
1516  {
1517  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1518 
1519  PreventTransactionChain(true, "START_REPLICATION");
1520 
1521  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1522  StartReplication(cmd);
1523  else
1525  break;
1526  }
1527 
1528  case T_TimeLineHistoryCmd:
1529  PreventTransactionChain(true, "TIMELINE_HISTORY");
1531  break;
1532 
1533  case T_VariableShowStmt:
1534  {
1536  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1537 
1538  GetPGVariable(n->name, dest);
1539  }
1540  break;
1541 
1542  case T_SQLCmd:
1543  if (MyDatabaseId == InvalidOid)
1544  ereport(ERROR,
1545  (errmsg("not connected to database")));
1546 
1547  /* Tell the caller that this wasn't a WalSender command. */
1548  return false;
1549 
1550  default:
1551  elog(ERROR, "unrecognized replication command node tag: %u",
1552  cmd_node->type);
1553  }
1554 
1555  /* done */
1556  MemoryContextSwitchTo(old_context);
1557  MemoryContextDelete(cmd_context);
1558 
1559  /* Send CommandComplete message */
1560  EndCommand("SELECT", DestRemote);
1561 
1562  return true;
1563 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:560
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
#define DEBUG1
Definition: elog.h:25
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:429
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1036
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:370
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:509
static StringInfoData output_message
Definition: walsender.c:156
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LOG
Definition: elog.h:26
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:7894
Node * replication_parse_result
bool IsTransactionBlock(void)
Definition: xact.c:4306
ReplicationKind kind
Definition: replnodes.h:81
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
#define ERROR
Definition: elog.h:43
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:686
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
WalSndState state
NodeTag type
Definition: nodes.h:511
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:525
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:184
#define ereport(elevel, rest)
Definition: elog.h:122
int replication_yyparse(void)
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
static StringInfoData reply_message
Definition: walsender.c:157
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1047
Oid MyDatabaseId
Definition: globals.c:76
WalSnd * MyWalSnd
Definition: walsender.c:108
#define InvalidOid
Definition: postgres_ext.h:36
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
void WalSndSetState(WalSndState state)
Definition: walsender.c:3085
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:842
static StringInfoData tmpbuf
Definition: walsender.c:158
bool log_replication_commands
Definition: walsender.c:120
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
#define elog
Definition: elog.h:219
static void IdentifySystem(void)
Definition: walsender.c:340
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:696
void PreventTransactionChain(bool isTopLevel, const char *stmtType)
Definition: xact.c:3156
void replication_scanner_init(const char *query_string)
void InitWalSender ( void  )

Referenced by PostgresMain().

void WalSndErrorCleanup ( void  )

Definition at line 291 of file walsender.c.

References close, ConditionVariableCancelSleep(), got_SIGINT, got_SIGUSR2, LWLockReleaseAll(), MyReplicationSlot, NULL, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), sendFile, WalSndSetState(), WALSNDSTATE_STARTUP, and WALSNDSTATE_STOPPING.

Referenced by PostgresMain().

292 {
296 
297  if (sendFile >= 0)
298  {
299  close(sendFile);
300  sendFile = -1;
301  }
302 
303  if (MyReplicationSlot != NULL)
305 
307 
308  replication_active = false;
309  if (got_SIGINT)
310  proc_exit(0);
311 
312  /* Revert back to startup state */
314 
315  if (got_SIGUSR2)
317 }
static int sendFile
Definition: walsender.c:131
void proc_exit(int code)
Definition: ipc.c:99
void ConditionVariableCancelSleep(void)
void ReplicationSlotRelease(void)
Definition: slot.c:375
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1232
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:184
static volatile sig_atomic_t replication_active
Definition: walsender.c:192
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NULL
Definition: c.h:229
void WalSndSetState(WalSndState state)
Definition: walsender.c:3085
static volatile sig_atomic_t got_SIGINT
Definition: walsender.c:183
void ReplicationSlotCleanup(void)
Definition: slot.c:429
void LWLockReleaseAll(void)
Definition: lwlock.c:1814
#define close(a)
Definition: win32.h:12
void WalSndRqstFileReload ( void  )

Definition at line 2869 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

2870 {
2871  int i;
2872 
2873  for (i = 0; i < max_wal_senders; i++)
2874  {
2875  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2876 
2877  if (walsnd->pid == 0)
2878  continue;
2879 
2880  SpinLockAcquire(&walsnd->mutex);
2881  walsnd->needreload = true;
2882  SpinLockRelease(&walsnd->mutex);
2883  }
2884 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:105
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int max_wal_senders
Definition: walsender.c:117
#define SpinLockRelease(lock)
Definition: spin.h:64
bool needreload
int i
void WalSndShmemInit ( void  )

Definition at line 2985 of file walsender.c.

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

2986 {
2987  bool found;
2988  int i;
2989 
2990  WalSndCtl = (WalSndCtlData *)
2991  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
2992 
2993  if (!found)
2994  {
2995  /* First time through, so initialize */
2997 
2998  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3000 
3001  for (i = 0; i < max_wal_senders; i++)
3002  {
3003  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3004 
3005  SpinLockInit(&walsnd->mutex);
3006  }
3007  }
3008 }
Size WalSndShmemSize(void)
Definition: walsender.c:2973
#define SpinLockInit(lock)
Definition: spin.h:60
WalSndCtlData * WalSndCtl
Definition: walsender.c:105
#define MemSet(start, val, len)
Definition: c.h:857
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int max_wal_senders
Definition: walsender.c:117
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28
Size WalSndShmemSize ( void  )

Definition at line 2973 of file walsender.c.

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and WalSndShmemInit().

2974 {
2975  Size size = 0;
2976 
2977  size = offsetof(WalSndCtlData, walsnds);
2978  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
2979 
2980  return size;
2981 }
int max_wal_senders
Definition: walsender.c:117
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
size_t Size
Definition: c.h:356
#define offsetof(type, field)
Definition: c.h:555
void WalSndSignals ( void  )

Definition at line 2949 of file walsender.c.

References die(), InitializeTimeouts(), pqsignal(), quickdie(), SIG_DFL, SIG_IGN, SIGCHLD, SIGCONT, SIGHUP, SIGPIPE, SIGQUIT, SIGTTIN, SIGTTOU, SIGUSR1, SIGUSR2, SIGWINCH, WalSndLastCycleHandler(), WalSndSigHupHandler(), WalSndSwitchStopping(), and WalSndXLogSendHandler().

Referenced by PostgresMain().

2950 {
2951  /* Set up signal handlers */
2952  pqsignal(SIGHUP, WalSndSigHupHandler); /* set flag to read config
2953  * file */
2954  pqsignal(SIGINT, WalSndLastCycleHandler); /* request a last cycle and
2955  * shutdown */
2956  pqsignal(SIGTERM, die); /* request shutdown */
2957  pqsignal(SIGQUIT, quickdie); /* hard crash time */
2958  InitializeTimeouts(); /* establishes SIGALRM handler */
2960  pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */
2961  pqsignal(SIGUSR2, WalSndSwitchStopping); /* switch to stopping state */
2962 
2963  /* Reset some signals that are accepted by postmaster but not here */
2969 }
static void WalSndXLogSendHandler(SIGNAL_ARGS)
Definition: walsender.c:2901
void InitializeTimeouts(void)
Definition: timeout.c:340
#define SIGUSR1
Definition: win32.h:202
#define SIGCONT
Definition: win32.h:197
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:2928
#define SIGWINCH
Definition: win32.h:201
#define SIGTTIN
Definition: win32.h:199
#define SIGQUIT
Definition: win32.h:189
static void WalSndSigHupHandler(SIGNAL_ARGS)
Definition: walsender.c:2888
#define SIG_IGN
Definition: win32.h:185
#define SIGPIPE
Definition: win32.h:193
#define SIGHUP
Definition: win32.h:188
#define SIG_DFL
Definition: win32.h:183
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
#define SIGTTOU
Definition: win32.h:200
static void WalSndSwitchStopping(SIGNAL_ARGS)
Definition: walsender.c:2912
void die(SIGNAL_ARGS)
Definition: postgres.c:2623
#define SIGCHLD
Definition: win32.h:198
void quickdie(SIGNAL_ARGS)
Definition: postgres.c:2562
#define SIGUSR2
Definition: win32.h:203
void WalSndWaitStopping ( void  )

Definition at line 3045 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

3046 {
3047  for (;;)
3048  {
3049  int i;
3050  bool all_stopped = true;
3051 
3052  for (i = 0; i < max_wal_senders; i++)
3053  {
3055  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3056 
3057  SpinLockAcquire(&walsnd->mutex);
3058 
3059  if (walsnd->pid == 0)
3060  {
3061  SpinLockRelease(&walsnd->mutex);
3062  continue;
3063  }
3064 
3065  state = walsnd->state;
3066  SpinLockRelease(&walsnd->mutex);
3067 
3068  if (state != WALSNDSTATE_STOPPING)
3069  {
3070  all_stopped = false;
3071  break;
3072  }
3073  }
3074 
3075  /* safe to leave if confirmation is done for all WAL senders */
3076  if (all_stopped)
3077  return;
3078 
3079  pg_usleep(10000L); /* wait for 10 msec */
3080  }
3081 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:105
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
WalSndState state
int max_wal_senders
Definition: walsender.c:117
#define SpinLockRelease(lock)
Definition: spin.h:64
Definition: regguts.h:298
WalSndState
int i
void WalSndWakeup ( void  )

Definition at line 3017 of file walsender.c.

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, NULL, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

3018 {
3019  int i;
3020 
3021  for (i = 0; i < max_wal_senders; i++)
3022  {
3023  Latch *latch;
3024  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3025 
3026  /*
3027  * Get latch pointer with spinlock held, for the unlikely case that
3028  * pointer reads aren't atomic (as they're 8 bytes).
3029  */
3030  SpinLockAcquire(&walsnd->mutex);
3031  latch = walsnd->latch;
3032  SpinLockRelease(&walsnd->mutex);
3033 
3034  if (latch != NULL)
3035  SetLatch(latch);
3036  }
3037 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:105
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
Definition: latch.h:110
int max_wal_senders
Definition: walsender.c:117
#define SpinLockRelease(lock)
Definition: spin.h:64
void SetLatch(volatile Latch *latch)
Definition: latch.c:415
#define NULL
Definition: c.h:229
int i

Variable Documentation

bool am_db_walsender

Definition at line 114 of file walsender.c.

Referenced by check_db(), InitPostgres(), and ProcessStartupPacket().

bool log_replication_commands

Definition at line 120 of file walsender.c.

Referenced by exec_replication_command().

bool wake_wal_senders

Definition at line 125 of file walsender.c.

int wal_sender_timeout