PostgreSQL Source Code  git master
walsender.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * walsender.c
4  *
5  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6  * care of sending XLOG from the primary server to a single recipient.
7  * (Note that there can be more than one walsender process concurrently.)
8  * It is started by the postmaster when the walreceiver of a standby server
9  * connects to the primary server and requests XLOG streaming replication.
10  *
11  * A walsender is similar to a regular backend, ie. there is a one-to-one
12  * relationship between a connection and a walsender process, but instead
13  * of processing SQL queries, it understands a small set of special
14  * replication-mode commands. The START_REPLICATION command begins streaming
15  * WAL to the client. While streaming, the walsender keeps reading XLOG
16  * records from the disk and sends them to the standby server over the
17  * COPY protocol, until either side ends the replication by exiting COPY
18  * mode (or until the connection is closed).
19  *
20  * Normal termination is by SIGTERM, which instructs the walsender to
21  * close the connection and exit(0) at the next convenient moment. Emergency
22  * termination is by SIGQUIT; like any backend, the walsender will simply
23  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24  * are treated as not a crash but approximately normal termination;
25  * the walsender will exit quickly without sending any more XLOG records.
26  *
27  * If the server is shut down, checkpointer sends us
28  * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If
29  * the backend is idle or runs an SQL query this causes the backend to
30  * shutdown, if logical replication is in progress all existing WAL records
31  * are processed followed by a shutdown. Otherwise this causes the walsender
32  * to switch to the "stopping" state. In this state, the walsender will reject
33  * any further replication commands. The checkpointer begins the shutdown
34  * checkpoint once all walsenders are confirmed as stopping. When the shutdown
35  * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
36  * walsender to send any outstanding WAL, including the shutdown checkpoint
37  * record, wait for it to be replicated to the standby, and then exit.
38  *
39  *
40  * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
41  *
42  * IDENTIFICATION
43  * src/backend/replication/walsender.c
44  *
45  *-------------------------------------------------------------------------
46  */
47 #include "postgres.h"
48 
49 #include <signal.h>
50 #include <unistd.h>
51 
52 #include "access/printtup.h"
53 #include "access/timeline.h"
54 #include "access/transam.h"
55 #include "access/xact.h"
56 #include "access/xlog_internal.h"
57 #include "access/xlogreader.h"
58 #include "access/xlogrecovery.h"
59 #include "access/xlogutils.h"
60 #include "backup/basebackup.h"
61 #include "catalog/pg_authid.h"
62 #include "catalog/pg_type.h"
63 #include "commands/dbcommands.h"
64 #include "commands/defrem.h"
65 #include "funcapi.h"
66 #include "libpq/libpq.h"
67 #include "libpq/pqformat.h"
68 #include "miscadmin.h"
69 #include "nodes/replnodes.h"
70 #include "pgstat.h"
71 #include "postmaster/interrupt.h"
72 #include "replication/decode.h"
73 #include "replication/logical.h"
74 #include "replication/slot.h"
75 #include "replication/snapbuild.h"
76 #include "replication/syncrep.h"
78 #include "replication/walsender.h"
81 #include "storage/fd.h"
82 #include "storage/ipc.h"
83 #include "storage/pmsignal.h"
84 #include "storage/proc.h"
85 #include "storage/procarray.h"
86 #include "tcop/dest.h"
87 #include "tcop/tcopprot.h"
88 #include "utils/acl.h"
89 #include "utils/builtins.h"
90 #include "utils/guc.h"
91 #include "utils/memutils.h"
92 #include "utils/pg_lsn.h"
93 #include "utils/portal.h"
94 #include "utils/ps_status.h"
95 #include "utils/timeout.h"
96 #include "utils/timestamp.h"
97 
98 /*
99  * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
100  *
101  * We don't have a good idea of what a good value would be; there's some
102  * overhead per message in both walsender and walreceiver, but on the other
103  * hand sending large batches makes walsender less responsive to signals
104  * because signals are checked only between messages. 128kB (with
105  * default 8k blocks) seems like a reasonable guess for now.
106  */
107 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
108 
109 /* Array of WalSnds in shared memory */
111 
112 /* My slot in the shared memory array */
113 WalSnd *MyWalSnd = NULL;
114 
115 /* Global state */
116 bool am_walsender = false; /* Am I a walsender process? */
117 bool am_cascading_walsender = false; /* Am I cascading WAL to another
118  * standby? */
119 bool am_db_walsender = false; /* Connected to a database? */
120 
121 /* GUC variables */
122 int max_wal_senders = 10; /* the maximum number of concurrent
123  * walsenders */
124 int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
125  * data message */
127 
128 /*
129  * State for WalSndWakeupRequest
130  */
131 bool wake_wal_senders = false;
132 
133 /*
134  * xlogreader used for replication. Note that a WAL sender doing physical
135  * replication does not need xlogreader to read WAL, but it needs one to
136  * keep a state of its work.
137  */
139 
140 /*
141  * These variables keep track of the state of the timeline we're currently
142  * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
143  * the timeline is not the latest timeline on this server, and the server's
144  * history forked off from that timeline at sendTimeLineValidUpto.
145  */
148 static bool sendTimeLineIsHistoric = false;
150 
151 /*
152  * How far have we sent WAL already? This is also advertised in
153  * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
154  */
156 
157 /* Buffers for constructing outgoing messages and processing reply messages. */
161 
162 /* Timestamp of last ProcessRepliesIfAny(). */
164 
165 /*
166  * Timestamp of last ProcessRepliesIfAny() that saw a reply from the
167  * standby. Set to 0 if wal_sender_timeout doesn't need to be active.
168  */
170 
171 /* Have we sent a heartbeat message asking for reply, since last reply? */
172 static bool waiting_for_ping_response = false;
173 
174 /*
175  * While streaming WAL in Copy mode, streamingDoneSending is set to true
176  * after we have sent CopyDone. We should not send any more CopyData messages
177  * after that. streamingDoneReceiving is set to true when we receive CopyDone
178  * from the other end. When both become true, it's time to exit Copy mode.
179  */
182 
183 /* Are we there yet? */
184 static bool WalSndCaughtUp = false;
185 
186 /* Flags set by signal handlers for later service in main loop */
187 static volatile sig_atomic_t got_SIGUSR2 = false;
188 static volatile sig_atomic_t got_STOPPING = false;
189 
190 /*
191  * This is set while we are streaming. When not set
192  * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
193  * the main loop is responsible for checking got_STOPPING and terminating when
194  * it's set (after streaming any remaining WAL).
195  */
196 static volatile sig_atomic_t replication_active = false;
197 
199 
200 /* A sample associating a WAL location with the time it was written. */
201 typedef struct
202 {
205 } WalTimeSample;
206 
207 /* The size of our buffer of time samples. */
208 #define LAG_TRACKER_BUFFER_SIZE 8192
209 
210 /* A mechanism for tracking replication lag. */
211 typedef struct
212 {
216  int read_heads[NUM_SYNC_REP_WAIT_MODE];
218 } LagTracker;
219 
221 
222 /* Signal handlers */
224 
225 /* Prototypes for private functions */
226 typedef void (*WalSndSendDataCallback) (void);
227 static void WalSndLoop(WalSndSendDataCallback send_data);
228 static void InitWalSenderSlot(void);
229 static void WalSndKill(int code, Datum arg);
231 static void XLogSendPhysical(void);
232 static void XLogSendLogical(void);
233 static void WalSndDone(WalSndSendDataCallback send_data);
235 static void IdentifySystem(void);
239 static void StartReplication(StartReplicationCmd *cmd);
241 static void ProcessStandbyMessage(void);
242 static void ProcessStandbyReplyMessage(void);
243 static void ProcessStandbyHSFeedbackMessage(void);
244 static void ProcessRepliesIfAny(void);
245 static void ProcessPendingWrites(void);
246 static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr);
247 static void WalSndKeepaliveIfNecessary(void);
248 static void WalSndCheckTimeOut(void);
250 static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
251 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
252 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
254  bool skipped_xact);
256 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
257 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
259 
260 static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
261  TimeLineID *tli_p);
262 
263 
264 /* Initialize walsender process before entering the main command loop */
265 void
266 InitWalSender(void)
267 {
269 
270  /* Create a per-walsender data structure in shared memory */
272 
273  /*
274  * We don't currently need any ResourceOwner in a walsender process, but
275  * if we did, we could call CreateAuxProcessResourceOwner here.
276  */
277 
278  /*
279  * Let postmaster know that we're a WAL sender. Once we've declared us as
280  * a WAL sender process, postmaster will let us outlive the bgwriter and
281  * kill us last in the shutdown sequence, so we get a chance to stream all
282  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
283  * there's no going back, and we mustn't write any WAL records after this.
284  */
287 
288  /*
289  * If the client didn't specify a database to connect to, show in PGPROC
290  * that our advertised xmin should affect vacuum horizons in all
291  * databases. This allows physical replication clients to send hot
292  * standby feedback that will delay vacuum cleanup in all databases.
293  */
294  if (MyDatabaseId == InvalidOid)
295  {
297  LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
300  LWLockRelease(ProcArrayLock);
301  }
302 
303  /* Initialize empty timestamp buffer for lag tracking. */
305 }
306 
307 /*
308  * Clean up after an error.
309  *
310  * WAL sender processes don't use transactions like regular backends do.
311  * This function does any cleanup required after an error in a WAL sender
312  * process, similar to what transaction abort does in a regular backend.
313  */
314 void
316 {
320 
321  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
323 
324  if (MyReplicationSlot != NULL)
326 
328 
329  replication_active = false;
330 
331  /*
332  * If there is a transaction in progress, it will clean up our
333  * ResourceOwner, but if a replication command set up a resource owner
334  * without a transaction, we've got to clean that up now.
335  */
337  WalSndResourceCleanup(false);
338 
339  if (got_STOPPING || got_SIGUSR2)
340  proc_exit(0);
341 
342  /* Revert back to startup state */
344 }
345 
346 /*
347  * Clean up any ResourceOwner we created.
348  */
349 void
350 WalSndResourceCleanup(bool isCommit)
351 {
352  ResourceOwner resowner;
353 
354  if (CurrentResourceOwner == NULL)
355  return;
356 
357  /*
358  * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
359  * in a local variable and clear it first.
360  */
361  resowner = CurrentResourceOwner;
362  CurrentResourceOwner = NULL;
363 
364  /* Now we can release resources and delete it. */
365  ResourceOwnerRelease(resowner,
366  RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
367  ResourceOwnerRelease(resowner,
368  RESOURCE_RELEASE_LOCKS, isCommit, true);
369  ResourceOwnerRelease(resowner,
370  RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
371  ResourceOwnerDelete(resowner);
372 }
373 
374 /*
375  * Handle a client's connection abort in an orderly manner.
376  */
377 static void
378 WalSndShutdown(void)
379 {
380  /*
381  * Reset whereToSendOutput to prevent ereport from attempting to send any
382  * more messages to the standby.
383  */
386 
387  proc_exit(0);
388  abort(); /* keep the compiler quiet */
389 }
390 
391 /*
392  * Handle the IDENTIFY_SYSTEM command.
393  */
394 static void
396 {
397  char sysid[32];
398  char xloc[MAXFNAMELEN];
399  XLogRecPtr logptr;
400  char *dbname = NULL;
402  TupOutputState *tstate;
403  TupleDesc tupdesc;
404  Datum values[4];
405  bool nulls[4] = {0};
406  TimeLineID currTLI;
407 
408  /*
409  * Reply with a result set with one row, four columns. First col is system
410  * ID, second is timeline ID, third is current xlog location and the
411  * fourth contains the database name if we are connected to one.
412  */
413 
414  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
416 
419  logptr = GetStandbyFlushRecPtr(&currTLI);
420  else
421  logptr = GetFlushRecPtr(&currTLI);
422 
423  snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
424 
425  if (MyDatabaseId != InvalidOid)
426  {
428 
429  /* syscache access needs a transaction env. */
431  /* make dbname live outside TX context */
435  /* CommitTransactionCommand switches to TopMemoryContext */
437  }
438 
440 
441  /* need a tuple descriptor representing four columns */
442  tupdesc = CreateTemplateTupleDesc(4);
443  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
444  TEXTOID, -1, 0);
445  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
446  INT8OID, -1, 0);
447  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
448  TEXTOID, -1, 0);
449  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
450  TEXTOID, -1, 0);
451 
452  /* prepare for projection of tuples */
453  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
454 
455  /* column 1: system identifier */
456  values[0] = CStringGetTextDatum(sysid);
457 
458  /* column 2: timeline */
459  values[1] = Int64GetDatum(currTLI);
460 
461  /* column 3: wal location */
462  values[2] = CStringGetTextDatum(xloc);
463 
464  /* column 4: database name, or NULL if none */
465  if (dbname)
467  else
468  nulls[3] = true;
469 
470  /* send it to dest */
471  do_tup_output(tstate, values, nulls);
472 
473  end_tup_output(tstate);
474 }
475 
476 /* Handle READ_REPLICATION_SLOT command */
477 static void
479 {
480 #define READ_REPLICATION_SLOT_COLS 3
481  ReplicationSlot *slot;
483  TupOutputState *tstate;
484  TupleDesc tupdesc;
486  bool nulls[READ_REPLICATION_SLOT_COLS];
487 
489  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
490  TEXTOID, -1, 0);
491  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
492  TEXTOID, -1, 0);
493  /* TimeLineID is unsigned, so int4 is not wide enough. */
494  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
495  INT8OID, -1, 0);
496 
497  memset(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
498 
499  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
500  slot = SearchNamedReplicationSlot(cmd->slotname, false);
501  if (slot == NULL || !slot->in_use)
502  {
503  LWLockRelease(ReplicationSlotControlLock);
504  }
505  else
506  {
507  ReplicationSlot slot_contents;
508  int i = 0;
509 
510  /* Copy slot contents while holding spinlock */
511  SpinLockAcquire(&slot->mutex);
512  slot_contents = *slot;
513  SpinLockRelease(&slot->mutex);
514  LWLockRelease(ReplicationSlotControlLock);
515 
516  if (OidIsValid(slot_contents.data.database))
517  ereport(ERROR,
518  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
519  errmsg("cannot use %s with a logical replication slot",
520  "READ_REPLICATION_SLOT"));
521 
522  /* slot type */
523  values[i] = CStringGetTextDatum("physical");
524  nulls[i] = false;
525  i++;
526 
527  /* start LSN */
528  if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
529  {
530  char xloc[64];
531 
532  snprintf(xloc, sizeof(xloc), "%X/%X",
533  LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
534  values[i] = CStringGetTextDatum(xloc);
535  nulls[i] = false;
536  }
537  i++;
538 
539  /* timeline this WAL was produced on */
540  if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
541  {
542  TimeLineID slots_position_timeline;
543  TimeLineID current_timeline;
544  List *timeline_history = NIL;
545 
546  /*
547  * While in recovery, use as timeline the currently-replaying one
548  * to get the LSN position's history.
549  */
550  if (RecoveryInProgress())
551  (void) GetXLogReplayRecPtr(&current_timeline);
552  else
553  current_timeline = GetWALInsertionTimeLine();
554 
555  timeline_history = readTimeLineHistory(current_timeline);
556  slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
557  timeline_history);
558  values[i] = Int64GetDatum((int64) slots_position_timeline);
559  nulls[i] = false;
560  }
561  i++;
562 
564  }
565 
567  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
568  do_tup_output(tstate, values, nulls);
569  end_tup_output(tstate);
570 }
571 
572 
573 /*
574  * Handle TIMELINE_HISTORY command.
575  */
576 static void
578 {
580  TupleDesc tupdesc;
582  char histfname[MAXFNAMELEN];
583  char path[MAXPGPATH];
584  int fd;
585  off_t histfilelen;
586  off_t bytesleft;
587  Size len;
588 
590 
591  /*
592  * Reply with a result set with one row, and two columns. The first col is
593  * the name of the history file, 2nd is the contents.
594  */
595  tupdesc = CreateTemplateTupleDesc(2);
596  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "filename", TEXTOID, -1, 0);
597  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "content", TEXTOID, -1, 0);
598 
599  TLHistoryFileName(histfname, cmd->timeline);
600  TLHistoryFilePath(path, cmd->timeline);
601 
602  /* Send a RowDescription message */
603  dest->rStartup(dest, CMD_SELECT, tupdesc);
604 
605  /* Send a DataRow message */
607  pq_sendint16(&buf, 2); /* # of columns */
608  len = strlen(histfname);
609  pq_sendint32(&buf, len); /* col1 len */
610  pq_sendbytes(&buf, histfname, len);
611 
612  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
613  if (fd < 0)
614  ereport(ERROR,
616  errmsg("could not open file \"%s\": %m", path)));
617 
618  /* Determine file length and send it to client */
619  histfilelen = lseek(fd, 0, SEEK_END);
620  if (histfilelen < 0)
621  ereport(ERROR,
623  errmsg("could not seek to end of file \"%s\": %m", path)));
624  if (lseek(fd, 0, SEEK_SET) != 0)
625  ereport(ERROR,
627  errmsg("could not seek to beginning of file \"%s\": %m", path)));
628 
629  pq_sendint32(&buf, histfilelen); /* col2 len */
630 
631  bytesleft = histfilelen;
632  while (bytesleft > 0)
633  {
634  PGAlignedBlock rbuf;
635  int nread;
636 
637  pgstat_report_wait_start(WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ);
638  nread = read(fd, rbuf.data, sizeof(rbuf));
640  if (nread < 0)
641  ereport(ERROR,
643  errmsg("could not read file \"%s\": %m",
644  path)));
645  else if (nread == 0)
646  ereport(ERROR,
648  errmsg("could not read file \"%s\": read %d of %zu",
649  path, nread, (Size) bytesleft)));
650 
651  pq_sendbytes(&buf, rbuf.data, nread);
652  bytesleft -= nread;
653  }
654 
655  if (CloseTransientFile(fd) != 0)
656  ereport(ERROR,
658  errmsg("could not close file \"%s\": %m", path)));
659 
660  pq_endmessage(&buf);
661 }
662 
663 /*
664  * Handle START_REPLICATION command.
665  *
666  * At the moment, this never returns, but an ereport(ERROR) will take us back
667  * to the main loop.
668  */
669 static void
671 {
673  XLogRecPtr FlushPtr;
674  TimeLineID FlushTLI;
675 
676  /* create xlogreader for physical replication */
677  xlogreader =
679  XL_ROUTINE(.segment_open = WalSndSegmentOpen,
680  .segment_close = wal_segment_close),
681  NULL);
682 
683  if (!xlogreader)
684  ereport(ERROR,
685  (errcode(ERRCODE_OUT_OF_MEMORY),
686  errmsg("out of memory"),
687  errdetail("Failed while allocating a WAL reading processor.")));
688 
689  /*
690  * We assume here that we're logging enough information in the WAL for
691  * log-shipping, since this is checked in PostmasterMain().
692  *
693  * NOTE: wal_level can only change at shutdown, so in most cases it is
694  * difficult for there to be WAL data that we can still see that was
695  * written at wal_level='minimal'.
696  */
697 
698  if (cmd->slotname)
699  {
700  ReplicationSlotAcquire(cmd->slotname, true);
702  ereport(ERROR,
703  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
704  errmsg("cannot use a logical replication slot for physical replication")));
705 
706  /*
707  * We don't need to verify the slot's restart_lsn here; instead we
708  * rely on the caller requesting the starting point to use. If the
709  * WAL segment doesn't exist, we'll fail later.
710  */
711  }
712 
713  /*
714  * Select the timeline. If it was given explicitly by the client, use
715  * that. Otherwise use the timeline of the last replayed record.
716  */
719  FlushPtr = GetStandbyFlushRecPtr(&FlushTLI);
720  else
721  FlushPtr = GetFlushRecPtr(&FlushTLI);
722 
723  if (cmd->timeline != 0)
724  {
725  XLogRecPtr switchpoint;
726 
727  sendTimeLine = cmd->timeline;
728  if (sendTimeLine == FlushTLI)
729  {
730  sendTimeLineIsHistoric = false;
732  }
733  else
734  {
735  List *timeLineHistory;
736 
737  sendTimeLineIsHistoric = true;
738 
739  /*
740  * Check that the timeline the client requested exists, and the
741  * requested start location is on that timeline.
742  */
743  timeLineHistory = readTimeLineHistory(FlushTLI);
744  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
746  list_free_deep(timeLineHistory);
747 
748  /*
749  * Found the requested timeline in the history. Check that
750  * requested startpoint is on that timeline in our history.
751  *
752  * This is quite loose on purpose. We only check that we didn't
753  * fork off the requested timeline before the switchpoint. We
754  * don't check that we switched *to* it before the requested
755  * starting point. This is because the client can legitimately
756  * request to start replication from the beginning of the WAL
757  * segment that contains switchpoint, but on the new timeline, so
758  * that it doesn't end up with a partial segment. If you ask for
759  * too old a starting point, you'll get an error later when we
760  * fail to find the requested WAL segment in pg_wal.
761  *
762  * XXX: we could be more strict here and only allow a startpoint
763  * that's older than the switchpoint, if it's still in the same
764  * WAL segment.
765  */
766  if (!XLogRecPtrIsInvalid(switchpoint) &&
767  switchpoint < cmd->startpoint)
768  {
769  ereport(ERROR,
770  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
772  cmd->timeline),
773  errdetail("This server's history forked from timeline %u at %X/%X.",
774  cmd->timeline,
775  LSN_FORMAT_ARGS(switchpoint))));
776  }
777  sendTimeLineValidUpto = switchpoint;
778  }
779  }
780  else
781  {
782  sendTimeLine = FlushTLI;
784  sendTimeLineIsHistoric = false;
785  }
786 
788 
789  /* If there is nothing to stream, don't even enter COPY mode */
791  {
792  /*
793  * When we first start replication the standby will be behind the
794  * primary. For some applications, for example synchronous
795  * replication, it is important to have a clear state for this initial
796  * catchup mode, so we can trigger actions when we change streaming
797  * state later. We may stay in this state for a long time, which is
798  * exactly why we want to be able to monitor whether or not we are
799  * still here.
800  */
802 
803  /* Send a CopyBothResponse message, and start streaming */
805  pq_sendbyte(&buf, 0);
806  pq_sendint16(&buf, 0);
807  pq_endmessage(&buf);
808  pq_flush();
809 
810  /*
811  * Don't allow a request to stream from a future point in WAL that
812  * hasn't been flushed to disk in this server yet.
813  */
814  if (FlushPtr < cmd->startpoint)
815  {
816  ereport(ERROR,
817  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
819  LSN_FORMAT_ARGS(FlushPtr))));
820  }
821 
822  /* Start streaming from the requested point */
823  sentPtr = cmd->startpoint;
824 
825  /* Initialize shared memory status, too */
829 
831 
832  /* Main loop of walsender */
833  replication_active = true;
834 
836 
837  replication_active = false;
838  if (got_STOPPING)
839  proc_exit(0);
841 
843  }
844 
845  if (cmd->slotname)
847 
848  /*
849  * Copy is finished now. Send a single-row result set indicating the next
850  * timeline.
851  */
853  {
854  char startpos_str[8 + 1 + 8 + 1];
856  TupOutputState *tstate;
857  TupleDesc tupdesc;
858  Datum values[2];
859  bool nulls[2] = {0};
860 
861  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
863 
865 
866  /*
867  * Need a tuple descriptor representing two columns. int8 may seem
868  * like a surprising data type for this, but in theory int4 would not
869  * be wide enough for this, as TimeLineID is unsigned.
870  */
871  tupdesc = CreateTemplateTupleDesc(2);
872  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
873  INT8OID, -1, 0);
874  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
875  TEXTOID, -1, 0);
876 
877  /* prepare for projection of tuple */
878  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
879 
881  values[1] = CStringGetTextDatum(startpos_str);
882 
883  /* send it to dest */
884  do_tup_output(tstate, values, nulls);
885 
886  end_tup_output(tstate);
887  }
888 
889  /* Send CommandComplete message */
890  EndReplicationCommand("START_STREAMING");
891 }
892 
893 /*
894  * XLogReaderRoutine->page_read callback for logical decoding contexts, as a
895  * walsender process.
896  *
897  * Inside the walsender we can do better than read_local_xlog_page,
898  * which has to do a plain sleep/busy loop, because the walsender's latch gets
899  * set every time WAL is flushed.
900  */
901 static int
903  XLogRecPtr targetRecPtr, char *cur_page)
904 {
905  XLogRecPtr flushptr;
906  int count;
907  WALReadError errinfo;
908  XLogSegNo segno;
909  TimeLineID currTLI;
910 
911  /*
912  * Make sure we have enough WAL available before retrieving the current
913  * timeline. This is needed to determine am_cascading_walsender accurately
914  * which is needed to determine the current timeline.
915  */
916  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
917 
918  /*
919  * Since logical decoding is also permitted on a standby server, we need
920  * to check if the server is in recovery to decide how to get the current
921  * timeline ID (so that it also cover the promotion or timeline change
922  * cases).
923  */
925 
927  GetXLogReplayRecPtr(&currTLI);
928  else
929  currTLI = GetWALInsertionTimeLine();
930 
931  XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
932  sendTimeLineIsHistoric = (state->currTLI != currTLI);
933  sendTimeLine = state->currTLI;
934  sendTimeLineValidUpto = state->currTLIValidUntil;
935  sendTimeLineNextTLI = state->nextTLI;
936 
937  /* fail if not (implies we are going to shut down) */
938  if (flushptr < targetPagePtr + reqLen)
939  return -1;
940 
941  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
942  count = XLOG_BLCKSZ; /* more than one block available */
943  else
944  count = flushptr - targetPagePtr; /* part of the page available */
945 
946  /* now actually read the data, we know it's there */
947  if (!WALRead(state,
948  cur_page,
949  targetPagePtr,
950  XLOG_BLCKSZ,
951  currTLI, /* Pass the current TLI because only
952  * WalSndSegmentOpen controls whether new TLI
953  * is needed. */
954  &errinfo))
955  WALReadRaiseError(&errinfo);
956 
957  /*
958  * After reading into the buffer, check that what we read was valid. We do
959  * this after reading, because even though the segment was present when we
960  * opened it, it might get recycled or removed while we read it. The
961  * read() succeeds in that case, but the data we tried to read might
962  * already have been overwritten with new WAL records.
963  */
964  XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
965  CheckXLogRemoved(segno, state->seg.ws_tli);
966 
967  return count;
968 }
969 
970 /*
971  * Process extra options given to CREATE_REPLICATION_SLOT.
972  */
973 static void
975  bool *reserve_wal,
976  CRSSnapshotAction *snapshot_action,
977  bool *two_phase)
978 {
979  ListCell *lc;
980  bool snapshot_action_given = false;
981  bool reserve_wal_given = false;
982  bool two_phase_given = false;
983 
984  /* Parse options */
985  foreach(lc, cmd->options)
986  {
987  DefElem *defel = (DefElem *) lfirst(lc);
988 
989  if (strcmp(defel->defname, "snapshot") == 0)
990  {
991  char *action;
992 
993  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
994  ereport(ERROR,
995  (errcode(ERRCODE_SYNTAX_ERROR),
996  errmsg("conflicting or redundant options")));
997 
998  action = defGetString(defel);
999  snapshot_action_given = true;
1000 
1001  if (strcmp(action, "export") == 0)
1002  *snapshot_action = CRS_EXPORT_SNAPSHOT;
1003  else if (strcmp(action, "nothing") == 0)
1004  *snapshot_action = CRS_NOEXPORT_SNAPSHOT;
1005  else if (strcmp(action, "use") == 0)
1006  *snapshot_action = CRS_USE_SNAPSHOT;
1007  else
1008  ereport(ERROR,
1009  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1010  errmsg("unrecognized value for CREATE_REPLICATION_SLOT option \"%s\": \"%s\"",
1011  defel->defname, action)));
1012  }
1013  else if (strcmp(defel->defname, "reserve_wal") == 0)
1014  {
1015  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
1016  ereport(ERROR,
1017  (errcode(ERRCODE_SYNTAX_ERROR),
1018  errmsg("conflicting or redundant options")));
1019 
1020  reserve_wal_given = true;
1021  *reserve_wal = defGetBoolean(defel);
1022  }
1023  else if (strcmp(defel->defname, "two_phase") == 0)
1024  {
1025  if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1026  ereport(ERROR,
1027  (errcode(ERRCODE_SYNTAX_ERROR),
1028  errmsg("conflicting or redundant options")));
1029  two_phase_given = true;
1030  *two_phase = defGetBoolean(defel);
1031  }
1032  else
1033  elog(ERROR, "unrecognized option: %s", defel->defname);
1034  }
1035 }
1036 
1037 /*
1038  * Create a new replication slot.
1039  */
1040 static void
1042 {
1043  const char *snapshot_name = NULL;
1044  char xloc[MAXFNAMELEN];
1045  char *slot_name;
1046  bool reserve_wal = false;
1047  bool two_phase = false;
1048  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
1049  DestReceiver *dest;
1050  TupOutputState *tstate;
1051  TupleDesc tupdesc;
1052  Datum values[4];
1053  bool nulls[4] = {0};
1054 
1056 
1057  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase);
1058 
1059  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1060  {
1061  ReplicationSlotCreate(cmd->slotname, false,
1063  false);
1064  }
1065  else
1066  {
1068 
1069  /*
1070  * Initially create persistent slot as ephemeral - that allows us to
1071  * nicely handle errors during initialization because it'll get
1072  * dropped if this transaction fails. We'll make it persistent at the
1073  * end. Temporary slots can be created as temporary from beginning as
1074  * they get dropped on error as well.
1075  */
1076  ReplicationSlotCreate(cmd->slotname, true,
1078  two_phase);
1079  }
1080 
1081  if (cmd->kind == REPLICATION_KIND_LOGICAL)
1082  {
1084  bool need_full_snapshot = false;
1085 
1086  /*
1087  * Do options check early so that we can bail before calling the
1088  * DecodingContextFindStartpoint which can take long time.
1089  */
1090  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1091  {
1092  if (IsTransactionBlock())
1093  ereport(ERROR,
1094  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1095  (errmsg("%s must not be called inside a transaction",
1096  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
1097 
1098  need_full_snapshot = true;
1099  }
1100  else if (snapshot_action == CRS_USE_SNAPSHOT)
1101  {
1102  if (!IsTransactionBlock())
1103  ereport(ERROR,
1104  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1105  (errmsg("%s must be called inside a transaction",
1106  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1107 
1109  ereport(ERROR,
1110  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1111  (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
1112  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1113  if (!XactReadOnly)
1114  ereport(ERROR,
1115  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1116  (errmsg("%s must be called in a read-only transaction",
1117  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1118 
1119  if (FirstSnapshotSet)
1120  ereport(ERROR,
1121  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1122  (errmsg("%s must be called before any query",
1123  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1124 
1125  if (IsSubTransaction())
1126  ereport(ERROR,
1127  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1128  (errmsg("%s must not be called in a subtransaction",
1129  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1130 
1131  need_full_snapshot = true;
1132  }
1133 
1134  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
1136  XL_ROUTINE(.page_read = logical_read_xlog_page,
1137  .segment_open = WalSndSegmentOpen,
1138  .segment_close = wal_segment_close),
1141 
1142  /*
1143  * Signal that we don't need the timeout mechanism. We're just
1144  * creating the replication slot and don't yet accept feedback
1145  * messages or send keepalives. As we possibly need to wait for
1146  * further WAL the walsender would otherwise possibly be killed too
1147  * soon.
1148  */
1150 
1151  /* build initial snapshot, might take a while */
1153 
1154  /*
1155  * Export or use the snapshot if we've been asked to do so.
1156  *
1157  * NB. We will convert the snapbuild.c kind of snapshot to normal
1158  * snapshot when doing this.
1159  */
1160  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1161  {
1162  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
1163  }
1164  else if (snapshot_action == CRS_USE_SNAPSHOT)
1165  {
1166  Snapshot snap;
1167 
1170  }
1171 
1172  /* don't need the decoding context anymore */
1173  FreeDecodingContext(ctx);
1174 
1175  if (!cmd->temporary)
1177  }
1178  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
1179  {
1181 
1183 
1184  /* Write this slot to disk if it's a permanent one. */
1185  if (!cmd->temporary)
1187  }
1188 
1189  snprintf(xloc, sizeof(xloc), "%X/%X",
1191 
1193 
1194  /*----------
1195  * Need a tuple descriptor representing four columns:
1196  * - first field: the slot name
1197  * - second field: LSN at which we became consistent
1198  * - third field: exported snapshot's name
1199  * - fourth field: output plugin
1200  */
1201  tupdesc = CreateTemplateTupleDesc(4);
1202  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1203  TEXTOID, -1, 0);
1204  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1205  TEXTOID, -1, 0);
1206  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1207  TEXTOID, -1, 0);
1208  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1209  TEXTOID, -1, 0);
1210 
1211  /* prepare for projection of tuples */
1212  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1213 
1214  /* slot_name */
1215  slot_name = NameStr(MyReplicationSlot->data.name);
1216  values[0] = CStringGetTextDatum(slot_name);
1217 
1218  /* consistent wal location */
1219  values[1] = CStringGetTextDatum(xloc);
1220 
1221  /* snapshot name, or NULL if none */
1222  if (snapshot_name != NULL)
1223  values[2] = CStringGetTextDatum(snapshot_name);
1224  else
1225  nulls[2] = true;
1226 
1227  /* plugin, or NULL if none */
1228  if (cmd->plugin != NULL)
1229  values[3] = CStringGetTextDatum(cmd->plugin);
1230  else
1231  nulls[3] = true;
1232 
1233  /* send it to dest */
1234  do_tup_output(tstate, values, nulls);
1235  end_tup_output(tstate);
1236 
1238 }
1239 
1240 /*
1241  * Get rid of a replication slot that is no longer wanted.
1242  */
1243 static void
1245 {
1246  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1247 }
1248 
1249 /*
1250  * Load previously initiated logical slot and prepare for sending data (via
1251  * WalSndLoop).
1252  */
1253 static void
1255 {
1257  QueryCompletion qc;
1258 
1259  /* make sure that our requirements are still fulfilled */
1261 
1263 
1264  ReplicationSlotAcquire(cmd->slotname, true);
1265 
1266  /*
1267  * Force a disconnect, so that the decoding code doesn't need to care
1268  * about an eventual switch from running in recovery, to running in a
1269  * normal environment. Client code is expected to handle reconnects.
1270  */
1272  {
1273  ereport(LOG,
1274  (errmsg("terminating walsender process after promotion")));
1275  got_STOPPING = true;
1276  }
1277 
1278  /*
1279  * Create our decoding context, making it start at the previously ack'ed
1280  * position.
1281  *
1282  * Do this before sending a CopyBothResponse message, so that any errors
1283  * are reported early.
1284  */
1286  CreateDecodingContext(cmd->startpoint, cmd->options, false,
1287  XL_ROUTINE(.page_read = logical_read_xlog_page,
1288  .segment_open = WalSndSegmentOpen,
1289  .segment_close = wal_segment_close),
1293 
1295 
1296  /* Send a CopyBothResponse message, and start streaming */
1298  pq_sendbyte(&buf, 0);
1299  pq_sendint16(&buf, 0);
1300  pq_endmessage(&buf);
1301  pq_flush();
1302 
1303  /* Start reading WAL from the oldest required WAL. */
1306 
1307  /*
1308  * Report the location after which we'll send out further commits as the
1309  * current sentPtr.
1310  */
1312 
1313  /* Also update the sent position status in shared memory */
1317 
1318  replication_active = true;
1319 
1321 
1322  /* Main loop of walsender */
1324 
1327 
1328  replication_active = false;
1329  if (got_STOPPING)
1330  proc_exit(0);
1332 
1333  /* Get out of COPY mode (CommandComplete). */
1334  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
1335  EndCommand(&qc, DestRemote, false);
1336 }
1337 
1338 /*
1339  * LogicalDecodingContext 'prepare_write' callback.
1340  *
1341  * Prepare a write into a StringInfo.
1342  *
1343  * Don't do anything lasting in here, it's quite possible that nothing will be done
1344  * with the data.
1345  */
1346 static void
1348 {
1349  /* can't have sync rep confused by sending the same LSN several times */
1350  if (!last_write)
1351  lsn = InvalidXLogRecPtr;
1352 
1353  resetStringInfo(ctx->out);
1354 
1355  pq_sendbyte(ctx->out, 'w');
1356  pq_sendint64(ctx->out, lsn); /* dataStart */
1357  pq_sendint64(ctx->out, lsn); /* walEnd */
1358 
1359  /*
1360  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1361  * reserve space here.
1362  */
1363  pq_sendint64(ctx->out, 0); /* sendtime */
1364 }
1365 
1366 /*
1367  * LogicalDecodingContext 'write' callback.
1368  *
1369  * Actually write out data previously prepared by WalSndPrepareWrite out to
1370  * the network. Take as long as needed, but process replies from the other
1371  * side and check timeouts during that.
1372  */
1373 static void
1375  bool last_write)
1376 {
1377  TimestampTz now;
1378 
1379  /*
1380  * Fill the send timestamp last, so that it is taken as late as possible.
1381  * This is somewhat ugly, but the protocol is set as it's already used for
1382  * several releases by streaming physical replication.
1383  */
1386  pq_sendint64(&tmpbuf, now);
1387  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1388  tmpbuf.data, sizeof(int64));
1389 
1390  /* output previously gathered data in a CopyData packet */
1391  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1392 
1394 
1395  /* Try to flush pending output to the client */
1396  if (pq_flush_if_writable() != 0)
1397  WalSndShutdown();
1398 
1399  /* Try taking fast path unless we get too close to walsender timeout. */
1401  wal_sender_timeout / 2) &&
1402  !pq_is_send_pending())
1403  {
1404  return;
1405  }
1406 
1407  /* If we have pending write here, go to slow path */
1409 }
1410 
1411 /*
1412  * Wait until there is no pending write. Also process replies from the other
1413  * side and check timeouts during that.
1414  */
1415 static void
1417 {
1418  for (;;)
1419  {
1420  long sleeptime;
1421 
1422  /* Check for input from the client */
1424 
1425  /* die if timeout was reached */
1427 
1428  /* Send keepalive if the time has come */
1430 
1431  if (!pq_is_send_pending())
1432  break;
1433 
1435 
1436  /* Sleep until something happens or we time out */
1438  WAIT_EVENT_WAL_SENDER_WRITE_DATA);
1439 
1440  /* Clear any already-pending wakeups */
1442 
1444 
1445  /* Process any requests or signals received recently */
1446  if (ConfigReloadPending)
1447  {
1448  ConfigReloadPending = false;
1451  }
1452 
1453  /* Try to flush pending output to the client */
1454  if (pq_flush_if_writable() != 0)
1455  WalSndShutdown();
1456  }
1457 
1458  /* reactivate latch so WalSndLoop knows to continue */
1459  SetLatch(MyLatch);
1460 }
1461 
1462 /*
1463  * LogicalDecodingContext 'update_progress' callback.
1464  *
1465  * Write the current position to the lag tracker (see XLogSendPhysical).
1466  *
1467  * When skipping empty transactions, send a keepalive message if necessary.
1468  */
1469 static void
1471  bool skipped_xact)
1472 {
1473  static TimestampTz sendTime = 0;
1475  bool pending_writes = false;
1476  bool end_xact = ctx->end_xact;
1477 
1478  /*
1479  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1480  * avoid flooding the lag tracker when we commit frequently.
1481  *
1482  * We don't have a mechanism to get the ack for any LSN other than end
1483  * xact LSN from the downstream. So, we track lag only for end of
1484  * transaction LSN.
1485  */
1486 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1487  if (end_xact && TimestampDifferenceExceeds(sendTime, now,
1489  {
1490  LagTrackerWrite(lsn, now);
1491  sendTime = now;
1492  }
1493 
1494  /*
1495  * When skipping empty transactions in synchronous replication, we send a
1496  * keepalive message to avoid delaying such transactions.
1497  *
1498  * It is okay to check sync_standbys_defined flag without lock here as in
1499  * the worst case we will just send an extra keepalive message when it is
1500  * really not required.
1501  */
1502  if (skipped_xact &&
1503  SyncRepRequested() &&
1504  ((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
1505  {
1506  WalSndKeepalive(false, lsn);
1507 
1508  /* Try to flush pending output to the client */
1509  if (pq_flush_if_writable() != 0)
1510  WalSndShutdown();
1511 
1512  /* If we have pending write here, make sure it's actually flushed */
1513  if (pq_is_send_pending())
1514  pending_writes = true;
1515  }
1516 
1517  /*
1518  * Process pending writes if any or try to send a keepalive if required.
1519  * We don't need to try sending keep alive messages at the transaction end
1520  * as that will be done at a later point in time. This is required only
1521  * for large transactions where we don't send any changes to the
1522  * downstream and the receiver can timeout due to that.
1523  */
1524  if (pending_writes || (!end_xact &&
1526  wal_sender_timeout / 2)))
1528 }
1529 
1530 /*
1531  * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
1532  *
1533  * Returns end LSN of flushed WAL. Normally this will be >= loc, but
1534  * if we detect a shutdown request (either from postmaster or client)
1535  * we will return early, so caller must always check.
1536  */
1537 static XLogRecPtr
1539 {
1540  int wakeEvents;
1541  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1542 
1543  /*
1544  * Fast path to avoid acquiring the spinlock in case we already know we
1545  * have enough WAL available. This is particularly interesting if we're
1546  * far behind.
1547  */
1548  if (RecentFlushPtr != InvalidXLogRecPtr &&
1549  loc <= RecentFlushPtr)
1550  return RecentFlushPtr;
1551 
1552  /* Get a more recent flush pointer. */
1553  if (!RecoveryInProgress())
1554  RecentFlushPtr = GetFlushRecPtr(NULL);
1555  else
1556  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1557 
1558  for (;;)
1559  {
1560  long sleeptime;
1561 
1562  /* Clear any already-pending wakeups */
1564 
1566 
1567  /* Process any requests or signals received recently */
1568  if (ConfigReloadPending)
1569  {
1570  ConfigReloadPending = false;
1573  }
1574 
1575  /* Check for input from the client */
1577 
1578  /*
1579  * If we're shutting down, trigger pending WAL to be written out,
1580  * otherwise we'd possibly end up waiting for WAL that never gets
1581  * written, because walwriter has shut down already.
1582  */
1583  if (got_STOPPING)
1585 
1586  /* Update our idea of the currently flushed position. */
1587  if (!RecoveryInProgress())
1588  RecentFlushPtr = GetFlushRecPtr(NULL);
1589  else
1590  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1591 
1592  /*
1593  * If postmaster asked us to stop, don't wait anymore.
1594  *
1595  * It's important to do this check after the recomputation of
1596  * RecentFlushPtr, so we can send all remaining data before shutting
1597  * down.
1598  */
1599  if (got_STOPPING)
1600  break;
1601 
1602  /*
1603  * We only send regular messages to the client for full decoded
1604  * transactions, but a synchronous replication and walsender shutdown
1605  * possibly are waiting for a later location. So, before sleeping, we
1606  * send a ping containing the flush location. If the receiver is
1607  * otherwise idle, this keepalive will trigger a reply. Processing the
1608  * reply will update these MyWalSnd locations.
1609  */
1610  if (MyWalSnd->flush < sentPtr &&
1611  MyWalSnd->write < sentPtr &&
1614 
1615  /* check whether we're done */
1616  if (loc <= RecentFlushPtr)
1617  break;
1618 
1619  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1620  WalSndCaughtUp = true;
1621 
1622  /*
1623  * Try to flush any pending output to the client.
1624  */
1625  if (pq_flush_if_writable() != 0)
1626  WalSndShutdown();
1627 
1628  /*
1629  * If we have received CopyDone from the client, sent CopyDone
1630  * ourselves, and the output buffer is empty, it's time to exit
1631  * streaming, so fail the current WAL fetch request.
1632  */
1634  !pq_is_send_pending())
1635  break;
1636 
1637  /* die if timeout was reached */
1639 
1640  /* Send keepalive if the time has come */
1642 
1643  /*
1644  * Sleep until something happens or we time out. Also wait for the
1645  * socket becoming writable, if there's still pending output.
1646  * Otherwise we might sit on sendable output data while waiting for
1647  * new WAL to be generated. (But if we have nothing to send, we don't
1648  * want to wake on socket-writable.)
1649  */
1651 
1652  wakeEvents = WL_SOCKET_READABLE;
1653 
1654  if (pq_is_send_pending())
1655  wakeEvents |= WL_SOCKET_WRITEABLE;
1656 
1657  WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL);
1658  }
1659 
1660  /* reactivate latch so WalSndLoop knows to continue */
1661  SetLatch(MyLatch);
1662  return RecentFlushPtr;
1663 }
1664 
1665 /*
1666  * Execute an incoming replication command.
1667  *
1668  * Returns true if the cmd_string was recognized as WalSender command, false
1669  * if not.
1670  */
1671 bool
1672 exec_replication_command(const char *cmd_string)
1673 {
1674  int parse_rc;
1675  Node *cmd_node;
1676  const char *cmdtag;
1677  MemoryContext cmd_context;
1678  MemoryContext old_context;
1679 
1680  /*
1681  * If WAL sender has been told that shutdown is getting close, switch its
1682  * status accordingly to handle the next replication commands correctly.
1683  */
1684  if (got_STOPPING)
1686 
1687  /*
1688  * Throw error if in stopping mode. We need prevent commands that could
1689  * generate WAL while the shutdown checkpoint is being written. To be
1690  * safe, we just prohibit all new commands.
1691  */
1693  ereport(ERROR,
1694  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1695  errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1696 
1697  /*
1698  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1699  * command arrives. Clean up the old stuff if there's anything.
1700  */
1702 
1704 
1705  /*
1706  * Prepare to parse and execute the command.
1707  */
1709  "Replication command context",
1711  old_context = MemoryContextSwitchTo(cmd_context);
1712 
1713  replication_scanner_init(cmd_string);
1714 
1715  /*
1716  * Is it a WalSender command?
1717  */
1719  {
1720  /* Nope; clean up and get out. */
1722 
1723  MemoryContextSwitchTo(old_context);
1724  MemoryContextDelete(cmd_context);
1725 
1726  /* XXX this is a pretty random place to make this check */
1727  if (MyDatabaseId == InvalidOid)
1728  ereport(ERROR,
1729  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1730  errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1731 
1732  /* Tell the caller that this wasn't a WalSender command. */
1733  return false;
1734  }
1735 
1736  /*
1737  * Looks like a WalSender command, so parse it.
1738  */
1739  parse_rc = replication_yyparse();
1740  if (parse_rc != 0)
1741  ereport(ERROR,
1742  (errcode(ERRCODE_SYNTAX_ERROR),
1743  errmsg_internal("replication command parser returned %d",
1744  parse_rc)));
1746 
1747  cmd_node = replication_parse_result;
1748 
1749  /*
1750  * Report query to various monitoring facilities. For this purpose, we
1751  * report replication commands just like SQL commands.
1752  */
1753  debug_query_string = cmd_string;
1754 
1756 
1757  /*
1758  * Log replication command if log_replication_commands is enabled. Even
1759  * when it's disabled, log the command with DEBUG1 level for backward
1760  * compatibility.
1761  */
1763  (errmsg("received replication command: %s", cmd_string)));
1764 
1765  /*
1766  * Disallow replication commands in aborted transaction blocks.
1767  */
1769  ereport(ERROR,
1770  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1771  errmsg("current transaction is aborted, "
1772  "commands ignored until end of transaction block")));
1773 
1775 
1776  /*
1777  * Allocate buffers that will be used for each outgoing and incoming
1778  * message. We do this just once per command to reduce palloc overhead.
1779  */
1783 
1784  switch (cmd_node->type)
1785  {
1786  case T_IdentifySystemCmd:
1787  cmdtag = "IDENTIFY_SYSTEM";
1788  set_ps_display(cmdtag);
1789  IdentifySystem();
1790  EndReplicationCommand(cmdtag);
1791  break;
1792 
1793  case T_ReadReplicationSlotCmd:
1794  cmdtag = "READ_REPLICATION_SLOT";
1795  set_ps_display(cmdtag);
1797  EndReplicationCommand(cmdtag);
1798  break;
1799 
1800  case T_BaseBackupCmd:
1801  cmdtag = "BASE_BACKUP";
1802  set_ps_display(cmdtag);
1803  PreventInTransactionBlock(true, cmdtag);
1804  SendBaseBackup((BaseBackupCmd *) cmd_node);
1805  EndReplicationCommand(cmdtag);
1806  break;
1807 
1808  case T_CreateReplicationSlotCmd:
1809  cmdtag = "CREATE_REPLICATION_SLOT";
1810  set_ps_display(cmdtag);
1812  EndReplicationCommand(cmdtag);
1813  break;
1814 
1815  case T_DropReplicationSlotCmd:
1816  cmdtag = "DROP_REPLICATION_SLOT";
1817  set_ps_display(cmdtag);
1819  EndReplicationCommand(cmdtag);
1820  break;
1821 
1822  case T_StartReplicationCmd:
1823  {
1824  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1825 
1826  cmdtag = "START_REPLICATION";
1827  set_ps_display(cmdtag);
1828  PreventInTransactionBlock(true, cmdtag);
1829 
1830  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1831  StartReplication(cmd);
1832  else
1834 
1835  /* dupe, but necessary per libpqrcv_endstreaming */
1836  EndReplicationCommand(cmdtag);
1837 
1838  Assert(xlogreader != NULL);
1839  break;
1840  }
1841 
1842  case T_TimeLineHistoryCmd:
1843  cmdtag = "TIMELINE_HISTORY";
1844  set_ps_display(cmdtag);
1845  PreventInTransactionBlock(true, cmdtag);
1847  EndReplicationCommand(cmdtag);
1848  break;
1849 
1850  case T_VariableShowStmt:
1851  {
1853  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1854 
1855  cmdtag = "SHOW";
1856  set_ps_display(cmdtag);
1857 
1858  /* syscache access needs a transaction environment */
1860  GetPGVariable(n->name, dest);
1862  EndReplicationCommand(cmdtag);
1863  }
1864  break;
1865 
1866  default:
1867  elog(ERROR, "unrecognized replication command node tag: %u",
1868  cmd_node->type);
1869  }
1870 
1871  /* done */
1872  MemoryContextSwitchTo(old_context);
1873  MemoryContextDelete(cmd_context);
1874 
1875  /*
1876  * We need not update ps display or pg_stat_activity, because PostgresMain
1877  * will reset those to "idle". But we must reset debug_query_string to
1878  * ensure it doesn't become a dangling pointer.
1879  */
1880  debug_query_string = NULL;
1881 
1882  return true;
1883 }
1884 
1885 /*
1886  * Process any incoming messages while streaming. Also checks if the remote
1887  * end has closed the connection.
1888  */
1889 static void
1891 {
1892  unsigned char firstchar;
1893  int maxmsglen;
1894  int r;
1895  bool received = false;
1896 
1898 
1899  /*
1900  * If we already received a CopyDone from the frontend, any subsequent
1901  * message is the beginning of a new command, and should be processed in
1902  * the main processing loop.
1903  */
1904  while (!streamingDoneReceiving)
1905  {
1906  pq_startmsgread();
1907  r = pq_getbyte_if_available(&firstchar);
1908  if (r < 0)
1909  {
1910  /* unexpected error or EOF */
1912  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1913  errmsg("unexpected EOF on standby connection")));
1914  proc_exit(0);
1915  }
1916  if (r == 0)
1917  {
1918  /* no data available without blocking */
1919  pq_endmsgread();
1920  break;
1921  }
1922 
1923  /* Validate message type and set packet size limit */
1924  switch (firstchar)
1925  {
1926  case PqMsg_CopyData:
1927  maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
1928  break;
1929  case PqMsg_CopyDone:
1930  case PqMsg_Terminate:
1931  maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
1932  break;
1933  default:
1934  ereport(FATAL,
1935  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1936  errmsg("invalid standby message type \"%c\"",
1937  firstchar)));
1938  maxmsglen = 0; /* keep compiler quiet */
1939  break;
1940  }
1941 
1942  /* Read the message contents */
1944  if (pq_getmessage(&reply_message, maxmsglen))
1945  {
1947  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1948  errmsg("unexpected EOF on standby connection")));
1949  proc_exit(0);
1950  }
1951 
1952  /* ... and process it */
1953  switch (firstchar)
1954  {
1955  /*
1956  * 'd' means a standby reply wrapped in a CopyData packet.
1957  */
1958  case PqMsg_CopyData:
1960  received = true;
1961  break;
1962 
1963  /*
1964  * CopyDone means the standby requested to finish streaming.
1965  * Reply with CopyDone, if we had not sent that already.
1966  */
1967  case PqMsg_CopyDone:
1968  if (!streamingDoneSending)
1969  {
1970  pq_putmessage_noblock('c', NULL, 0);
1971  streamingDoneSending = true;
1972  }
1973 
1974  streamingDoneReceiving = true;
1975  received = true;
1976  break;
1977 
1978  /*
1979  * 'X' means that the standby is closing down the socket.
1980  */
1981  case PqMsg_Terminate:
1982  proc_exit(0);
1983 
1984  default:
1985  Assert(false); /* NOT REACHED */
1986  }
1987  }
1988 
1989  /*
1990  * Save the last reply timestamp if we've received at least one reply.
1991  */
1992  if (received)
1993  {
1995  waiting_for_ping_response = false;
1996  }
1997 }
1998 
1999 /*
2000  * Process a status update message received from standby.
2001  */
2002 static void
2004 {
2005  char msgtype;
2006 
2007  /*
2008  * Check message type from the first byte.
2009  */
2010  msgtype = pq_getmsgbyte(&reply_message);
2011 
2012  switch (msgtype)
2013  {
2014  case 'r':
2016  break;
2017 
2018  case 'h':
2020  break;
2021 
2022  default:
2024  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2025  errmsg("unexpected message type \"%c\"", msgtype)));
2026  proc_exit(0);
2027  }
2028 }
2029 
2030 /*
2031  * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
2032  */
2033 static void
2035 {
2036  bool changed = false;
2038 
2039  Assert(lsn != InvalidXLogRecPtr);
2040  SpinLockAcquire(&slot->mutex);
2041  if (slot->data.restart_lsn != lsn)
2042  {
2043  changed = true;
2044  slot->data.restart_lsn = lsn;
2045  }
2046  SpinLockRelease(&slot->mutex);
2047 
2048  if (changed)
2049  {
2052  }
2053 
2054  /*
2055  * One could argue that the slot should be saved to disk now, but that'd
2056  * be energy wasted - the worst thing lost information could cause here is
2057  * to give wrong information in a statistics view - we'll just potentially
2058  * be more conservative in removing files.
2059  */
2060 }
2061 
2062 /*
2063  * Regular reply from standby advising of WAL locations on standby server.
2064  */
2065 static void
2067 {
2068  XLogRecPtr writePtr,
2069  flushPtr,
2070  applyPtr;
2071  bool replyRequested;
2072  TimeOffset writeLag,
2073  flushLag,
2074  applyLag;
2075  bool clearLagTimes;
2076  TimestampTz now;
2077  TimestampTz replyTime;
2078 
2079  static bool fullyAppliedLastTime = false;
2080 
2081  /* the caller already consumed the msgtype byte */
2082  writePtr = pq_getmsgint64(&reply_message);
2083  flushPtr = pq_getmsgint64(&reply_message);
2084  applyPtr = pq_getmsgint64(&reply_message);
2085  replyTime = pq_getmsgint64(&reply_message);
2086  replyRequested = pq_getmsgbyte(&reply_message);
2087 
2089  {
2090  char *replyTimeStr;
2091 
2092  /* Copy because timestamptz_to_str returns a static buffer */
2093  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2094 
2095  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
2096  LSN_FORMAT_ARGS(writePtr),
2097  LSN_FORMAT_ARGS(flushPtr),
2098  LSN_FORMAT_ARGS(applyPtr),
2099  replyRequested ? " (reply requested)" : "",
2100  replyTimeStr);
2101 
2102  pfree(replyTimeStr);
2103  }
2104 
2105  /* See if we can compute the round-trip lag for these positions. */
2107  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
2108  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
2109  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
2110 
2111  /*
2112  * If the standby reports that it has fully replayed the WAL in two
2113  * consecutive reply messages, then the second such message must result
2114  * from wal_receiver_status_interval expiring on the standby. This is a
2115  * convenient time to forget the lag times measured when it last
2116  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
2117  * until more WAL traffic arrives.
2118  */
2119  clearLagTimes = false;
2120  if (applyPtr == sentPtr)
2121  {
2122  if (fullyAppliedLastTime)
2123  clearLagTimes = true;
2124  fullyAppliedLastTime = true;
2125  }
2126  else
2127  fullyAppliedLastTime = false;
2128 
2129  /* Send a reply if the standby requested one. */
2130  if (replyRequested)
2132 
2133  /*
2134  * Update shared state for this WalSender process based on reply data from
2135  * standby.
2136  */
2137  {
2138  WalSnd *walsnd = MyWalSnd;
2139 
2140  SpinLockAcquire(&walsnd->mutex);
2141  walsnd->write = writePtr;
2142  walsnd->flush = flushPtr;
2143  walsnd->apply = applyPtr;
2144  if (writeLag != -1 || clearLagTimes)
2145  walsnd->writeLag = writeLag;
2146  if (flushLag != -1 || clearLagTimes)
2147  walsnd->flushLag = flushLag;
2148  if (applyLag != -1 || clearLagTimes)
2149  walsnd->applyLag = applyLag;
2150  walsnd->replyTime = replyTime;
2151  SpinLockRelease(&walsnd->mutex);
2152  }
2153 
2156 
2157  /*
2158  * Advance our local xmin horizon when the client confirmed a flush.
2159  */
2160  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
2161  {
2164  else
2166  }
2167 }
2168 
2169 /* compute new replication slot xmin horizon if needed */
2170 static void
2172 {
2173  bool changed = false;
2175 
2176  SpinLockAcquire(&slot->mutex);
2178 
2179  /*
2180  * For physical replication we don't need the interlock provided by xmin
2181  * and effective_xmin since the consequences of a missed increase are
2182  * limited to query cancellations, so set both at once.
2183  */
2184  if (!TransactionIdIsNormal(slot->data.xmin) ||
2185  !TransactionIdIsNormal(feedbackXmin) ||
2186  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
2187  {
2188  changed = true;
2189  slot->data.xmin = feedbackXmin;
2190  slot->effective_xmin = feedbackXmin;
2191  }
2192  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
2193  !TransactionIdIsNormal(feedbackCatalogXmin) ||
2194  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
2195  {
2196  changed = true;
2197  slot->data.catalog_xmin = feedbackCatalogXmin;
2198  slot->effective_catalog_xmin = feedbackCatalogXmin;
2199  }
2200  SpinLockRelease(&slot->mutex);
2201 
2202  if (changed)
2203  {
2206  }
2207 }
2208 
2209 /*
2210  * Check that the provided xmin/epoch are sane, that is, not in the future
2211  * and not so far back as to be already wrapped around.
2212  *
2213  * Epoch of nextXid should be same as standby, or if the counter has
2214  * wrapped, then one greater than standby.
2215  *
2216  * This check doesn't care about whether clog exists for these xids
2217  * at all.
2218  */
2219 static bool
2221 {
2222  FullTransactionId nextFullXid;
2223  TransactionId nextXid;
2224  uint32 nextEpoch;
2225 
2226  nextFullXid = ReadNextFullTransactionId();
2227  nextXid = XidFromFullTransactionId(nextFullXid);
2228  nextEpoch = EpochFromFullTransactionId(nextFullXid);
2229 
2230  if (xid <= nextXid)
2231  {
2232  if (epoch != nextEpoch)
2233  return false;
2234  }
2235  else
2236  {
2237  if (epoch + 1 != nextEpoch)
2238  return false;
2239  }
2240 
2241  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2242  return false; /* epoch OK, but it's wrapped around */
2243 
2244  return true;
2245 }
2246 
2247 /*
2248  * Hot Standby feedback
2249  */
2250 static void
2252 {
2253  TransactionId feedbackXmin;
2254  uint32 feedbackEpoch;
2255  TransactionId feedbackCatalogXmin;
2256  uint32 feedbackCatalogEpoch;
2257  TimestampTz replyTime;
2258 
2259  /*
2260  * Decipher the reply message. The caller already consumed the msgtype
2261  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2262  * of this message.
2263  */
2264  replyTime = pq_getmsgint64(&reply_message);
2265  feedbackXmin = pq_getmsgint(&reply_message, 4);
2266  feedbackEpoch = pq_getmsgint(&reply_message, 4);
2267  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2268  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2269 
2271  {
2272  char *replyTimeStr;
2273 
2274  /* Copy because timestamptz_to_str returns a static buffer */
2275  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2276 
2277  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2278  feedbackXmin,
2279  feedbackEpoch,
2280  feedbackCatalogXmin,
2281  feedbackCatalogEpoch,
2282  replyTimeStr);
2283 
2284  pfree(replyTimeStr);
2285  }
2286 
2287  /*
2288  * Update shared state for this WalSender process based on reply data from
2289  * standby.
2290  */
2291  {
2292  WalSnd *walsnd = MyWalSnd;
2293 
2294  SpinLockAcquire(&walsnd->mutex);
2295  walsnd->replyTime = replyTime;
2296  SpinLockRelease(&walsnd->mutex);
2297  }
2298 
2299  /*
2300  * Unset WalSender's xmins if the feedback message values are invalid.
2301  * This happens when the downstream turned hot_standby_feedback off.
2302  */
2303  if (!TransactionIdIsNormal(feedbackXmin)
2304  && !TransactionIdIsNormal(feedbackCatalogXmin))
2305  {
2307  if (MyReplicationSlot != NULL)
2308  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2309  return;
2310  }
2311 
2312  /*
2313  * Check that the provided xmin/epoch are sane, that is, not in the future
2314  * and not so far back as to be already wrapped around. Ignore if not.
2315  */
2316  if (TransactionIdIsNormal(feedbackXmin) &&
2317  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2318  return;
2319 
2320  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2321  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2322  return;
2323 
2324  /*
2325  * Set the WalSender's xmin equal to the standby's requested xmin, so that
2326  * the xmin will be taken into account by GetSnapshotData() /
2327  * ComputeXidHorizons(). This will hold back the removal of dead rows and
2328  * thereby prevent the generation of cleanup conflicts on the standby
2329  * server.
2330  *
2331  * There is a small window for a race condition here: although we just
2332  * checked that feedbackXmin precedes nextXid, the nextXid could have
2333  * gotten advanced between our fetching it and applying the xmin below,
2334  * perhaps far enough to make feedbackXmin wrap around. In that case the
2335  * xmin we set here would be "in the future" and have no effect. No point
2336  * in worrying about this since it's too late to save the desired data
2337  * anyway. Assuming that the standby sends us an increasing sequence of
2338  * xmins, this could only happen during the first reply cycle, else our
2339  * own xmin would prevent nextXid from advancing so far.
2340  *
2341  * We don't bother taking the ProcArrayLock here. Setting the xmin field
2342  * is assumed atomic, and there's no real need to prevent concurrent
2343  * horizon determinations. (If we're moving our xmin forward, this is
2344  * obviously safe, and if we're moving it backwards, well, the data is at
2345  * risk already since a VACUUM could already have determined the horizon.)
2346  *
2347  * If we're using a replication slot we reserve the xmin via that,
2348  * otherwise via the walsender's PGPROC entry. We can only track the
2349  * catalog xmin separately when using a slot, so we store the least of the
2350  * two provided when not using a slot.
2351  *
2352  * XXX: It might make sense to generalize the ephemeral slot concept and
2353  * always use the slot mechanism to handle the feedback xmin.
2354  */
2355  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2356  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2357  else
2358  {
2359  if (TransactionIdIsNormal(feedbackCatalogXmin)
2360  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2361  MyProc->xmin = feedbackCatalogXmin;
2362  else
2363  MyProc->xmin = feedbackXmin;
2364  }
2365 }
2366 
2367 /*
2368  * Compute how long send/receive loops should sleep.
2369  *
2370  * If wal_sender_timeout is enabled we want to wake up in time to send
2371  * keepalives and to abort the connection if wal_sender_timeout has been
2372  * reached.
2373  */
2374 static long
2376 {
2377  long sleeptime = 10000; /* 10 s */
2378 
2380  {
2381  TimestampTz wakeup_time;
2382 
2383  /*
2384  * At the latest stop sleeping once wal_sender_timeout has been
2385  * reached.
2386  */
2389 
2390  /*
2391  * If no ping has been sent yet, wakeup when it's time to do so.
2392  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2393  * the timeout passed without a response.
2394  */
2397  wal_sender_timeout / 2);
2398 
2399  /* Compute relative time until wakeup. */
2400  sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
2401  }
2402 
2403  return sleeptime;
2404 }
2405 
2406 /*
2407  * Check whether there have been responses by the client within
2408  * wal_sender_timeout and shutdown if not. Using last_processing as the
2409  * reference point avoids counting server-side stalls against the client.
2410  * However, a long server-side stall can make WalSndKeepaliveIfNecessary()
2411  * postdate last_processing by more than wal_sender_timeout. If that happens,
2412  * the client must reply almost immediately to avoid a timeout. This rarely
2413  * affects the default configuration, under which clients spontaneously send a
2414  * message every standby_message_timeout = wal_sender_timeout/6 = 10s. We
2415  * could eliminate that problem by recognizing timeout expiration at
2416  * wal_sender_timeout/2 after the keepalive.
2417  */
2418 static void
2420 {
2421  TimestampTz timeout;
2422 
2423  /* don't bail out if we're doing something that doesn't require timeouts */
2424  if (last_reply_timestamp <= 0)
2425  return;
2426 
2429 
2430  if (wal_sender_timeout > 0 && last_processing >= timeout)
2431  {
2432  /*
2433  * Since typically expiration of replication timeout means
2434  * communication problem, we don't send the error message to the
2435  * standby.
2436  */
2438  (errmsg("terminating walsender process due to replication timeout")));
2439 
2440  WalSndShutdown();
2441  }
2442 }
2443 
2444 /* Main loop of walsender process that streams the WAL over Copy messages. */
2445 static void
2447 {
2448  /*
2449  * Initialize the last reply timestamp. That enables timeout processing
2450  * from hereon.
2451  */
2453  waiting_for_ping_response = false;
2454 
2455  /*
2456  * Loop until we reach the end of this timeline or the client requests to
2457  * stop streaming.
2458  */
2459  for (;;)
2460  {
2461  /* Clear any already-pending wakeups */
2463 
2465 
2466  /* Process any requests or signals received recently */
2467  if (ConfigReloadPending)
2468  {
2469  ConfigReloadPending = false;
2472  }
2473 
2474  /* Check for input from the client */
2476 
2477  /*
2478  * If we have received CopyDone from the client, sent CopyDone
2479  * ourselves, and the output buffer is empty, it's time to exit
2480  * streaming.
2481  */
2483  !pq_is_send_pending())
2484  break;
2485 
2486  /*
2487  * If we don't have any pending data in the output buffer, try to send
2488  * some more. If there is some, we don't bother to call send_data
2489  * again until we've flushed it ... but we'd better assume we are not
2490  * caught up.
2491  */
2492  if (!pq_is_send_pending())
2493  send_data();
2494  else
2495  WalSndCaughtUp = false;
2496 
2497  /* Try to flush pending output to the client */
2498  if (pq_flush_if_writable() != 0)
2499  WalSndShutdown();
2500 
2501  /* If nothing remains to be sent right now ... */
2503  {
2504  /*
2505  * If we're in catchup state, move to streaming. This is an
2506  * important state change for users to know about, since before
2507  * this point data loss might occur if the primary dies and we
2508  * need to failover to the standby. The state change is also
2509  * important for synchronous replication, since commits that
2510  * started to wait at that point might wait for some time.
2511  */
2513  {
2514  ereport(DEBUG1,
2515  (errmsg_internal("\"%s\" has now caught up with upstream server",
2516  application_name)));
2518  }
2519 
2520  /*
2521  * When SIGUSR2 arrives, we send any outstanding logs up to the
2522  * shutdown checkpoint record (i.e., the latest record), wait for
2523  * them to be replicated to the standby, and exit. This may be a
2524  * normal termination at shutdown, or a promotion, the walsender
2525  * is not sure which.
2526  */
2527  if (got_SIGUSR2)
2528  WalSndDone(send_data);
2529  }
2530 
2531  /* Check for replication timeout. */
2533 
2534  /* Send keepalive if the time has come */
2536 
2537  /*
2538  * Block if we have unsent data. XXX For logical replication, let
2539  * WalSndWaitForWal() handle any other blocking; idle receivers need
2540  * its additional actions. For physical replication, also block if
2541  * caught up; its send_data does not block.
2542  */
2543  if ((WalSndCaughtUp && send_data != XLogSendLogical &&
2546  {
2547  long sleeptime;
2548  int wakeEvents;
2549 
2551  wakeEvents = WL_SOCKET_READABLE;
2552  else
2553  wakeEvents = 0;
2554 
2555  /*
2556  * Use fresh timestamp, not last_processing, to reduce the chance
2557  * of reaching wal_sender_timeout before sending a keepalive.
2558  */
2560 
2561  if (pq_is_send_pending())
2562  wakeEvents |= WL_SOCKET_WRITEABLE;
2563 
2564  /* Sleep until something happens or we time out */
2565  WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
2566  }
2567  }
2568 }
2569 
2570 /* Initialize a per-walsender data structure for this walsender process */
2571 static void
2573 {
2574  int i;
2575 
2576  /*
2577  * WalSndCtl should be set up already (we inherit this by fork() or
2578  * EXEC_BACKEND mechanism from the postmaster).
2579  */
2580  Assert(WalSndCtl != NULL);
2581  Assert(MyWalSnd == NULL);
2582 
2583  /*
2584  * Find a free walsender slot and reserve it. This must not fail due to
2585  * the prior check for free WAL senders in InitProcess().
2586  */
2587  for (i = 0; i < max_wal_senders; i++)
2588  {
2589  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2590 
2591  SpinLockAcquire(&walsnd->mutex);
2592 
2593  if (walsnd->pid != 0)
2594  {
2595  SpinLockRelease(&walsnd->mutex);
2596  continue;
2597  }
2598  else
2599  {
2600  /*
2601  * Found a free slot. Reserve it for us.
2602  */
2603  walsnd->pid = MyProcPid;
2604  walsnd->state = WALSNDSTATE_STARTUP;
2605  walsnd->sentPtr = InvalidXLogRecPtr;
2606  walsnd->needreload = false;
2607  walsnd->write = InvalidXLogRecPtr;
2608  walsnd->flush = InvalidXLogRecPtr;
2609  walsnd->apply = InvalidXLogRecPtr;
2610  walsnd->writeLag = -1;
2611  walsnd->flushLag = -1;
2612  walsnd->applyLag = -1;
2613  walsnd->sync_standby_priority = 0;
2614  walsnd->latch = &MyProc->procLatch;
2615  walsnd->replyTime = 0;
2616 
2617  /*
2618  * The kind assignment is done here and not in StartReplication()
2619  * and StartLogicalReplication(). Indeed, the logical walsender
2620  * needs to read WAL records (like snapshot of running
2621  * transactions) during the slot creation. So it needs to be woken
2622  * up based on its kind.
2623  *
2624  * The kind assignment could also be done in StartReplication(),
2625  * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
2626  * seems better to set it on one place.
2627  */
2628  if (MyDatabaseId == InvalidOid)
2629  walsnd->kind = REPLICATION_KIND_PHYSICAL;
2630  else
2631  walsnd->kind = REPLICATION_KIND_LOGICAL;
2632 
2633  SpinLockRelease(&walsnd->mutex);
2634  /* don't need the lock anymore */
2635  MyWalSnd = (WalSnd *) walsnd;
2636 
2637  break;
2638  }
2639  }
2640 
2641  Assert(MyWalSnd != NULL);
2642 
2643  /* Arrange to clean up at walsender exit */
2645 }
2646 
2647 /* Destroy the per-walsender data structure for this walsender process */
2648 static void
2650 {
2651  WalSnd *walsnd = MyWalSnd;
2652 
2653  Assert(walsnd != NULL);
2654 
2655  MyWalSnd = NULL;
2656 
2657  SpinLockAcquire(&walsnd->mutex);
2658  /* clear latch while holding the spinlock, so it can safely be read */
2659  walsnd->latch = NULL;
2660  /* Mark WalSnd struct as no longer being in use. */
2661  walsnd->pid = 0;
2662  SpinLockRelease(&walsnd->mutex);
2663 }
2664 
2665 /* XLogReaderRoutine->segment_open callback */
2666 static void
2668  TimeLineID *tli_p)
2669 {
2670  char path[MAXPGPATH];
2671 
2672  /*-------
2673  * When reading from a historic timeline, and there is a timeline switch
2674  * within this segment, read from the WAL segment belonging to the new
2675  * timeline.
2676  *
2677  * For example, imagine that this server is currently on timeline 5, and
2678  * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
2679  * 0/13002088. In pg_wal, we have these files:
2680  *
2681  * ...
2682  * 000000040000000000000012
2683  * 000000040000000000000013
2684  * 000000050000000000000013
2685  * 000000050000000000000014
2686  * ...
2687  *
2688  * In this situation, when requested to send the WAL from segment 0x13, on
2689  * timeline 4, we read the WAL from file 000000050000000000000013. Archive
2690  * recovery prefers files from newer timelines, so if the segment was
2691  * restored from the archive on this server, the file belonging to the old
2692  * timeline, 000000040000000000000013, might not exist. Their contents are
2693  * equal up to the switchpoint, because at a timeline switch, the used
2694  * portion of the old segment is copied to the new file.
2695  */
2696  *tli_p = sendTimeLine;
2698  {
2699  XLogSegNo endSegNo;
2700 
2701  XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
2702  if (nextSegNo == endSegNo)
2703  *tli_p = sendTimeLineNextTLI;
2704  }
2705 
2706  XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
2707  state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2708  if (state->seg.ws_file >= 0)
2709  return;
2710 
2711  /*
2712  * If the file is not found, assume it's because the standby asked for a
2713  * too old WAL segment that has already been removed or recycled.
2714  */
2715  if (errno == ENOENT)
2716  {
2717  char xlogfname[MAXFNAMELEN];
2718  int save_errno = errno;
2719 
2720  XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
2721  errno = save_errno;
2722  ereport(ERROR,
2724  errmsg("requested WAL segment %s has already been removed",
2725  xlogfname)));
2726  }
2727  else
2728  ereport(ERROR,
2730  errmsg("could not open file \"%s\": %m",
2731  path)));
2732 }
2733 
2734 /*
2735  * Send out the WAL in its normal physical/stored form.
2736  *
2737  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
2738  * but not yet sent to the client, and buffer it in the libpq output
2739  * buffer.
2740  *
2741  * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
2742  * otherwise WalSndCaughtUp is set to false.
2743  */
2744 static void
2746 {
2747  XLogRecPtr SendRqstPtr;
2748  XLogRecPtr startptr;
2749  XLogRecPtr endptr;
2750  Size nbytes;
2751  XLogSegNo segno;
2752  WALReadError errinfo;
2753 
2754  /* If requested switch the WAL sender to the stopping state. */
2755  if (got_STOPPING)
2757 
2759  {
2760  WalSndCaughtUp = true;
2761  return;
2762  }
2763 
2764  /* Figure out how far we can safely send the WAL. */
2766  {
2767  /*
2768  * Streaming an old timeline that's in this server's history, but is
2769  * not the one we're currently inserting or replaying. It can be
2770  * streamed up to the point where we switched off that timeline.
2771  */
2772  SendRqstPtr = sendTimeLineValidUpto;
2773  }
2774  else if (am_cascading_walsender)
2775  {
2776  TimeLineID SendRqstTLI;
2777 
2778  /*
2779  * Streaming the latest timeline on a standby.
2780  *
2781  * Attempt to send all WAL that has already been replayed, so that we
2782  * know it's valid. If we're receiving WAL through streaming
2783  * replication, it's also OK to send any WAL that has been received
2784  * but not replayed.
2785  *
2786  * The timeline we're recovering from can change, or we can be
2787  * promoted. In either case, the current timeline becomes historic. We
2788  * need to detect that so that we don't try to stream past the point
2789  * where we switched to another timeline. We check for promotion or
2790  * timeline switch after calculating FlushPtr, to avoid a race
2791  * condition: if the timeline becomes historic just after we checked
2792  * that it was still current, it's still be OK to stream it up to the
2793  * FlushPtr that was calculated before it became historic.
2794  */
2795  bool becameHistoric = false;
2796 
2797  SendRqstPtr = GetStandbyFlushRecPtr(&SendRqstTLI);
2798 
2799  if (!RecoveryInProgress())
2800  {
2801  /* We have been promoted. */
2802  SendRqstTLI = GetWALInsertionTimeLine();
2803  am_cascading_walsender = false;
2804  becameHistoric = true;
2805  }
2806  else
2807  {
2808  /*
2809  * Still a cascading standby. But is the timeline we're sending
2810  * still the one recovery is recovering from?
2811  */
2812  if (sendTimeLine != SendRqstTLI)
2813  becameHistoric = true;
2814  }
2815 
2816  if (becameHistoric)
2817  {
2818  /*
2819  * The timeline we were sending has become historic. Read the
2820  * timeline history file of the new timeline to see where exactly
2821  * we forked off from the timeline we were sending.
2822  */
2823  List *history;
2824 
2825  history = readTimeLineHistory(SendRqstTLI);
2827 
2829  list_free_deep(history);
2830 
2831  sendTimeLineIsHistoric = true;
2832 
2833  SendRqstPtr = sendTimeLineValidUpto;
2834  }
2835  }
2836  else
2837  {
2838  /*
2839  * Streaming the current timeline on a primary.
2840  *
2841  * Attempt to send all data that's already been written out and
2842  * fsync'd to disk. We cannot go further than what's been written out
2843  * given the current implementation of WALRead(). And in any case
2844  * it's unsafe to send WAL that is not securely down to disk on the
2845  * primary: if the primary subsequently crashes and restarts, standbys
2846  * must not have applied any WAL that got lost on the primary.
2847  */
2848  SendRqstPtr = GetFlushRecPtr(NULL);
2849  }
2850 
2851  /*
2852  * Record the current system time as an approximation of the time at which
2853  * this WAL location was written for the purposes of lag tracking.
2854  *
2855  * In theory we could make XLogFlush() record a time in shmem whenever WAL
2856  * is flushed and we could get that time as well as the LSN when we call
2857  * GetFlushRecPtr() above (and likewise for the cascading standby
2858  * equivalent), but rather than putting any new code into the hot WAL path
2859  * it seems good enough to capture the time here. We should reach this
2860  * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2861  * may take some time, we read the WAL flush pointer and take the time
2862  * very close to together here so that we'll get a later position if it is
2863  * still moving.
2864  *
2865  * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
2866  * this gives us a cheap approximation for the WAL flush time for this
2867  * LSN.
2868  *
2869  * Note that the LSN is not necessarily the LSN for the data contained in
2870  * the present message; it's the end of the WAL, which might be further
2871  * ahead. All the lag tracking machinery cares about is finding out when
2872  * that arbitrary LSN is eventually reported as written, flushed and
2873  * applied, so that it can measure the elapsed time.
2874  */
2875  LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2876 
2877  /*
2878  * If this is a historic timeline and we've reached the point where we
2879  * forked to the next timeline, stop streaming.
2880  *
2881  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2882  * startup process will normally replay all WAL that has been received
2883  * from the primary, before promoting, but if the WAL streaming is
2884  * terminated at a WAL page boundary, the valid portion of the timeline
2885  * might end in the middle of a WAL record. We might've already sent the
2886  * first half of that partial WAL record to the cascading standby, so that
2887  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2888  * replay the partial WAL record either, so it can still follow our
2889  * timeline switch.
2890  */
2892  {
2893  /* close the current file. */
2894  if (xlogreader->seg.ws_file >= 0)
2896 
2897  /* Send CopyDone */
2898  pq_putmessage_noblock('c', NULL, 0);
2899  streamingDoneSending = true;
2900 
2901  WalSndCaughtUp = true;
2902 
2903  elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2906  return;
2907  }
2908 
2909  /* Do we have any work to do? */
2910  Assert(sentPtr <= SendRqstPtr);
2911  if (SendRqstPtr <= sentPtr)
2912  {
2913  WalSndCaughtUp = true;
2914  return;
2915  }
2916 
2917  /*
2918  * Figure out how much to send in one message. If there's no more than
2919  * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2920  * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2921  *
2922  * The rounding is not only for performance reasons. Walreceiver relies on
2923  * the fact that we never split a WAL record across two messages. Since a
2924  * long WAL record is split at page boundary into continuation records,
2925  * page boundary is always a safe cut-off point. We also assume that
2926  * SendRqstPtr never points to the middle of a WAL record.
2927  */
2928  startptr = sentPtr;
2929  endptr = startptr;
2930  endptr += MAX_SEND_SIZE;
2931 
2932  /* if we went beyond SendRqstPtr, back off */
2933  if (SendRqstPtr <= endptr)
2934  {
2935  endptr = SendRqstPtr;
2937  WalSndCaughtUp = false;
2938  else
2939  WalSndCaughtUp = true;
2940  }
2941  else
2942  {
2943  /* round down to page boundary. */
2944  endptr -= (endptr % XLOG_BLCKSZ);
2945  WalSndCaughtUp = false;
2946  }
2947 
2948  nbytes = endptr - startptr;
2949  Assert(nbytes <= MAX_SEND_SIZE);
2950 
2951  /*
2952  * OK to read and send the slice.
2953  */
2955  pq_sendbyte(&output_message, 'w');
2956 
2957  pq_sendint64(&output_message, startptr); /* dataStart */
2958  pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2959  pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2960 
2961  /*
2962  * Read the log directly into the output buffer to avoid extra memcpy
2963  * calls.
2964  */
2966 
2967 retry:
2968  if (!WALRead(xlogreader,
2970  startptr,
2971  nbytes,
2972  xlogreader->seg.ws_tli, /* Pass the current TLI because
2973  * only WalSndSegmentOpen controls
2974  * whether new TLI is needed. */
2975  &errinfo))
2976  WALReadRaiseError(&errinfo);
2977 
2978  /* See logical_read_xlog_page(). */
2979  XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
2981 
2982  /*
2983  * During recovery, the currently-open WAL file might be replaced with the
2984  * file of the same name retrieved from archive. So we always need to
2985  * check what we read was valid after reading into the buffer. If it's
2986  * invalid, we try to open and read the file again.
2987  */
2989  {
2990  WalSnd *walsnd = MyWalSnd;
2991  bool reload;
2992 
2993  SpinLockAcquire(&walsnd->mutex);
2994  reload = walsnd->needreload;
2995  walsnd->needreload = false;
2996  SpinLockRelease(&walsnd->mutex);
2997 
2998  if (reload && xlogreader->seg.ws_file >= 0)
2999  {
3001 
3002  goto retry;
3003  }
3004  }
3005 
3006  output_message.len += nbytes;
3008 
3009  /*
3010  * Fill the send timestamp last, so that it is taken as late as possible.
3011  */
3014  memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
3015  tmpbuf.data, sizeof(int64));
3016 
3018 
3019  sentPtr = endptr;
3020 
3021  /* Update shared memory status */
3022  {
3023  WalSnd *walsnd = MyWalSnd;
3024 
3025  SpinLockAcquire(&walsnd->mutex);
3026  walsnd->sentPtr = sentPtr;
3027  SpinLockRelease(&walsnd->mutex);
3028  }
3029 
3030  /* Report progress of XLOG streaming in PS display */
3032  {
3033  char activitymsg[50];
3034 
3035  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
3037  set_ps_display(activitymsg);
3038  }
3039 }
3040 
3041 /*
3042  * Stream out logically decoded data.
3043  */
3044 static void
3046 {
3047  XLogRecord *record;
3048  char *errm;
3049 
3050  /*
3051  * We'll use the current flush point to determine whether we've caught up.
3052  * This variable is static in order to cache it across calls. Caching is
3053  * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
3054  * spinlock.
3055  */
3056  static XLogRecPtr flushPtr = InvalidXLogRecPtr;
3057 
3058  /*
3059  * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
3060  * true in WalSndWaitForWal, if we're actually waiting. We also set to
3061  * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
3062  * didn't wait - i.e. when we're shutting down.
3063  */
3064  WalSndCaughtUp = false;
3065 
3066  record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
3067 
3068  /* xlog record was invalid */
3069  if (errm != NULL)
3070  elog(ERROR, "could not find record while sending logically-decoded data: %s",
3071  errm);
3072 
3073  if (record != NULL)
3074  {
3075  /*
3076  * Note the lack of any call to LagTrackerWrite() which is handled by
3077  * WalSndUpdateProgress which is called by output plugin through
3078  * logical decoding write api.
3079  */
3081 
3083  }
3084 
3085  /*
3086  * If first time through in this session, initialize flushPtr. Otherwise,
3087  * we only need to update flushPtr if EndRecPtr is past it.
3088  */
3089  if (flushPtr == InvalidXLogRecPtr ||
3090  logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3091  {
3093  flushPtr = GetStandbyFlushRecPtr(NULL);
3094  else
3095  flushPtr = GetFlushRecPtr(NULL);
3096  }
3097 
3098  /* If EndRecPtr is still past our flushPtr, it means we caught up. */
3099  if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3100  WalSndCaughtUp = true;
3101 
3102  /*
3103  * If we're caught up and have been requested to stop, have WalSndLoop()
3104  * terminate the connection in an orderly manner, after writing out all
3105  * the pending data.
3106  */
3108  got_SIGUSR2 = true;
3109 
3110  /* Update shared memory status */
3111  {
3112  WalSnd *walsnd = MyWalSnd;
3113 
3114  SpinLockAcquire(&walsnd->mutex);
3115  walsnd->sentPtr = sentPtr;
3116  SpinLockRelease(&walsnd->mutex);
3117  }
3118 }
3119 
3120 /*
3121  * Shutdown if the sender is caught up.
3122  *
3123  * NB: This should only be called when the shutdown signal has been received
3124  * from postmaster.
3125  *
3126  * Note that if we determine that there's still more data to send, this
3127  * function will return control to the caller.
3128  */
3129 static void
3131 {
3132  XLogRecPtr replicatedPtr;
3133 
3134  /* ... let's just be real sure we're caught up ... */
3135  send_data();
3136 
3137  /*
3138  * To figure out whether all WAL has successfully been replicated, check
3139  * flush location if valid, write otherwise. Tools like pg_receivewal will
3140  * usually (unless in synchronous mode) return an invalid flush location.
3141  */
3142  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
3144 
3145  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
3146  !pq_is_send_pending())
3147  {
3148  QueryCompletion qc;
3149 
3150  /* Inform the standby that XLOG streaming is done */
3151  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
3152  EndCommand(&qc, DestRemote, false);
3153  pq_flush();
3154 
3155  proc_exit(0);
3156  }
3159 }
3160 
3161 /*
3162  * Returns the latest point in WAL that has been safely flushed to disk, and
3163  * can be sent to the standby. This should only be called when in recovery,
3164  * ie. we're streaming to a cascaded standby.
3165  *
3166  * As a side-effect, *tli is updated to the TLI of the last
3167  * replayed WAL record.
3168  */
3169 static XLogRecPtr
3171 {
3172  XLogRecPtr replayPtr;
3173  TimeLineID replayTLI;
3174  XLogRecPtr receivePtr;
3176  XLogRecPtr result;
3177 
3178  /*
3179  * We can safely send what's already been replayed. Also, if walreceiver
3180  * is streaming WAL from the same timeline, we can send anything that it
3181  * has streamed, but hasn't been replayed yet.
3182  */
3183 
3184  receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
3185  replayPtr = GetXLogReplayRecPtr(&replayTLI);
3186 
3187  if (tli)
3188  *tli = replayTLI;
3189 
3190  result = replayPtr;
3191  if (receiveTLI == replayTLI && receivePtr > replayPtr)
3192  result = receivePtr;
3193 
3194  return result;
3195 }
3196 
3197 /*
3198  * Request walsenders to reload the currently-open WAL file
3199  */
3200 void
3202 {
3203  int i;
3204 
3205  for (i = 0; i < max_wal_senders; i++)
3206  {
3207  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3208 
3209  SpinLockAcquire(&walsnd->mutex);
3210  if (walsnd->pid == 0)
3211  {
3212  SpinLockRelease(&walsnd->mutex);
3213  continue;
3214  }
3215  walsnd->needreload = true;
3216  SpinLockRelease(&walsnd->mutex);
3217  }
3218 }
3219 
3220 /*
3221  * Handle PROCSIG_WALSND_INIT_STOPPING signal.
3222  */
3223 void
3225 {
3227 
3228  /*
3229  * If replication has not yet started, die like with SIGTERM. If
3230  * replication is active, only set a flag and wake up the main loop. It
3231  * will send any outstanding WAL, wait for it to be replicated to the
3232  * standby, and then exit gracefully.
3233  */
3234  if (!replication_active)
3235  kill(MyProcPid, SIGTERM);
3236  else
3237  got_STOPPING = true;
3238 }
3239 
3240 /*
3241  * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
3242  * sender should already have been switched to WALSNDSTATE_STOPPING at
3243  * this point.
3244  */
3245 static void
3247 {
3248  int save_errno = errno;
3249 
3250  got_SIGUSR2 = true;
3251  SetLatch(MyLatch);
3252 
3253  errno = save_errno;
3254 }
3255 
3256 /* Set up signal handlers */
3257 void
3259 {
3260  /* Set up signal handlers */
3262  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3263  pqsignal(SIGTERM, die); /* request shutdown */
3264  /* SIGQUIT handler was already set up by InitPostmasterChild */
3265  InitializeTimeouts(); /* establishes SIGALRM handler */
3268  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3269  * shutdown */
3270 
3271  /* Reset some signals that are accepted by postmaster but not here */
3273 }
3274 
3275 /* Report shared-memory space needed by WalSndShmemInit */
3276 Size
3278 {
3279  Size size = 0;
3280 
3281  size = offsetof(WalSndCtlData, walsnds);
3282  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3283 
3284  return size;
3285 }
3286 
3287 /* Allocate and initialize walsender-related shared memory */
3288 void
3290 {
3291  bool found;
3292  int i;
3293 
3294  WalSndCtl = (WalSndCtlData *)
3295  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3296 
3297  if (!found)
3298  {
3299  /* First time through, so initialize */
3301 
3302  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3304 
3305  for (i = 0; i < max_wal_senders; i++)
3306  {
3307  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3308 
3309  SpinLockInit(&walsnd->mutex);
3310  }
3311 
3314  }
3315 }
3316 
3317 /*
3318  * Wake up physical, logical or both kinds of walsenders
3319  *
3320  * The distinction between physical and logical walsenders is done, because:
3321  * - physical walsenders can't send data until it's been flushed
3322  * - logical walsenders on standby can't decode and send data until it's been
3323  * applied
3324  *
3325  * For cascading replication we need to wake up physical walsenders separately
3326  * from logical walsenders (see the comment before calling WalSndWakeup() in
3327  * ApplyWalRecord() for more details).
3328  *
3329  * This will be called inside critical sections, so throwing an error is not
3330  * advisable.
3331  */
3332 void
3333 WalSndWakeup(bool physical, bool logical)
3334 {
3335  /*
3336  * Wake up all the walsenders waiting on WAL being flushed or replayed
3337  * respectively. Note that waiting walsender would have prepared to sleep
3338  * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
3339  * before actually waiting.
3340  */
3341  if (physical)
3343 
3344  if (logical)
3346 }
3347 
3348 /*
3349  * Wait for readiness on the FeBe socket, or a timeout. The mask should be
3350  * composed of optional WL_SOCKET_WRITEABLE and WL_SOCKET_READABLE flags. Exit
3351  * on postmaster death.
3352  */
3353 static void
3354 WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
3355 {
3356  WaitEvent event;
3357 
3358  ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
3359 
3360  /*
3361  * We use a condition variable to efficiently wake up walsenders in
3362  * WalSndWakeup().
3363  *
3364  * Every walsender prepares to sleep on a shared memory CV. Note that it
3365  * just prepares to sleep on the CV (i.e., adds itself to the CV's
3366  * waitlist), but does not actually wait on the CV (IOW, it never calls
3367  * ConditionVariableSleep()). It still uses WaitEventSetWait() for
3368  * waiting, because we also need to wait for socket events. The processes
3369  * (startup process, walreceiver etc.) wanting to wake up walsenders use
3370  * ConditionVariableBroadcast(), which in turn calls SetLatch(), helping
3371  * walsenders come out of WaitEventSetWait().
3372  *
3373  * This approach is simple and efficient because, one doesn't have to loop
3374  * through all the walsenders slots, with a spinlock acquisition and
3375  * release for every iteration, just to wake up only the waiting
3376  * walsenders. It makes WalSndWakeup() callers' life easy.
3377  *
3378  * XXX: A desirable future improvement would be to add support for CVs
3379  * into WaitEventSetWait().
3380  *
3381  * And, we use separate shared memory CVs for physical and logical
3382  * walsenders for selective wake ups, see WalSndWakeup() for more details.
3383  */
3386  else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL)
3388 
3389  if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
3390  (event.events & WL_POSTMASTER_DEATH))
3391  {
3393  proc_exit(1);
3394  }
3395 
3397 }
3398 
3399 /*
3400  * Signal all walsenders to move to stopping state.
3401  *
3402  * This will trigger walsenders to move to a state where no further WAL can be
3403  * generated. See this file's header for details.
3404  */
3405 void
3407 {
3408  int i;
3409 
3410  for (i = 0; i < max_wal_senders; i++)
3411  {
3412  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3413  pid_t pid;
3414 
3415  SpinLockAcquire(&walsnd->mutex);
3416  pid = walsnd->pid;
3417  SpinLockRelease(&walsnd->mutex);
3418 
3419  if (pid == 0)
3420  continue;
3421 
3423  }
3424 }
3425 
3426 /*
3427  * Wait that all the WAL senders have quit or reached the stopping state. This
3428  * is used by the checkpointer to control when the shutdown checkpoint can
3429  * safely be performed.
3430  */
3431 void
3433 {
3434  for (;;)
3435  {
3436  int i;
3437  bool all_stopped = true;
3438 
3439  for (i = 0; i < max_wal_senders; i++)
3440  {
3441  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3442 
3443  SpinLockAcquire(&walsnd->mutex);
3444 
3445  if (walsnd->pid == 0)
3446  {
3447  SpinLockRelease(&walsnd->mutex);
3448  continue;
3449  }
3450 
3451  if (walsnd->state != WALSNDSTATE_STOPPING)
3452  {
3453  all_stopped = false;
3454  SpinLockRelease(&walsnd->mutex);
3455  break;
3456  }
3457  SpinLockRelease(&walsnd->mutex);
3458  }
3459 
3460  /* safe to leave if confirmation is done for all WAL senders */
3461  if (all_stopped)
3462  return;
3463 
3464  pg_usleep(10000L); /* wait for 10 msec */
3465  }
3466 }
3467 
3468 /* Set state for current walsender (only called in walsender) */
3469 void
3471 {
3472  WalSnd *walsnd = MyWalSnd;
3473 
3475 
3476  if (walsnd->state == state)
3477  return;
3478 
3479  SpinLockAcquire(&walsnd->mutex);
3480  walsnd->state = state;
3481  SpinLockRelease(&walsnd->mutex);
3482 }
3483 
3484 /*
3485  * Return a string constant representing the state. This is used
3486  * in system views, and should *not* be translated.
3487  */
3488 static const char *
3490 {
3491  switch (state)
3492  {
3493  case WALSNDSTATE_STARTUP:
3494  return "startup";
3495  case WALSNDSTATE_BACKUP:
3496  return "backup";
3497  case WALSNDSTATE_CATCHUP:
3498  return "catchup";
3499  case WALSNDSTATE_STREAMING:
3500  return "streaming";
3501  case WALSNDSTATE_STOPPING:
3502  return "stopping";
3503  }
3504  return "UNKNOWN";
3505 }
3506 
3507 static Interval *
3509 {
3510  Interval *result = palloc(sizeof(Interval));
3511 
3512  result->month = 0;
3513  result->day = 0;
3514  result->time = offset;
3515 
3516  return result;
3517 }
3518 
3519 /*
3520  * Returns activity of walsenders, including pids and xlog locations sent to
3521  * standby servers.
3522  */
3523 Datum
3525 {
3526 #define PG_STAT_GET_WAL_SENDERS_COLS 12
3527  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3528  SyncRepStandbyData *sync_standbys;
3529  int num_standbys;
3530  int i;
3531 
3532  InitMaterializedSRF(fcinfo, 0);
3533 
3534  /*
3535  * Get the currently active synchronous standbys. This could be out of
3536  * date before we're done, but we'll use the data anyway.
3537  */
3538  num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3539 
3540  for (i = 0; i < max_wal_senders; i++)
3541  {
3542  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3543  XLogRecPtr sent_ptr;
3544  XLogRecPtr write;
3545  XLogRecPtr flush;
3546  XLogRecPtr apply;
3547  TimeOffset writeLag;
3548  TimeOffset flushLag;
3549  TimeOffset applyLag;
3550  int priority;
3551  int pid;
3553  TimestampTz replyTime;
3554  bool is_sync_standby;
3556  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS] = {0};
3557  int j;
3558 
3559  /* Collect data from shared memory */
3560  SpinLockAcquire(&walsnd->mutex);
3561  if (walsnd->pid == 0)
3562  {
3563  SpinLockRelease(&walsnd->mutex);
3564  continue;
3565  }
3566  pid = walsnd->pid;
3567  sent_ptr = walsnd->sentPtr;
3568  state = walsnd->state;
3569  write = walsnd->write;
3570  flush = walsnd->flush;
3571  apply = walsnd->apply;
3572  writeLag = walsnd->writeLag;
3573  flushLag = walsnd->flushLag;
3574  applyLag = walsnd->applyLag;
3575  priority = walsnd->sync_standby_priority;
3576  replyTime = walsnd->replyTime;
3577  SpinLockRelease(&walsnd->mutex);
3578 
3579  /*
3580  * Detect whether walsender is/was considered synchronous. We can
3581  * provide some protection against stale data by checking the PID
3582  * along with walsnd_index.
3583  */
3584  is_sync_standby = false;
3585  for (j = 0; j < num_standbys; j++)
3586  {
3587  if (sync_standbys[j].walsnd_index == i &&
3588  sync_standbys[j].pid == pid)
3589  {
3590  is_sync_standby = true;
3591  break;
3592  }
3593  }
3594 
3595  values[0] = Int32GetDatum(pid);
3596 
3597  if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
3598  {
3599  /*
3600  * Only superusers and roles with privileges of pg_read_all_stats
3601  * can see details. Other users only get the pid value to know
3602  * it's a walsender, but no details.
3603  */
3604  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3605  }
3606  else
3607  {
3609 
3610  if (XLogRecPtrIsInvalid(sent_ptr))
3611  nulls[2] = true;
3612  values[2] = LSNGetDatum(sent_ptr);
3613 
3615  nulls[3] = true;
3616  values[3] = LSNGetDatum(write);
3617 
3618  if (XLogRecPtrIsInvalid(flush))
3619  nulls[4] = true;
3620  values[4] = LSNGetDatum(flush);
3621 
3622  if (XLogRecPtrIsInvalid(apply))
3623  nulls[5] = true;
3624  values[5] = LSNGetDatum(apply);
3625 
3626  /*
3627  * Treat a standby such as a pg_basebackup background process
3628  * which always returns an invalid flush location, as an
3629  * asynchronous standby.
3630  */
3631  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3632 
3633  if (writeLag < 0)
3634  nulls[6] = true;
3635  else
3637 
3638  if (flushLag < 0)
3639  nulls[7] = true;
3640  else
3642 
3643  if (applyLag < 0)
3644  nulls[8] = true;
3645  else
3647 
3648  values[9] = Int32GetDatum(priority);
3649 
3650  /*
3651  * More easily understood version of standby state. This is purely
3652  * informational.
3653  *
3654  * In quorum-based sync replication, the role of each standby
3655  * listed in synchronous_standby_names can be changing very
3656  * frequently. Any standbys considered as "sync" at one moment can
3657  * be switched to "potential" ones at the next moment. So, it's
3658  * basically useless to report "sync" or "potential" as their sync
3659  * states. We report just "quorum" for them.
3660  */
3661  if (priority == 0)
3662  values[10] = CStringGetTextDatum("async");
3663  else if (is_sync_standby)
3665  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3666  else
3667  values[10] = CStringGetTextDatum("potential");
3668 
3669  if (replyTime == 0)
3670  nulls[11] = true;
3671  else
3672  values[11] = TimestampTzGetDatum(replyTime);
3673  }
3674 
3675  tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
3676  values, nulls);
3677  }
3678 
3679  return (Datum) 0;
3680 }
3681 
3682 /*
3683  * Send a keepalive message to standby.
3684  *
3685  * If requestReply is set, the message requests the other party to send
3686  * a message back to us, for heartbeat purposes. We also set a flag to
3687  * let nearby code know that we're waiting for that response, to avoid
3688  * repeated requests.
3689  *
3690  * writePtr is the location up to which the WAL is sent. It is essentially
3691  * the same as sentPtr but in some cases, we need to send keep alive before
3692  * sentPtr is updated like when skipping empty transactions.
3693  */
3694 static void
3695 WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
3696 {
3697  elog(DEBUG2, "sending replication keepalive");
3698 
3699  /* construct the message... */
3701  pq_sendbyte(&output_message, 'k');
3702  pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr);
3704  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3705 
3706  /* ... and send it wrapped in CopyData */
3708 
3709  /* Set local flag */
3710  if (requestReply)
3712 }
3713 
3714 /*
3715  * Send keepalive message if too much time has elapsed.
3716  */
3717 static void
3719 {
3720  TimestampTz ping_time;
3721 
3722  /*
3723  * Don't send keepalive messages if timeouts are globally disabled or
3724  * we're doing something not partaking in timeouts.
3725  */
3726  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3727  return;
3728 
3730  return;
3731 
3732  /*
3733  * If half of wal_sender_timeout has lapsed without receiving any reply
3734  * from the standby, send a keep-alive message to the standby requesting
3735  * an immediate reply.
3736  */
3738  wal_sender_timeout / 2);
3739  if (last_processing >= ping_time)
3740  {
3742 
3743  /* Try to flush pending output to the client */
3744  if (pq_flush_if_writable() != 0)
3745  WalSndShutdown();
3746  }
3747 }
3748 
3749 /*
3750  * Record the end of the WAL and the time it was flushed locally, so that
3751  * LagTrackerRead can compute the elapsed time (lag) when this WAL location is
3752  * eventually reported to have been written, flushed and applied by the
3753  * standby in a reply message.
3754  */
3755 static void
3757 {
3758  bool buffer_full;
3759  int new_write_head;
3760  int i;
3761 
3762  if (!am_walsender)
3763  return;
3764 
3765  /*
3766  * If the lsn hasn't advanced since last time, then do nothing. This way
3767  * we only record a new sample when new WAL has been written.
3768  */
3769  if (lag_tracker->last_lsn == lsn)
3770  return;
3771  lag_tracker->last_lsn = lsn;
3772 
3773  /*
3774  * If advancing the write head of the circular buffer would crash into any
3775  * of the read heads, then the buffer is full. In other words, the
3776  * slowest reader (presumably apply) is the one that controls the release
3777  * of space.
3778  */
3779  new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3780  buffer_full = false;
3781  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3782  {
3783  if (new_write_head == lag_tracker->read_heads[i])
3784  buffer_full = true;
3785  }
3786 
3787  /*
3788  * If the buffer is full, for now we just rewind by one slot and overwrite
3789  * the last sample, as a simple (if somewhat uneven) way to lower the
3790  * sampling rate. There may be better adaptive compaction algorithms.
3791  */
3792  if (buffer_full)
3793  {
3794  new_write_head = lag_tracker->write_head;
3795  if (lag_tracker->write_head > 0)
3797  else
3799  }
3800 
3801  /* Store a sample at the current write head position. */
3803  lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
3804  lag_tracker->write_head = new_write_head;
3805 }
3806 
3807 /*
3808  * Find out how much time has elapsed between the moment WAL location 'lsn'
3809  * (or the highest known earlier LSN) was flushed locally and the time 'now'.
3810  * We have a separate read head for each of the reported LSN locations we
3811  * receive in replies from standby; 'head' controls which read head is
3812  * used. Whenever a read head crosses an LSN which was written into the
3813  * lag buffer with LagTrackerWrite, we can use the associated timestamp to
3814  * find out the time this LSN (or an earlier one) was flushed locally, and
3815  * therefore compute the lag.
3816  *
3817  * Return -1 if no new sample data is available, and otherwise the elapsed
3818  * time in microseconds.
3819  */
3820 static TimeOffset
3822 {
3823  TimestampTz time = 0;
3824 
3825  /* Read all unread samples up to this LSN or end of buffer. */
3826  while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
3827  lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
3828  {
3829  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3830  lag_tracker->last_read[head] =
3832  lag_tracker->read_heads[head] =
3834  }
3835 
3836  /*
3837  * If the lag tracker is empty, that means the standby has processed
3838  * everything we've ever sent so we should now clear 'last_read'. If we
3839  * didn't do that, we'd risk using a stale and irrelevant sample for
3840  * interpolation at the beginning of the next burst of WAL after a period
3841  * of idleness.
3842  */
3844  lag_tracker->last_read[head].time = 0;
3845 
3846  if (time > now)
3847  {
3848  /* If the clock somehow went backwards, treat as not found. */
3849  return -1;
3850  }
3851  else if (time == 0)
3852  {
3853  /*
3854  * We didn't cross a time. If there is a future sample that we
3855  * haven't reached yet, and we've already reached at least one sample,
3856  * let's interpolate the local flushed time. This is mainly useful
3857  * for reporting a completely stuck apply position as having
3858  * increasing lag, since otherwise we'd have to wait for it to
3859  * eventually start moving again and cross one of our samples before
3860  * we can show the lag increasing.
3861  */
3863  {
3864  /* There are no future samples, so we can't interpolate. */
3865  return -1;
3866  }
3867  else if (lag_tracker->last_read[head].time != 0)
3868  {
3869  /* We can interpolate between last_read and the next sample. */
3870  double fraction;
3871  WalTimeSample prev = lag_tracker->last_read[head];
3873 
3874  if (lsn < prev.lsn)
3875  {
3876  /*
3877  * Reported LSNs shouldn't normally go backwards, but it's
3878  * possible when there is a timeline change. Treat as not
3879  * found.
3880  */
3881  return -1;
3882  }
3883 
3884  Assert(prev.lsn < next.lsn);
3885 
3886  if (prev.time > next.time)
3887  {
3888  /* If the clock somehow went backwards, treat as not found. */
3889  return -1;
3890  }
3891 
3892  /* See how far we are between the previous and next samples. */
3893  fraction =
3894  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3895 
3896  /* Scale the local flush time proportionally. */
3897  time = (TimestampTz)
3898  ((double) prev.time + (next.time - prev.time) * fraction);
3899  }
3900  else
3901  {
3902  /*
3903  * We have only a future sample, implying that we were entirely
3904  * caught up but and now there is a new burst of WAL and the
3905  * standby hasn't processed the first sample yet. Until the
3906  * standby reaches the future sample the best we can do is report
3907  * the hypothetical lag if that sample were to be replayed now.
3908  */
3909  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3910  }
3911  }
3912 
3913  /* Return the elapsed time since local flush time in microseconds. */
3914  Assert(time != 0);
3915  return now - time;
3916 }
bool has_privs_of_role(Oid member, Oid role)
Definition: acl.c:4961
int16 AttrNumber
Definition: attnum.h:21
TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history)
Definition: timeline.c:544
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:572
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:76
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1695
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1719
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1782
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
#define InvalidBackendId
Definition: backendid.h:23
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:964
static int32 next
Definition: blutils.c:219
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define CStringGetTextDatum(s)
Definition: builtins.h:94
#define NameStr(name)
Definition: c.h:735
unsigned int uint32
Definition: c.h:495
#define SIGNAL_ARGS
Definition: c.h:1355
#define PG_BINARY
Definition: c.h:1283
#define pg_attribute_noreturn()
Definition: c.h:206
#define UINT64_FORMAT
Definition: c.h:538
#define MemSet(start, val, len)
Definition: c.h:1009
uint32 TransactionId
Definition: c.h:641
#define OidIsValid(objectId)
Definition: c.h:764
size_t Size
Definition: c.h:594
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:38
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
int64 TimestampTz
Definition: timestamp.h:39
int64 TimeOffset
Definition: timestamp.h:40
char * get_database_name(Oid dbid)
Definition: dbcommands.c:3084
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:91
bool defGetBoolean(DefElem *def)
Definition: define.c:108
char * defGetString(DefElem *def)
Definition: define.c:49
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:166
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:201
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
@ DestRemote
Definition: dest.h:89
@ DestRemoteSimple
Definition: dest.h:91
@ DestNone
Definition: dest.h:87
struct cursor * cur
Definition: ecpg.c:28
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1156
int errcode_for_file_access(void)
Definition: elog.c:881
int errdetail(const char *fmt,...)
Definition: elog.c:1202
bool message_level_is_interesting(int elevel)
Definition: elog.c:277
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define LOG
Definition: elog.h:31
#define COMMERROR
Definition: elog.h:33
#define FATAL
Definition: elog.h:41
#define DEBUG2
Definition: elog.h:29
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2334
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2276
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2256
int CloseTransientFile(int fd)
Definition: fd.c:2754
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1039
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2578
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1790
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
int MyProcPid
Definition: globals.c:44
struct Latch * MyLatch
Definition: globals.c:58
Oid MyDatabaseId
Definition: globals.c:89
@ PGC_SIGHUP
Definition: guc.h:71
void ProcessConfigFile(GucContext context)
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc_funcs.c:382
char * application_name
Definition: guc_tables.c:541
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
#define write(a, b, c)
Definition: win32.h:14
#define read(a, b, c)
Definition: win32.h:13
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
void proc_exit(int code)
Definition: ipc.c:104
int j
Definition: isn.c:74
int i
Definition: isn.c:73
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
Definition: latch.c:1006
void SetLatch(Latch *latch)
Definition: latch.c:605
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
Definition: latch.c:1381
void ResetLatch(Latch *latch)
Definition: latch.c:697
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define WL_POSTMASTER_DEATH
Definition: latch.h:129
#define WL_SOCKET_WRITEABLE
Definition: latch.h:127
#define pq_flush()
Definition: libpq.h:46
#define PQ_SMALL_MESSAGE_LIMIT
Definition: libpq.h:30
#define pq_flush_if_writable()
Definition: libpq.h:47
#define pq_is_send_pending()
Definition: libpq.h:48
#define PQ_LARGE_MESSAGE_LIMIT
Definition: libpq.h:31
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:51
#define FeBeWaitSetSocketPos
Definition: libpq.h:63
Assert(fmt[strlen(fmt) - 1] !='\n')
void list_free_deep(List *list)
Definition: list.c:1559
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1815
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:674
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:630
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:108
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:328
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:494
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1195
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1808
void LWLockReleaseAll(void)
Definition: lwlock.c:1903
@ LW_SHARED
Definition: lwlock.h:117
@ LW_EXCLUSIVE
Definition: lwlock.h:116
char * pstrdup(const char *in)
Definition: mcxt.c:1644
void pfree(void *pointer)
Definition: mcxt.c:1456
MemoryContext TopMemoryContext
Definition: mcxt.c:141
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1064
MemoryContext CurrentMemoryContext
Definition: mcxt.c:135
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:403
void * palloc(Size size)
Definition: mcxt.c:1226
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
Oid GetUserId(void)
Definition: miscinit.c:509
@ CMD_SELECT
Definition: nodes.h:276
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:138
void * arg
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:41
#define MAXPGPATH
const void size_t len
#define lfirst(lc)
Definition: pg_list.h:172
#define NIL
Definition: pg_list.h:68
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
static bool two_phase
#define die(msg)
Definition: pg_test_fsync.c:95
static char * buf
Definition: pg_test_fsync.c:67
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:181
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:339
@ PMSIGNAL_ADVANCE_STATE_MACHINE
Definition: pmsignal.h:42
pqsigfunc pqsignal(int signo, pqsigfunc func)
#define snprintf
Definition: port.h:238
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:3008
CommandDest whereToSendOutput
Definition: postgres.c:88
const char * debug_query_string
Definition: postgres.c:85
uintptr_t Datum
Definition: postgres.h:64
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
#define InvalidOid
Definition: postgres_ext.h:36
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1020
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1212
WaitEventSet * FeBeWaitSet
Definition: pqcomm.c:164
void pq_endmsgread(void)
Definition: pqcomm.c:1174
void pq_startmsgread(void)
Definition: pqcomm.c:1150
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:418
void pq_sendbytes(StringInfo buf, const void *data, int datalen)
Definition: pqformat.c:126
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:299
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:402
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:456
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PROC_AFFECTS_ALL_HORIZONS
Definition: proc.h:61
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:262
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:639
@ PROCSIG_WALSND_INIT_STOPPING
Definition: procsignal.h:35
#define PqMsg_CopyDone
Definition: protocol.h:64
#define PqMsg_CopyData
Definition: protocol.h:65
#define PqMsg_CopyBothResponse
Definition: protocol.h:54
#define PqMsg_DataRow
Definition: protocol.h:43
#define PqMsg_Terminate
Definition: protocol.h:28
bool update_process_title
Definition: ps_status.c:31
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
@ REPLICATION_KIND_PHYSICAL
Definition: replnodes.h:22
@ REPLICATION_KIND_LOGICAL
Definition: replnodes.h:23
ResourceOwner CurrentResourceOwner
Definition: resowner.c:147
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
Definition: resowner.c:488
void ResourceOwnerDelete(ResourceOwner owner)
Definition: resowner.c:762
@ RESOURCE_RELEASE_LOCKS
Definition: resowner.h:49
@ RESOURCE_RELEASE_BEFORE_LOCKS
Definition: resowner.h:48
@ RESOURCE_RELEASE_AFTER_LOCKS
Definition: resowner.h:50
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
void pg_usleep(long microsec)
Definition: signal.c:53
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:376
void ReplicationSlotCleanup(void)
Definition: slot.c:605
void ReplicationSlotMarkDirty(void)
Definition: slot.c:798
void ReplicationSlotReserveWal(void)
Definition: slot.c:1175
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:452
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:837
void ReplicationSlotPersist(void)
Definition: slot.c:815
ReplicationSlot * MyReplicationSlot
Definition: slot.c:99
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:643
void ReplicationSlotSave(void)
Definition: slot.c:780
void ReplicationSlotRelease(void)
Definition: slot.c:549
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:893
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase)
Definition: slot.c:253
@ RS_PERSISTENT
Definition: slot.h:35
@ RS_EPHEMERAL
Definition: slot.h:36
@ RS_TEMPORARY
Definition: slot.h:37
#define SlotIsLogical(slot)
Definition: slot.h:191
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:669
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:570
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:730
bool FirstSnapshotSet
Definition: snapmgr.c:141
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
Definition: snapmgr.c:1815
#define SpinLockInit(lock)
Definition: spin.h:60
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
PGPROC * MyProc
Definition: proc.c:66
PROC_HDR * ProcGlobal
Definition: proc.c:78
char * dbname
Definition: streamutil.c:51
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:283
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
ReplicationKind kind
Definition: replnodes.h:56
char * defname
Definition: parsenodes.h:809
int32 day
Definition: timestamp.h:51
int32 month
Definition: timestamp.h:52
TimeOffset time
Definition: timestamp.h:49
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:214
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:216
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:217
int write_head
Definition: walsender.c:215
XLogRecPtr last_lsn
Definition: walsender.c:213
Definition: pg_list.h:54
XLogReaderState * reader
Definition: logical.h:42
struct SnapBuild * snapshot_builder
Definition: logical.h:44
StringInfo out
Definition: logical.h:71
Definition: nodes.h:129
NodeTag type
Definition: nodes.h:130
TransactionId xmin
Definition: proc.h:178
uint8 statusFlags
Definition: proc.h:233
int pgxactoff
Definition: proc.h:188
Latch procLatch
Definition: proc.h:170
uint8 * statusFlags
Definition: proc.h:377
TransactionId xmin
Definition: slot.h:77
TransactionId catalog_xmin
Definition: slot.h:85
XLogRecPtr restart_lsn
Definition: slot.h:88
XLogRecPtr confirmed_flush
Definition: slot.h:99
TransactionId effective_catalog_xmin
Definition: slot.h:159
slock_t mutex
Definition: slot.h:135
bool in_use
Definition: slot.h:138
TransactionId effective_xmin
Definition: slot.h:158
ReplicationSlotPersistentData data
Definition: slot.h:162
TupleDesc setDesc
Definition: execnodes.h:334
Tuplestorestate * setResult
Definition: execnodes.h:333
XLogRecPtr startpoint
Definition: replnodes.h:85
ReplicationKind kind
Definition: replnodes.h:82
TimeLineID timeline
Definition: replnodes.h:84
uint8 syncrep_method
Definition: syncrep.h:68
TimeLineID timeline
Definition: replnodes.h:108
TimeLineID ws_tli
Definition: xlogreader.h:49
uint32 events
Definition: latch.h:153
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv
TimeOffset writeLag
slock_t mutex
XLogRecPtr flush
XLogRecPtr sentPtr
TimeOffset flushLag
WalSndState state
ReplicationKind kind
XLogRecPtr write
TimeOffset applyLag
int sync_standby_priority
bool needreload
Latch * latch
TimestampTz replyTime
XLogRecPtr apply
TimestampTz time
Definition: walsender.c:204
XLogRecPtr lsn
Definition: walsender.c:203
WALSegmentContext segcxt
Definition: xlogreader.h:271
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207
WALOpenSegment seg
Definition: xlogreader.h:272
Definition: regguts.h:323
void SyncRepInitConfig(void)
Definition: syncrep.c:403
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:98
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition: syncrep.c:717
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:432
#define SYNC_REP_PRIORITY
Definition: syncrep.h:35
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:27
#define SyncRepRequested()
Definition: syncrep.h:18
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:23
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:24
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:25
void InitializeTimeouts(void)
Definition: timeout.c:474
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:299
#define InvalidTransactionId
Definition: transam.h:31
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
#define XidFromFullTransactionId(x)
Definition: transam.h:48
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:45
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:659
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
char data[BLCKSZ]
Definition: c.h:1132
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
static Datum IntervalPGetDatum(const Interval *X)
Definition: timestamp.h:58
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:261
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:88
static void pgstat_report_wait_end(void)
Definition: wait_event.h:104
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
static void ProcessPendingWrites(void)
Definition: walsender.c:1416
static XLogRecPtr sentPtr
Definition: walsender.c:155
#define READ_REPLICATION_SLOT_COLS
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3508
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition: walsender.c:3354
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3246
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:187
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2419
static void XLogSendPhysical(void)
Definition: walsender.c:2745
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1890
static bool waiting_for_ping_response
Definition: walsender.c:172
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:577
void WalSndErrorCleanup(void)
Definition: walsender.c:315
static void InitWalSenderSlot(void)
Definition: walsender.c:2572
WalSnd * MyWalSnd
Definition: walsender.c:113
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:2251
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition: walsender.c:478
static StringInfoData tmpbuf
Definition: walsender.c:160
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:2171
static LagTracker * lag_tracker
Definition: walsender.c:220
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:2034
static void IdentifySystem(void)
Definition: walsender.c:395
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2667
static StringInfoData reply_message
Definition: walsender.c:159
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3718
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase)
Definition: walsender.c:974
bool am_walsender
Definition: walsender.c:116
void WalSndSetState(WalSndState state)
Definition: walsender.c:3470
static StringInfoData output_message
Definition: walsender.c:158
static TimeLineID sendTimeLine
Definition: walsender.c:146
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2446
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1374
void WalSndWakeup(bool physical, bool logical)
Definition: walsender.c:3333
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:198
static void XLogSendLogical(void)
Definition: walsender.c:3045
void WalSndShmemInit(void)
Definition: walsender.c:3289
bool am_db_walsender
Definition: walsender.c:119
static volatile sig_atomic_t replication_active
Definition: walsender.c:196
bool wake_wal_senders
Definition: walsender.c:131
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:188
int max_wal_senders
Definition: walsender.c:122
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:2220
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
Definition: walsender.c:1470
bool exec_replication_command(const char *cmd_string)
Definition: walsender.c:1672
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
#define PG_STAT_GET_WAL_SENDERS_COLS
void(* WalSndSendDataCallback)(void)
Definition: walsender.c:226
Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
Definition: walsender.c:3524
void WalSndInitStopping(void)
Definition: walsender.c:3406
void WalSndWaitStopping(void)
Definition: walsender.c:3432
static bool sendTimeLineIsHistoric
Definition: walsender.c:148
void WalSndResourceCleanup(bool isCommit)
Definition: walsender.c:350
void WalSndRqstFileReload(void)
Definition: walsender.c:3201
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1538
bool am_cascading_walsender
Definition: walsender.c:117
static TimestampTz last_processing
Definition: walsender.c:163
Size WalSndShmemSize(void)
Definition: walsender.c:3277
bool log_replication_commands
Definition: walsender.c:126
void HandleWalSndInitStopping(void)
Definition: walsender.c:3224
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:147
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1041
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:902
static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
Definition: walsender.c:3170
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:2066
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
Definition: walsender.c:3695
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3756
void WalSndSignals(void)
Definition: walsender.c:3258
static bool streamingDoneSending
Definition: walsender.c:180
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1254
static void WalSndShutdown(void)
Definition: walsender.c:230
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2649
int wal_sender_timeout
Definition: walsender.c:124
#define MAX_SEND_SIZE
Definition: walsender.c:107
static bool WalSndCaughtUp
Definition: walsender.c:184
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:149
static void ProcessStandbyMessage(void)
Definition: walsender.c:2003
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1347
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1244
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:208
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3821
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2375
static bool streamingDoneReceiving
Definition: walsender.c:181
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:670
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:3130
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3489
static XLogReaderState * xlogreader
Definition: walsender.c:138
static TimestampTz last_reply_timestamp
Definition: walsender.c:169
WalSndCtlData * WalSndCtl
Definition: walsender.c:110
void InitWalSender(void)
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22
PGDLLIMPORT Node * replication_parse_result
WalSndState
@ WALSNDSTATE_STREAMING
@ WALSNDSTATE_BACKUP
@ WALSNDSTATE_CATCHUP
@ WALSNDSTATE_STARTUP
@ WALSNDSTATE_STOPPING
void replication_scanner_finish(void)
int replication_yyparse(void)
void replication_scanner_init(const char *str)
bool replication_scanner_is_replication_command(void)
#define SIGCHLD
Definition: win32_port.h:178
#define SIGHUP
Definition: win32_port.h:168
#define SIG_DFL
Definition: win32_port.h:163
#define SIGPIPE
Definition: win32_port.h:173
#define kill(pid, sig)
Definition: win32_port.h:485
#define SIGUSR1
Definition: win32_port.h:180
#define SIGUSR2
Definition: win32_port.h:181
#define SIG_IGN
Definition: win32_port.h:165
static const unsigned __int64 epoch
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4834
bool XactReadOnly
Definition: xact.c:82
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3481
void StartTransactionCommand(void)
Definition: xact.c:2937
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:398
int XactIsoLevel
Definition: xact.c:79
bool IsSubTransaction(void)
Definition: xact.c:4889
bool IsTransactionBlock(void)
Definition: xact.c:4816
void CommitTransactionCommand(void)
Definition: xact.c:3034
#define XACT_REPEATABLE_READ
Definition: xact.h:38
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4203
bool RecoveryInProgress(void)
Definition: xlog.c:5948
TimeLineID GetWALInsertionTimeLine(void)
Definition: xlog.c:6136
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3460
int wal_segment_size
Definition: xlog.c:146
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:6113
bool XLogBackgroundFlush(void)
Definition: xlog.c:2725
#define MAXFNAMELEN
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
static void XLogFilePath(char *path, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void TLHistoryFilePath(char *path, TimeLineID tli)
static void TLHistoryFileName(char *fname, TimeLineID tli)
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint32 TimeLineID
Definition: xlogdefs.h:59
uint64 XLogSegNo
Definition: xlogdefs.h:48
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:406
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
Definition: xlogreader.c:1511
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition: xlogreader.c:108
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:248
#define XL_ROUTINE(...)
Definition: xlogreader.h:117
static TimeLineID receiveTLI
Definition: xlogrecovery.c:263
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:844
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength, TimeLineID currTLI)
Definition: xlogutils.c:720
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:1027