PostgreSQL Source Code  git master
walsender.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * walsender.c
4  *
5  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6  * care of sending XLOG from the primary server to a single recipient.
7  * (Note that there can be more than one walsender process concurrently.)
8  * It is started by the postmaster when the walreceiver of a standby server
9  * connects to the primary server and requests XLOG streaming replication.
10  *
11  * A walsender is similar to a regular backend, ie. there is a one-to-one
12  * relationship between a connection and a walsender process, but instead
13  * of processing SQL queries, it understands a small set of special
14  * replication-mode commands. The START_REPLICATION command begins streaming
15  * WAL to the client. While streaming, the walsender keeps reading XLOG
16  * records from the disk and sends them to the standby server over the
17  * COPY protocol, until either side ends the replication by exiting COPY
18  * mode (or until the connection is closed).
19  *
20  * Normal termination is by SIGTERM, which instructs the walsender to
21  * close the connection and exit(0) at the next convenient moment. Emergency
22  * termination is by SIGQUIT; like any backend, the walsender will simply
23  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24  * are treated as not a crash but approximately normal termination;
25  * the walsender will exit quickly without sending any more XLOG records.
26  *
27  * If the server is shut down, checkpointer sends us
28  * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If
29  * the backend is idle or runs an SQL query this causes the backend to
30  * shutdown, if logical replication is in progress all existing WAL records
31  * are processed followed by a shutdown. Otherwise this causes the walsender
32  * to switch to the "stopping" state. In this state, the walsender will reject
33  * any further replication commands. The checkpointer begins the shutdown
34  * checkpoint once all walsenders are confirmed as stopping. When the shutdown
35  * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
36  * walsender to send any outstanding WAL, including the shutdown checkpoint
37  * record, wait for it to be replicated to the standby, and then exit.
38  *
39  *
40  * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
41  *
42  * IDENTIFICATION
43  * src/backend/replication/walsender.c
44  *
45  *-------------------------------------------------------------------------
46  */
47 #include "postgres.h"
48 
49 #include <signal.h>
50 #include <unistd.h>
51 
52 #include "access/printtup.h"
53 #include "access/timeline.h"
54 #include "access/transam.h"
55 #include "access/xact.h"
56 #include "access/xlog_internal.h"
57 #include "access/xlogreader.h"
58 #include "access/xlogrecovery.h"
59 #include "access/xlogutils.h"
60 #include "backup/basebackup.h"
62 #include "catalog/pg_authid.h"
63 #include "catalog/pg_type.h"
64 #include "commands/dbcommands.h"
65 #include "commands/defrem.h"
66 #include "funcapi.h"
67 #include "libpq/libpq.h"
68 #include "libpq/pqformat.h"
69 #include "miscadmin.h"
70 #include "nodes/replnodes.h"
71 #include "pgstat.h"
72 #include "postmaster/interrupt.h"
73 #include "replication/decode.h"
74 #include "replication/logical.h"
75 #include "replication/slotsync.h"
76 #include "replication/slot.h"
77 #include "replication/snapbuild.h"
78 #include "replication/syncrep.h"
80 #include "replication/walsender.h"
83 #include "storage/fd.h"
84 #include "storage/ipc.h"
85 #include "storage/pmsignal.h"
86 #include "storage/proc.h"
87 #include "storage/procarray.h"
88 #include "tcop/dest.h"
89 #include "tcop/tcopprot.h"
90 #include "utils/acl.h"
91 #include "utils/builtins.h"
92 #include "utils/guc.h"
93 #include "utils/memutils.h"
94 #include "utils/pg_lsn.h"
95 #include "utils/portal.h"
96 #include "utils/ps_status.h"
97 #include "utils/timeout.h"
98 #include "utils/timestamp.h"
99 
100 /*
101  * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
102  *
103  * We don't have a good idea of what a good value would be; there's some
104  * overhead per message in both walsender and walreceiver, but on the other
105  * hand sending large batches makes walsender less responsive to signals
106  * because signals are checked only between messages. 128kB (with
107  * default 8k blocks) seems like a reasonable guess for now.
108  */
109 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
110 
111 /* Array of WalSnds in shared memory */
113 
114 /* My slot in the shared memory array */
115 WalSnd *MyWalSnd = NULL;
116 
117 /* Global state */
118 bool am_walsender = false; /* Am I a walsender process? */
119 bool am_cascading_walsender = false; /* Am I cascading WAL to another
120  * standby? */
121 bool am_db_walsender = false; /* Connected to a database? */
122 
123 /* GUC variables */
124 int max_wal_senders = 10; /* the maximum number of concurrent
125  * walsenders */
126 int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
127  * data message */
129 
130 /*
131  * State for WalSndWakeupRequest
132  */
133 bool wake_wal_senders = false;
134 
135 /*
136  * xlogreader used for replication. Note that a WAL sender doing physical
137  * replication does not need xlogreader to read WAL, but it needs one to
138  * keep a state of its work.
139  */
141 
142 /*
143  * If the UPLOAD_MANIFEST command is used to provide a backup manifest in
144  * preparation for an incremental backup, uploaded_manifest will be point
145  * to an object containing information about its contexts, and
146  * uploaded_manifest_mcxt will point to the memory context that contains
147  * that object and all of its subordinate data. Otherwise, both values will
148  * be NULL.
149  */
152 
153 /*
154  * These variables keep track of the state of the timeline we're currently
155  * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
156  * the timeline is not the latest timeline on this server, and the server's
157  * history forked off from that timeline at sendTimeLineValidUpto.
158  */
161 static bool sendTimeLineIsHistoric = false;
163 
164 /*
165  * How far have we sent WAL already? This is also advertised in
166  * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
167  */
169 
170 /* Buffers for constructing outgoing messages and processing reply messages. */
174 
175 /* Timestamp of last ProcessRepliesIfAny(). */
177 
178 /*
179  * Timestamp of last ProcessRepliesIfAny() that saw a reply from the
180  * standby. Set to 0 if wal_sender_timeout doesn't need to be active.
181  */
183 
184 /* Have we sent a heartbeat message asking for reply, since last reply? */
185 static bool waiting_for_ping_response = false;
186 
187 /*
188  * While streaming WAL in Copy mode, streamingDoneSending is set to true
189  * after we have sent CopyDone. We should not send any more CopyData messages
190  * after that. streamingDoneReceiving is set to true when we receive CopyDone
191  * from the other end. When both become true, it's time to exit Copy mode.
192  */
195 
196 /* Are we there yet? */
197 static bool WalSndCaughtUp = false;
198 
199 /* Flags set by signal handlers for later service in main loop */
200 static volatile sig_atomic_t got_SIGUSR2 = false;
201 static volatile sig_atomic_t got_STOPPING = false;
202 
203 /*
204  * This is set while we are streaming. When not set
205  * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
206  * the main loop is responsible for checking got_STOPPING and terminating when
207  * it's set (after streaming any remaining WAL).
208  */
209 static volatile sig_atomic_t replication_active = false;
210 
212 
213 /* A sample associating a WAL location with the time it was written. */
214 typedef struct
215 {
218 } WalTimeSample;
219 
220 /* The size of our buffer of time samples. */
221 #define LAG_TRACKER_BUFFER_SIZE 8192
222 
223 /* A mechanism for tracking replication lag. */
224 typedef struct
225 {
229  int read_heads[NUM_SYNC_REP_WAIT_MODE];
231 } LagTracker;
232 
234 
235 /* Signal handlers */
237 
238 /* Prototypes for private functions */
239 typedef void (*WalSndSendDataCallback) (void);
240 static void WalSndLoop(WalSndSendDataCallback send_data);
241 static void InitWalSenderSlot(void);
242 static void WalSndKill(int code, Datum arg);
244 static void XLogSendPhysical(void);
245 static void XLogSendLogical(void);
246 static void WalSndDone(WalSndSendDataCallback send_data);
247 static void IdentifySystem(void);
248 static void UploadManifest(void);
249 static bool HandleUploadManifestPacket(StringInfo buf, off_t *offset,
254 static void StartReplication(StartReplicationCmd *cmd);
256 static void ProcessStandbyMessage(void);
257 static void ProcessStandbyReplyMessage(void);
258 static void ProcessStandbyHSFeedbackMessage(void);
259 static void ProcessRepliesIfAny(void);
260 static void ProcessPendingWrites(void);
261 static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr);
262 static void WalSndKeepaliveIfNecessary(void);
263 static void WalSndCheckTimeOut(void);
265 static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
266 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
267 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
269  bool skipped_xact);
271 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
272 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
274 
275 static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
276  TimeLineID *tli_p);
277 
278 
279 /* Initialize walsender process before entering the main command loop */
280 void
281 InitWalSender(void)
282 {
284 
285  /* Create a per-walsender data structure in shared memory */
287 
288  /*
289  * We don't currently need any ResourceOwner in a walsender process, but
290  * if we did, we could call CreateAuxProcessResourceOwner here.
291  */
292 
293  /*
294  * Let postmaster know that we're a WAL sender. Once we've declared us as
295  * a WAL sender process, postmaster will let us outlive the bgwriter and
296  * kill us last in the shutdown sequence, so we get a chance to stream all
297  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
298  * there's no going back, and we mustn't write any WAL records after this.
299  */
302 
303  /*
304  * If the client didn't specify a database to connect to, show in PGPROC
305  * that our advertised xmin should affect vacuum horizons in all
306  * databases. This allows physical replication clients to send hot
307  * standby feedback that will delay vacuum cleanup in all databases.
308  */
309  if (MyDatabaseId == InvalidOid)
310  {
312  LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
315  LWLockRelease(ProcArrayLock);
316  }
317 
318  /* Initialize empty timestamp buffer for lag tracking. */
320 }
321 
322 /*
323  * Clean up after an error.
324  *
325  * WAL sender processes don't use transactions like regular backends do.
326  * This function does any cleanup required after an error in a WAL sender
327  * process, similar to what transaction abort does in a regular backend.
328  */
329 void
331 {
335 
336  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
338 
339  if (MyReplicationSlot != NULL)
341 
343 
344  replication_active = false;
345 
346  /*
347  * If there is a transaction in progress, it will clean up our
348  * ResourceOwner, but if a replication command set up a resource owner
349  * without a transaction, we've got to clean that up now.
350  */
352  WalSndResourceCleanup(false);
353 
354  if (got_STOPPING || got_SIGUSR2)
355  proc_exit(0);
356 
357  /* Revert back to startup state */
359 }
360 
361 /*
362  * Clean up any ResourceOwner we created.
363  */
364 void
365 WalSndResourceCleanup(bool isCommit)
366 {
367  ResourceOwner resowner;
368 
369  if (CurrentResourceOwner == NULL)
370  return;
371 
372  /*
373  * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
374  * in a local variable and clear it first.
375  */
376  resowner = CurrentResourceOwner;
377  CurrentResourceOwner = NULL;
378 
379  /* Now we can release resources and delete it. */
380  ResourceOwnerRelease(resowner,
381  RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
382  ResourceOwnerRelease(resowner,
383  RESOURCE_RELEASE_LOCKS, isCommit, true);
384  ResourceOwnerRelease(resowner,
385  RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
386  ResourceOwnerDelete(resowner);
387 }
388 
389 /*
390  * Handle a client's connection abort in an orderly manner.
391  */
392 static void
393 WalSndShutdown(void)
394 {
395  /*
396  * Reset whereToSendOutput to prevent ereport from attempting to send any
397  * more messages to the standby.
398  */
401 
402  proc_exit(0);
403  abort(); /* keep the compiler quiet */
404 }
405 
406 /*
407  * Handle the IDENTIFY_SYSTEM command.
408  */
409 static void
411 {
412  char sysid[32];
413  char xloc[MAXFNAMELEN];
414  XLogRecPtr logptr;
415  char *dbname = NULL;
417  TupOutputState *tstate;
418  TupleDesc tupdesc;
419  Datum values[4];
420  bool nulls[4] = {0};
421  TimeLineID currTLI;
422 
423  /*
424  * Reply with a result set with one row, four columns. First col is system
425  * ID, second is timeline ID, third is current xlog location and the
426  * fourth contains the database name if we are connected to one.
427  */
428 
429  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
431 
434  logptr = GetStandbyFlushRecPtr(&currTLI);
435  else
436  logptr = GetFlushRecPtr(&currTLI);
437 
438  snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
439 
440  if (MyDatabaseId != InvalidOid)
441  {
443 
444  /* syscache access needs a transaction env. */
446  /* make dbname live outside TX context */
450  /* CommitTransactionCommand switches to TopMemoryContext */
452  }
453 
455 
456  /* need a tuple descriptor representing four columns */
457  tupdesc = CreateTemplateTupleDesc(4);
458  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
459  TEXTOID, -1, 0);
460  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
461  INT8OID, -1, 0);
462  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
463  TEXTOID, -1, 0);
464  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
465  TEXTOID, -1, 0);
466 
467  /* prepare for projection of tuples */
468  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
469 
470  /* column 1: system identifier */
471  values[0] = CStringGetTextDatum(sysid);
472 
473  /* column 2: timeline */
474  values[1] = Int64GetDatum(currTLI);
475 
476  /* column 3: wal location */
477  values[2] = CStringGetTextDatum(xloc);
478 
479  /* column 4: database name, or NULL if none */
480  if (dbname)
482  else
483  nulls[3] = true;
484 
485  /* send it to dest */
486  do_tup_output(tstate, values, nulls);
487 
488  end_tup_output(tstate);
489 }
490 
491 /* Handle READ_REPLICATION_SLOT command */
492 static void
494 {
495 #define READ_REPLICATION_SLOT_COLS 3
496  ReplicationSlot *slot;
498  TupOutputState *tstate;
499  TupleDesc tupdesc;
501  bool nulls[READ_REPLICATION_SLOT_COLS];
502 
504  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
505  TEXTOID, -1, 0);
506  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
507  TEXTOID, -1, 0);
508  /* TimeLineID is unsigned, so int4 is not wide enough. */
509  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
510  INT8OID, -1, 0);
511 
512  memset(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
513 
514  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
515  slot = SearchNamedReplicationSlot(cmd->slotname, false);
516  if (slot == NULL || !slot->in_use)
517  {
518  LWLockRelease(ReplicationSlotControlLock);
519  }
520  else
521  {
522  ReplicationSlot slot_contents;
523  int i = 0;
524 
525  /* Copy slot contents while holding spinlock */
526  SpinLockAcquire(&slot->mutex);
527  slot_contents = *slot;
528  SpinLockRelease(&slot->mutex);
529  LWLockRelease(ReplicationSlotControlLock);
530 
531  if (OidIsValid(slot_contents.data.database))
532  ereport(ERROR,
533  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
534  errmsg("cannot use %s with a logical replication slot",
535  "READ_REPLICATION_SLOT"));
536 
537  /* slot type */
538  values[i] = CStringGetTextDatum("physical");
539  nulls[i] = false;
540  i++;
541 
542  /* start LSN */
543  if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
544  {
545  char xloc[64];
546 
547  snprintf(xloc, sizeof(xloc), "%X/%X",
548  LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
549  values[i] = CStringGetTextDatum(xloc);
550  nulls[i] = false;
551  }
552  i++;
553 
554  /* timeline this WAL was produced on */
555  if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
556  {
557  TimeLineID slots_position_timeline;
558  TimeLineID current_timeline;
559  List *timeline_history = NIL;
560 
561  /*
562  * While in recovery, use as timeline the currently-replaying one
563  * to get the LSN position's history.
564  */
565  if (RecoveryInProgress())
566  (void) GetXLogReplayRecPtr(&current_timeline);
567  else
568  current_timeline = GetWALInsertionTimeLine();
569 
570  timeline_history = readTimeLineHistory(current_timeline);
571  slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
572  timeline_history);
573  values[i] = Int64GetDatum((int64) slots_position_timeline);
574  nulls[i] = false;
575  }
576  i++;
577 
579  }
580 
582  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
583  do_tup_output(tstate, values, nulls);
584  end_tup_output(tstate);
585 }
586 
587 
588 /*
589  * Handle TIMELINE_HISTORY command.
590  */
591 static void
593 {
595  TupleDesc tupdesc;
597  char histfname[MAXFNAMELEN];
598  char path[MAXPGPATH];
599  int fd;
600  off_t histfilelen;
601  off_t bytesleft;
602  Size len;
603 
605 
606  /*
607  * Reply with a result set with one row, and two columns. The first col is
608  * the name of the history file, 2nd is the contents.
609  */
610  tupdesc = CreateTemplateTupleDesc(2);
611  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "filename", TEXTOID, -1, 0);
612  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "content", TEXTOID, -1, 0);
613 
614  TLHistoryFileName(histfname, cmd->timeline);
615  TLHistoryFilePath(path, cmd->timeline);
616 
617  /* Send a RowDescription message */
618  dest->rStartup(dest, CMD_SELECT, tupdesc);
619 
620  /* Send a DataRow message */
622  pq_sendint16(&buf, 2); /* # of columns */
623  len = strlen(histfname);
624  pq_sendint32(&buf, len); /* col1 len */
625  pq_sendbytes(&buf, histfname, len);
626 
627  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
628  if (fd < 0)
629  ereport(ERROR,
631  errmsg("could not open file \"%s\": %m", path)));
632 
633  /* Determine file length and send it to client */
634  histfilelen = lseek(fd, 0, SEEK_END);
635  if (histfilelen < 0)
636  ereport(ERROR,
638  errmsg("could not seek to end of file \"%s\": %m", path)));
639  if (lseek(fd, 0, SEEK_SET) != 0)
640  ereport(ERROR,
642  errmsg("could not seek to beginning of file \"%s\": %m", path)));
643 
644  pq_sendint32(&buf, histfilelen); /* col2 len */
645 
646  bytesleft = histfilelen;
647  while (bytesleft > 0)
648  {
649  PGAlignedBlock rbuf;
650  int nread;
651 
652  pgstat_report_wait_start(WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ);
653  nread = read(fd, rbuf.data, sizeof(rbuf));
655  if (nread < 0)
656  ereport(ERROR,
658  errmsg("could not read file \"%s\": %m",
659  path)));
660  else if (nread == 0)
661  ereport(ERROR,
663  errmsg("could not read file \"%s\": read %d of %zu",
664  path, nread, (Size) bytesleft)));
665 
666  pq_sendbytes(&buf, rbuf.data, nread);
667  bytesleft -= nread;
668  }
669 
670  if (CloseTransientFile(fd) != 0)
671  ereport(ERROR,
673  errmsg("could not close file \"%s\": %m", path)));
674 
675  pq_endmessage(&buf);
676 }
677 
678 /*
679  * Handle UPLOAD_MANIFEST command.
680  */
681 static void
683 {
684  MemoryContext mcxt;
686  off_t offset = 0;
688 
689  /*
690  * parsing the manifest will use the cryptohash stuff, which requires a
691  * resource owner
692  */
693  Assert(CurrentResourceOwner == NULL);
694  CurrentResourceOwner = ResourceOwnerCreate(NULL, "base backup");
695 
696  /* Prepare to read manifest data into a temporary context. */
698  "incremental backup information",
700  ib = CreateIncrementalBackupInfo(mcxt);
701 
702  /* Send a CopyInResponse message */
703  pq_beginmessage(&buf, 'G');
704  pq_sendbyte(&buf, 0);
705  pq_sendint16(&buf, 0);
707  pq_flush();
708 
709  /* Receive packets from client until done. */
710  while (HandleUploadManifestPacket(&buf, &offset, ib))
711  ;
712 
713  /* Finish up manifest processing. */
715 
716  /*
717  * Discard any old manifest information and arrange to preserve the new
718  * information we just got.
719  *
720  * We assume that MemoryContextDelete and MemoryContextSetParent won't
721  * fail, and thus we shouldn't end up bailing out of here in such a way as
722  * to leave dangling pointers.
723  */
724  if (uploaded_manifest_mcxt != NULL)
727  uploaded_manifest = ib;
728  uploaded_manifest_mcxt = mcxt;
729 
730  /* clean up the resource owner we created */
731  WalSndResourceCleanup(true);
732 }
733 
734 /*
735  * Process one packet received during the handling of an UPLOAD_MANIFEST
736  * operation.
737  *
738  * 'buf' is scratch space. This function expects it to be initialized, doesn't
739  * care what the current contents are, and may override them with completely
740  * new contents.
741  *
742  * The return value is true if the caller should continue processing
743  * additional packets and false if the UPLOAD_MANIFEST operation is complete.
744  */
745 static bool
748 {
749  int mtype;
750  int maxmsglen;
751 
753 
754  pq_startmsgread();
755  mtype = pq_getbyte();
756  if (mtype == EOF)
757  ereport(ERROR,
758  (errcode(ERRCODE_CONNECTION_FAILURE),
759  errmsg("unexpected EOF on client connection with an open transaction")));
760 
761  switch (mtype)
762  {
763  case 'd': /* CopyData */
764  maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
765  break;
766  case 'c': /* CopyDone */
767  case 'f': /* CopyFail */
768  case 'H': /* Flush */
769  case 'S': /* Sync */
770  maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
771  break;
772  default:
773  ereport(ERROR,
774  (errcode(ERRCODE_PROTOCOL_VIOLATION),
775  errmsg("unexpected message type 0x%02X during COPY from stdin",
776  mtype)));
777  maxmsglen = 0; /* keep compiler quiet */
778  break;
779  }
780 
781  /* Now collect the message body */
782  if (pq_getmessage(buf, maxmsglen))
783  ereport(ERROR,
784  (errcode(ERRCODE_CONNECTION_FAILURE),
785  errmsg("unexpected EOF on client connection with an open transaction")));
787 
788  /* Process the message */
789  switch (mtype)
790  {
791  case 'd': /* CopyData */
792  AppendIncrementalManifestData(ib, buf->data, buf->len);
793  return true;
794 
795  case 'c': /* CopyDone */
796  return false;
797 
798  case 'H': /* Sync */
799  case 'S': /* Flush */
800  /* Ignore these while in CopyOut mode as we do elsewhere. */
801  return true;
802 
803  case 'f':
804  ereport(ERROR,
805  (errcode(ERRCODE_QUERY_CANCELED),
806  errmsg("COPY from stdin failed: %s",
807  pq_getmsgstring(buf))));
808  }
809 
810  /* Not reached. */
811  Assert(false);
812  return false;
813 }
814 
815 /*
816  * Handle START_REPLICATION command.
817  *
818  * At the moment, this never returns, but an ereport(ERROR) will take us back
819  * to the main loop.
820  */
821 static void
823 {
825  XLogRecPtr FlushPtr;
826  TimeLineID FlushTLI;
827 
828  /* create xlogreader for physical replication */
829  xlogreader =
831  XL_ROUTINE(.segment_open = WalSndSegmentOpen,
832  .segment_close = wal_segment_close),
833  NULL);
834 
835  if (!xlogreader)
836  ereport(ERROR,
837  (errcode(ERRCODE_OUT_OF_MEMORY),
838  errmsg("out of memory"),
839  errdetail("Failed while allocating a WAL reading processor.")));
840 
841  /*
842  * We assume here that we're logging enough information in the WAL for
843  * log-shipping, since this is checked in PostmasterMain().
844  *
845  * NOTE: wal_level can only change at shutdown, so in most cases it is
846  * difficult for there to be WAL data that we can still see that was
847  * written at wal_level='minimal'.
848  */
849 
850  if (cmd->slotname)
851  {
852  ReplicationSlotAcquire(cmd->slotname, true);
854  ereport(ERROR,
855  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
856  errmsg("cannot use a logical replication slot for physical replication")));
857 
858  /*
859  * We don't need to verify the slot's restart_lsn here; instead we
860  * rely on the caller requesting the starting point to use. If the
861  * WAL segment doesn't exist, we'll fail later.
862  */
863  }
864 
865  /*
866  * Select the timeline. If it was given explicitly by the client, use
867  * that. Otherwise use the timeline of the last replayed record.
868  */
871  FlushPtr = GetStandbyFlushRecPtr(&FlushTLI);
872  else
873  FlushPtr = GetFlushRecPtr(&FlushTLI);
874 
875  if (cmd->timeline != 0)
876  {
877  XLogRecPtr switchpoint;
878 
879  sendTimeLine = cmd->timeline;
880  if (sendTimeLine == FlushTLI)
881  {
882  sendTimeLineIsHistoric = false;
884  }
885  else
886  {
887  List *timeLineHistory;
888 
889  sendTimeLineIsHistoric = true;
890 
891  /*
892  * Check that the timeline the client requested exists, and the
893  * requested start location is on that timeline.
894  */
895  timeLineHistory = readTimeLineHistory(FlushTLI);
896  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
898  list_free_deep(timeLineHistory);
899 
900  /*
901  * Found the requested timeline in the history. Check that
902  * requested startpoint is on that timeline in our history.
903  *
904  * This is quite loose on purpose. We only check that we didn't
905  * fork off the requested timeline before the switchpoint. We
906  * don't check that we switched *to* it before the requested
907  * starting point. This is because the client can legitimately
908  * request to start replication from the beginning of the WAL
909  * segment that contains switchpoint, but on the new timeline, so
910  * that it doesn't end up with a partial segment. If you ask for
911  * too old a starting point, you'll get an error later when we
912  * fail to find the requested WAL segment in pg_wal.
913  *
914  * XXX: we could be more strict here and only allow a startpoint
915  * that's older than the switchpoint, if it's still in the same
916  * WAL segment.
917  */
918  if (!XLogRecPtrIsInvalid(switchpoint) &&
919  switchpoint < cmd->startpoint)
920  {
921  ereport(ERROR,
922  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
924  cmd->timeline),
925  errdetail("This server's history forked from timeline %u at %X/%X.",
926  cmd->timeline,
927  LSN_FORMAT_ARGS(switchpoint))));
928  }
929  sendTimeLineValidUpto = switchpoint;
930  }
931  }
932  else
933  {
934  sendTimeLine = FlushTLI;
936  sendTimeLineIsHistoric = false;
937  }
938 
940 
941  /* If there is nothing to stream, don't even enter COPY mode */
943  {
944  /*
945  * When we first start replication the standby will be behind the
946  * primary. For some applications, for example synchronous
947  * replication, it is important to have a clear state for this initial
948  * catchup mode, so we can trigger actions when we change streaming
949  * state later. We may stay in this state for a long time, which is
950  * exactly why we want to be able to monitor whether or not we are
951  * still here.
952  */
954 
955  /* Send a CopyBothResponse message, and start streaming */
957  pq_sendbyte(&buf, 0);
958  pq_sendint16(&buf, 0);
959  pq_endmessage(&buf);
960  pq_flush();
961 
962  /*
963  * Don't allow a request to stream from a future point in WAL that
964  * hasn't been flushed to disk in this server yet.
965  */
966  if (FlushPtr < cmd->startpoint)
967  {
968  ereport(ERROR,
969  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
971  LSN_FORMAT_ARGS(FlushPtr))));
972  }
973 
974  /* Start streaming from the requested point */
975  sentPtr = cmd->startpoint;
976 
977  /* Initialize shared memory status, too */
981 
983 
984  /* Main loop of walsender */
985  replication_active = true;
986 
988 
989  replication_active = false;
990  if (got_STOPPING)
991  proc_exit(0);
993 
995  }
996 
997  if (cmd->slotname)
999 
1000  /*
1001  * Copy is finished now. Send a single-row result set indicating the next
1002  * timeline.
1003  */
1005  {
1006  char startpos_str[8 + 1 + 8 + 1];
1007  DestReceiver *dest;
1008  TupOutputState *tstate;
1009  TupleDesc tupdesc;
1010  Datum values[2];
1011  bool nulls[2] = {0};
1012 
1013  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
1015 
1017 
1018  /*
1019  * Need a tuple descriptor representing two columns. int8 may seem
1020  * like a surprising data type for this, but in theory int4 would not
1021  * be wide enough for this, as TimeLineID is unsigned.
1022  */
1023  tupdesc = CreateTemplateTupleDesc(2);
1024  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
1025  INT8OID, -1, 0);
1026  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
1027  TEXTOID, -1, 0);
1028 
1029  /* prepare for projection of tuple */
1030  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1031 
1033  values[1] = CStringGetTextDatum(startpos_str);
1034 
1035  /* send it to dest */
1036  do_tup_output(tstate, values, nulls);
1037 
1038  end_tup_output(tstate);
1039  }
1040 
1041  /* Send CommandComplete message */
1042  EndReplicationCommand("START_STREAMING");
1043 }
1044 
1045 /*
1046  * XLogReaderRoutine->page_read callback for logical decoding contexts, as a
1047  * walsender process.
1048  *
1049  * Inside the walsender we can do better than read_local_xlog_page,
1050  * which has to do a plain sleep/busy loop, because the walsender's latch gets
1051  * set every time WAL is flushed.
1052  */
1053 static int
1055  XLogRecPtr targetRecPtr, char *cur_page)
1056 {
1057  XLogRecPtr flushptr;
1058  int count;
1059  WALReadError errinfo;
1060  XLogSegNo segno;
1061  TimeLineID currTLI;
1062 
1063  /*
1064  * Make sure we have enough WAL available before retrieving the current
1065  * timeline. This is needed to determine am_cascading_walsender accurately
1066  * which is needed to determine the current timeline.
1067  */
1068  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
1069 
1070  /*
1071  * Since logical decoding is also permitted on a standby server, we need
1072  * to check if the server is in recovery to decide how to get the current
1073  * timeline ID (so that it also cover the promotion or timeline change
1074  * cases).
1075  */
1077 
1079  GetXLogReplayRecPtr(&currTLI);
1080  else
1081  currTLI = GetWALInsertionTimeLine();
1082 
1083  XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
1084  sendTimeLineIsHistoric = (state->currTLI != currTLI);
1085  sendTimeLine = state->currTLI;
1086  sendTimeLineValidUpto = state->currTLIValidUntil;
1087  sendTimeLineNextTLI = state->nextTLI;
1088 
1089  /* fail if not (implies we are going to shut down) */
1090  if (flushptr < targetPagePtr + reqLen)
1091  return -1;
1092 
1093  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
1094  count = XLOG_BLCKSZ; /* more than one block available */
1095  else
1096  count = flushptr - targetPagePtr; /* part of the page available */
1097 
1098  /* now actually read the data, we know it's there */
1099  if (!WALRead(state,
1100  cur_page,
1101  targetPagePtr,
1102  count,
1103  currTLI, /* Pass the current TLI because only
1104  * WalSndSegmentOpen controls whether new TLI
1105  * is needed. */
1106  &errinfo))
1107  WALReadRaiseError(&errinfo);
1108 
1109  /*
1110  * After reading into the buffer, check that what we read was valid. We do
1111  * this after reading, because even though the segment was present when we
1112  * opened it, it might get recycled or removed while we read it. The
1113  * read() succeeds in that case, but the data we tried to read might
1114  * already have been overwritten with new WAL records.
1115  */
1116  XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
1117  CheckXLogRemoved(segno, state->seg.ws_tli);
1118 
1119  return count;
1120 }
1121 
1122 /*
1123  * Process extra options given to CREATE_REPLICATION_SLOT.
1124  */
1125 static void
1127  bool *reserve_wal,
1128  CRSSnapshotAction *snapshot_action,
1129  bool *two_phase, bool *failover)
1130 {
1131  ListCell *lc;
1132  bool snapshot_action_given = false;
1133  bool reserve_wal_given = false;
1134  bool two_phase_given = false;
1135  bool failover_given = false;
1136 
1137  /* Parse options */
1138  foreach(lc, cmd->options)
1139  {
1140  DefElem *defel = (DefElem *) lfirst(lc);
1141 
1142  if (strcmp(defel->defname, "snapshot") == 0)
1143  {
1144  char *action;
1145 
1146  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1147  ereport(ERROR,
1148  (errcode(ERRCODE_SYNTAX_ERROR),
1149  errmsg("conflicting or redundant options")));
1150 
1151  action = defGetString(defel);
1152  snapshot_action_given = true;
1153 
1154  if (strcmp(action, "export") == 0)
1155  *snapshot_action = CRS_EXPORT_SNAPSHOT;
1156  else if (strcmp(action, "nothing") == 0)
1157  *snapshot_action = CRS_NOEXPORT_SNAPSHOT;
1158  else if (strcmp(action, "use") == 0)
1159  *snapshot_action = CRS_USE_SNAPSHOT;
1160  else
1161  ereport(ERROR,
1162  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1163  errmsg("unrecognized value for CREATE_REPLICATION_SLOT option \"%s\": \"%s\"",
1164  defel->defname, action)));
1165  }
1166  else if (strcmp(defel->defname, "reserve_wal") == 0)
1167  {
1168  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
1169  ereport(ERROR,
1170  (errcode(ERRCODE_SYNTAX_ERROR),
1171  errmsg("conflicting or redundant options")));
1172 
1173  reserve_wal_given = true;
1174  *reserve_wal = defGetBoolean(defel);
1175  }
1176  else if (strcmp(defel->defname, "two_phase") == 0)
1177  {
1178  if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1179  ereport(ERROR,
1180  (errcode(ERRCODE_SYNTAX_ERROR),
1181  errmsg("conflicting or redundant options")));
1182  two_phase_given = true;
1183  *two_phase = defGetBoolean(defel);
1184  }
1185  else if (strcmp(defel->defname, "failover") == 0)
1186  {
1187  if (failover_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1188  ereport(ERROR,
1189  (errcode(ERRCODE_SYNTAX_ERROR),
1190  errmsg("conflicting or redundant options")));
1191  failover_given = true;
1192  *failover = defGetBoolean(defel);
1193  }
1194  else
1195  elog(ERROR, "unrecognized option: %s", defel->defname);
1196  }
1197 }
1198 
1199 /*
1200  * Create a new replication slot.
1201  */
1202 static void
1204 {
1205  const char *snapshot_name = NULL;
1206  char xloc[MAXFNAMELEN];
1207  char *slot_name;
1208  bool reserve_wal = false;
1209  bool two_phase = false;
1210  bool failover = false;
1211  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
1212  DestReceiver *dest;
1213  TupOutputState *tstate;
1214  TupleDesc tupdesc;
1215  Datum values[4];
1216  bool nulls[4] = {0};
1217 
1219 
1220  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase,
1221  &failover);
1222 
1223  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1224  {
1225  ReplicationSlotCreate(cmd->slotname, false,
1227  false, false, false);
1228 
1229  if (reserve_wal)
1230  {
1232 
1234 
1235  /* Write this slot to disk if it's a permanent one. */
1236  if (!cmd->temporary)
1238  }
1239  }
1240  else
1241  {
1243  bool need_full_snapshot = false;
1244 
1246 
1248 
1249  /*
1250  * Initially create persistent slot as ephemeral - that allows us to
1251  * nicely handle errors during initialization because it'll get
1252  * dropped if this transaction fails. We'll make it persistent at the
1253  * end. Temporary slots can be created as temporary from beginning as
1254  * they get dropped on error as well.
1255  */
1256  ReplicationSlotCreate(cmd->slotname, true,
1258  two_phase, failover, false);
1259 
1260  /*
1261  * Do options check early so that we can bail before calling the
1262  * DecodingContextFindStartpoint which can take long time.
1263  */
1264  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1265  {
1266  if (IsTransactionBlock())
1267  ereport(ERROR,
1268  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1269  (errmsg("%s must not be called inside a transaction",
1270  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
1271 
1272  need_full_snapshot = true;
1273  }
1274  else if (snapshot_action == CRS_USE_SNAPSHOT)
1275  {
1276  if (!IsTransactionBlock())
1277  ereport(ERROR,
1278  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1279  (errmsg("%s must be called inside a transaction",
1280  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1281 
1283  ereport(ERROR,
1284  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1285  (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
1286  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1287  if (!XactReadOnly)
1288  ereport(ERROR,
1289  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1290  (errmsg("%s must be called in a read-only transaction",
1291  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1292 
1293  if (FirstSnapshotSet)
1294  ereport(ERROR,
1295  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1296  (errmsg("%s must be called before any query",
1297  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1298 
1299  if (IsSubTransaction())
1300  ereport(ERROR,
1301  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1302  (errmsg("%s must not be called in a subtransaction",
1303  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1304 
1305  need_full_snapshot = true;
1306  }
1307 
1308  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
1310  XL_ROUTINE(.page_read = logical_read_xlog_page,
1311  .segment_open = WalSndSegmentOpen,
1312  .segment_close = wal_segment_close),
1315 
1316  /*
1317  * Signal that we don't need the timeout mechanism. We're just
1318  * creating the replication slot and don't yet accept feedback
1319  * messages or send keepalives. As we possibly need to wait for
1320  * further WAL the walsender would otherwise possibly be killed too
1321  * soon.
1322  */
1324 
1325  /* build initial snapshot, might take a while */
1327 
1328  /*
1329  * Export or use the snapshot if we've been asked to do so.
1330  *
1331  * NB. We will convert the snapbuild.c kind of snapshot to normal
1332  * snapshot when doing this.
1333  */
1334  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1335  {
1336  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
1337  }
1338  else if (snapshot_action == CRS_USE_SNAPSHOT)
1339  {
1340  Snapshot snap;
1341 
1344  }
1345 
1346  /* don't need the decoding context anymore */
1347  FreeDecodingContext(ctx);
1348 
1349  if (!cmd->temporary)
1351  }
1352 
1353  snprintf(xloc, sizeof(xloc), "%X/%X",
1355 
1357 
1358  /*----------
1359  * Need a tuple descriptor representing four columns:
1360  * - first field: the slot name
1361  * - second field: LSN at which we became consistent
1362  * - third field: exported snapshot's name
1363  * - fourth field: output plugin
1364  */
1365  tupdesc = CreateTemplateTupleDesc(4);
1366  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1367  TEXTOID, -1, 0);
1368  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1369  TEXTOID, -1, 0);
1370  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1371  TEXTOID, -1, 0);
1372  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1373  TEXTOID, -1, 0);
1374 
1375  /* prepare for projection of tuples */
1376  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1377 
1378  /* slot_name */
1379  slot_name = NameStr(MyReplicationSlot->data.name);
1380  values[0] = CStringGetTextDatum(slot_name);
1381 
1382  /* consistent wal location */
1383  values[1] = CStringGetTextDatum(xloc);
1384 
1385  /* snapshot name, or NULL if none */
1386  if (snapshot_name != NULL)
1387  values[2] = CStringGetTextDatum(snapshot_name);
1388  else
1389  nulls[2] = true;
1390 
1391  /* plugin, or NULL if none */
1392  if (cmd->plugin != NULL)
1393  values[3] = CStringGetTextDatum(cmd->plugin);
1394  else
1395  nulls[3] = true;
1396 
1397  /* send it to dest */
1398  do_tup_output(tstate, values, nulls);
1399  end_tup_output(tstate);
1400 
1402 }
1403 
1404 /*
1405  * Get rid of a replication slot that is no longer wanted.
1406  */
1407 static void
1409 {
1410  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1411 }
1412 
1413 /*
1414  * Process extra options given to ALTER_REPLICATION_SLOT.
1415  */
1416 static void
1418 {
1419  bool failover_given = false;
1420 
1421  /* Parse options */
1422  foreach_ptr(DefElem, defel, cmd->options)
1423  {
1424  if (strcmp(defel->defname, "failover") == 0)
1425  {
1426  if (failover_given)
1427  ereport(ERROR,
1428  (errcode(ERRCODE_SYNTAX_ERROR),
1429  errmsg("conflicting or redundant options")));
1430  failover_given = true;
1431  *failover = defGetBoolean(defel);
1432  }
1433  else
1434  elog(ERROR, "unrecognized option: %s", defel->defname);
1435  }
1436 }
1437 
1438 /*
1439  * Change the definition of a replication slot.
1440  */
1441 static void
1443 {
1444  bool failover = false;
1445 
1446  ParseAlterReplSlotOptions(cmd, &failover);
1447  ReplicationSlotAlter(cmd->slotname, failover);
1448 }
1449 
1450 /*
1451  * Load previously initiated logical slot and prepare for sending data (via
1452  * WalSndLoop).
1453  */
1454 static void
1456 {
1458  QueryCompletion qc;
1459 
1460  /* make sure that our requirements are still fulfilled */
1462 
1464 
1465  ReplicationSlotAcquire(cmd->slotname, true);
1466 
1467  /*
1468  * Force a disconnect, so that the decoding code doesn't need to care
1469  * about an eventual switch from running in recovery, to running in a
1470  * normal environment. Client code is expected to handle reconnects.
1471  */
1473  {
1474  ereport(LOG,
1475  (errmsg("terminating walsender process after promotion")));
1476  got_STOPPING = true;
1477  }
1478 
1479  /*
1480  * Create our decoding context, making it start at the previously ack'ed
1481  * position.
1482  *
1483  * Do this before sending a CopyBothResponse message, so that any errors
1484  * are reported early.
1485  */
1487  CreateDecodingContext(cmd->startpoint, cmd->options, false,
1488  XL_ROUTINE(.page_read = logical_read_xlog_page,
1489  .segment_open = WalSndSegmentOpen,
1490  .segment_close = wal_segment_close),
1494 
1496 
1497  /* Send a CopyBothResponse message, and start streaming */
1499  pq_sendbyte(&buf, 0);
1500  pq_sendint16(&buf, 0);
1501  pq_endmessage(&buf);
1502  pq_flush();
1503 
1504  /* Start reading WAL from the oldest required WAL. */
1507 
1508  /*
1509  * Report the location after which we'll send out further commits as the
1510  * current sentPtr.
1511  */
1513 
1514  /* Also update the sent position status in shared memory */
1518 
1519  replication_active = true;
1520 
1522 
1523  /* Main loop of walsender */
1525 
1528 
1529  replication_active = false;
1530  if (got_STOPPING)
1531  proc_exit(0);
1533 
1534  /* Get out of COPY mode (CommandComplete). */
1535  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
1536  EndCommand(&qc, DestRemote, false);
1537 }
1538 
1539 /*
1540  * LogicalDecodingContext 'prepare_write' callback.
1541  *
1542  * Prepare a write into a StringInfo.
1543  *
1544  * Don't do anything lasting in here, it's quite possible that nothing will be done
1545  * with the data.
1546  */
1547 static void
1549 {
1550  /* can't have sync rep confused by sending the same LSN several times */
1551  if (!last_write)
1552  lsn = InvalidXLogRecPtr;
1553 
1554  resetStringInfo(ctx->out);
1555 
1556  pq_sendbyte(ctx->out, 'w');
1557  pq_sendint64(ctx->out, lsn); /* dataStart */
1558  pq_sendint64(ctx->out, lsn); /* walEnd */
1559 
1560  /*
1561  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1562  * reserve space here.
1563  */
1564  pq_sendint64(ctx->out, 0); /* sendtime */
1565 }
1566 
1567 /*
1568  * LogicalDecodingContext 'write' callback.
1569  *
1570  * Actually write out data previously prepared by WalSndPrepareWrite out to
1571  * the network. Take as long as needed, but process replies from the other
1572  * side and check timeouts during that.
1573  */
1574 static void
1576  bool last_write)
1577 {
1578  TimestampTz now;
1579 
1580  /*
1581  * Fill the send timestamp last, so that it is taken as late as possible.
1582  * This is somewhat ugly, but the protocol is set as it's already used for
1583  * several releases by streaming physical replication.
1584  */
1587  pq_sendint64(&tmpbuf, now);
1588  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1589  tmpbuf.data, sizeof(int64));
1590 
1591  /* output previously gathered data in a CopyData packet */
1592  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1593 
1595 
1596  /* Try to flush pending output to the client */
1597  if (pq_flush_if_writable() != 0)
1598  WalSndShutdown();
1599 
1600  /* Try taking fast path unless we get too close to walsender timeout. */
1602  wal_sender_timeout / 2) &&
1603  !pq_is_send_pending())
1604  {
1605  return;
1606  }
1607 
1608  /* If we have pending write here, go to slow path */
1610 }
1611 
1612 /*
1613  * Wait until there is no pending write. Also process replies from the other
1614  * side and check timeouts during that.
1615  */
1616 static void
1618 {
1619  for (;;)
1620  {
1621  long sleeptime;
1622 
1623  /* Check for input from the client */
1625 
1626  /* die if timeout was reached */
1628 
1629  /* Send keepalive if the time has come */
1631 
1632  if (!pq_is_send_pending())
1633  break;
1634 
1636 
1637  /* Sleep until something happens or we time out */
1639  WAIT_EVENT_WAL_SENDER_WRITE_DATA);
1640 
1641  /* Clear any already-pending wakeups */
1643 
1645 
1646  /* Process any requests or signals received recently */
1647  if (ConfigReloadPending)
1648  {
1649  ConfigReloadPending = false;
1652  }
1653 
1654  /* Try to flush pending output to the client */
1655  if (pq_flush_if_writable() != 0)
1656  WalSndShutdown();
1657  }
1658 
1659  /* reactivate latch so WalSndLoop knows to continue */
1660  SetLatch(MyLatch);
1661 }
1662 
1663 /*
1664  * LogicalDecodingContext 'update_progress' callback.
1665  *
1666  * Write the current position to the lag tracker (see XLogSendPhysical).
1667  *
1668  * When skipping empty transactions, send a keepalive message if necessary.
1669  */
1670 static void
1672  bool skipped_xact)
1673 {
1674  static TimestampTz sendTime = 0;
1676  bool pending_writes = false;
1677  bool end_xact = ctx->end_xact;
1678 
1679  /*
1680  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1681  * avoid flooding the lag tracker when we commit frequently.
1682  *
1683  * We don't have a mechanism to get the ack for any LSN other than end
1684  * xact LSN from the downstream. So, we track lag only for end of
1685  * transaction LSN.
1686  */
1687 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1688  if (end_xact && TimestampDifferenceExceeds(sendTime, now,
1690  {
1691  LagTrackerWrite(lsn, now);
1692  sendTime = now;
1693  }
1694 
1695  /*
1696  * When skipping empty transactions in synchronous replication, we send a
1697  * keepalive message to avoid delaying such transactions.
1698  *
1699  * It is okay to check sync_standbys_defined flag without lock here as in
1700  * the worst case we will just send an extra keepalive message when it is
1701  * really not required.
1702  */
1703  if (skipped_xact &&
1704  SyncRepRequested() &&
1705  ((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
1706  {
1707  WalSndKeepalive(false, lsn);
1708 
1709  /* Try to flush pending output to the client */
1710  if (pq_flush_if_writable() != 0)
1711  WalSndShutdown();
1712 
1713  /* If we have pending write here, make sure it's actually flushed */
1714  if (pq_is_send_pending())
1715  pending_writes = true;
1716  }
1717 
1718  /*
1719  * Process pending writes if any or try to send a keepalive if required.
1720  * We don't need to try sending keep alive messages at the transaction end
1721  * as that will be done at a later point in time. This is required only
1722  * for large transactions where we don't send any changes to the
1723  * downstream and the receiver can timeout due to that.
1724  */
1725  if (pending_writes || (!end_xact &&
1727  wal_sender_timeout / 2)))
1729 }
1730 
1731 /*
1732  * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
1733  *
1734  * Returns end LSN of flushed WAL. Normally this will be >= loc, but
1735  * if we detect a shutdown request (either from postmaster or client)
1736  * we will return early, so caller must always check.
1737  */
1738 static XLogRecPtr
1740 {
1741  int wakeEvents;
1742  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1743 
1744  /*
1745  * Fast path to avoid acquiring the spinlock in case we already know we
1746  * have enough WAL available. This is particularly interesting if we're
1747  * far behind.
1748  */
1749  if (RecentFlushPtr != InvalidXLogRecPtr &&
1750  loc <= RecentFlushPtr)
1751  return RecentFlushPtr;
1752 
1753  /* Get a more recent flush pointer. */
1754  if (!RecoveryInProgress())
1755  RecentFlushPtr = GetFlushRecPtr(NULL);
1756  else
1757  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1758 
1759  for (;;)
1760  {
1761  long sleeptime;
1762 
1763  /* Clear any already-pending wakeups */
1765 
1767 
1768  /* Process any requests or signals received recently */
1769  if (ConfigReloadPending)
1770  {
1771  ConfigReloadPending = false;
1774  }
1775 
1776  /* Check for input from the client */
1778 
1779  /*
1780  * If we're shutting down, trigger pending WAL to be written out,
1781  * otherwise we'd possibly end up waiting for WAL that never gets
1782  * written, because walwriter has shut down already.
1783  */
1784  if (got_STOPPING)
1786 
1787  /* Update our idea of the currently flushed position. */
1788  if (!RecoveryInProgress())
1789  RecentFlushPtr = GetFlushRecPtr(NULL);
1790  else
1791  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1792 
1793  /*
1794  * If postmaster asked us to stop, don't wait anymore.
1795  *
1796  * It's important to do this check after the recomputation of
1797  * RecentFlushPtr, so we can send all remaining data before shutting
1798  * down.
1799  */
1800  if (got_STOPPING)
1801  break;
1802 
1803  /*
1804  * We only send regular messages to the client for full decoded
1805  * transactions, but a synchronous replication and walsender shutdown
1806  * possibly are waiting for a later location. So, before sleeping, we
1807  * send a ping containing the flush location. If the receiver is
1808  * otherwise idle, this keepalive will trigger a reply. Processing the
1809  * reply will update these MyWalSnd locations.
1810  */
1811  if (MyWalSnd->flush < sentPtr &&
1812  MyWalSnd->write < sentPtr &&
1815 
1816  /* check whether we're done */
1817  if (loc <= RecentFlushPtr)
1818  break;
1819 
1820  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1821  WalSndCaughtUp = true;
1822 
1823  /*
1824  * Try to flush any pending output to the client.
1825  */
1826  if (pq_flush_if_writable() != 0)
1827  WalSndShutdown();
1828 
1829  /*
1830  * If we have received CopyDone from the client, sent CopyDone
1831  * ourselves, and the output buffer is empty, it's time to exit
1832  * streaming, so fail the current WAL fetch request.
1833  */
1835  !pq_is_send_pending())
1836  break;
1837 
1838  /* die if timeout was reached */
1840 
1841  /* Send keepalive if the time has come */
1843 
1844  /*
1845  * Sleep until something happens or we time out. Also wait for the
1846  * socket becoming writable, if there's still pending output.
1847  * Otherwise we might sit on sendable output data while waiting for
1848  * new WAL to be generated. (But if we have nothing to send, we don't
1849  * want to wake on socket-writable.)
1850  */
1852 
1853  wakeEvents = WL_SOCKET_READABLE;
1854 
1855  if (pq_is_send_pending())
1856  wakeEvents |= WL_SOCKET_WRITEABLE;
1857 
1858  WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL);
1859  }
1860 
1861  /* reactivate latch so WalSndLoop knows to continue */
1862  SetLatch(MyLatch);
1863  return RecentFlushPtr;
1864 }
1865 
1866 /*
1867  * Execute an incoming replication command.
1868  *
1869  * Returns true if the cmd_string was recognized as WalSender command, false
1870  * if not.
1871  */
1872 bool
1873 exec_replication_command(const char *cmd_string)
1874 {
1875  int parse_rc;
1876  Node *cmd_node;
1877  const char *cmdtag;
1878  MemoryContext cmd_context;
1879  MemoryContext old_context;
1880 
1881  /*
1882  * If WAL sender has been told that shutdown is getting close, switch its
1883  * status accordingly to handle the next replication commands correctly.
1884  */
1885  if (got_STOPPING)
1887 
1888  /*
1889  * Throw error if in stopping mode. We need prevent commands that could
1890  * generate WAL while the shutdown checkpoint is being written. To be
1891  * safe, we just prohibit all new commands.
1892  */
1894  ereport(ERROR,
1895  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1896  errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1897 
1898  /*
1899  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1900  * command arrives. Clean up the old stuff if there's anything.
1901  */
1903 
1905 
1906  /*
1907  * Prepare to parse and execute the command.
1908  */
1910  "Replication command context",
1912  old_context = MemoryContextSwitchTo(cmd_context);
1913 
1914  replication_scanner_init(cmd_string);
1915 
1916  /*
1917  * Is it a WalSender command?
1918  */
1920  {
1921  /* Nope; clean up and get out. */
1923 
1924  MemoryContextSwitchTo(old_context);
1925  MemoryContextDelete(cmd_context);
1926 
1927  /* XXX this is a pretty random place to make this check */
1928  if (MyDatabaseId == InvalidOid)
1929  ereport(ERROR,
1930  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1931  errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1932 
1933  /* Tell the caller that this wasn't a WalSender command. */
1934  return false;
1935  }
1936 
1937  /*
1938  * Looks like a WalSender command, so parse it.
1939  */
1940  parse_rc = replication_yyparse();
1941  if (parse_rc != 0)
1942  ereport(ERROR,
1943  (errcode(ERRCODE_SYNTAX_ERROR),
1944  errmsg_internal("replication command parser returned %d",
1945  parse_rc)));
1947 
1948  cmd_node = replication_parse_result;
1949 
1950  /*
1951  * Report query to various monitoring facilities. For this purpose, we
1952  * report replication commands just like SQL commands.
1953  */
1954  debug_query_string = cmd_string;
1955 
1957 
1958  /*
1959  * Log replication command if log_replication_commands is enabled. Even
1960  * when it's disabled, log the command with DEBUG1 level for backward
1961  * compatibility.
1962  */
1964  (errmsg("received replication command: %s", cmd_string)));
1965 
1966  /*
1967  * Disallow replication commands in aborted transaction blocks.
1968  */
1970  ereport(ERROR,
1971  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1972  errmsg("current transaction is aborted, "
1973  "commands ignored until end of transaction block")));
1974 
1976 
1977  /*
1978  * Allocate buffers that will be used for each outgoing and incoming
1979  * message. We do this just once per command to reduce palloc overhead.
1980  */
1984 
1985  switch (cmd_node->type)
1986  {
1987  case T_IdentifySystemCmd:
1988  cmdtag = "IDENTIFY_SYSTEM";
1989  set_ps_display(cmdtag);
1990  IdentifySystem();
1991  EndReplicationCommand(cmdtag);
1992  break;
1993 
1994  case T_ReadReplicationSlotCmd:
1995  cmdtag = "READ_REPLICATION_SLOT";
1996  set_ps_display(cmdtag);
1998  EndReplicationCommand(cmdtag);
1999  break;
2000 
2001  case T_BaseBackupCmd:
2002  cmdtag = "BASE_BACKUP";
2003  set_ps_display(cmdtag);
2004  PreventInTransactionBlock(true, cmdtag);
2006  EndReplicationCommand(cmdtag);
2007  break;
2008 
2009  case T_CreateReplicationSlotCmd:
2010  cmdtag = "CREATE_REPLICATION_SLOT";
2011  set_ps_display(cmdtag);
2013  EndReplicationCommand(cmdtag);
2014  break;
2015 
2016  case T_DropReplicationSlotCmd:
2017  cmdtag = "DROP_REPLICATION_SLOT";
2018  set_ps_display(cmdtag);
2020  EndReplicationCommand(cmdtag);
2021  break;
2022 
2023  case T_AlterReplicationSlotCmd:
2024  cmdtag = "ALTER_REPLICATION_SLOT";
2025  set_ps_display(cmdtag);
2027  EndReplicationCommand(cmdtag);
2028  break;
2029 
2030  case T_StartReplicationCmd:
2031  {
2032  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
2033 
2034  cmdtag = "START_REPLICATION";
2035  set_ps_display(cmdtag);
2036  PreventInTransactionBlock(true, cmdtag);
2037 
2038  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
2039  StartReplication(cmd);
2040  else
2042 
2043  /* dupe, but necessary per libpqrcv_endstreaming */
2044  EndReplicationCommand(cmdtag);
2045 
2046  Assert(xlogreader != NULL);
2047  break;
2048  }
2049 
2050  case T_TimeLineHistoryCmd:
2051  cmdtag = "TIMELINE_HISTORY";
2052  set_ps_display(cmdtag);
2053  PreventInTransactionBlock(true, cmdtag);
2055  EndReplicationCommand(cmdtag);
2056  break;
2057 
2058  case T_VariableShowStmt:
2059  {
2061  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
2062 
2063  cmdtag = "SHOW";
2064  set_ps_display(cmdtag);
2065 
2066  /* syscache access needs a transaction environment */
2068  GetPGVariable(n->name, dest);
2070  EndReplicationCommand(cmdtag);
2071  }
2072  break;
2073 
2074  case T_UploadManifestCmd:
2075  cmdtag = "UPLOAD_MANIFEST";
2076  set_ps_display(cmdtag);
2077  PreventInTransactionBlock(true, cmdtag);
2078  UploadManifest();
2079  EndReplicationCommand(cmdtag);
2080  break;
2081 
2082  default:
2083  elog(ERROR, "unrecognized replication command node tag: %u",
2084  cmd_node->type);
2085  }
2086 
2087  /* done */
2088  MemoryContextSwitchTo(old_context);
2089  MemoryContextDelete(cmd_context);
2090 
2091  /*
2092  * We need not update ps display or pg_stat_activity, because PostgresMain
2093  * will reset those to "idle". But we must reset debug_query_string to
2094  * ensure it doesn't become a dangling pointer.
2095  */
2096  debug_query_string = NULL;
2097 
2098  return true;
2099 }
2100 
2101 /*
2102  * Process any incoming messages while streaming. Also checks if the remote
2103  * end has closed the connection.
2104  */
2105 static void
2107 {
2108  unsigned char firstchar;
2109  int maxmsglen;
2110  int r;
2111  bool received = false;
2112 
2114 
2115  /*
2116  * If we already received a CopyDone from the frontend, any subsequent
2117  * message is the beginning of a new command, and should be processed in
2118  * the main processing loop.
2119  */
2120  while (!streamingDoneReceiving)
2121  {
2122  pq_startmsgread();
2123  r = pq_getbyte_if_available(&firstchar);
2124  if (r < 0)
2125  {
2126  /* unexpected error or EOF */
2128  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2129  errmsg("unexpected EOF on standby connection")));
2130  proc_exit(0);
2131  }
2132  if (r == 0)
2133  {
2134  /* no data available without blocking */
2135  pq_endmsgread();
2136  break;
2137  }
2138 
2139  /* Validate message type and set packet size limit */
2140  switch (firstchar)
2141  {
2142  case PqMsg_CopyData:
2143  maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
2144  break;
2145  case PqMsg_CopyDone:
2146  case PqMsg_Terminate:
2147  maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
2148  break;
2149  default:
2150  ereport(FATAL,
2151  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2152  errmsg("invalid standby message type \"%c\"",
2153  firstchar)));
2154  maxmsglen = 0; /* keep compiler quiet */
2155  break;
2156  }
2157 
2158  /* Read the message contents */
2160  if (pq_getmessage(&reply_message, maxmsglen))
2161  {
2163  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2164  errmsg("unexpected EOF on standby connection")));
2165  proc_exit(0);
2166  }
2167 
2168  /* ... and process it */
2169  switch (firstchar)
2170  {
2171  /*
2172  * 'd' means a standby reply wrapped in a CopyData packet.
2173  */
2174  case PqMsg_CopyData:
2176  received = true;
2177  break;
2178 
2179  /*
2180  * CopyDone means the standby requested to finish streaming.
2181  * Reply with CopyDone, if we had not sent that already.
2182  */
2183  case PqMsg_CopyDone:
2184  if (!streamingDoneSending)
2185  {
2186  pq_putmessage_noblock('c', NULL, 0);
2187  streamingDoneSending = true;
2188  }
2189 
2190  streamingDoneReceiving = true;
2191  received = true;
2192  break;
2193 
2194  /*
2195  * 'X' means that the standby is closing down the socket.
2196  */
2197  case PqMsg_Terminate:
2198  proc_exit(0);
2199 
2200  default:
2201  Assert(false); /* NOT REACHED */
2202  }
2203  }
2204 
2205  /*
2206  * Save the last reply timestamp if we've received at least one reply.
2207  */
2208  if (received)
2209  {
2211  waiting_for_ping_response = false;
2212  }
2213 }
2214 
2215 /*
2216  * Process a status update message received from standby.
2217  */
2218 static void
2220 {
2221  char msgtype;
2222 
2223  /*
2224  * Check message type from the first byte.
2225  */
2226  msgtype = pq_getmsgbyte(&reply_message);
2227 
2228  switch (msgtype)
2229  {
2230  case 'r':
2232  break;
2233 
2234  case 'h':
2236  break;
2237 
2238  default:
2240  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2241  errmsg("unexpected message type \"%c\"", msgtype)));
2242  proc_exit(0);
2243  }
2244 }
2245 
2246 /*
2247  * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
2248  */
2249 static void
2251 {
2252  bool changed = false;
2254 
2255  Assert(lsn != InvalidXLogRecPtr);
2256  SpinLockAcquire(&slot->mutex);
2257  if (slot->data.restart_lsn != lsn)
2258  {
2259  changed = true;
2260  slot->data.restart_lsn = lsn;
2261  }
2262  SpinLockRelease(&slot->mutex);
2263 
2264  if (changed)
2265  {
2268  }
2269 
2270  /*
2271  * One could argue that the slot should be saved to disk now, but that'd
2272  * be energy wasted - the worst thing lost information could cause here is
2273  * to give wrong information in a statistics view - we'll just potentially
2274  * be more conservative in removing files.
2275  */
2276 }
2277 
2278 /*
2279  * Regular reply from standby advising of WAL locations on standby server.
2280  */
2281 static void
2283 {
2284  XLogRecPtr writePtr,
2285  flushPtr,
2286  applyPtr;
2287  bool replyRequested;
2288  TimeOffset writeLag,
2289  flushLag,
2290  applyLag;
2291  bool clearLagTimes;
2292  TimestampTz now;
2293  TimestampTz replyTime;
2294 
2295  static bool fullyAppliedLastTime = false;
2296 
2297  /* the caller already consumed the msgtype byte */
2298  writePtr = pq_getmsgint64(&reply_message);
2299  flushPtr = pq_getmsgint64(&reply_message);
2300  applyPtr = pq_getmsgint64(&reply_message);
2301  replyTime = pq_getmsgint64(&reply_message);
2302  replyRequested = pq_getmsgbyte(&reply_message);
2303 
2305  {
2306  char *replyTimeStr;
2307 
2308  /* Copy because timestamptz_to_str returns a static buffer */
2309  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2310 
2311  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
2312  LSN_FORMAT_ARGS(writePtr),
2313  LSN_FORMAT_ARGS(flushPtr),
2314  LSN_FORMAT_ARGS(applyPtr),
2315  replyRequested ? " (reply requested)" : "",
2316  replyTimeStr);
2317 
2318  pfree(replyTimeStr);
2319  }
2320 
2321  /* See if we can compute the round-trip lag for these positions. */
2323  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
2324  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
2325  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
2326 
2327  /*
2328  * If the standby reports that it has fully replayed the WAL in two
2329  * consecutive reply messages, then the second such message must result
2330  * from wal_receiver_status_interval expiring on the standby. This is a
2331  * convenient time to forget the lag times measured when it last
2332  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
2333  * until more WAL traffic arrives.
2334  */
2335  clearLagTimes = false;
2336  if (applyPtr == sentPtr)
2337  {
2338  if (fullyAppliedLastTime)
2339  clearLagTimes = true;
2340  fullyAppliedLastTime = true;
2341  }
2342  else
2343  fullyAppliedLastTime = false;
2344 
2345  /* Send a reply if the standby requested one. */
2346  if (replyRequested)
2348 
2349  /*
2350  * Update shared state for this WalSender process based on reply data from
2351  * standby.
2352  */
2353  {
2354  WalSnd *walsnd = MyWalSnd;
2355 
2356  SpinLockAcquire(&walsnd->mutex);
2357  walsnd->write = writePtr;
2358  walsnd->flush = flushPtr;
2359  walsnd->apply = applyPtr;
2360  if (writeLag != -1 || clearLagTimes)
2361  walsnd->writeLag = writeLag;
2362  if (flushLag != -1 || clearLagTimes)
2363  walsnd->flushLag = flushLag;
2364  if (applyLag != -1 || clearLagTimes)
2365  walsnd->applyLag = applyLag;
2366  walsnd->replyTime = replyTime;
2367  SpinLockRelease(&walsnd->mutex);
2368  }
2369 
2372 
2373  /*
2374  * Advance our local xmin horizon when the client confirmed a flush.
2375  */
2376  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
2377  {
2380  else
2382  }
2383 }
2384 
2385 /* compute new replication slot xmin horizon if needed */
2386 static void
2388 {
2389  bool changed = false;
2391 
2392  SpinLockAcquire(&slot->mutex);
2394 
2395  /*
2396  * For physical replication we don't need the interlock provided by xmin
2397  * and effective_xmin since the consequences of a missed increase are
2398  * limited to query cancellations, so set both at once.
2399  */
2400  if (!TransactionIdIsNormal(slot->data.xmin) ||
2401  !TransactionIdIsNormal(feedbackXmin) ||
2402  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
2403  {
2404  changed = true;
2405  slot->data.xmin = feedbackXmin;
2406  slot->effective_xmin = feedbackXmin;
2407  }
2408  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
2409  !TransactionIdIsNormal(feedbackCatalogXmin) ||
2410  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
2411  {
2412  changed = true;
2413  slot->data.catalog_xmin = feedbackCatalogXmin;
2414  slot->effective_catalog_xmin = feedbackCatalogXmin;
2415  }
2416  SpinLockRelease(&slot->mutex);
2417 
2418  if (changed)
2419  {
2422  }
2423 }
2424 
2425 /*
2426  * Check that the provided xmin/epoch are sane, that is, not in the future
2427  * and not so far back as to be already wrapped around.
2428  *
2429  * Epoch of nextXid should be same as standby, or if the counter has
2430  * wrapped, then one greater than standby.
2431  *
2432  * This check doesn't care about whether clog exists for these xids
2433  * at all.
2434  */
2435 static bool
2437 {
2438  FullTransactionId nextFullXid;
2439  TransactionId nextXid;
2440  uint32 nextEpoch;
2441 
2442  nextFullXid = ReadNextFullTransactionId();
2443  nextXid = XidFromFullTransactionId(nextFullXid);
2444  nextEpoch = EpochFromFullTransactionId(nextFullXid);
2445 
2446  if (xid <= nextXid)
2447  {
2448  if (epoch != nextEpoch)
2449  return false;
2450  }
2451  else
2452  {
2453  if (epoch + 1 != nextEpoch)
2454  return false;
2455  }
2456 
2457  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2458  return false; /* epoch OK, but it's wrapped around */
2459 
2460  return true;
2461 }
2462 
2463 /*
2464  * Hot Standby feedback
2465  */
2466 static void
2468 {
2469  TransactionId feedbackXmin;
2470  uint32 feedbackEpoch;
2471  TransactionId feedbackCatalogXmin;
2472  uint32 feedbackCatalogEpoch;
2473  TimestampTz replyTime;
2474 
2475  /*
2476  * Decipher the reply message. The caller already consumed the msgtype
2477  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2478  * of this message.
2479  */
2480  replyTime = pq_getmsgint64(&reply_message);
2481  feedbackXmin = pq_getmsgint(&reply_message, 4);
2482  feedbackEpoch = pq_getmsgint(&reply_message, 4);
2483  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2484  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2485 
2487  {
2488  char *replyTimeStr;
2489 
2490  /* Copy because timestamptz_to_str returns a static buffer */
2491  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2492 
2493  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2494  feedbackXmin,
2495  feedbackEpoch,
2496  feedbackCatalogXmin,
2497  feedbackCatalogEpoch,
2498  replyTimeStr);
2499 
2500  pfree(replyTimeStr);
2501  }
2502 
2503  /*
2504  * Update shared state for this WalSender process based on reply data from
2505  * standby.
2506  */
2507  {
2508  WalSnd *walsnd = MyWalSnd;
2509 
2510  SpinLockAcquire(&walsnd->mutex);
2511  walsnd->replyTime = replyTime;
2512  SpinLockRelease(&walsnd->mutex);
2513  }
2514 
2515  /*
2516  * Unset WalSender's xmins if the feedback message values are invalid.
2517  * This happens when the downstream turned hot_standby_feedback off.
2518  */
2519  if (!TransactionIdIsNormal(feedbackXmin)
2520  && !TransactionIdIsNormal(feedbackCatalogXmin))
2521  {
2523  if (MyReplicationSlot != NULL)
2524  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2525  return;
2526  }
2527 
2528  /*
2529  * Check that the provided xmin/epoch are sane, that is, not in the future
2530  * and not so far back as to be already wrapped around. Ignore if not.
2531  */
2532  if (TransactionIdIsNormal(feedbackXmin) &&
2533  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2534  return;
2535 
2536  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2537  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2538  return;
2539 
2540  /*
2541  * Set the WalSender's xmin equal to the standby's requested xmin, so that
2542  * the xmin will be taken into account by GetSnapshotData() /
2543  * ComputeXidHorizons(). This will hold back the removal of dead rows and
2544  * thereby prevent the generation of cleanup conflicts on the standby
2545  * server.
2546  *
2547  * There is a small window for a race condition here: although we just
2548  * checked that feedbackXmin precedes nextXid, the nextXid could have
2549  * gotten advanced between our fetching it and applying the xmin below,
2550  * perhaps far enough to make feedbackXmin wrap around. In that case the
2551  * xmin we set here would be "in the future" and have no effect. No point
2552  * in worrying about this since it's too late to save the desired data
2553  * anyway. Assuming that the standby sends us an increasing sequence of
2554  * xmins, this could only happen during the first reply cycle, else our
2555  * own xmin would prevent nextXid from advancing so far.
2556  *
2557  * We don't bother taking the ProcArrayLock here. Setting the xmin field
2558  * is assumed atomic, and there's no real need to prevent concurrent
2559  * horizon determinations. (If we're moving our xmin forward, this is
2560  * obviously safe, and if we're moving it backwards, well, the data is at
2561  * risk already since a VACUUM could already have determined the horizon.)
2562  *
2563  * If we're using a replication slot we reserve the xmin via that,
2564  * otherwise via the walsender's PGPROC entry. We can only track the
2565  * catalog xmin separately when using a slot, so we store the least of the
2566  * two provided when not using a slot.
2567  *
2568  * XXX: It might make sense to generalize the ephemeral slot concept and
2569  * always use the slot mechanism to handle the feedback xmin.
2570  */
2571  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2572  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2573  else
2574  {
2575  if (TransactionIdIsNormal(feedbackCatalogXmin)
2576  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2577  MyProc->xmin = feedbackCatalogXmin;
2578  else
2579  MyProc->xmin = feedbackXmin;
2580  }
2581 }
2582 
2583 /*
2584  * Compute how long send/receive loops should sleep.
2585  *
2586  * If wal_sender_timeout is enabled we want to wake up in time to send
2587  * keepalives and to abort the connection if wal_sender_timeout has been
2588  * reached.
2589  */
2590 static long
2592 {
2593  long sleeptime = 10000; /* 10 s */
2594 
2596  {
2597  TimestampTz wakeup_time;
2598 
2599  /*
2600  * At the latest stop sleeping once wal_sender_timeout has been
2601  * reached.
2602  */
2605 
2606  /*
2607  * If no ping has been sent yet, wakeup when it's time to do so.
2608  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2609  * the timeout passed without a response.
2610  */
2613  wal_sender_timeout / 2);
2614 
2615  /* Compute relative time until wakeup. */
2616  sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
2617  }
2618 
2619  return sleeptime;
2620 }
2621 
2622 /*
2623  * Check whether there have been responses by the client within
2624  * wal_sender_timeout and shutdown if not. Using last_processing as the
2625  * reference point avoids counting server-side stalls against the client.
2626  * However, a long server-side stall can make WalSndKeepaliveIfNecessary()
2627  * postdate last_processing by more than wal_sender_timeout. If that happens,
2628  * the client must reply almost immediately to avoid a timeout. This rarely
2629  * affects the default configuration, under which clients spontaneously send a
2630  * message every standby_message_timeout = wal_sender_timeout/6 = 10s. We
2631  * could eliminate that problem by recognizing timeout expiration at
2632  * wal_sender_timeout/2 after the keepalive.
2633  */
2634 static void
2636 {
2637  TimestampTz timeout;
2638 
2639  /* don't bail out if we're doing something that doesn't require timeouts */
2640  if (last_reply_timestamp <= 0)
2641  return;
2642 
2645 
2646  if (wal_sender_timeout > 0 && last_processing >= timeout)
2647  {
2648  /*
2649  * Since typically expiration of replication timeout means
2650  * communication problem, we don't send the error message to the
2651  * standby.
2652  */
2654  (errmsg("terminating walsender process due to replication timeout")));
2655 
2656  WalSndShutdown();
2657  }
2658 }
2659 
2660 /* Main loop of walsender process that streams the WAL over Copy messages. */
2661 static void
2663 {
2664  /*
2665  * Initialize the last reply timestamp. That enables timeout processing
2666  * from hereon.
2667  */
2669  waiting_for_ping_response = false;
2670 
2671  /*
2672  * Loop until we reach the end of this timeline or the client requests to
2673  * stop streaming.
2674  */
2675  for (;;)
2676  {
2677  /* Clear any already-pending wakeups */
2679 
2681 
2682  /* Process any requests or signals received recently */
2683  if (ConfigReloadPending)
2684  {
2685  ConfigReloadPending = false;
2688  }
2689 
2690  /* Check for input from the client */
2692 
2693  /*
2694  * If we have received CopyDone from the client, sent CopyDone
2695  * ourselves, and the output buffer is empty, it's time to exit
2696  * streaming.
2697  */
2699  !pq_is_send_pending())
2700  break;
2701 
2702  /*
2703  * If we don't have any pending data in the output buffer, try to send
2704  * some more. If there is some, we don't bother to call send_data
2705  * again until we've flushed it ... but we'd better assume we are not
2706  * caught up.
2707  */
2708  if (!pq_is_send_pending())
2709  send_data();
2710  else
2711  WalSndCaughtUp = false;
2712 
2713  /* Try to flush pending output to the client */
2714  if (pq_flush_if_writable() != 0)
2715  WalSndShutdown();
2716 
2717  /* If nothing remains to be sent right now ... */
2719  {
2720  /*
2721  * If we're in catchup state, move to streaming. This is an
2722  * important state change for users to know about, since before
2723  * this point data loss might occur if the primary dies and we
2724  * need to failover to the standby. The state change is also
2725  * important for synchronous replication, since commits that
2726  * started to wait at that point might wait for some time.
2727  */
2729  {
2730  ereport(DEBUG1,
2731  (errmsg_internal("\"%s\" has now caught up with upstream server",
2732  application_name)));
2734  }
2735 
2736  /*
2737  * When SIGUSR2 arrives, we send any outstanding logs up to the
2738  * shutdown checkpoint record (i.e., the latest record), wait for
2739  * them to be replicated to the standby, and exit. This may be a
2740  * normal termination at shutdown, or a promotion, the walsender
2741  * is not sure which.
2742  */
2743  if (got_SIGUSR2)
2744  WalSndDone(send_data);
2745  }
2746 
2747  /* Check for replication timeout. */
2749 
2750  /* Send keepalive if the time has come */
2752 
2753  /*
2754  * Block if we have unsent data. XXX For logical replication, let
2755  * WalSndWaitForWal() handle any other blocking; idle receivers need
2756  * its additional actions. For physical replication, also block if
2757  * caught up; its send_data does not block.
2758  */
2759  if ((WalSndCaughtUp && send_data != XLogSendLogical &&
2762  {
2763  long sleeptime;
2764  int wakeEvents;
2765 
2767  wakeEvents = WL_SOCKET_READABLE;
2768  else
2769  wakeEvents = 0;
2770 
2771  /*
2772  * Use fresh timestamp, not last_processing, to reduce the chance
2773  * of reaching wal_sender_timeout before sending a keepalive.
2774  */
2776 
2777  if (pq_is_send_pending())
2778  wakeEvents |= WL_SOCKET_WRITEABLE;
2779 
2780  /* Sleep until something happens or we time out */
2781  WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
2782  }
2783  }
2784 }
2785 
2786 /* Initialize a per-walsender data structure for this walsender process */
2787 static void
2789 {
2790  int i;
2791 
2792  /*
2793  * WalSndCtl should be set up already (we inherit this by fork() or
2794  * EXEC_BACKEND mechanism from the postmaster).
2795  */
2796  Assert(WalSndCtl != NULL);
2797  Assert(MyWalSnd == NULL);
2798 
2799  /*
2800  * Find a free walsender slot and reserve it. This must not fail due to
2801  * the prior check for free WAL senders in InitProcess().
2802  */
2803  for (i = 0; i < max_wal_senders; i++)
2804  {
2805  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2806 
2807  SpinLockAcquire(&walsnd->mutex);
2808 
2809  if (walsnd->pid != 0)
2810  {
2811  SpinLockRelease(&walsnd->mutex);
2812  continue;
2813  }
2814  else
2815  {
2816  /*
2817  * Found a free slot. Reserve it for us.
2818  */
2819  walsnd->pid = MyProcPid;
2820  walsnd->state = WALSNDSTATE_STARTUP;
2821  walsnd->sentPtr = InvalidXLogRecPtr;
2822  walsnd->needreload = false;
2823  walsnd->write = InvalidXLogRecPtr;
2824  walsnd->flush = InvalidXLogRecPtr;
2825  walsnd->apply = InvalidXLogRecPtr;
2826  walsnd->writeLag = -1;
2827  walsnd->flushLag = -1;
2828  walsnd->applyLag = -1;
2829  walsnd->sync_standby_priority = 0;
2830  walsnd->latch = &MyProc->procLatch;
2831  walsnd->replyTime = 0;
2832 
2833  /*
2834  * The kind assignment is done here and not in StartReplication()
2835  * and StartLogicalReplication(). Indeed, the logical walsender
2836  * needs to read WAL records (like snapshot of running
2837  * transactions) during the slot creation. So it needs to be woken
2838  * up based on its kind.
2839  *
2840  * The kind assignment could also be done in StartReplication(),
2841  * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
2842  * seems better to set it on one place.
2843  */
2844  if (MyDatabaseId == InvalidOid)
2845  walsnd->kind = REPLICATION_KIND_PHYSICAL;
2846  else
2847  walsnd->kind = REPLICATION_KIND_LOGICAL;
2848 
2849  SpinLockRelease(&walsnd->mutex);
2850  /* don't need the lock anymore */
2851  MyWalSnd = (WalSnd *) walsnd;
2852 
2853  break;
2854  }
2855  }
2856 
2857  Assert(MyWalSnd != NULL);
2858 
2859  /* Arrange to clean up at walsender exit */
2861 }
2862 
2863 /* Destroy the per-walsender data structure for this walsender process */
2864 static void
2866 {
2867  WalSnd *walsnd = MyWalSnd;
2868 
2869  Assert(walsnd != NULL);
2870 
2871  MyWalSnd = NULL;
2872 
2873  SpinLockAcquire(&walsnd->mutex);
2874  /* clear latch while holding the spinlock, so it can safely be read */
2875  walsnd->latch = NULL;
2876  /* Mark WalSnd struct as no longer being in use. */
2877  walsnd->pid = 0;
2878  SpinLockRelease(&walsnd->mutex);
2879 }
2880 
2881 /* XLogReaderRoutine->segment_open callback */
2882 static void
2884  TimeLineID *tli_p)
2885 {
2886  char path[MAXPGPATH];
2887 
2888  /*-------
2889  * When reading from a historic timeline, and there is a timeline switch
2890  * within this segment, read from the WAL segment belonging to the new
2891  * timeline.
2892  *
2893  * For example, imagine that this server is currently on timeline 5, and
2894  * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
2895  * 0/13002088. In pg_wal, we have these files:
2896  *
2897  * ...
2898  * 000000040000000000000012
2899  * 000000040000000000000013
2900  * 000000050000000000000013
2901  * 000000050000000000000014
2902  * ...
2903  *
2904  * In this situation, when requested to send the WAL from segment 0x13, on
2905  * timeline 4, we read the WAL from file 000000050000000000000013. Archive
2906  * recovery prefers files from newer timelines, so if the segment was
2907  * restored from the archive on this server, the file belonging to the old
2908  * timeline, 000000040000000000000013, might not exist. Their contents are
2909  * equal up to the switchpoint, because at a timeline switch, the used
2910  * portion of the old segment is copied to the new file.
2911  */
2912  *tli_p = sendTimeLine;
2914  {
2915  XLogSegNo endSegNo;
2916 
2917  XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
2918  if (nextSegNo == endSegNo)
2919  *tli_p = sendTimeLineNextTLI;
2920  }
2921 
2922  XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
2923  state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2924  if (state->seg.ws_file >= 0)
2925  return;
2926 
2927  /*
2928  * If the file is not found, assume it's because the standby asked for a
2929  * too old WAL segment that has already been removed or recycled.
2930  */
2931  if (errno == ENOENT)
2932  {
2933  char xlogfname[MAXFNAMELEN];
2934  int save_errno = errno;
2935 
2936  XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
2937  errno = save_errno;
2938  ereport(ERROR,
2940  errmsg("requested WAL segment %s has already been removed",
2941  xlogfname)));
2942  }
2943  else
2944  ereport(ERROR,
2946  errmsg("could not open file \"%s\": %m",
2947  path)));
2948 }
2949 
2950 /*
2951  * Send out the WAL in its normal physical/stored form.
2952  *
2953  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
2954  * but not yet sent to the client, and buffer it in the libpq output
2955  * buffer.
2956  *
2957  * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
2958  * otherwise WalSndCaughtUp is set to false.
2959  */
2960 static void
2962 {
2963  XLogRecPtr SendRqstPtr;
2964  XLogRecPtr startptr;
2965  XLogRecPtr endptr;
2966  Size nbytes;
2967  XLogSegNo segno;
2968  WALReadError errinfo;
2969  Size rbytes;
2970 
2971  /* If requested switch the WAL sender to the stopping state. */
2972  if (got_STOPPING)
2974 
2976  {
2977  WalSndCaughtUp = true;
2978  return;
2979  }
2980 
2981  /* Figure out how far we can safely send the WAL. */
2983  {
2984  /*
2985  * Streaming an old timeline that's in this server's history, but is
2986  * not the one we're currently inserting or replaying. It can be
2987  * streamed up to the point where we switched off that timeline.
2988  */
2989  SendRqstPtr = sendTimeLineValidUpto;
2990  }
2991  else if (am_cascading_walsender)
2992  {
2993  TimeLineID SendRqstTLI;
2994 
2995  /*
2996  * Streaming the latest timeline on a standby.
2997  *
2998  * Attempt to send all WAL that has already been replayed, so that we
2999  * know it's valid. If we're receiving WAL through streaming
3000  * replication, it's also OK to send any WAL that has been received
3001  * but not replayed.
3002  *
3003  * The timeline we're recovering from can change, or we can be
3004  * promoted. In either case, the current timeline becomes historic. We
3005  * need to detect that so that we don't try to stream past the point
3006  * where we switched to another timeline. We check for promotion or
3007  * timeline switch after calculating FlushPtr, to avoid a race
3008  * condition: if the timeline becomes historic just after we checked
3009  * that it was still current, it's still be OK to stream it up to the
3010  * FlushPtr that was calculated before it became historic.
3011  */
3012  bool becameHistoric = false;
3013 
3014  SendRqstPtr = GetStandbyFlushRecPtr(&SendRqstTLI);
3015 
3016  if (!RecoveryInProgress())
3017  {
3018  /* We have been promoted. */
3019  SendRqstTLI = GetWALInsertionTimeLine();
3020  am_cascading_walsender = false;
3021  becameHistoric = true;
3022  }
3023  else
3024  {
3025  /*
3026  * Still a cascading standby. But is the timeline we're sending
3027  * still the one recovery is recovering from?
3028  */
3029  if (sendTimeLine != SendRqstTLI)
3030  becameHistoric = true;
3031  }
3032 
3033  if (becameHistoric)
3034  {
3035  /*
3036  * The timeline we were sending has become historic. Read the
3037  * timeline history file of the new timeline to see where exactly
3038  * we forked off from the timeline we were sending.
3039  */
3040  List *history;
3041 
3042  history = readTimeLineHistory(SendRqstTLI);
3044 
3046  list_free_deep(history);
3047 
3048  sendTimeLineIsHistoric = true;
3049 
3050  SendRqstPtr = sendTimeLineValidUpto;
3051  }
3052  }
3053  else
3054  {
3055  /*
3056  * Streaming the current timeline on a primary.
3057  *
3058  * Attempt to send all data that's already been written out and
3059  * fsync'd to disk. We cannot go further than what's been written out
3060  * given the current implementation of WALRead(). And in any case
3061  * it's unsafe to send WAL that is not securely down to disk on the
3062  * primary: if the primary subsequently crashes and restarts, standbys
3063  * must not have applied any WAL that got lost on the primary.
3064  */
3065  SendRqstPtr = GetFlushRecPtr(NULL);
3066  }
3067 
3068  /*
3069  * Record the current system time as an approximation of the time at which
3070  * this WAL location was written for the purposes of lag tracking.
3071  *
3072  * In theory we could make XLogFlush() record a time in shmem whenever WAL
3073  * is flushed and we could get that time as well as the LSN when we call
3074  * GetFlushRecPtr() above (and likewise for the cascading standby
3075  * equivalent), but rather than putting any new code into the hot WAL path
3076  * it seems good enough to capture the time here. We should reach this
3077  * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
3078  * may take some time, we read the WAL flush pointer and take the time
3079  * very close to together here so that we'll get a later position if it is
3080  * still moving.
3081  *
3082  * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
3083  * this gives us a cheap approximation for the WAL flush time for this
3084  * LSN.
3085  *
3086  * Note that the LSN is not necessarily the LSN for the data contained in
3087  * the present message; it's the end of the WAL, which might be further
3088  * ahead. All the lag tracking machinery cares about is finding out when
3089  * that arbitrary LSN is eventually reported as written, flushed and
3090  * applied, so that it can measure the elapsed time.
3091  */
3092  LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
3093 
3094  /*
3095  * If this is a historic timeline and we've reached the point where we
3096  * forked to the next timeline, stop streaming.
3097  *
3098  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
3099  * startup process will normally replay all WAL that has been received
3100  * from the primary, before promoting, but if the WAL streaming is
3101  * terminated at a WAL page boundary, the valid portion of the timeline
3102  * might end in the middle of a WAL record. We might've already sent the
3103  * first half of that partial WAL record to the cascading standby, so that
3104  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
3105  * replay the partial WAL record either, so it can still follow our
3106  * timeline switch.
3107  */
3109  {
3110  /* close the current file. */
3111  if (xlogreader->seg.ws_file >= 0)
3113 
3114  /* Send CopyDone */
3115  pq_putmessage_noblock('c', NULL, 0);
3116  streamingDoneSending = true;
3117 
3118  WalSndCaughtUp = true;
3119 
3120  elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
3123  return;
3124  }
3125 
3126  /* Do we have any work to do? */
3127  Assert(sentPtr <= SendRqstPtr);
3128  if (SendRqstPtr <= sentPtr)
3129  {
3130  WalSndCaughtUp = true;
3131  return;
3132  }
3133 
3134  /*
3135  * Figure out how much to send in one message. If there's no more than
3136  * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
3137  * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
3138  *
3139  * The rounding is not only for performance reasons. Walreceiver relies on
3140  * the fact that we never split a WAL record across two messages. Since a
3141  * long WAL record is split at page boundary into continuation records,
3142  * page boundary is always a safe cut-off point. We also assume that
3143  * SendRqstPtr never points to the middle of a WAL record.
3144  */
3145  startptr = sentPtr;
3146  endptr = startptr;
3147  endptr += MAX_SEND_SIZE;
3148 
3149  /* if we went beyond SendRqstPtr, back off */
3150  if (SendRqstPtr <= endptr)
3151  {
3152  endptr = SendRqstPtr;
3154  WalSndCaughtUp = false;
3155  else
3156  WalSndCaughtUp = true;
3157  }
3158  else
3159  {
3160  /* round down to page boundary. */
3161  endptr -= (endptr % XLOG_BLCKSZ);
3162  WalSndCaughtUp = false;
3163  }
3164 
3165  nbytes = endptr - startptr;
3166  Assert(nbytes <= MAX_SEND_SIZE);
3167 
3168  /*
3169  * OK to read and send the slice.
3170  */
3172  pq_sendbyte(&output_message, 'w');
3173 
3174  pq_sendint64(&output_message, startptr); /* dataStart */
3175  pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
3176  pq_sendint64(&output_message, 0); /* sendtime, filled in last */
3177 
3178  /*
3179  * Read the log directly into the output buffer to avoid extra memcpy
3180  * calls.
3181  */
3183 
3184 retry:
3185  /* attempt to read WAL from WAL buffers first */
3187  startptr, nbytes, xlogreader->seg.ws_tli);
3188  output_message.len += rbytes;
3189  startptr += rbytes;
3190  nbytes -= rbytes;
3191 
3192  /* now read the remaining WAL from WAL file */
3193  if (nbytes > 0 &&
3196  startptr,
3197  nbytes,
3198  xlogreader->seg.ws_tli, /* Pass the current TLI because
3199  * only WalSndSegmentOpen controls
3200  * whether new TLI is needed. */
3201  &errinfo))
3202  WALReadRaiseError(&errinfo);
3203 
3204  /* See logical_read_xlog_page(). */
3205  XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
3207 
3208  /*
3209  * During recovery, the currently-open WAL file might be replaced with the
3210  * file of the same name retrieved from archive. So we always need to
3211  * check what we read was valid after reading into the buffer. If it's
3212  * invalid, we try to open and read the file again.
3213  */
3215  {
3216  WalSnd *walsnd = MyWalSnd;
3217  bool reload;
3218 
3219  SpinLockAcquire(&walsnd->mutex);
3220  reload = walsnd->needreload;
3221  walsnd->needreload = false;
3222  SpinLockRelease(&walsnd->mutex);
3223 
3224  if (reload && xlogreader->seg.ws_file >= 0)
3225  {
3227 
3228  goto retry;
3229  }
3230  }
3231 
3232  output_message.len += nbytes;
3234 
3235  /*
3236  * Fill the send timestamp last, so that it is taken as late as possible.
3237  */
3240  memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
3241  tmpbuf.data, sizeof(int64));
3242 
3244 
3245  sentPtr = endptr;
3246 
3247  /* Update shared memory status */
3248  {
3249  WalSnd *walsnd = MyWalSnd;
3250 
3251  SpinLockAcquire(&walsnd->mutex);
3252  walsnd->sentPtr = sentPtr;
3253  SpinLockRelease(&walsnd->mutex);
3254  }
3255 
3256  /* Report progress of XLOG streaming in PS display */
3258  {
3259  char activitymsg[50];
3260 
3261  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
3263  set_ps_display(activitymsg);
3264  }
3265 }
3266 
3267 /*
3268  * Stream out logically decoded data.
3269  */
3270 static void
3272 {
3273  XLogRecord *record;
3274  char *errm;
3275 
3276  /*
3277  * We'll use the current flush point to determine whether we've caught up.
3278  * This variable is static in order to cache it across calls. Caching is
3279  * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
3280  * spinlock.
3281  */
3282  static XLogRecPtr flushPtr = InvalidXLogRecPtr;
3283 
3284  /*
3285  * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
3286  * true in WalSndWaitForWal, if we're actually waiting. We also set to
3287  * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
3288  * didn't wait - i.e. when we're shutting down.
3289  */
3290  WalSndCaughtUp = false;
3291 
3292  record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
3293 
3294  /* xlog record was invalid */
3295  if (errm != NULL)
3296  elog(ERROR, "could not find record while sending logically-decoded data: %s",
3297  errm);
3298 
3299  if (record != NULL)
3300  {
3301  /*
3302  * Note the lack of any call to LagTrackerWrite() which is handled by
3303  * WalSndUpdateProgress which is called by output plugin through
3304  * logical decoding write api.
3305  */
3307 
3309  }
3310 
3311  /*
3312  * If first time through in this session, initialize flushPtr. Otherwise,
3313  * we only need to update flushPtr if EndRecPtr is past it.
3314  */
3315  if (flushPtr == InvalidXLogRecPtr ||
3316  logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3317  {
3319  flushPtr = GetStandbyFlushRecPtr(NULL);
3320  else
3321  flushPtr = GetFlushRecPtr(NULL);
3322  }
3323 
3324  /* If EndRecPtr is still past our flushPtr, it means we caught up. */
3325  if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3326  WalSndCaughtUp = true;
3327 
3328  /*
3329  * If we're caught up and have been requested to stop, have WalSndLoop()
3330  * terminate the connection in an orderly manner, after writing out all
3331  * the pending data.
3332  */
3334  got_SIGUSR2 = true;
3335 
3336  /* Update shared memory status */
3337  {
3338  WalSnd *walsnd = MyWalSnd;
3339 
3340  SpinLockAcquire(&walsnd->mutex);
3341  walsnd->sentPtr = sentPtr;
3342  SpinLockRelease(&walsnd->mutex);
3343  }
3344 }
3345 
3346 /*
3347  * Shutdown if the sender is caught up.
3348  *
3349  * NB: This should only be called when the shutdown signal has been received
3350  * from postmaster.
3351  *
3352  * Note that if we determine that there's still more data to send, this
3353  * function will return control to the caller.
3354  */
3355 static void
3357 {
3358  XLogRecPtr replicatedPtr;
3359 
3360  /* ... let's just be real sure we're caught up ... */
3361  send_data();
3362 
3363  /*
3364  * To figure out whether all WAL has successfully been replicated, check
3365  * flush location if valid, write otherwise. Tools like pg_receivewal will
3366  * usually (unless in synchronous mode) return an invalid flush location.
3367  */
3368  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
3370 
3371  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
3372  !pq_is_send_pending())
3373  {
3374  QueryCompletion qc;
3375 
3376  /* Inform the standby that XLOG streaming is done */
3377  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
3378  EndCommand(&qc, DestRemote, false);
3379  pq_flush();
3380 
3381  proc_exit(0);
3382  }
3385 }
3386 
3387 /*
3388  * Returns the latest point in WAL that has been safely flushed to disk.
3389  * This should only be called when in recovery.
3390  *
3391  * This is called either by cascading walsender to find WAL postion to be sent
3392  * to a cascaded standby or by slot synchronization operation to validate remote
3393  * slot's lsn before syncing it locally.
3394  *
3395  * As a side-effect, *tli is updated to the TLI of the last
3396  * replayed WAL record.
3397  */
3398 XLogRecPtr
3400 {
3401  XLogRecPtr replayPtr;
3402  TimeLineID replayTLI;
3403  XLogRecPtr receivePtr;
3405  XLogRecPtr result;
3406 
3408 
3409  /*
3410  * We can safely send what's already been replayed. Also, if walreceiver
3411  * is streaming WAL from the same timeline, we can send anything that it
3412  * has streamed, but hasn't been replayed yet.
3413  */
3414 
3415  receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
3416  replayPtr = GetXLogReplayRecPtr(&replayTLI);
3417 
3418  if (tli)
3419  *tli = replayTLI;
3420 
3421  result = replayPtr;
3422  if (receiveTLI == replayTLI && receivePtr > replayPtr)
3423  result = receivePtr;
3424 
3425  return result;
3426 }
3427 
3428 /*
3429  * Request walsenders to reload the currently-open WAL file
3430  */
3431 void
3433 {
3434  int i;
3435 
3436  for (i = 0; i < max_wal_senders; i++)
3437  {
3438  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3439 
3440  SpinLockAcquire(&walsnd->mutex);
3441  if (walsnd->pid == 0)
3442  {
3443  SpinLockRelease(&walsnd->mutex);
3444  continue;
3445  }
3446  walsnd->needreload = true;
3447  SpinLockRelease(&walsnd->mutex);
3448  }
3449 }
3450 
3451 /*
3452  * Handle PROCSIG_WALSND_INIT_STOPPING signal.
3453  */
3454 void
3456 {
3458 
3459  /*
3460  * If replication has not yet started, die like with SIGTERM. If
3461  * replication is active, only set a flag and wake up the main loop. It
3462  * will send any outstanding WAL, wait for it to be replicated to the
3463  * standby, and then exit gracefully.
3464  */
3465  if (!replication_active)
3466  kill(MyProcPid, SIGTERM);
3467  else
3468  got_STOPPING = true;
3469 }
3470 
3471 /*
3472  * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
3473  * sender should already have been switched to WALSNDSTATE_STOPPING at
3474  * this point.
3475  */
3476 static void
3478 {
3479  got_SIGUSR2 = true;
3480  SetLatch(MyLatch);
3481 }
3482 
3483 /* Set up signal handlers */
3484 void
3486 {
3487  /* Set up signal handlers */
3489  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3490  pqsignal(SIGTERM, die); /* request shutdown */
3491  /* SIGQUIT handler was already set up by InitPostmasterChild */
3492  InitializeTimeouts(); /* establishes SIGALRM handler */
3495  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3496  * shutdown */
3497 
3498  /* Reset some signals that are accepted by postmaster but not here */
3500 }
3501 
3502 /* Report shared-memory space needed by WalSndShmemInit */
3503 Size
3505 {
3506  Size size = 0;
3507 
3508  size = offsetof(WalSndCtlData, walsnds);
3509  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3510 
3511  return size;
3512 }
3513 
3514 /* Allocate and initialize walsender-related shared memory */
3515 void
3517 {
3518  bool found;
3519  int i;
3520 
3521  WalSndCtl = (WalSndCtlData *)
3522  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3523 
3524  if (!found)
3525  {
3526  /* First time through, so initialize */
3528 
3529  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3531 
3532  for (i = 0; i < max_wal_senders; i++)
3533  {
3534  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3535 
3536  SpinLockInit(&walsnd->mutex);
3537  }
3538 
3541  }
3542 }
3543 
3544 /*
3545  * Wake up physical, logical or both kinds of walsenders
3546  *
3547  * The distinction between physical and logical walsenders is done, because:
3548  * - physical walsenders can't send data until it's been flushed
3549  * - logical walsenders on standby can't decode and send data until it's been
3550  * applied
3551  *
3552  * For cascading replication we need to wake up physical walsenders separately
3553  * from logical walsenders (see the comment before calling WalSndWakeup() in
3554  * ApplyWalRecord() for more details).
3555  *
3556  * This will be called inside critical sections, so throwing an error is not
3557  * advisable.
3558  */
3559 void
3560 WalSndWakeup(bool physical, bool logical)
3561 {
3562  /*
3563  * Wake up all the walsenders waiting on WAL being flushed or replayed
3564  * respectively. Note that waiting walsender would have prepared to sleep
3565  * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
3566  * before actually waiting.
3567  */
3568  if (physical)
3570 
3571  if (logical)
3573 }
3574 
3575 /*
3576  * Wait for readiness on the FeBe socket, or a timeout. The mask should be
3577  * composed of optional WL_SOCKET_WRITEABLE and WL_SOCKET_READABLE flags. Exit
3578  * on postmaster death.
3579  */
3580 static void
3581 WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
3582 {
3583  WaitEvent event;
3584 
3585  ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
3586 
3587  /*
3588  * We use a condition variable to efficiently wake up walsenders in
3589  * WalSndWakeup().
3590  *
3591  * Every walsender prepares to sleep on a shared memory CV. Note that it
3592  * just prepares to sleep on the CV (i.e., adds itself to the CV's
3593  * waitlist), but does not actually wait on the CV (IOW, it never calls
3594  * ConditionVariableSleep()). It still uses WaitEventSetWait() for
3595  * waiting, because we also need to wait for socket events. The processes
3596  * (startup process, walreceiver etc.) wanting to wake up walsenders use
3597  * ConditionVariableBroadcast(), which in turn calls SetLatch(), helping
3598  * walsenders come out of WaitEventSetWait().
3599  *
3600  * This approach is simple and efficient because, one doesn't have to loop
3601  * through all the walsenders slots, with a spinlock acquisition and
3602  * release for every iteration, just to wake up only the waiting
3603  * walsenders. It makes WalSndWakeup() callers' life easy.
3604  *
3605  * XXX: A desirable future improvement would be to add support for CVs
3606  * into WaitEventSetWait().
3607  *
3608  * And, we use separate shared memory CVs for physical and logical
3609  * walsenders for selective wake ups, see WalSndWakeup() for more details.
3610  */
3613  else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL)
3615 
3616  if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
3617  (event.events & WL_POSTMASTER_DEATH))
3618  {
3620  proc_exit(1);
3621  }
3622 
3624 }
3625 
3626 /*
3627  * Signal all walsenders to move to stopping state.
3628  *
3629  * This will trigger walsenders to move to a state where no further WAL can be
3630  * generated. See this file's header for details.
3631  */
3632 void
3634 {
3635  int i;
3636 
3637  for (i = 0; i < max_wal_senders; i++)
3638  {
3639  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3640  pid_t pid;
3641 
3642  SpinLockAcquire(&walsnd->mutex);
3643  pid = walsnd->pid;
3644  SpinLockRelease(&walsnd->mutex);
3645 
3646  if (pid == 0)
3647  continue;
3648 
3650  }
3651 }
3652 
3653 /*
3654  * Wait that all the WAL senders have quit or reached the stopping state. This
3655  * is used by the checkpointer to control when the shutdown checkpoint can
3656  * safely be performed.
3657  */
3658 void
3660 {
3661  for (;;)
3662  {
3663  int i;
3664  bool all_stopped = true;
3665 
3666  for (i = 0; i < max_wal_senders; i++)
3667  {
3668  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3669 
3670  SpinLockAcquire(&walsnd->mutex);
3671 
3672  if (walsnd->pid == 0)
3673  {
3674  SpinLockRelease(&walsnd->mutex);
3675  continue;
3676  }
3677 
3678  if (walsnd->state != WALSNDSTATE_STOPPING)
3679  {
3680  all_stopped = false;
3681  SpinLockRelease(&walsnd->mutex);
3682  break;
3683  }
3684  SpinLockRelease(&walsnd->mutex);
3685  }
3686 
3687  /* safe to leave if confirmation is done for all WAL senders */
3688  if (all_stopped)
3689  return;
3690 
3691  pg_usleep(10000L); /* wait for 10 msec */
3692  }
3693 }
3694 
3695 /* Set state for current walsender (only called in walsender) */
3696 void
3698 {
3699  WalSnd *walsnd = MyWalSnd;
3700 
3702 
3703  if (walsnd->state == state)
3704  return;
3705 
3706  SpinLockAcquire(&walsnd->mutex);
3707  walsnd->state = state;
3708  SpinLockRelease(&walsnd->mutex);
3709 }
3710 
3711 /*
3712  * Return a string constant representing the state. This is used
3713  * in system views, and should *not* be translated.
3714  */
3715 static const char *
3717 {
3718  switch (state)
3719  {
3720  case WALSNDSTATE_STARTUP:
3721  return "startup";
3722  case WALSNDSTATE_BACKUP:
3723  return "backup";
3724  case WALSNDSTATE_CATCHUP:
3725  return "catchup";
3726  case WALSNDSTATE_STREAMING:
3727  return "streaming";
3728  case WALSNDSTATE_STOPPING:
3729  return "stopping";
3730  }
3731  return "UNKNOWN";
3732 }
3733 
3734 static Interval *
3736 {
3737  Interval *result = palloc(sizeof(Interval));
3738 
3739  result->month = 0;
3740  result->day = 0;
3741  result->time = offset;
3742 
3743  return result;
3744 }
3745 
3746 /*
3747  * Returns activity of walsenders, including pids and xlog locations sent to
3748  * standby servers.
3749  */
3750 Datum
3752 {
3753 #define PG_STAT_GET_WAL_SENDERS_COLS 12
3754  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3755  SyncRepStandbyData *sync_standbys;
3756  int num_standbys;
3757  int i;
3758 
3759  InitMaterializedSRF(fcinfo, 0);
3760 
3761  /*
3762  * Get the currently active synchronous standbys. This could be out of
3763  * date before we're done, but we'll use the data anyway.
3764  */
3765  num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3766 
3767  for (i = 0; i < max_wal_senders; i++)
3768  {
3769  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3770  XLogRecPtr sent_ptr;
3771  XLogRecPtr write;
3772  XLogRecPtr flush;
3773  XLogRecPtr apply;
3774  TimeOffset writeLag;
3775  TimeOffset flushLag;
3776  TimeOffset applyLag;
3777  int priority;
3778  int pid;
3780  TimestampTz replyTime;
3781  bool is_sync_standby;
3783  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS] = {0};
3784  int j;
3785 
3786  /* Collect data from shared memory */
3787  SpinLockAcquire(&walsnd->mutex);
3788  if (walsnd->pid == 0)
3789  {
3790  SpinLockRelease(&walsnd->mutex);
3791  continue;
3792  }
3793  pid = walsnd->pid;
3794  sent_ptr = walsnd->sentPtr;
3795  state = walsnd->state;
3796  write = walsnd->write;
3797  flush = walsnd->flush;
3798  apply = walsnd->apply;
3799  writeLag = walsnd->writeLag;
3800  flushLag = walsnd->flushLag;
3801  applyLag = walsnd->applyLag;
3802  priority = walsnd->sync_standby_priority;
3803  replyTime = walsnd->replyTime;
3804  SpinLockRelease(&walsnd->mutex);
3805 
3806  /*
3807  * Detect whether walsender is/was considered synchronous. We can
3808  * provide some protection against stale data by checking the PID
3809  * along with walsnd_index.
3810  */
3811  is_sync_standby = false;
3812  for (j = 0; j < num_standbys; j++)
3813  {
3814  if (sync_standbys[j].walsnd_index == i &&
3815  sync_standbys[j].pid == pid)
3816  {
3817  is_sync_standby = true;
3818  break;
3819  }
3820  }
3821 
3822  values[0] = Int32GetDatum(pid);
3823 
3824  if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
3825  {
3826  /*
3827  * Only superusers and roles with privileges of pg_read_all_stats
3828  * can see details. Other users only get the pid value to know
3829  * it's a walsender, but no details.
3830  */
3831  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3832  }
3833  else
3834  {
3836 
3837  if (XLogRecPtrIsInvalid(sent_ptr))
3838  nulls[2] = true;
3839  values[2] = LSNGetDatum(sent_ptr);
3840 
3842  nulls[3] = true;
3843  values[3] = LSNGetDatum(write);
3844 
3845  if (XLogRecPtrIsInvalid(flush))
3846  nulls[4] = true;
3847  values[4] = LSNGetDatum(flush);
3848 
3849  if (XLogRecPtrIsInvalid(apply))
3850  nulls[5] = true;
3851  values[5] = LSNGetDatum(apply);
3852 
3853  /*
3854  * Treat a standby such as a pg_basebackup background process
3855  * which always returns an invalid flush location, as an
3856  * asynchronous standby.
3857  */
3858  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3859 
3860  if (writeLag < 0)
3861  nulls[6] = true;
3862  else
3864 
3865  if (flushLag < 0)
3866  nulls[7] = true;
3867  else
3869 
3870  if (applyLag < 0)
3871  nulls[8] = true;
3872  else
3874 
3875  values[9] = Int32GetDatum(priority);
3876 
3877  /*
3878  * More easily understood version of standby state. This is purely
3879  * informational.
3880  *
3881  * In quorum-based sync replication, the role of each standby
3882  * listed in synchronous_standby_names can be changing very
3883  * frequently. Any standbys considered as "sync" at one moment can
3884  * be switched to "potential" ones at the next moment. So, it's
3885  * basically useless to report "sync" or "potential" as their sync
3886  * states. We report just "quorum" for them.
3887  */
3888  if (priority == 0)
3889  values[10] = CStringGetTextDatum("async");
3890  else if (is_sync_standby)
3892  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3893  else
3894  values[10] = CStringGetTextDatum("potential");
3895 
3896  if (replyTime == 0)
3897  nulls[11] = true;
3898  else
3899  values[11] = TimestampTzGetDatum(replyTime);
3900  }
3901 
3902  tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
3903  values, nulls);
3904  }
3905 
3906  return (Datum) 0;
3907 }
3908 
3909 /*
3910  * Send a keepalive message to standby.
3911  *
3912  * If requestReply is set, the message requests the other party to send
3913  * a message back to us, for heartbeat purposes. We also set a flag to
3914  * let nearby code know that we're waiting for that response, to avoid
3915  * repeated requests.
3916  *
3917  * writePtr is the location up to which the WAL is sent. It is essentially
3918  * the same as sentPtr but in some cases, we need to send keep alive before
3919  * sentPtr is updated like when skipping empty transactions.
3920  */
3921 static void
3922 WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
3923 {
3924  elog(DEBUG2, "sending replication keepalive");
3925 
3926  /* construct the message... */
3928  pq_sendbyte(&output_message, 'k');
3929  pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr);
3931  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3932 
3933  /* ... and send it wrapped in CopyData */
3935 
3936  /* Set local flag */
3937  if (requestReply)
3939 }
3940 
3941 /*
3942  * Send keepalive message if too much time has elapsed.
3943  */
3944 static void
3946 {
3947  TimestampTz ping_time;
3948 
3949  /*
3950  * Don't send keepalive messages if timeouts are globally disabled or
3951  * we're doing something not partaking in timeouts.
3952  */
3953  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3954  return;
3955 
3957  return;
3958 
3959  /*
3960  * If half of wal_sender_timeout has lapsed without receiving any reply
3961  * from the standby, send a keep-alive message to the standby requesting
3962  * an immediate reply.
3963  */
3965  wal_sender_timeout / 2);
3966  if (last_processing >= ping_time)
3967  {
3969 
3970  /* Try to flush pending output to the client */
3971  if (pq_flush_if_writable() != 0)
3972  WalSndShutdown();
3973  }
3974 }
3975 
3976 /*
3977  * Record the end of the WAL and the time it was flushed locally, so that
3978  * LagTrackerRead can compute the elapsed time (lag) when this WAL location is
3979  * eventually reported to have been written, flushed and applied by the
3980  * standby in a reply message.
3981  */
3982 static void
3984 {
3985  bool buffer_full;
3986  int new_write_head;
3987  int i;
3988 
3989  if (!am_walsender)
3990  return;
3991 
3992  /*
3993  * If the lsn hasn't advanced since last time, then do nothing. This way
3994  * we only record a new sample when new WAL has been written.
3995  */
3996  if (lag_tracker->last_lsn == lsn)
3997  return;
3998  lag_tracker->last_lsn = lsn;
3999 
4000  /*
4001  * If advancing the write head of the circular buffer would crash into any
4002  * of the read heads, then the buffer is full. In other words, the
4003  * slowest reader (presumably apply) is the one that controls the release
4004  * of space.
4005  */
4006  new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
4007  buffer_full = false;
4008  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
4009  {
4010  if (new_write_head == lag_tracker->read_heads[i])
4011  buffer_full = true;
4012  }
4013 
4014  /*
4015  * If the buffer is full, for now we just rewind by one slot and overwrite
4016  * the last sample, as a simple (if somewhat uneven) way to lower the
4017  * sampling rate. There may be better adaptive compaction algorithms.
4018  */
4019  if (buffer_full)
4020  {
4021  new_write_head = lag_tracker->write_head;
4022  if (lag_tracker->write_head > 0)
4024  else
4026  }
4027 
4028  /* Store a sample at the current write head position. */
4030  lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
4031  lag_tracker->write_head = new_write_head;
4032 }
4033 
4034 /*
4035  * Find out how much time has elapsed between the moment WAL location 'lsn'
4036  * (or the highest known earlier LSN) was flushed locally and the time 'now'.
4037  * We have a separate read head for each of the reported LSN locations we
4038  * receive in replies from standby; 'head' controls which read head is
4039  * used. Whenever a read head crosses an LSN which was written into the
4040  * lag buffer with LagTrackerWrite, we can use the associated timestamp to
4041  * find out the time this LSN (or an earlier one) was flushed locally, and
4042  * therefore compute the lag.
4043  *
4044  * Return -1 if no new sample data is available, and otherwise the elapsed
4045  * time in microseconds.
4046  */
4047 static TimeOffset
4049 {
4050  TimestampTz time = 0;
4051 
4052  /* Read all unread samples up to this LSN or end of buffer. */
4053  while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
4054  lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
4055  {
4056  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
4057  lag_tracker->last_read[head] =
4059  lag_tracker->read_heads[head] =
4061  }
4062 
4063  /*
4064  * If the lag tracker is empty, that means the standby has processed
4065  * everything we've ever sent so we should now clear 'last_read'. If we
4066  * didn't do that, we'd risk using a stale and irrelevant sample for
4067  * interpolation at the beginning of the next burst of WAL after a period
4068  * of idleness.
4069  */
4071  lag_tracker->last_read[head].time = 0;
4072 
4073  if (time > now)
4074  {
4075  /* If the clock somehow went backwards, treat as not found. */
4076  return -1;
4077  }
4078  else if (time == 0)
4079  {
4080  /*
4081  * We didn't cross a time. If there is a future sample that we
4082  * haven't reached yet, and we've already reached at least one sample,
4083  * let's interpolate the local flushed time. This is mainly useful
4084  * for reporting a completely stuck apply position as having
4085  * increasing lag, since otherwise we'd have to wait for it to
4086  * eventually start moving again and cross one of our samples before
4087  * we can show the lag increasing.
4088  */
4090  {
4091  /* There are no future samples, so we can't interpolate. */
4092  return -1;
4093  }
4094  else if (lag_tracker->last_read[head].time != 0)
4095  {
4096  /* We can interpolate between last_read and the next sample. */
4097  double fraction;
4098  WalTimeSample prev = lag_tracker->last_read[head];
4100 
4101  if (lsn < prev.lsn)
4102  {
4103  /*
4104  * Reported LSNs shouldn't normally go backwards, but it's
4105  * possible when there is a timeline change. Treat as not
4106  * found.
4107  */
4108  return -1;
4109  }
4110 
4111  Assert(prev.lsn < next.lsn);
4112 
4113  if (prev.time > next.time)
4114  {
4115  /* If the clock somehow went backwards, treat as not found. */
4116  return -1;
4117  }
4118 
4119  /* See how far we are between the previous and next samples. */
4120  fraction =
4121  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
4122 
4123  /* Scale the local flush time proportionally. */
4124  time = (TimestampTz)
4125  ((double) prev.time + (next.time - prev.time) * fraction);
4126  }
4127  else
4128  {
4129  /*
4130  * We have only a future sample, implying that we were entirely
4131  * caught up but and now there is a new burst of WAL and the
4132  * standby hasn't processed the first sample yet. Until the
4133  * standby reaches the future sample the best we can do is report
4134  * the hypothetical lag if that sample were to be replayed now.
4135  */
4136  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
4137  }
4138  }
4139 
4140  /* Return the elapsed time since local flush time in microseconds. */
4141  Assert(time != 0);
4142  return now - time;
4143 }
bool has_privs_of_role(Oid member, Oid role)
Definition: acl.c:5060
int16 AttrNumber
Definition: attnum.h:21
TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history)
Definition: timeline.c:544
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:572
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:76
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1767
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1791
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1655
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1619
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1854
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
#define InvalidBackendId
Definition: backendid.h:23
void SendBaseBackup(BaseBackupCmd *cmd, IncrementalBackupInfo *ib)
Definition: basebackup.c:991
void AppendIncrementalManifestData(IncrementalBackupInfo *ib, const char *data, int len)
IncrementalBackupInfo * CreateIncrementalBackupInfo(MemoryContext mcxt)
void FinalizeIncrementalManifest(IncrementalBackupInfo *ib)
static int32 next
Definition: blutils.c:221
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define CStringGetTextDatum(s)
Definition: builtins.h:97
#define NameStr(name)
Definition: c.h:735
unsigned int uint32
Definition: c.h:495
#define SIGNAL_ARGS
Definition: c.h:1334
#define PG_BINARY
Definition: c.h:1262
#define pg_attribute_noreturn()
Definition: c.h:206
#define UINT64_FORMAT
Definition: c.h:538
#define MemSet(start, val, len)
Definition: c.h:1009
uint32 TransactionId
Definition: c.h:641
#define OidIsValid(objectId)
Definition: c.h:764
size_t Size
Definition: c.h:594
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:38
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
int64 TimestampTz
Definition: timestamp.h:39
int64 TimeOffset
Definition: timestamp.h:40
char * get_database_name(Oid dbid)
Definition: dbcommands.c:3089
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:91
bool defGetBoolean(DefElem *def)
Definition: define.c:108
char * defGetString(DefElem *def)
Definition: define.c:49
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:166
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:201
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
@ DestRemote
Definition: dest.h:89
@ DestRemoteSimple
Definition: dest.h:91
@ DestNone
Definition: dest.h:87
struct cursor * cur
Definition: ecpg.c:28
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1162
int errcode_for_file_access(void)
Definition: elog.c:883
int errdetail(const char *fmt,...)
Definition: elog.c:1208
bool message_level_is_interesting(int elevel)
Definition: elog.c:277
int errcode(int sqlerrcode)
Definition: elog.c:860
int errmsg(const char *fmt,...)
Definition: elog.c:1075
#define LOG
Definition: elog.h:31
#define COMMERROR
Definition: elog.h:33
#define FATAL
Definition: elog.h:41
#define DEBUG2
Definition: elog.h:29
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
void do_tup_output(TupOutputState *tstate, const Datum *values, const bool *isnull)
Definition: execTuples.c:2274
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2332
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2254
int CloseTransientFile(int fd)
Definition: fd.c:2809
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1087
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2633
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1807
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
int MyProcPid
Definition: globals.c:45
struct Latch * MyLatch
Definition: globals.c:59
Oid MyDatabaseId
Definition: globals.c:90
@ PGC_SIGHUP
Definition: guc.h:71
void ProcessConfigFile(GucContext context)
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc_funcs.c:382
char * application_name
Definition: guc_tables.c:543
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
#define write(a, b, c)
Definition: win32.h:14
#define read(a, b, c)
Definition: win32.h:13
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
void proc_exit(int code)
Definition: ipc.c:104
int j
Definition: isn.c:74
int i
Definition: isn.c:73
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
Definition: latch.c:1050
void SetLatch(Latch *latch)
Definition: latch.c:633
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
Definition: latch.c:1425
void ResetLatch(Latch *latch)
Definition: latch.c:725
#define WL_SOCKET_READABLE
Definition: latch.h:128
#define WL_POSTMASTER_DEATH
Definition: latch.h:131
#define WL_SOCKET_WRITEABLE
Definition: latch.h:129
#define pq_flush()
Definition: libpq.h:46
#define PQ_SMALL_MESSAGE_LIMIT
Definition: libpq.h:30
#define pq_flush_if_writable()
Definition: libpq.h:47
#define pq_is_send_pending()
Definition: libpq.h:48
#define PQ_LARGE_MESSAGE_LIMIT
Definition: libpq.h:31
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:51
#define FeBeWaitSetSocketPos
Definition: libpq.h:63
Assert(fmt[strlen(fmt) - 1] !='\n')
void list_free_deep(List *list)
Definition: list.c:1560
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1829
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:688
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:644
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:110
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:330
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:496
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1168
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1781
void LWLockReleaseAll(void)
Definition: lwlock.c:1876
@ LW_SHARED
Definition: lwlock.h:117
@ LW_EXCLUSIVE
Definition: lwlock.h:116
void MemoryContextSetParent(MemoryContext context, MemoryContext new_parent)
Definition: mcxt.c:546
char * pstrdup(const char *in)
Definition: mcxt.c:1619
void pfree(void *pointer)
Definition: mcxt.c:1431
MemoryContext TopMemoryContext
Definition: mcxt.c:141
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1077
MemoryContext CurrentMemoryContext
Definition: mcxt.c:135
MemoryContext CacheMemoryContext
Definition: mcxt.c:144
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:403
void * palloc(Size size)
Definition: mcxt.c:1201
#define AllocSetContextCreate
Definition: memutils.h:128
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:152
#define HOLD_CANCEL_INTERRUPTS()
Definition: miscadmin.h:141
#define RESUME_CANCEL_INTERRUPTS()
Definition: miscadmin.h:143
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
Oid GetUserId(void)
Definition: miscinit.c:515
@ CMD_SELECT
Definition: nodes.h:255
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
void * arg
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:41
#define MAXPGPATH
const void size_t len
#define lfirst(lc)
Definition: pg_list.h:172
#define NIL
Definition: pg_list.h:68
#define foreach_ptr(type, var, lst)
Definition: pg_list.h:469
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
static bool two_phase
#define die(msg)
static char * buf
Definition: pg_test_fsync.c:73
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:181
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:339
@ PMSIGNAL_ADVANCE_STATE_MACHINE
Definition: pmsignal.h:42
pqsigfunc pqsignal(int signo, pqsigfunc func)
#define snprintf
Definition: port.h:238
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:3001
CommandDest whereToSendOutput
Definition: postgres.c:89
const char * debug_query_string
Definition: postgres.c:86
uintptr_t Datum
Definition: postgres.h:64
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
#define InvalidOid
Definition: postgres_ext.h:36
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1021
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1218
WaitEventSet * FeBeWaitSet
Definition: pqcomm.c:164
void pq_endmsgread(void)
Definition: pqcomm.c:1180
int pq_getbyte(void)
Definition: pqcomm.c:981
void pq_startmsgread(void)
Definition: pqcomm.c:1156
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:418
void pq_sendbytes(StringInfo buf, const void *data, int datalen)
Definition: pqformat.c:126
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:582
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:299
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:402
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:456
void pq_endmessage_reuse(StringInfo buf)
Definition: pqformat.c:317
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PROC_AFFECTS_ALL_HORIZONS
Definition: proc.h:61
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:262
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:639
@ PROCSIG_WALSND_INIT_STOPPING
Definition: procsignal.h:35
#define PqMsg_CopyDone
Definition: protocol.h:64
#define PqMsg_CopyData
Definition: protocol.h:65
#define PqMsg_CopyBothResponse
Definition: protocol.h:54
#define PqMsg_DataRow
Definition: protocol.h:43
#define PqMsg_Terminate
Definition: protocol.h:28
bool update_process_title
Definition: ps_status.c:31
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
@ REPLICATION_KIND_PHYSICAL
Definition: replnodes.h:22
@ REPLICATION_KIND_LOGICAL
Definition: replnodes.h:23
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:413
ResourceOwner CurrentResourceOwner
Definition: resowner.c:165
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
Definition: resowner.c:648
void ResourceOwnerDelete(ResourceOwner owner)
Definition: resowner.c:854
@ RESOURCE_RELEASE_LOCKS
Definition: resowner.h:55
@ RESOURCE_RELEASE_BEFORE_LOCKS
Definition: resowner.h:54
@ RESOURCE_RELEASE_AFTER_LOCKS
Definition: resowner.h:56
Size add_size(Size s1, Size s2)
Definition: shmem.c:494
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:388
Size mul_size(Size s1, Size s2)
Definition: shmem.c:511
void pg_usleep(long microsec)
Definition: signal.c:53
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:426
void ReplicationSlotAlter(const char *name, bool failover)
Definition: slot.c:743
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
Definition: slot.c:272
void ReplicationSlotCleanup(void)
Definition: slot.c:682
void ReplicationSlotMarkDirty(void)
Definition: slot.c:946
void ReplicationSlotReserveWal(void)
Definition: slot.c:1337
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:502
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:985
void ReplicationSlotPersist(void)
Definition: slot.c:963
ReplicationSlot * MyReplicationSlot
Definition: slot.c:116
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:720
void ReplicationSlotSave(void)
Definition: slot.c:928
void ReplicationSlotRelease(void)
Definition: slot.c:606
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:1041
@ RS_PERSISTENT
Definition: slot.h:35
@ RS_EPHEMERAL
Definition: slot.h:36
@ RS_TEMPORARY
Definition: slot.h:37
#define SlotIsLogical(slot)
Definition: slot.h:207
bool IsSyncingReplicationSlots(void)
Definition: slotsync.c:1429
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:669
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:570
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:730
bool FirstSnapshotSet
Definition: snapmgr.c:142
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
Definition: snapmgr.c:1846
#define SpinLockInit(lock)
Definition: spin.h:60
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
PGPROC * MyProc
Definition: proc.c:68
PROC_HDR * ProcGlobal
Definition: proc.c:81
char * dbname
Definition: streamutil.c:51
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:78
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:289
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
ReplicationKind kind
Definition: replnodes.h:56
char * defname
Definition: parsenodes.h:802
int32 day
Definition: timestamp.h:51
int32 month
Definition: timestamp.h:52
TimeOffset time
Definition: timestamp.h:49
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:227
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:229
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:230
int write_head
Definition: walsender.c:228
XLogRecPtr last_lsn
Definition: walsender.c:226
Definition: pg_list.h:54
XLogReaderState * reader
Definition: logical.h:42
struct SnapBuild * snapshot_builder
Definition: logical.h:44
StringInfo out
Definition: logical.h:71
Definition: nodes.h:129
NodeTag type
Definition: nodes.h:130
TransactionId xmin
Definition: proc.h:178
uint8 statusFlags
Definition: proc.h:228
int pgxactoff
Definition: proc.h:188
Latch procLatch
Definition: proc.h:170
uint8 * statusFlags
Definition: proc.h:373
TransactionId xmin
Definition: slot.h:82
TransactionId catalog_xmin
Definition: slot.h:90
XLogRecPtr restart_lsn
Definition: slot.h:93
XLogRecPtr confirmed_flush
Definition: slot.h:104
TransactionId effective_catalog_xmin
Definition: slot.h:175
slock_t mutex
Definition: slot.h:151
bool in_use
Definition: slot.h:154
TransactionId effective_xmin
Definition: slot.h:174
ReplicationSlotPersistentData data
Definition: slot.h:178
TupleDesc setDesc
Definition: execnodes.h:340
Tuplestorestate * setResult
Definition: execnodes.h:339
XLogRecPtr startpoint
Definition: replnodes.h:97
ReplicationKind kind
Definition: replnodes.h:94
TimeLineID timeline
Definition: replnodes.h:96
uint8 syncrep_method
Definition: syncrep.h:68
TimeLineID timeline
Definition: replnodes.h:120
TimeLineID ws_tli
Definition: xlogreader.h:49
uint32 events
Definition: latch.h:155
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv
TimeOffset writeLag
slock_t mutex
XLogRecPtr flush
XLogRecPtr sentPtr
TimeOffset flushLag
WalSndState state
ReplicationKind kind
XLogRecPtr write
TimeOffset applyLag
int sync_standby_priority
bool needreload
Latch * latch
TimestampTz replyTime
XLogRecPtr apply
TimestampTz time
Definition: walsender.c:217
XLogRecPtr lsn
Definition: walsender.c:216
WALSegmentContext segcxt
Definition: xlogreader.h:271
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207
WALOpenSegment seg
Definition: xlogreader.h:272
Definition: regguts.h:323
void SyncRepInitConfig(void)
Definition: syncrep.c:404
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:99
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition: syncrep.c:713
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:433
#define SYNC_REP_PRIORITY
Definition: syncrep.h:35
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:27
#define SyncRepRequested()
Definition: syncrep.h:18
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:23
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:24
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:25
void InitializeTimeouts(void)
Definition: timeout.c:470
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:299
#define InvalidTransactionId
Definition: transam.h:31
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
#define XidFromFullTransactionId(x)
Definition: transam.h:48
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:67
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:678
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition: tuplestore.c:750
char data[BLCKSZ]
Definition: c.h:1108
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
static Datum IntervalPGetDatum(const Interval *X)
Definition: timestamp.h:58
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:288
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:88
static void pgstat_report_wait_end(void)
Definition: wait_event.h:104
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
static void ProcessPendingWrites(void)
Definition: walsender.c:1617
static XLogRecPtr sentPtr
Definition: walsender.c:168
#define READ_REPLICATION_SLOT_COLS
static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
Definition: walsender.c:1442
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3735
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition: walsender.c:3581
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3477
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:200
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2635
static void XLogSendPhysical(void)
Definition: walsender.c:2961
static void ProcessRepliesIfAny(void)
Definition: walsender.c:2106
static bool waiting_for_ping_response
Definition: walsender.c:185
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:592
void WalSndErrorCleanup(void)
Definition: walsender.c:330
static void InitWalSenderSlot(void)
Definition: walsender.c:2788
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
Definition: walsender.c:1126
WalSnd * MyWalSnd
Definition: walsender.c:115
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:2467
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition: walsender.c:493
static StringInfoData tmpbuf
Definition: walsender.c:173
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:2387
static LagTracker * lag_tracker
Definition: walsender.c:233
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:2250
static void IdentifySystem(void)
Definition: walsender.c:410
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2883
static StringInfoData reply_message
Definition: walsender.c:172
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3945
bool am_walsender
Definition: walsender.c:118
void WalSndSetState(WalSndState state)
Definition: walsender.c:3697
static StringInfoData output_message
Definition: walsender.c:171
static TimeLineID sendTimeLine
Definition: walsender.c:159
static bool HandleUploadManifestPacket(StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
Definition: walsender.c:746
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2662
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1575
void WalSndWakeup(bool physical, bool logical)
Definition: walsender.c:3560
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:211
static void XLogSendLogical(void)
Definition: walsender.c:3271
void WalSndShmemInit(void)
Definition: walsender.c:3516
bool am_db_walsender
Definition: walsender.c:121
static volatile sig_atomic_t replication_active
Definition: walsender.c:209
static void UploadManifest(void)
Definition: walsender.c:682
bool wake_wal_senders
Definition: walsender.c:133
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:201
int max_wal_senders
Definition: walsender.c:124
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:2436
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
Definition: walsender.c:1671
bool exec_replication_command(const char *cmd_string)
Definition: walsender.c:1873
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
#define PG_STAT_GET_WAL_SENDERS_COLS
void(* WalSndSendDataCallback)(void)
Definition: walsender.c:239
Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
Definition: walsender.c:3751
void WalSndInitStopping(void)
Definition: walsender.c:3633
void WalSndWaitStopping(void)
Definition: walsender.c:3659
static bool sendTimeLineIsHistoric
Definition: walsender.c:161
void WalSndResourceCleanup(bool isCommit)
Definition: walsender.c:365
void WalSndRqstFileReload(void)
Definition: walsender.c:3432
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1739
bool am_cascading_walsender
Definition: walsender.c:119
static TimestampTz last_processing
Definition: walsender.c:176
Size WalSndShmemSize(void)
Definition: walsender.c:3504
bool log_replication_commands
Definition: walsender.c:128
void HandleWalSndInitStopping(void)
Definition: walsender.c:3455
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:160
static MemoryContext uploaded_manifest_mcxt
Definition: walsender.c:151
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1203
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:1054
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:2282
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
Definition: walsender.c:3922
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3983
void WalSndSignals(void)
Definition: walsender.c:3485
static bool streamingDoneSending
Definition: walsender.c:193
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1455
static IncrementalBackupInfo * uploaded_manifest
Definition: walsender.c:150
static void WalSndShutdown(void)
Definition: walsender.c:243
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2865
int wal_sender_timeout
Definition: walsender.c:126
#define MAX_SEND_SIZE
Definition: walsender.c:109
static void ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover)
Definition: walsender.c:1417
static bool WalSndCaughtUp
Definition: walsender.c:197
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:162
static void ProcessStandbyMessage(void)
Definition: walsender.c:2219
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1548
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1408
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:221
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:4048
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2591
static bool streamingDoneReceiving
Definition: walsender.c:194
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:822
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:3356
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3716
static XLogReaderState * xlogreader
Definition: walsender.c:140
static TimestampTz last_reply_timestamp
Definition: walsender.c:182
XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
Definition: walsender.c:3399
WalSndCtlData * WalSndCtl
Definition: walsender.c:112
void InitWalSender(void)
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22
PGDLLIMPORT Node * replication_parse_result
WalSndState
@ WALSNDSTATE_STREAMING
@ WALSNDSTATE_BACKUP
@ WALSNDSTATE_CATCHUP
@ WALSNDSTATE_STARTUP
@ WALSNDSTATE_STOPPING
void replication_scanner_finish(void)
int replication_yyparse(void)
void replication_scanner_init(const char *str)
bool replication_scanner_is_replication_command(void)
#define SIGCHLD
Definition: win32_port.h:178
#define SIGHUP
Definition: win32_port.h:168
#define SIG_DFL
Definition: win32_port.h:163
#define SIGPIPE
Definition: win32_port.h:173
#define kill(pid, sig)
Definition: win32_port.h:485
#define SIGUSR1
Definition: win32_port.h:180
#define SIGUSR2
Definition: win32_port.h:181
#define SIG_IGN
Definition: win32_port.h:165
static const unsigned __int64 epoch
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4850
bool XactReadOnly
Definition: xact.c:82
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3497
void StartTransactionCommand(void)
Definition: xact.c:2953
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:398
int XactIsoLevel
Definition: xact.c:79
bool IsSubTransaction(void)
Definition: xact.c:4905
bool IsTransactionBlock(void)
Definition: xact.c:4832
void CommitTransactionCommand(void)
Definition: xact.c:3050
#define XACT_REPEATABLE_READ
Definition: xact.h:38
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4463
bool RecoveryInProgress(void)
Definition: xlog.c:6211
TimeLineID GetWALInsertionTimeLine(void)
Definition: xlog.c:6399
Size WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, TimeLineID tli)
Definition: xlog.c:1723
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3662
int wal_segment_size
Definition: xlog.c:147
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:6376
bool XLogBackgroundFlush(void)
Definition: xlog.c:2923
#define MAXFNAMELEN
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
static void XLogFilePath(char *path, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void TLHistoryFilePath(char *path, TimeLineID tli)
static void TLHistoryFileName(char *fname, TimeLineID tli)
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint32 TimeLineID
Definition: xlogdefs.h:59
uint64 XLogSegNo
Definition: xlogdefs.h:48
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:391
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
Definition: xlogreader.c:1505
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition: xlogreader.c:108
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:233
#define XL_ROUTINE(...)
Definition: xlogreader.h:117
static TimeLineID receiveTLI
Definition: xlogrecovery.c:264
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:845
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength, TimeLineID currTLI)
Definition: xlogutils.c:721
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:1023