PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
walsender.h File Reference
#include <signal.h>
#include "fmgr.h"
Include dependency graph for walsender.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define WalSndWakeupRequest()   do { wake_wal_senders = true; } while (0)
 
#define WalSndWakeupProcessRequests()
 

Enumerations

enum  CRSSnapshotAction { CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT }
 

Functions

void InitWalSender (void)
 
bool exec_replication_command (const char *query_string)
 
void WalSndErrorCleanup (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
void WalSndRqstFileReload (void)
 

Variables

bool am_walsender
 
bool am_cascading_walsender
 
bool am_db_walsender
 
bool wake_wal_senders
 
int max_wal_senders
 
int wal_sender_timeout
 
bool log_replication_commands
 

Macro Definition Documentation

#define WalSndWakeupProcessRequests ( )
Value:
do \
{ \
{ \
if (max_wal_senders > 0) \
} \
} while (0)
int max_wal_senders
Definition: walsender.c:114
bool wake_wal_senders
Definition: walsender.c:122
void WalSndWakeup(void)
Definition: walsender.c:2930

Definition at line 61 of file walsender.h.

Referenced by XLogBackgroundFlush(), and XLogFlush().

#define WalSndWakeupRequest ( )    do { wake_wal_senders = true; } while (0)

Definition at line 55 of file walsender.h.

Referenced by XLogWrite().

Enumeration Type Documentation

Enumerator
CRS_EXPORT_SNAPSHOT 
CRS_NOEXPORT_SNAPSHOT 
CRS_USE_SNAPSHOT 

Definition at line 22 of file walsender.h.

Function Documentation

bool exec_replication_command ( const char *  query_string)

Definition at line 1364 of file walsender.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate(), CHECK_FOR_INTERRUPTS, CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, DestRemote, DestRemoteSimple, DropReplicationSlot(), elog, EndCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), IdentifySystem(), initStringInfo(), InvalidOid, IsA, IsAbortedTransactionBlockState(), IsTransactionBlock(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, VariableShowStmt::name, PreventTransactionChain(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_init(), replication_yyparse(), SendBaseBackup(), SendTimeLineHistory(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, T_IdentifySystemCmd, T_SQLCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_VariableShowStmt, and Node::type.

Referenced by PostgresMain().

1365 {
1366  int parse_rc;
1367  Node *cmd_node;
1368  MemoryContext cmd_context;
1369  MemoryContext old_context;
1370 
1371  /*
1372  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1373  * command arrives. Clean up the old stuff if there's anything.
1374  */
1376 
1378 
1380  "Replication command context",
1382  old_context = MemoryContextSwitchTo(cmd_context);
1383 
1384  replication_scanner_init(cmd_string);
1385  parse_rc = replication_yyparse();
1386  if (parse_rc != 0)
1387  ereport(ERROR,
1388  (errcode(ERRCODE_SYNTAX_ERROR),
1389  (errmsg_internal("replication command parser returned %d",
1390  parse_rc))));
1391 
1392  cmd_node = replication_parse_result;
1393 
1394  /*
1395  * Log replication command if log_replication_commands is enabled. Even
1396  * when it's disabled, log the command with DEBUG1 level for backward
1397  * compatibility. Note that SQL commands are not logged here, and will be
1398  * logged later if log_statement is enabled.
1399  */
1400  if (cmd_node->type != T_SQLCmd)
1402  (errmsg("received replication command: %s", cmd_string)));
1403 
1404  /*
1405  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
1406  * called outside of transaction the snapshot should be cleared here.
1407  */
1408  if (!IsTransactionBlock())
1410 
1411  /*
1412  * For aborted transactions, don't allow anything except pure SQL,
1413  * the exec_simple_query() will handle it correctly.
1414  */
1415  if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
1416  ereport(ERROR,
1417  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1418  errmsg("current transaction is aborted, "
1419  "commands ignored until end of transaction block")));
1420 
1422 
1423  /*
1424  * Allocate buffers that will be used for each outgoing and incoming
1425  * message. We do this just once per command to reduce palloc overhead.
1426  */
1430 
1431  switch (cmd_node->type)
1432  {
1433  case T_IdentifySystemCmd:
1434  IdentifySystem();
1435  break;
1436 
1437  case T_BaseBackupCmd:
1438  PreventTransactionChain(true, "BASE_BACKUP");
1439  SendBaseBackup((BaseBackupCmd *) cmd_node);
1440  break;
1441 
1444  break;
1445 
1448  break;
1449 
1450  case T_StartReplicationCmd:
1451  {
1452  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1453 
1454  PreventTransactionChain(true, "START_REPLICATION");
1455 
1456  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1457  StartReplication(cmd);
1458  else
1460  break;
1461  }
1462 
1463  case T_TimeLineHistoryCmd:
1464  PreventTransactionChain(true, "TIMELINE_HISTORY");
1466  break;
1467 
1468  case T_VariableShowStmt:
1469  {
1471  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1472 
1473  GetPGVariable(n->name, dest);
1474  }
1475  break;
1476 
1477  case T_SQLCmd:
1478  if (MyDatabaseId == InvalidOid)
1479  ereport(ERROR,
1480  (errmsg("not connected to database")));
1481 
1482  /* Tell the caller that this wasn't a WalSender command. */
1483  return false;
1484 
1485  default:
1486  elog(ERROR, "unrecognized replication command node tag: %u",
1487  cmd_node->type);
1488  }
1489 
1490  /* done */
1491  MemoryContextSwitchTo(old_context);
1492  MemoryContextDelete(cmd_context);
1493 
1494  /* Send CommandComplete message */
1495  EndCommand("SELECT", DestRemote);
1496 
1497  return true;
1498 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:560
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
#define DEBUG1
Definition: elog.h:25
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:419
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1020
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:370
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:509
static StringInfoData output_message
Definition: walsender.c:153
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LOG
Definition: elog.h:26
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:7898
Node * replication_parse_result
bool IsTransactionBlock(void)
Definition: xact.c:4304
ReplicationKind kind
Definition: replnodes.h:81
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
#define ERROR
Definition: elog.h:43
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:686
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
NodeTag type
Definition: nodes.h:511
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:515
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
#define ereport(elevel, rest)
Definition: elog.h:122
int replication_yyparse(void)
void initStringInfo(StringInfo str)
Definition: stringinfo.c:65
static StringInfoData reply_message
Definition: walsender.c:154
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1031
Oid MyDatabaseId
Definition: globals.c:76
#define InvalidOid
Definition: postgres_ext.h:36
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:832
static StringInfoData tmpbuf
Definition: walsender.c:155
bool log_replication_commands
Definition: walsender.c:117
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
#define elog
Definition: elog.h:219
static void IdentifySystem(void)
Definition: walsender.c:330
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:648
void PreventTransactionChain(bool isTopLevel, const char *stmtType)
Definition: xact.c:3154
void replication_scanner_init(const char *query_string)
void InitWalSender ( void  )

Referenced by PostgresMain().

void WalSndErrorCleanup ( void  )

Definition at line 284 of file walsender.c.

References close, ConditionVariableCancelSleep(), LWLockReleaseAll(), MyReplicationSlot, NULL, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), sendFile, walsender_ready_to_stop, WalSndSetState(), and WALSNDSTATE_STARTUP.

Referenced by PostgresMain().

285 {
289 
290  if (sendFile >= 0)
291  {
292  close(sendFile);
293  sendFile = -1;
294  }
295 
296  if (MyReplicationSlot != NULL)
298 
300 
301  replication_active = false;
303  proc_exit(0);
304 
305  /* Revert back to startup state */
307 }
static int sendFile
Definition: walsender.c:128
void proc_exit(int code)
Definition: ipc.c:99
void ConditionVariableCancelSleep(void)
static volatile sig_atomic_t walsender_ready_to_stop
Definition: walsender.c:180
void ReplicationSlotRelease(void)
Definition: slot.c:375
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1232
static volatile sig_atomic_t replication_active
Definition: walsender.c:188
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define NULL
Definition: c.h:229
void WalSndSetState(WalSndState state)
Definition: walsender.c:2954
void ReplicationSlotCleanup(void)
Definition: slot.c:413
void LWLockReleaseAll(void)
Definition: lwlock.c:1814
#define close(a)
Definition: win32.h:12
void WalSndRqstFileReload ( void  )

Definition at line 2798 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

2799 {
2800  int i;
2801 
2802  for (i = 0; i < max_wal_senders; i++)
2803  {
2804  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2805 
2806  if (walsnd->pid == 0)
2807  continue;
2808 
2809  SpinLockAcquire(&walsnd->mutex);
2810  walsnd->needreload = true;
2811  SpinLockRelease(&walsnd->mutex);
2812  }
2813 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:102
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int max_wal_senders
Definition: walsender.c:114
#define SpinLockRelease(lock)
Definition: spin.h:64
bool needreload
int i
void WalSndShmemInit ( void  )

Definition at line 2898 of file walsender.c.

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

2899 {
2900  bool found;
2901  int i;
2902 
2903  WalSndCtl = (WalSndCtlData *)
2904  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
2905 
2906  if (!found)
2907  {
2908  /* First time through, so initialize */
2910 
2911  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
2913 
2914  for (i = 0; i < max_wal_senders; i++)
2915  {
2916  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2917 
2918  SpinLockInit(&walsnd->mutex);
2919  }
2920  }
2921 }
Size WalSndShmemSize(void)
Definition: walsender.c:2886
#define SpinLockInit(lock)
Definition: spin.h:60
WalSndCtlData * WalSndCtl
Definition: walsender.c:102
#define MemSet(start, val, len)
Definition: c.h:857
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int max_wal_senders
Definition: walsender.c:114
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28
Size WalSndShmemSize ( void  )

Definition at line 2886 of file walsender.c.

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and WalSndShmemInit().

2887 {
2888  Size size = 0;
2889 
2890  size = offsetof(WalSndCtlData, walsnds);
2891  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
2892 
2893  return size;
2894 }
int max_wal_senders
Definition: walsender.c:114
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
size_t Size
Definition: c.h:356
#define offsetof(type, field)
Definition: c.h:555
void WalSndSignals ( void  )

Definition at line 2862 of file walsender.c.

References die(), InitializeTimeouts(), pqsignal(), quickdie(), SIG_DFL, SIG_IGN, SIGCHLD, SIGCONT, SIGHUP, SIGPIPE, SIGQUIT, SIGTTIN, SIGTTOU, SIGUSR1, SIGUSR2, SIGWINCH, WalSndLastCycleHandler(), WalSndSigHupHandler(), and WalSndXLogSendHandler().

Referenced by PostgresMain().

2863 {
2864  /* Set up signal handlers */
2865  pqsignal(SIGHUP, WalSndSigHupHandler); /* set flag to read config
2866  * file */
2867  pqsignal(SIGINT, SIG_IGN); /* not used */
2868  pqsignal(SIGTERM, die); /* request shutdown */
2869  pqsignal(SIGQUIT, quickdie); /* hard crash time */
2870  InitializeTimeouts(); /* establishes SIGALRM handler */
2872  pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */
2873  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
2874  * shutdown */
2875 
2876  /* Reset some signals that are accepted by postmaster but not here */
2882 }
static void WalSndXLogSendHandler(SIGNAL_ARGS)
Definition: walsender.c:2830
void InitializeTimeouts(void)
Definition: timeout.c:340
#define SIGUSR1
Definition: win32.h:202
#define SIGCONT
Definition: win32.h:197
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:2841
#define SIGWINCH
Definition: win32.h:201
#define SIGTTIN
Definition: win32.h:199
#define SIGQUIT
Definition: win32.h:189
static void WalSndSigHupHandler(SIGNAL_ARGS)
Definition: walsender.c:2817
#define SIG_IGN
Definition: win32.h:185
#define SIGPIPE
Definition: win32.h:193
#define SIGHUP
Definition: win32.h:188
#define SIG_DFL
Definition: win32.h:183
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
#define SIGTTOU
Definition: win32.h:200
void die(SIGNAL_ARGS)
Definition: postgres.c:2623
#define SIGCHLD
Definition: win32.h:198
void quickdie(SIGNAL_ARGS)
Definition: postgres.c:2562
#define SIGUSR2
Definition: win32.h:203
void WalSndWakeup ( void  )

Definition at line 2930 of file walsender.c.

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, NULL, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

2931 {
2932  int i;
2933 
2934  for (i = 0; i < max_wal_senders; i++)
2935  {
2936  Latch *latch;
2937  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2938 
2939  /*
2940  * Get latch pointer with spinlock held, for the unlikely case that
2941  * pointer reads aren't atomic (as they're 8 bytes).
2942  */
2943  SpinLockAcquire(&walsnd->mutex);
2944  latch = walsnd->latch;
2945  SpinLockRelease(&walsnd->mutex);
2946 
2947  if (latch != NULL)
2948  SetLatch(latch);
2949  }
2950 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:102
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
Definition: latch.h:110
int max_wal_senders
Definition: walsender.c:114
#define SpinLockRelease(lock)
Definition: spin.h:64
void SetLatch(volatile Latch *latch)
Definition: latch.c:367
#define NULL
Definition: c.h:229
int i

Variable Documentation

bool am_db_walsender

Definition at line 111 of file walsender.c.

Referenced by check_db(), InitPostgres(), and ProcessStartupPacket().

bool log_replication_commands

Definition at line 117 of file walsender.c.

Referenced by exec_replication_command().

bool wake_wal_senders

Definition at line 122 of file walsender.c.

int wal_sender_timeout