PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
walsender.h File Reference
#include <signal.h>
#include "fmgr.h"
Include dependency graph for walsender.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define WalSndWakeupRequest()   do { wake_wal_senders = true; } while (0)
 
#define WalSndWakeupProcessRequests()
 

Functions

void InitWalSender (void)
 
void exec_replication_command (const char *query_string)
 
void WalSndErrorCleanup (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (void)
 
void WalSndRqstFileReload (void)
 

Variables

bool am_walsender
 
bool am_cascading_walsender
 
bool am_db_walsender
 
bool wake_wal_senders
 
int max_wal_senders
 
int wal_sender_timeout
 
bool log_replication_commands
 

Macro Definition Documentation

#define WalSndWakeupProcessRequests ( )
Value:
do \
{ \
{ \
if (max_wal_senders > 0) \
} \
} while (0)
int max_wal_senders
Definition: walsender.c:112
bool wake_wal_senders
Definition: walsender.c:120
void WalSndWakeup(void)
Definition: walsender.c:2631

Definition at line 51 of file walsender.h.

Referenced by XLogBackgroundFlush(), and XLogFlush().

#define WalSndWakeupRequest ( )    do { wake_wal_senders = true; } while (0)

Definition at line 45 of file walsender.h.

Referenced by XLogWrite().

Function Documentation

void exec_replication_command ( const char *  query_string)

Definition at line 1222 of file walsender.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate(), CHECK_FOR_INTERRUPTS, CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, DestRemote, DestRemoteSimple, DropReplicationSlot(), elog, EndCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), IdentifySystem(), initStringInfo(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), VariableShowStmt::name, REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_init(), replication_yyparse(), SendBaseBackup(), SendTimeLineHistory(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, T_IdentifySystemCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, T_VariableShowStmt, and Node::type.

Referenced by PostgresMain().

1223 {
1224  int parse_rc;
1225  Node *cmd_node;
1226  MemoryContext cmd_context;
1227  MemoryContext old_context;
1228 
1229  /*
1230  * Log replication command if log_replication_commands is enabled. Even
1231  * when it's disabled, log the command with DEBUG1 level for backward
1232  * compatibility.
1233  */
1235  (errmsg("received replication command: %s", cmd_string)));
1236 
1237  /*
1238  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1239  * command arrives. Clean up the old stuff if there's anything.
1240  */
1242 
1244 
1246  "Replication command context",
1248  old_context = MemoryContextSwitchTo(cmd_context);
1249 
1250  replication_scanner_init(cmd_string);
1251  parse_rc = replication_yyparse();
1252  if (parse_rc != 0)
1253  ereport(ERROR,
1254  (errcode(ERRCODE_SYNTAX_ERROR),
1255  (errmsg_internal("replication command parser returned %d",
1256  parse_rc))));
1257 
1258  cmd_node = replication_parse_result;
1259 
1260  /*
1261  * Allocate buffers that will be used for each outgoing and incoming
1262  * message. We do this just once per command to reduce palloc overhead.
1263  */
1267 
1268  switch (cmd_node->type)
1269  {
1270  case T_IdentifySystemCmd:
1271  IdentifySystem();
1272  break;
1273 
1274  case T_BaseBackupCmd:
1275  SendBaseBackup((BaseBackupCmd *) cmd_node);
1276  break;
1277 
1280  break;
1281 
1284  break;
1285 
1286  case T_StartReplicationCmd:
1287  {
1288  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1289 
1290  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1291  StartReplication(cmd);
1292  else
1294  break;
1295  }
1296 
1297  case T_TimeLineHistoryCmd:
1299  break;
1300 
1301  case T_VariableShowStmt:
1302  {
1304  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1305 
1306  GetPGVariable(n->name, dest);
1307  }
1308  break;
1309 
1310  default:
1311  elog(ERROR, "unrecognized replication command node tag: %u",
1312  cmd_node->type);
1313  }
1314 
1315  /* done */
1316  MemoryContextSwitchTo(old_context);
1317  MemoryContextDelete(cmd_context);
1318 
1319  /* Send CommandComplete message */
1320  EndCommand("SELECT", DestRemote);
1321 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
#define DEBUG1
Definition: elog.h:25
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:392
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:877
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:509
static StringInfoData output_message
Definition: walsender.c:151
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LOG
Definition: elog.h:26
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:7848
Node * replication_parse_result
ReplicationKind kind
Definition: replnodes.h:81
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
#define ERROR
Definition: elog.h:43
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:682
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
NodeTag type
Definition: nodes.h:511
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:486
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
#define ereport(elevel, rest)
Definition: elog.h:122
int replication_yyparse(void)
void initStringInfo(StringInfo str)
Definition: stringinfo.c:65
static StringInfoData reply_message
Definition: walsender.c:152
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:321
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:888
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:739
static StringInfoData tmpbuf
Definition: walsender.c:153
bool log_replication_commands
Definition: walsender.c:115
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
#define elog
Definition: elog.h:219
static void IdentifySystem(void)
Definition: walsender.c:303
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:629
void replication_scanner_init(const char *query_string)
void InitWalSender ( void  )

Referenced by PostgresMain().

void WalSndErrorCleanup ( void  )

Definition at line 257 of file walsender.c.

References close, ConditionVariableCancelSleep(), LWLockReleaseAll(), MyReplicationSlot, NULL, pgstat_report_wait_end(), proc_exit(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), sendFile, walsender_ready_to_stop, WalSndSetState(), and WALSNDSTATE_STARTUP.

Referenced by PostgresMain().

258 {
262 
263  if (sendFile >= 0)
264  {
265  close(sendFile);
266  sendFile = -1;
267  }
268 
269  if (MyReplicationSlot != NULL)
271 
273 
274  replication_active = false;
276  proc_exit(0);
277 
278  /* Revert back to startup state */
280 }
static int sendFile
Definition: walsender.c:126
void proc_exit(int code)
Definition: ipc.c:99
void ReplicationSlotCleanup()
Definition: slot.c:412
void ConditionVariableCancelSleep(void)
static volatile sig_atomic_t walsender_ready_to_stop
Definition: walsender.c:178
void ReplicationSlotRelease(void)
Definition: slot.c:374
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1124
static volatile sig_atomic_t replication_active
Definition: walsender.c:186
ReplicationSlot * MyReplicationSlot
Definition: slot.c:95
#define NULL
Definition: c.h:226
void WalSndSetState(WalSndState state)
Definition: walsender.c:2655
void LWLockReleaseAll(void)
Definition: lwlock.c:1813
#define close(a)
Definition: win32.h:17
void WalSndRqstFileReload ( void  )

Definition at line 2499 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

2500 {
2501  int i;
2502 
2503  for (i = 0; i < max_wal_senders; i++)
2504  {
2505  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2506 
2507  if (walsnd->pid == 0)
2508  continue;
2509 
2510  SpinLockAcquire(&walsnd->mutex);
2511  walsnd->needreload = true;
2512  SpinLockRelease(&walsnd->mutex);
2513  }
2514 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:100
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
int max_wal_senders
Definition: walsender.c:112
#define SpinLockRelease(lock)
Definition: spin.h:64
bool needreload
int i
void WalSndShmemInit ( void  )

Definition at line 2599 of file walsender.c.

References i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

2600 {
2601  bool found;
2602  int i;
2603 
2604  WalSndCtl = (WalSndCtlData *)
2605  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
2606 
2607  if (!found)
2608  {
2609  /* First time through, so initialize */
2611 
2612  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
2614 
2615  for (i = 0; i < max_wal_senders; i++)
2616  {
2617  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2618 
2619  SpinLockInit(&walsnd->mutex);
2620  }
2621  }
2622 }
Size WalSndShmemSize(void)
Definition: walsender.c:2587
#define SpinLockInit(lock)
Definition: spin.h:60
WalSndCtlData * WalSndCtl
Definition: walsender.c:100
#define MemSet(start, val, len)
Definition: c.h:853
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
int max_wal_senders
Definition: walsender.c:112
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
int i
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28
Size WalSndShmemSize ( void  )

Definition at line 2587 of file walsender.c.

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and WalSndShmemInit().

2588 {
2589  Size size = 0;
2590 
2591  size = offsetof(WalSndCtlData, walsnds);
2592  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
2593 
2594  return size;
2595 }
int max_wal_senders
Definition: walsender.c:112
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
size_t Size
Definition: c.h:353
#define offsetof(type, field)
Definition: c.h:551
void WalSndSignals ( void  )

Definition at line 2563 of file walsender.c.

References die(), InitializeTimeouts(), pqsignal(), quickdie(), SIG_DFL, SIG_IGN, SIGCHLD, SIGCONT, SIGHUP, SIGPIPE, SIGQUIT, SIGTTIN, SIGTTOU, SIGUSR1, SIGUSR2, SIGWINCH, WalSndLastCycleHandler(), WalSndSigHupHandler(), and WalSndXLogSendHandler().

Referenced by PostgresMain().

2564 {
2565  /* Set up signal handlers */
2566  pqsignal(SIGHUP, WalSndSigHupHandler); /* set flag to read config
2567  * file */
2568  pqsignal(SIGINT, SIG_IGN); /* not used */
2569  pqsignal(SIGTERM, die); /* request shutdown */
2570  pqsignal(SIGQUIT, quickdie); /* hard crash time */
2571  InitializeTimeouts(); /* establishes SIGALRM handler */
2573  pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */
2574  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
2575  * shutdown */
2576 
2577  /* Reset some signals that are accepted by postmaster but not here */
2583 }
static void WalSndXLogSendHandler(SIGNAL_ARGS)
Definition: walsender.c:2531
void InitializeTimeouts(void)
Definition: timeout.c:340
#define SIGUSR1
Definition: win32.h:211
#define SIGCONT
Definition: win32.h:205
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:2542
#define SIGWINCH
Definition: win32.h:209
#define SIGTTIN
Definition: win32.h:207
#define SIGQUIT
Definition: win32.h:197
static void WalSndSigHupHandler(SIGNAL_ARGS)
Definition: walsender.c:2518
#define SIG_IGN
Definition: win32.h:193
#define SIGPIPE
Definition: win32.h:201
#define SIGHUP
Definition: win32.h:196
#define SIG_DFL
Definition: win32.h:191
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
#define SIGTTOU
Definition: win32.h:208
void die(SIGNAL_ARGS)
Definition: postgres.c:2617
#define SIGCHLD
Definition: win32.h:206
void quickdie(SIGNAL_ARGS)
Definition: postgres.c:2556
#define SIGUSR2
Definition: win32.h:212
void WalSndWakeup ( void  )

Definition at line 2631 of file walsender.c.

References i, WalSnd::latch, max_wal_senders, WalSnd::mutex, NULL, SetLatch(), SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

2632 {
2633  int i;
2634 
2635  for (i = 0; i < max_wal_senders; i++)
2636  {
2637  Latch *latch;
2638  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2639 
2640  /*
2641  * Get latch pointer with spinlock held, for the unlikely case that
2642  * pointer reads aren't atomic (as they're 8 bytes).
2643  */
2644  SpinLockAcquire(&walsnd->mutex);
2645  latch = walsnd->latch;
2646  SpinLockRelease(&walsnd->mutex);
2647 
2648  if (latch != NULL)
2649  SetLatch(latch);
2650  }
2651 }
WalSndCtlData * WalSndCtl
Definition: walsender.c:100
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
slock_t mutex
#define SpinLockAcquire(lock)
Definition: spin.h:62
Latch * latch
Definition: latch.h:110
int max_wal_senders
Definition: walsender.c:112
#define SpinLockRelease(lock)
Definition: spin.h:64
void SetLatch(volatile Latch *latch)
Definition: latch.c:379
#define NULL
Definition: c.h:226
int i

Variable Documentation

bool am_db_walsender

Definition at line 109 of file walsender.c.

Referenced by InitPostgres(), and ProcessStartupPacket().

bool log_replication_commands

Definition at line 115 of file walsender.c.

Referenced by exec_replication_command().

bool wake_wal_senders

Definition at line 120 of file walsender.c.

int wal_sender_timeout