PostgreSQL Source Code  git master
walsender.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * walsender.c
4  *
5  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6  * care of sending XLOG from the primary server to a single recipient.
7  * (Note that there can be more than one walsender process concurrently.)
8  * It is started by the postmaster when the walreceiver of a standby server
9  * connects to the primary server and requests XLOG streaming replication.
10  *
11  * A walsender is similar to a regular backend, ie. there is a one-to-one
12  * relationship between a connection and a walsender process, but instead
13  * of processing SQL queries, it understands a small set of special
14  * replication-mode commands. The START_REPLICATION command begins streaming
15  * WAL to the client. While streaming, the walsender keeps reading XLOG
16  * records from the disk and sends them to the standby server over the
17  * COPY protocol, until either side ends the replication by exiting COPY
18  * mode (or until the connection is closed).
19  *
20  * Normal termination is by SIGTERM, which instructs the walsender to
21  * close the connection and exit(0) at the next convenient moment. Emergency
22  * termination is by SIGQUIT; like any backend, the walsender will simply
23  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24  * are treated as not a crash but approximately normal termination;
25  * the walsender will exit quickly without sending any more XLOG records.
26  *
27  * If the server is shut down, checkpointer sends us
28  * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If
29  * the backend is idle or runs an SQL query this causes the backend to
30  * shutdown, if logical replication is in progress all existing WAL records
31  * are processed followed by a shutdown. Otherwise this causes the walsender
32  * to switch to the "stopping" state. In this state, the walsender will reject
33  * any further replication commands. The checkpointer begins the shutdown
34  * checkpoint once all walsenders are confirmed as stopping. When the shutdown
35  * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
36  * walsender to send any outstanding WAL, including the shutdown checkpoint
37  * record, wait for it to be replicated to the standby, and then exit.
38  *
39  *
40  * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
41  *
42  * IDENTIFICATION
43  * src/backend/replication/walsender.c
44  *
45  *-------------------------------------------------------------------------
46  */
47 #include "postgres.h"
48 
49 #include <signal.h>
50 #include <unistd.h>
51 
52 #include "access/printtup.h"
53 #include "access/timeline.h"
54 #include "access/transam.h"
55 #include "access/xact.h"
56 #include "access/xlog_internal.h"
57 #include "access/xlogreader.h"
58 #include "access/xlogutils.h"
59 #include "catalog/pg_authid.h"
60 #include "catalog/pg_type.h"
61 #include "commands/dbcommands.h"
62 #include "commands/defrem.h"
63 #include "funcapi.h"
64 #include "libpq/libpq.h"
65 #include "libpq/pqformat.h"
66 #include "miscadmin.h"
67 #include "nodes/replnodes.h"
68 #include "pgstat.h"
69 #include "postmaster/interrupt.h"
70 #include "replication/basebackup.h"
71 #include "replication/decode.h"
72 #include "replication/logical.h"
73 #include "replication/slot.h"
74 #include "replication/snapbuild.h"
75 #include "replication/syncrep.h"
77 #include "replication/walsender.h"
80 #include "storage/fd.h"
81 #include "storage/ipc.h"
82 #include "storage/pmsignal.h"
83 #include "storage/proc.h"
84 #include "storage/procarray.h"
85 #include "tcop/dest.h"
86 #include "tcop/tcopprot.h"
87 #include "utils/acl.h"
88 #include "utils/builtins.h"
89 #include "utils/guc.h"
90 #include "utils/memutils.h"
91 #include "utils/pg_lsn.h"
92 #include "utils/portal.h"
93 #include "utils/ps_status.h"
94 #include "utils/timeout.h"
95 #include "utils/timestamp.h"
96 
97 /*
98  * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
99  *
100  * We don't have a good idea of what a good value would be; there's some
101  * overhead per message in both walsender and walreceiver, but on the other
102  * hand sending large batches makes walsender less responsive to signals
103  * because signals are checked only between messages. 128kB (with
104  * default 8k blocks) seems like a reasonable guess for now.
105  */
106 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
107 
108 /* Array of WalSnds in shared memory */
110 
111 /* My slot in the shared memory array */
112 WalSnd *MyWalSnd = NULL;
113 
114 /* Global state */
115 bool am_walsender = false; /* Am I a walsender process? */
116 bool am_cascading_walsender = false; /* Am I cascading WAL to another
117  * standby? */
118 bool am_db_walsender = false; /* Connected to a database? */
119 
120 /* User-settable parameters for walsender */
121 int max_wal_senders = 0; /* the maximum number of concurrent
122  * walsenders */
123 int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
124  * data message */
126 
127 /*
128  * State for WalSndWakeupRequest
129  */
130 bool wake_wal_senders = false;
131 
132 /*
133  * xlogreader used for replication. Note that a WAL sender doing physical
134  * replication does not need xlogreader to read WAL, but it needs one to
135  * keep a state of its work.
136  */
138 
139 /*
140  * These variables keep track of the state of the timeline we're currently
141  * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
142  * the timeline is not the latest timeline on this server, and the server's
143  * history forked off from that timeline at sendTimeLineValidUpto.
144  */
147 static bool sendTimeLineIsHistoric = false;
149 
150 /*
151  * How far have we sent WAL already? This is also advertised in
152  * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
153  */
155 
156 /* Buffers for constructing outgoing messages and processing reply messages. */
160 
161 /* Timestamp of last ProcessRepliesIfAny(). */
163 
164 /*
165  * Timestamp of last ProcessRepliesIfAny() that saw a reply from the
166  * standby. Set to 0 if wal_sender_timeout doesn't need to be active.
167  */
169 
170 /* Have we sent a heartbeat message asking for reply, since last reply? */
171 static bool waiting_for_ping_response = false;
172 
173 /*
174  * While streaming WAL in Copy mode, streamingDoneSending is set to true
175  * after we have sent CopyDone. We should not send any more CopyData messages
176  * after that. streamingDoneReceiving is set to true when we receive CopyDone
177  * from the other end. When both become true, it's time to exit Copy mode.
178  */
181 
182 /* Are we there yet? */
183 static bool WalSndCaughtUp = false;
184 
185 /* Flags set by signal handlers for later service in main loop */
186 static volatile sig_atomic_t got_SIGUSR2 = false;
187 static volatile sig_atomic_t got_STOPPING = false;
188 
189 /*
190  * This is set while we are streaming. When not set
191  * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
192  * the main loop is responsible for checking got_STOPPING and terminating when
193  * it's set (after streaming any remaining WAL).
194  */
195 static volatile sig_atomic_t replication_active = false;
196 
198 
199 /* A sample associating a WAL location with the time it was written. */
200 typedef struct
201 {
204 } WalTimeSample;
205 
206 /* The size of our buffer of time samples. */
207 #define LAG_TRACKER_BUFFER_SIZE 8192
208 
209 /* A mechanism for tracking replication lag. */
210 typedef struct
211 {
215  int read_heads[NUM_SYNC_REP_WAIT_MODE];
217 } LagTracker;
218 
220 
221 /* Signal handlers */
223 
224 /* Prototypes for private functions */
225 typedef void (*WalSndSendDataCallback) (void);
226 static void WalSndLoop(WalSndSendDataCallback send_data);
227 static void InitWalSenderSlot(void);
228 static void WalSndKill(int code, Datum arg);
230 static void XLogSendPhysical(void);
231 static void XLogSendLogical(void);
232 static void WalSndDone(WalSndSendDataCallback send_data);
233 static XLogRecPtr GetStandbyFlushRecPtr(void);
234 static void IdentifySystem(void);
237 static void StartReplication(StartReplicationCmd *cmd);
239 static void ProcessStandbyMessage(void);
240 static void ProcessStandbyReplyMessage(void);
241 static void ProcessStandbyHSFeedbackMessage(void);
242 static void ProcessRepliesIfAny(void);
243 static void WalSndKeepalive(bool requestReply);
244 static void WalSndKeepaliveIfNecessary(void);
245 static void WalSndCheckTimeOut(void);
247 static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
248 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
249 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
252 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
253 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
255 
256 static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
257  TimeLineID *tli_p);
258 
259 
260 /* Initialize walsender process before entering the main command loop */
261 void
262 InitWalSender(void)
263 {
265 
266  /* Create a per-walsender data structure in shared memory */
268 
269  /*
270  * We don't currently need any ResourceOwner in a walsender process, but
271  * if we did, we could call CreateAuxProcessResourceOwner here.
272  */
273 
274  /*
275  * Let postmaster know that we're a WAL sender. Once we've declared us as
276  * a WAL sender process, postmaster will let us outlive the bgwriter and
277  * kill us last in the shutdown sequence, so we get a chance to stream all
278  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
279  * there's no going back, and we mustn't write any WAL records after this.
280  */
283 
284  /* Initialize empty timestamp buffer for lag tracking. */
285  lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
286 }
287 
288 /*
289  * Clean up after an error.
290  *
291  * WAL sender processes don't use transactions like regular backends do.
292  * This function does any cleanup required after an error in a WAL sender
293  * process, similar to what transaction abort does in a regular backend.
294  */
295 void
297 {
301 
302  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
303  wal_segment_close(xlogreader);
304 
305  if (MyReplicationSlot != NULL)
307 
309 
310  replication_active = false;
311 
312  /*
313  * If there is a transaction in progress, it will clean up our
314  * ResourceOwner, but if a replication command set up a resource owner
315  * without a transaction, we've got to clean that up now.
316  */
318  WalSndResourceCleanup(false);
319 
320  if (got_STOPPING || got_SIGUSR2)
321  proc_exit(0);
322 
323  /* Revert back to startup state */
325 }
326 
327 /*
328  * Clean up any ResourceOwner we created.
329  */
330 void
331 WalSndResourceCleanup(bool isCommit)
332 {
333  ResourceOwner resowner;
334 
335  if (CurrentResourceOwner == NULL)
336  return;
337 
338  /*
339  * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
340  * in a local variable and clear it first.
341  */
342  resowner = CurrentResourceOwner;
343  CurrentResourceOwner = NULL;
344 
345  /* Now we can release resources and delete it. */
346  ResourceOwnerRelease(resowner,
347  RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
348  ResourceOwnerRelease(resowner,
349  RESOURCE_RELEASE_LOCKS, isCommit, true);
350  ResourceOwnerRelease(resowner,
351  RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
352  ResourceOwnerDelete(resowner);
353 }
354 
355 /*
356  * Handle a client's connection abort in an orderly manner.
357  */
358 static void
359 WalSndShutdown(void)
360 {
361  /*
362  * Reset whereToSendOutput to prevent ereport from attempting to send any
363  * more messages to the standby.
364  */
367 
368  proc_exit(0);
369  abort(); /* keep the compiler quiet */
370 }
371 
372 /*
373  * Handle the IDENTIFY_SYSTEM command.
374  */
375 static void
377 {
378  char sysid[32];
379  char xloc[MAXFNAMELEN];
380  XLogRecPtr logptr;
381  char *dbname = NULL;
383  TupOutputState *tstate;
384  TupleDesc tupdesc;
385  Datum values[4];
386  bool nulls[4];
387 
388  /*
389  * Reply with a result set with one row, four columns. First col is system
390  * ID, second is timeline ID, third is current xlog location and the
391  * fourth contains the database name if we are connected to one.
392  */
393 
394  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
396 
399  {
400  /* this also updates ThisTimeLineID */
401  logptr = GetStandbyFlushRecPtr();
402  }
403  else
404  logptr = GetFlushRecPtr();
405 
406  snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
407 
408  if (MyDatabaseId != InvalidOid)
409  {
411 
412  /* syscache access needs a transaction env. */
414  /* make dbname live outside TX context */
418  /* CommitTransactionCommand switches to TopMemoryContext */
420  }
421 
423  MemSet(nulls, false, sizeof(nulls));
424 
425  /* need a tuple descriptor representing four columns */
426  tupdesc = CreateTemplateTupleDesc(4);
427  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
428  TEXTOID, -1, 0);
429  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
430  INT4OID, -1, 0);
431  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
432  TEXTOID, -1, 0);
433  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
434  TEXTOID, -1, 0);
435 
436  /* prepare for projection of tuples */
437  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
438 
439  /* column 1: system identifier */
440  values[0] = CStringGetTextDatum(sysid);
441 
442  /* column 2: timeline */
443  values[1] = Int32GetDatum(ThisTimeLineID);
444 
445  /* column 3: wal location */
446  values[2] = CStringGetTextDatum(xloc);
447 
448  /* column 4: database name, or NULL if none */
449  if (dbname)
450  values[3] = CStringGetTextDatum(dbname);
451  else
452  nulls[3] = true;
453 
454  /* send it to dest */
455  do_tup_output(tstate, values, nulls);
456 
457  end_tup_output(tstate);
458 }
459 
460 
461 /*
462  * Handle TIMELINE_HISTORY command.
463  */
464 static void
466 {
468  char histfname[MAXFNAMELEN];
469  char path[MAXPGPATH];
470  int fd;
471  off_t histfilelen;
472  off_t bytesleft;
473  Size len;
474 
475  /*
476  * Reply with a result set with one row, and two columns. The first col is
477  * the name of the history file, 2nd is the contents.
478  */
479 
480  TLHistoryFileName(histfname, cmd->timeline);
481  TLHistoryFilePath(path, cmd->timeline);
482 
483  /* Send a RowDescription message */
484  pq_beginmessage(&buf, 'T');
485  pq_sendint16(&buf, 2); /* 2 fields */
486 
487  /* first field */
488  pq_sendstring(&buf, "filename"); /* col name */
489  pq_sendint32(&buf, 0); /* table oid */
490  pq_sendint16(&buf, 0); /* attnum */
491  pq_sendint32(&buf, TEXTOID); /* type oid */
492  pq_sendint16(&buf, -1); /* typlen */
493  pq_sendint32(&buf, 0); /* typmod */
494  pq_sendint16(&buf, 0); /* format code */
495 
496  /* second field */
497  pq_sendstring(&buf, "content"); /* col name */
498  pq_sendint32(&buf, 0); /* table oid */
499  pq_sendint16(&buf, 0); /* attnum */
500  pq_sendint32(&buf, TEXTOID); /* type oid */
501  pq_sendint16(&buf, -1); /* typlen */
502  pq_sendint32(&buf, 0); /* typmod */
503  pq_sendint16(&buf, 0); /* format code */
504  pq_endmessage(&buf);
505 
506  /* Send a DataRow message */
507  pq_beginmessage(&buf, 'D');
508  pq_sendint16(&buf, 2); /* # of columns */
509  len = strlen(histfname);
510  pq_sendint32(&buf, len); /* col1 len */
511  pq_sendbytes(&buf, histfname, len);
512 
513  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
514  if (fd < 0)
515  ereport(ERROR,
517  errmsg("could not open file \"%s\": %m", path)));
518 
519  /* Determine file length and send it to client */
520  histfilelen = lseek(fd, 0, SEEK_END);
521  if (histfilelen < 0)
522  ereport(ERROR,
524  errmsg("could not seek to end of file \"%s\": %m", path)));
525  if (lseek(fd, 0, SEEK_SET) != 0)
526  ereport(ERROR,
528  errmsg("could not seek to beginning of file \"%s\": %m", path)));
529 
530  pq_sendint32(&buf, histfilelen); /* col2 len */
531 
532  bytesleft = histfilelen;
533  while (bytesleft > 0)
534  {
535  PGAlignedBlock rbuf;
536  int nread;
537 
539  nread = read(fd, rbuf.data, sizeof(rbuf));
541  if (nread < 0)
542  ereport(ERROR,
544  errmsg("could not read file \"%s\": %m",
545  path)));
546  else if (nread == 0)
547  ereport(ERROR,
549  errmsg("could not read file \"%s\": read %d of %zu",
550  path, nread, (Size) bytesleft)));
551 
552  pq_sendbytes(&buf, rbuf.data, nread);
553  bytesleft -= nread;
554  }
555 
556  if (CloseTransientFile(fd) != 0)
557  ereport(ERROR,
559  errmsg("could not close file \"%s\": %m", path)));
560 
561  pq_endmessage(&buf);
562 }
563 
564 /*
565  * Handle START_REPLICATION command.
566  *
567  * At the moment, this never returns, but an ereport(ERROR) will take us back
568  * to the main loop.
569  */
570 static void
572 {
574  XLogRecPtr FlushPtr;
575 
576  /* create xlogreader for physical replication */
577  xlogreader =
579  XL_ROUTINE(.segment_open = WalSndSegmentOpen,
580  .segment_close = wal_segment_close),
581  NULL);
582 
583  if (!xlogreader)
584  ereport(ERROR,
585  (errcode(ERRCODE_OUT_OF_MEMORY),
586  errmsg("out of memory")));
587 
588  /*
589  * We assume here that we're logging enough information in the WAL for
590  * log-shipping, since this is checked in PostmasterMain().
591  *
592  * NOTE: wal_level can only change at shutdown, so in most cases it is
593  * difficult for there to be WAL data that we can still see that was
594  * written at wal_level='minimal'.
595  */
596 
597  if (cmd->slotname)
598  {
599  ReplicationSlotAcquire(cmd->slotname, true);
601  ereport(ERROR,
602  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
603  errmsg("cannot use a logical replication slot for physical replication")));
604 
605  /*
606  * We don't need to verify the slot's restart_lsn here; instead we
607  * rely on the caller requesting the starting point to use. If the
608  * WAL segment doesn't exist, we'll fail later.
609  */
610  }
611 
612  /*
613  * Select the timeline. If it was given explicitly by the client, use
614  * that. Otherwise use the timeline of the last replayed record, which is
615  * kept in ThisTimeLineID.
616  */
619  {
620  /* this also updates ThisTimeLineID */
621  FlushPtr = GetStandbyFlushRecPtr();
622  }
623  else
624  FlushPtr = GetFlushRecPtr();
625 
626  if (cmd->timeline != 0)
627  {
628  XLogRecPtr switchpoint;
629 
630  sendTimeLine = cmd->timeline;
632  {
633  sendTimeLineIsHistoric = false;
635  }
636  else
637  {
638  List *timeLineHistory;
639 
640  sendTimeLineIsHistoric = true;
641 
642  /*
643  * Check that the timeline the client requested exists, and the
644  * requested start location is on that timeline.
645  */
646  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
647  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
649  list_free_deep(timeLineHistory);
650 
651  /*
652  * Found the requested timeline in the history. Check that
653  * requested startpoint is on that timeline in our history.
654  *
655  * This is quite loose on purpose. We only check that we didn't
656  * fork off the requested timeline before the switchpoint. We
657  * don't check that we switched *to* it before the requested
658  * starting point. This is because the client can legitimately
659  * request to start replication from the beginning of the WAL
660  * segment that contains switchpoint, but on the new timeline, so
661  * that it doesn't end up with a partial segment. If you ask for
662  * too old a starting point, you'll get an error later when we
663  * fail to find the requested WAL segment in pg_wal.
664  *
665  * XXX: we could be more strict here and only allow a startpoint
666  * that's older than the switchpoint, if it's still in the same
667  * WAL segment.
668  */
669  if (!XLogRecPtrIsInvalid(switchpoint) &&
670  switchpoint < cmd->startpoint)
671  {
672  ereport(ERROR,
673  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
675  cmd->timeline),
676  errdetail("This server's history forked from timeline %u at %X/%X.",
677  cmd->timeline,
678  LSN_FORMAT_ARGS(switchpoint))));
679  }
680  sendTimeLineValidUpto = switchpoint;
681  }
682  }
683  else
684  {
687  sendTimeLineIsHistoric = false;
688  }
689 
691 
692  /* If there is nothing to stream, don't even enter COPY mode */
694  {
695  /*
696  * When we first start replication the standby will be behind the
697  * primary. For some applications, for example synchronous
698  * replication, it is important to have a clear state for this initial
699  * catchup mode, so we can trigger actions when we change streaming
700  * state later. We may stay in this state for a long time, which is
701  * exactly why we want to be able to monitor whether or not we are
702  * still here.
703  */
705 
706  /* Send a CopyBothResponse message, and start streaming */
707  pq_beginmessage(&buf, 'W');
708  pq_sendbyte(&buf, 0);
709  pq_sendint16(&buf, 0);
710  pq_endmessage(&buf);
711  pq_flush();
712 
713  /*
714  * Don't allow a request to stream from a future point in WAL that
715  * hasn't been flushed to disk in this server yet.
716  */
717  if (FlushPtr < cmd->startpoint)
718  {
719  ereport(ERROR,
720  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
722  LSN_FORMAT_ARGS(FlushPtr))));
723  }
724 
725  /* Start streaming from the requested point */
726  sentPtr = cmd->startpoint;
727 
728  /* Initialize shared memory status, too */
729  SpinLockAcquire(&MyWalSnd->mutex);
730  MyWalSnd->sentPtr = sentPtr;
731  SpinLockRelease(&MyWalSnd->mutex);
732 
734 
735  /* Main loop of walsender */
736  replication_active = true;
737 
739 
740  replication_active = false;
741  if (got_STOPPING)
742  proc_exit(0);
744 
746  }
747 
748  if (cmd->slotname)
750 
751  /*
752  * Copy is finished now. Send a single-row result set indicating the next
753  * timeline.
754  */
756  {
757  char startpos_str[8 + 1 + 8 + 1];
759  TupOutputState *tstate;
760  TupleDesc tupdesc;
761  Datum values[2];
762  bool nulls[2];
763 
764  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
766 
768  MemSet(nulls, false, sizeof(nulls));
769 
770  /*
771  * Need a tuple descriptor representing two columns. int8 may seem
772  * like a surprising data type for this, but in theory int4 would not
773  * be wide enough for this, as TimeLineID is unsigned.
774  */
775  tupdesc = CreateTemplateTupleDesc(2);
776  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
777  INT8OID, -1, 0);
778  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
779  TEXTOID, -1, 0);
780 
781  /* prepare for projection of tuple */
782  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
783 
784  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
785  values[1] = CStringGetTextDatum(startpos_str);
786 
787  /* send it to dest */
788  do_tup_output(tstate, values, nulls);
789 
790  end_tup_output(tstate);
791  }
792 
793  /* Send CommandComplete message */
794  EndReplicationCommand("START_STREAMING");
795 }
796 
797 /*
798  * XLogReaderRoutine->page_read callback for logical decoding contexts, as a
799  * walsender process.
800  *
801  * Inside the walsender we can do better than read_local_xlog_page,
802  * which has to do a plain sleep/busy loop, because the walsender's latch gets
803  * set every time WAL is flushed.
804  */
805 static int
807  XLogRecPtr targetRecPtr, char *cur_page)
808 {
809  XLogRecPtr flushptr;
810  int count;
811  WALReadError errinfo;
812  XLogSegNo segno;
813 
814  XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
816  sendTimeLine = state->currTLI;
818  sendTimeLineNextTLI = state->nextTLI;
819 
820  /* make sure we have enough WAL available */
821  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
822 
823  /* fail if not (implies we are going to shut down) */
824  if (flushptr < targetPagePtr + reqLen)
825  return -1;
826 
827  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
828  count = XLOG_BLCKSZ; /* more than one block available */
829  else
830  count = flushptr - targetPagePtr; /* part of the page available */
831 
832  /* now actually read the data, we know it's there */
833  if (!WALRead(state,
834  cur_page,
835  targetPagePtr,
836  XLOG_BLCKSZ,
837  state->seg.ws_tli, /* Pass the current TLI because only
838  * WalSndSegmentOpen controls whether new
839  * TLI is needed. */
840  &errinfo))
841  WALReadRaiseError(&errinfo);
842 
843  /*
844  * After reading into the buffer, check that what we read was valid. We do
845  * this after reading, because even though the segment was present when we
846  * opened it, it might get recycled or removed while we read it. The
847  * read() succeeds in that case, but the data we tried to read might
848  * already have been overwritten with new WAL records.
849  */
850  XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
851  CheckXLogRemoved(segno, state->seg.ws_tli);
852 
853  return count;
854 }
855 
856 /*
857  * Process extra options given to CREATE_REPLICATION_SLOT.
858  */
859 static void
861  bool *reserve_wal,
862  CRSSnapshotAction *snapshot_action,
863  bool *two_phase)
864 {
865  ListCell *lc;
866  bool snapshot_action_given = false;
867  bool reserve_wal_given = false;
868  bool two_phase_given = false;
869 
870  /* Parse options */
871  foreach(lc, cmd->options)
872  {
873  DefElem *defel = (DefElem *) lfirst(lc);
874 
875  if (strcmp(defel->defname, "export_snapshot") == 0)
876  {
877  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
878  ereport(ERROR,
879  (errcode(ERRCODE_SYNTAX_ERROR),
880  errmsg("conflicting or redundant options")));
881 
882  snapshot_action_given = true;
883  *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
885  }
886  else if (strcmp(defel->defname, "use_snapshot") == 0)
887  {
888  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
889  ereport(ERROR,
890  (errcode(ERRCODE_SYNTAX_ERROR),
891  errmsg("conflicting or redundant options")));
892 
893  snapshot_action_given = true;
894  *snapshot_action = CRS_USE_SNAPSHOT;
895  }
896  else if (strcmp(defel->defname, "reserve_wal") == 0)
897  {
898  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
899  ereport(ERROR,
900  (errcode(ERRCODE_SYNTAX_ERROR),
901  errmsg("conflicting or redundant options")));
902 
903  reserve_wal_given = true;
904  *reserve_wal = true;
905  }
906  else if (strcmp(defel->defname, "two_phase") == 0)
907  {
908  if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL)
909  ereport(ERROR,
910  (errcode(ERRCODE_SYNTAX_ERROR),
911  errmsg("conflicting or redundant options")));
912  two_phase_given = true;
913  *two_phase = true;
914  }
915  else
916  elog(ERROR, "unrecognized option: %s", defel->defname);
917  }
918 }
919 
920 /*
921  * Create a new replication slot.
922  */
923 static void
925 {
926  const char *snapshot_name = NULL;
927  char xloc[MAXFNAMELEN];
928  char *slot_name;
929  bool reserve_wal = false;
930  bool two_phase = false;
931  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
933  TupOutputState *tstate;
934  TupleDesc tupdesc;
935  Datum values[4];
936  bool nulls[4];
937 
939 
940  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase);
941 
942  /* setup state for WalSndSegmentOpen */
943  sendTimeLineIsHistoric = false;
945 
946  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
947  {
948  ReplicationSlotCreate(cmd->slotname, false,
950  false);
951  }
952  else
953  {
955 
956  /*
957  * Initially create persistent slot as ephemeral - that allows us to
958  * nicely handle errors during initialization because it'll get
959  * dropped if this transaction fails. We'll make it persistent at the
960  * end. Temporary slots can be created as temporary from beginning as
961  * they get dropped on error as well.
962  */
963  ReplicationSlotCreate(cmd->slotname, true,
965  two_phase);
966  }
967 
968  if (cmd->kind == REPLICATION_KIND_LOGICAL)
969  {
971  bool need_full_snapshot = false;
972 
973  /*
974  * Do options check early so that we can bail before calling the
975  * DecodingContextFindStartpoint which can take long time.
976  */
977  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
978  {
979  if (IsTransactionBlock())
980  ereport(ERROR,
981  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
982  (errmsg("%s must not be called inside a transaction",
983  "CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT")));
984 
985  need_full_snapshot = true;
986  }
987  else if (snapshot_action == CRS_USE_SNAPSHOT)
988  {
989  if (!IsTransactionBlock())
990  ereport(ERROR,
991  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
992  (errmsg("%s must be called inside a transaction",
993  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
994 
996  ereport(ERROR,
997  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
998  (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
999  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
1000 
1001  if (FirstSnapshotSet)
1002  ereport(ERROR,
1003  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1004  (errmsg("%s must be called before any query",
1005  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
1006 
1007  if (IsSubTransaction())
1008  ereport(ERROR,
1009  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1010  (errmsg("%s must not be called in a subtransaction",
1011  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
1012 
1013  need_full_snapshot = true;
1014  }
1015 
1016  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
1018  XL_ROUTINE(.page_read = logical_read_xlog_page,
1019  .segment_open = WalSndSegmentOpen,
1020  .segment_close = wal_segment_close),
1023 
1024  /*
1025  * Signal that we don't need the timeout mechanism. We're just
1026  * creating the replication slot and don't yet accept feedback
1027  * messages or send keepalives. As we possibly need to wait for
1028  * further WAL the walsender would otherwise possibly be killed too
1029  * soon.
1030  */
1032 
1033  /* build initial snapshot, might take a while */
1035 
1036  /*
1037  * Export or use the snapshot if we've been asked to do so.
1038  *
1039  * NB. We will convert the snapbuild.c kind of snapshot to normal
1040  * snapshot when doing this.
1041  */
1042  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1043  {
1044  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
1045  }
1046  else if (snapshot_action == CRS_USE_SNAPSHOT)
1047  {
1048  Snapshot snap;
1049 
1052  }
1053 
1054  /* don't need the decoding context anymore */
1055  FreeDecodingContext(ctx);
1056 
1057  if (!cmd->temporary)
1059  }
1060  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
1061  {
1063 
1065 
1066  /* Write this slot to disk if it's a permanent one. */
1067  if (!cmd->temporary)
1069  }
1070 
1071  snprintf(xloc, sizeof(xloc), "%X/%X",
1073 
1075  MemSet(nulls, false, sizeof(nulls));
1076 
1077  /*----------
1078  * Need a tuple descriptor representing four columns:
1079  * - first field: the slot name
1080  * - second field: LSN at which we became consistent
1081  * - third field: exported snapshot's name
1082  * - fourth field: output plugin
1083  *----------
1084  */
1085  tupdesc = CreateTemplateTupleDesc(4);
1086  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1087  TEXTOID, -1, 0);
1088  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1089  TEXTOID, -1, 0);
1090  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1091  TEXTOID, -1, 0);
1092  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1093  TEXTOID, -1, 0);
1094 
1095  /* prepare for projection of tuples */
1096  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1097 
1098  /* slot_name */
1099  slot_name = NameStr(MyReplicationSlot->data.name);
1100  values[0] = CStringGetTextDatum(slot_name);
1101 
1102  /* consistent wal location */
1103  values[1] = CStringGetTextDatum(xloc);
1104 
1105  /* snapshot name, or NULL if none */
1106  if (snapshot_name != NULL)
1107  values[2] = CStringGetTextDatum(snapshot_name);
1108  else
1109  nulls[2] = true;
1110 
1111  /* plugin, or NULL if none */
1112  if (cmd->plugin != NULL)
1113  values[3] = CStringGetTextDatum(cmd->plugin);
1114  else
1115  nulls[3] = true;
1116 
1117  /* send it to dest */
1118  do_tup_output(tstate, values, nulls);
1119  end_tup_output(tstate);
1120 
1122 }
1123 
1124 /*
1125  * Get rid of a replication slot that is no longer wanted.
1126  */
1127 static void
1129 {
1130  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1131 }
1132 
1133 /*
1134  * Load previously initiated logical slot and prepare for sending data (via
1135  * WalSndLoop).
1136  */
1137 static void
1139 {
1141  QueryCompletion qc;
1142 
1143  /* make sure that our requirements are still fulfilled */
1145 
1147 
1148  ReplicationSlotAcquire(cmd->slotname, true);
1149 
1151  ereport(ERROR,
1152  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1153  errmsg("cannot read from logical replication slot \"%s\"",
1154  cmd->slotname),
1155  errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
1156 
1157  /*
1158  * Force a disconnect, so that the decoding code doesn't need to care
1159  * about an eventual switch from running in recovery, to running in a
1160  * normal environment. Client code is expected to handle reconnects.
1161  */
1163  {
1164  ereport(LOG,
1165  (errmsg("terminating walsender process after promotion")));
1166  got_STOPPING = true;
1167  }
1168 
1169  /*
1170  * Create our decoding context, making it start at the previously ack'ed
1171  * position.
1172  *
1173  * Do this before sending a CopyBothResponse message, so that any errors
1174  * are reported early.
1175  */
1176  logical_decoding_ctx =
1177  CreateDecodingContext(cmd->startpoint, cmd->options, false,
1178  XL_ROUTINE(.page_read = logical_read_xlog_page,
1179  .segment_open = WalSndSegmentOpen,
1180  .segment_close = wal_segment_close),
1183  xlogreader = logical_decoding_ctx->reader;
1184 
1186 
1187  /* Send a CopyBothResponse message, and start streaming */
1188  pq_beginmessage(&buf, 'W');
1189  pq_sendbyte(&buf, 0);
1190  pq_sendint16(&buf, 0);
1191  pq_endmessage(&buf);
1192  pq_flush();
1193 
1194  /* Start reading WAL from the oldest required WAL. */
1195  XLogBeginRead(logical_decoding_ctx->reader,
1197 
1198  /*
1199  * Report the location after which we'll send out further commits as the
1200  * current sentPtr.
1201  */
1203 
1204  /* Also update the sent position status in shared memory */
1205  SpinLockAcquire(&MyWalSnd->mutex);
1207  SpinLockRelease(&MyWalSnd->mutex);
1208 
1209  replication_active = true;
1210 
1212 
1213  /* Main loop of walsender */
1215 
1216  FreeDecodingContext(logical_decoding_ctx);
1218 
1219  replication_active = false;
1220  if (got_STOPPING)
1221  proc_exit(0);
1223 
1224  /* Get out of COPY mode (CommandComplete). */
1225  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
1226  EndCommand(&qc, DestRemote, false);
1227 }
1228 
1229 /*
1230  * LogicalDecodingContext 'prepare_write' callback.
1231  *
1232  * Prepare a write into a StringInfo.
1233  *
1234  * Don't do anything lasting in here, it's quite possible that nothing will be done
1235  * with the data.
1236  */
1237 static void
1239 {
1240  /* can't have sync rep confused by sending the same LSN several times */
1241  if (!last_write)
1242  lsn = InvalidXLogRecPtr;
1243 
1244  resetStringInfo(ctx->out);
1245 
1246  pq_sendbyte(ctx->out, 'w');
1247  pq_sendint64(ctx->out, lsn); /* dataStart */
1248  pq_sendint64(ctx->out, lsn); /* walEnd */
1249 
1250  /*
1251  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1252  * reserve space here.
1253  */
1254  pq_sendint64(ctx->out, 0); /* sendtime */
1255 }
1256 
1257 /*
1258  * LogicalDecodingContext 'write' callback.
1259  *
1260  * Actually write out data previously prepared by WalSndPrepareWrite out to
1261  * the network. Take as long as needed, but process replies from the other
1262  * side and check timeouts during that.
1263  */
1264 static void
1266  bool last_write)
1267 {
1268  TimestampTz now;
1269 
1270  /*
1271  * Fill the send timestamp last, so that it is taken as late as possible.
1272  * This is somewhat ugly, but the protocol is set as it's already used for
1273  * several releases by streaming physical replication.
1274  */
1275  resetStringInfo(&tmpbuf);
1276  now = GetCurrentTimestamp();
1277  pq_sendint64(&tmpbuf, now);
1278  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1279  tmpbuf.data, sizeof(int64));
1280 
1281  /* output previously gathered data in a CopyData packet */
1282  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1283 
1285 
1286  /* Try to flush pending output to the client */
1287  if (pq_flush_if_writable() != 0)
1288  WalSndShutdown();
1289 
1290  /* Try taking fast path unless we get too close to walsender timeout. */
1292  wal_sender_timeout / 2) &&
1293  !pq_is_send_pending())
1294  {
1295  return;
1296  }
1297 
1298  /* If we have pending write here, go to slow path */
1299  for (;;)
1300  {
1301  long sleeptime;
1302 
1303  /* Check for input from the client */
1305 
1306  /* die if timeout was reached */
1308 
1309  /* Send keepalive if the time has come */
1311 
1312  if (!pq_is_send_pending())
1313  break;
1314 
1316 
1317  /* Sleep until something happens or we time out */
1320 
1321  /* Clear any already-pending wakeups */
1323 
1325 
1326  /* Process any requests or signals received recently */
1327  if (ConfigReloadPending)
1328  {
1329  ConfigReloadPending = false;
1332  }
1333 
1334  /* Try to flush pending output to the client */
1335  if (pq_flush_if_writable() != 0)
1336  WalSndShutdown();
1337  }
1338 
1339  /* reactivate latch so WalSndLoop knows to continue */
1340  SetLatch(MyLatch);
1341 }
1342 
1343 /*
1344  * LogicalDecodingContext 'update_progress' callback.
1345  *
1346  * Write the current position to the lag tracker (see XLogSendPhysical).
1347  */
1348 static void
1350 {
1351  static TimestampTz sendTime = 0;
1353 
1354  /*
1355  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1356  * avoid flooding the lag tracker when we commit frequently.
1357  */
1358 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1359  if (!TimestampDifferenceExceeds(sendTime, now,
1361  return;
1362 
1363  LagTrackerWrite(lsn, now);
1364  sendTime = now;
1365 }
1366 
1367 /*
1368  * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
1369  *
1370  * Returns end LSN of flushed WAL. Normally this will be >= loc, but
1371  * if we detect a shutdown request (either from postmaster or client)
1372  * we will return early, so caller must always check.
1373  */
1374 static XLogRecPtr
1376 {
1377  int wakeEvents;
1378  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1379 
1380  /*
1381  * Fast path to avoid acquiring the spinlock in case we already know we
1382  * have enough WAL available. This is particularly interesting if we're
1383  * far behind.
1384  */
1385  if (RecentFlushPtr != InvalidXLogRecPtr &&
1386  loc <= RecentFlushPtr)
1387  return RecentFlushPtr;
1388 
1389  /* Get a more recent flush pointer. */
1390  if (!RecoveryInProgress())
1391  RecentFlushPtr = GetFlushRecPtr();
1392  else
1393  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1394 
1395  for (;;)
1396  {
1397  long sleeptime;
1398 
1399  /* Clear any already-pending wakeups */
1401 
1403 
1404  /* Process any requests or signals received recently */
1405  if (ConfigReloadPending)
1406  {
1407  ConfigReloadPending = false;
1410  }
1411 
1412  /* Check for input from the client */
1414 
1415  /*
1416  * If we're shutting down, trigger pending WAL to be written out,
1417  * otherwise we'd possibly end up waiting for WAL that never gets
1418  * written, because walwriter has shut down already.
1419  */
1420  if (got_STOPPING)
1422 
1423  /* Update our idea of the currently flushed position. */
1424  if (!RecoveryInProgress())
1425  RecentFlushPtr = GetFlushRecPtr();
1426  else
1427  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1428 
1429  /*
1430  * If postmaster asked us to stop, don't wait anymore.
1431  *
1432  * It's important to do this check after the recomputation of
1433  * RecentFlushPtr, so we can send all remaining data before shutting
1434  * down.
1435  */
1436  if (got_STOPPING)
1437  break;
1438 
1439  /*
1440  * We only send regular messages to the client for full decoded
1441  * transactions, but a synchronous replication and walsender shutdown
1442  * possibly are waiting for a later location. So, before sleeping, we
1443  * send a ping containing the flush location. If the receiver is
1444  * otherwise idle, this keepalive will trigger a reply. Processing the
1445  * reply will update these MyWalSnd locations.
1446  */
1447  if (MyWalSnd->flush < sentPtr &&
1448  MyWalSnd->write < sentPtr &&
1450  WalSndKeepalive(false);
1451 
1452  /* check whether we're done */
1453  if (loc <= RecentFlushPtr)
1454  break;
1455 
1456  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1457  WalSndCaughtUp = true;
1458 
1459  /*
1460  * Try to flush any pending output to the client.
1461  */
1462  if (pq_flush_if_writable() != 0)
1463  WalSndShutdown();
1464 
1465  /*
1466  * If we have received CopyDone from the client, sent CopyDone
1467  * ourselves, and the output buffer is empty, it's time to exit
1468  * streaming, so fail the current WAL fetch request.
1469  */
1471  !pq_is_send_pending())
1472  break;
1473 
1474  /* die if timeout was reached */
1476 
1477  /* Send keepalive if the time has come */
1479 
1480  /*
1481  * Sleep until something happens or we time out. Also wait for the
1482  * socket becoming writable, if there's still pending output.
1483  * Otherwise we might sit on sendable output data while waiting for
1484  * new WAL to be generated. (But if we have nothing to send, we don't
1485  * want to wake on socket-writable.)
1486  */
1488 
1489  wakeEvents = WL_SOCKET_READABLE;
1490 
1491  if (pq_is_send_pending())
1492  wakeEvents |= WL_SOCKET_WRITEABLE;
1493 
1494  WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL);
1495  }
1496 
1497  /* reactivate latch so WalSndLoop knows to continue */
1498  SetLatch(MyLatch);
1499  return RecentFlushPtr;
1500 }
1501 
1502 /*
1503  * Execute an incoming replication command.
1504  *
1505  * Returns true if the cmd_string was recognized as WalSender command, false
1506  * if not.
1507  */
1508 bool
1509 exec_replication_command(const char *cmd_string)
1510 {
1511  int parse_rc;
1512  Node *cmd_node;
1513  const char *cmdtag;
1514  MemoryContext cmd_context;
1515  MemoryContext old_context;
1516 
1517  /*
1518  * If WAL sender has been told that shutdown is getting close, switch its
1519  * status accordingly to handle the next replication commands correctly.
1520  */
1521  if (got_STOPPING)
1523 
1524  /*
1525  * Throw error if in stopping mode. We need prevent commands that could
1526  * generate WAL while the shutdown checkpoint is being written. To be
1527  * safe, we just prohibit all new commands.
1528  */
1529  if (MyWalSnd->state == WALSNDSTATE_STOPPING)
1530  ereport(ERROR,
1531  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1532 
1533  /*
1534  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1535  * command arrives. Clean up the old stuff if there's anything.
1536  */
1538 
1540 
1541  /*
1542  * Parse the command.
1543  */
1545  "Replication command context",
1547  old_context = MemoryContextSwitchTo(cmd_context);
1548 
1549  replication_scanner_init(cmd_string);
1550  parse_rc = replication_yyparse();
1551  if (parse_rc != 0)
1552  ereport(ERROR,
1553  (errcode(ERRCODE_SYNTAX_ERROR),
1554  errmsg_internal("replication command parser returned %d",
1555  parse_rc)));
1557 
1558  cmd_node = replication_parse_result;
1559 
1560  /*
1561  * If it's a SQL command, just clean up our mess and return false; the
1562  * caller will take care of executing it.
1563  */
1564  if (IsA(cmd_node, SQLCmd))
1565  {
1566  if (MyDatabaseId == InvalidOid)
1567  ereport(ERROR,
1568  (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1569 
1570  MemoryContextSwitchTo(old_context);
1571  MemoryContextDelete(cmd_context);
1572 
1573  /* Tell the caller that this wasn't a WalSender command. */
1574  return false;
1575  }
1576 
1577  /*
1578  * Report query to various monitoring facilities. For this purpose, we
1579  * report replication commands just like SQL commands.
1580  */
1581  debug_query_string = cmd_string;
1582 
1584 
1585  /*
1586  * Log replication command if log_replication_commands is enabled. Even
1587  * when it's disabled, log the command with DEBUG1 level for backward
1588  * compatibility.
1589  */
1591  (errmsg("received replication command: %s", cmd_string)));
1592 
1593  /*
1594  * Disallow replication commands in aborted transaction blocks.
1595  */
1597  ereport(ERROR,
1598  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1599  errmsg("current transaction is aborted, "
1600  "commands ignored until end of transaction block")));
1601 
1603 
1604  /*
1605  * Allocate buffers that will be used for each outgoing and incoming
1606  * message. We do this just once per command to reduce palloc overhead.
1607  */
1608  initStringInfo(&output_message);
1609  initStringInfo(&reply_message);
1610  initStringInfo(&tmpbuf);
1611 
1612  switch (cmd_node->type)
1613  {
1614  case T_IdentifySystemCmd:
1615  cmdtag = "IDENTIFY_SYSTEM";
1616  set_ps_display(cmdtag);
1617  IdentifySystem();
1618  EndReplicationCommand(cmdtag);
1619  break;
1620 
1621  case T_BaseBackupCmd:
1622  cmdtag = "BASE_BACKUP";
1623  set_ps_display(cmdtag);
1624  PreventInTransactionBlock(true, cmdtag);
1625  SendBaseBackup((BaseBackupCmd *) cmd_node);
1626  EndReplicationCommand(cmdtag);
1627  break;
1628 
1630  cmdtag = "CREATE_REPLICATION_SLOT";
1631  set_ps_display(cmdtag);
1633  EndReplicationCommand(cmdtag);
1634  break;
1635 
1637  cmdtag = "DROP_REPLICATION_SLOT";
1638  set_ps_display(cmdtag);
1640  EndReplicationCommand(cmdtag);
1641  break;
1642 
1643  case T_StartReplicationCmd:
1644  {
1645  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1646 
1647  cmdtag = "START_REPLICATION";
1648  set_ps_display(cmdtag);
1649  PreventInTransactionBlock(true, cmdtag);
1650 
1651  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1652  StartReplication(cmd);
1653  else
1655 
1656  /* dupe, but necessary per libpqrcv_endstreaming */
1657  EndReplicationCommand(cmdtag);
1658 
1659  Assert(xlogreader != NULL);
1660  break;
1661  }
1662 
1663  case T_TimeLineHistoryCmd:
1664  cmdtag = "TIMELINE_HISTORY";
1665  set_ps_display(cmdtag);
1666  PreventInTransactionBlock(true, cmdtag);
1668  EndReplicationCommand(cmdtag);
1669  break;
1670 
1671  case T_VariableShowStmt:
1672  {
1674  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1675 
1676  cmdtag = "SHOW";
1677  set_ps_display(cmdtag);
1678 
1679  /* syscache access needs a transaction environment */
1681  GetPGVariable(n->name, dest);
1683  EndReplicationCommand(cmdtag);
1684  }
1685  break;
1686 
1687  default:
1688  elog(ERROR, "unrecognized replication command node tag: %u",
1689  cmd_node->type);
1690  }
1691 
1692  /* done */
1693  MemoryContextSwitchTo(old_context);
1694  MemoryContextDelete(cmd_context);
1695 
1696  /*
1697  * We need not update ps display or pg_stat_activity, because PostgresMain
1698  * will reset those to "idle". But we must reset debug_query_string to
1699  * ensure it doesn't become a dangling pointer.
1700  */
1701  debug_query_string = NULL;
1702 
1703  return true;
1704 }
1705 
1706 /*
1707  * Process any incoming messages while streaming. Also checks if the remote
1708  * end has closed the connection.
1709  */
1710 static void
1712 {
1713  unsigned char firstchar;
1714  int maxmsglen;
1715  int r;
1716  bool received = false;
1717 
1719 
1720  /*
1721  * If we already received a CopyDone from the frontend, any subsequent
1722  * message is the beginning of a new command, and should be processed in
1723  * the main processing loop.
1724  */
1725  while (!streamingDoneReceiving)
1726  {
1727  pq_startmsgread();
1728  r = pq_getbyte_if_available(&firstchar);
1729  if (r < 0)
1730  {
1731  /* unexpected error or EOF */
1733  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1734  errmsg("unexpected EOF on standby connection")));
1735  proc_exit(0);
1736  }
1737  if (r == 0)
1738  {
1739  /* no data available without blocking */
1740  pq_endmsgread();
1741  break;
1742  }
1743 
1744  /* Validate message type and set packet size limit */
1745  switch (firstchar)
1746  {
1747  case 'd':
1748  maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
1749  break;
1750  case 'c':
1751  case 'X':
1752  maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
1753  break;
1754  default:
1755  ereport(FATAL,
1756  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1757  errmsg("invalid standby message type \"%c\"",
1758  firstchar)));
1759  maxmsglen = 0; /* keep compiler quiet */
1760  break;
1761  }
1762 
1763  /* Read the message contents */
1764  resetStringInfo(&reply_message);
1765  if (pq_getmessage(&reply_message, maxmsglen))
1766  {
1768  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1769  errmsg("unexpected EOF on standby connection")));
1770  proc_exit(0);
1771  }
1772 
1773  /* ... and process it */
1774  switch (firstchar)
1775  {
1776  /*
1777  * 'd' means a standby reply wrapped in a CopyData packet.
1778  */
1779  case 'd':
1781  received = true;
1782  break;
1783 
1784  /*
1785  * CopyDone means the standby requested to finish streaming.
1786  * Reply with CopyDone, if we had not sent that already.
1787  */
1788  case 'c':
1789  if (!streamingDoneSending)
1790  {
1791  pq_putmessage_noblock('c', NULL, 0);
1792  streamingDoneSending = true;
1793  }
1794 
1795  streamingDoneReceiving = true;
1796  received = true;
1797  break;
1798 
1799  /*
1800  * 'X' means that the standby is closing down the socket.
1801  */
1802  case 'X':
1803  proc_exit(0);
1804 
1805  default:
1806  Assert(false); /* NOT REACHED */
1807  }
1808  }
1809 
1810  /*
1811  * Save the last reply timestamp if we've received at least one reply.
1812  */
1813  if (received)
1814  {
1816  waiting_for_ping_response = false;
1817  }
1818 }
1819 
1820 /*
1821  * Process a status update message received from standby.
1822  */
1823 static void
1825 {
1826  char msgtype;
1827 
1828  /*
1829  * Check message type from the first byte.
1830  */
1831  msgtype = pq_getmsgbyte(&reply_message);
1832 
1833  switch (msgtype)
1834  {
1835  case 'r':
1837  break;
1838 
1839  case 'h':
1841  break;
1842 
1843  default:
1845  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1846  errmsg("unexpected message type \"%c\"", msgtype)));
1847  proc_exit(0);
1848  }
1849 }
1850 
1851 /*
1852  * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
1853  */
1854 static void
1856 {
1857  bool changed = false;
1859 
1860  Assert(lsn != InvalidXLogRecPtr);
1861  SpinLockAcquire(&slot->mutex);
1862  if (slot->data.restart_lsn != lsn)
1863  {
1864  changed = true;
1865  slot->data.restart_lsn = lsn;
1866  }
1867  SpinLockRelease(&slot->mutex);
1868 
1869  if (changed)
1870  {
1873  }
1874 
1875  /*
1876  * One could argue that the slot should be saved to disk now, but that'd
1877  * be energy wasted - the worst lost information can do here is give us
1878  * wrong information in a statistics view - we'll just potentially be more
1879  * conservative in removing files.
1880  */
1881 }
1882 
1883 /*
1884  * Regular reply from standby advising of WAL locations on standby server.
1885  */
1886 static void
1888 {
1889  XLogRecPtr writePtr,
1890  flushPtr,
1891  applyPtr;
1892  bool replyRequested;
1893  TimeOffset writeLag,
1894  flushLag,
1895  applyLag;
1896  bool clearLagTimes;
1897  TimestampTz now;
1898  TimestampTz replyTime;
1899 
1900  static bool fullyAppliedLastTime = false;
1901 
1902  /* the caller already consumed the msgtype byte */
1903  writePtr = pq_getmsgint64(&reply_message);
1904  flushPtr = pq_getmsgint64(&reply_message);
1905  applyPtr = pq_getmsgint64(&reply_message);
1906  replyTime = pq_getmsgint64(&reply_message);
1907  replyRequested = pq_getmsgbyte(&reply_message);
1908 
1910  {
1911  char *replyTimeStr;
1912 
1913  /* Copy because timestamptz_to_str returns a static buffer */
1914  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
1915 
1916  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
1917  LSN_FORMAT_ARGS(writePtr),
1918  LSN_FORMAT_ARGS(flushPtr),
1919  LSN_FORMAT_ARGS(applyPtr),
1920  replyRequested ? " (reply requested)" : "",
1921  replyTimeStr);
1922 
1923  pfree(replyTimeStr);
1924  }
1925 
1926  /* See if we can compute the round-trip lag for these positions. */
1927  now = GetCurrentTimestamp();
1928  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1929  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1930  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1931 
1932  /*
1933  * If the standby reports that it has fully replayed the WAL in two
1934  * consecutive reply messages, then the second such message must result
1935  * from wal_receiver_status_interval expiring on the standby. This is a
1936  * convenient time to forget the lag times measured when it last
1937  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1938  * until more WAL traffic arrives.
1939  */
1940  clearLagTimes = false;
1941  if (applyPtr == sentPtr)
1942  {
1943  if (fullyAppliedLastTime)
1944  clearLagTimes = true;
1945  fullyAppliedLastTime = true;
1946  }
1947  else
1948  fullyAppliedLastTime = false;
1949 
1950  /* Send a reply if the standby requested one. */
1951  if (replyRequested)
1952  WalSndKeepalive(false);
1953 
1954  /*
1955  * Update shared state for this WalSender process based on reply data from
1956  * standby.
1957  */
1958  {
1959  WalSnd *walsnd = MyWalSnd;
1960 
1961  SpinLockAcquire(&walsnd->mutex);
1962  walsnd->write = writePtr;
1963  walsnd->flush = flushPtr;
1964  walsnd->apply = applyPtr;
1965  if (writeLag != -1 || clearLagTimes)
1966  walsnd->writeLag = writeLag;
1967  if (flushLag != -1 || clearLagTimes)
1968  walsnd->flushLag = flushLag;
1969  if (applyLag != -1 || clearLagTimes)
1970  walsnd->applyLag = applyLag;
1971  walsnd->replyTime = replyTime;
1972  SpinLockRelease(&walsnd->mutex);
1973  }
1974 
1977 
1978  /*
1979  * Advance our local xmin horizon when the client confirmed a flush.
1980  */
1981  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1982  {
1985  else
1987  }
1988 }
1989 
1990 /* compute new replication slot xmin horizon if needed */
1991 static void
1993 {
1994  bool changed = false;
1996 
1997  SpinLockAcquire(&slot->mutex);
1999 
2000  /*
2001  * For physical replication we don't need the interlock provided by xmin
2002  * and effective_xmin since the consequences of a missed increase are
2003  * limited to query cancellations, so set both at once.
2004  */
2005  if (!TransactionIdIsNormal(slot->data.xmin) ||
2006  !TransactionIdIsNormal(feedbackXmin) ||
2007  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
2008  {
2009  changed = true;
2010  slot->data.xmin = feedbackXmin;
2011  slot->effective_xmin = feedbackXmin;
2012  }
2013  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
2014  !TransactionIdIsNormal(feedbackCatalogXmin) ||
2015  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
2016  {
2017  changed = true;
2018  slot->data.catalog_xmin = feedbackCatalogXmin;
2019  slot->effective_catalog_xmin = feedbackCatalogXmin;
2020  }
2021  SpinLockRelease(&slot->mutex);
2022 
2023  if (changed)
2024  {
2027  }
2028 }
2029 
2030 /*
2031  * Check that the provided xmin/epoch are sane, that is, not in the future
2032  * and not so far back as to be already wrapped around.
2033  *
2034  * Epoch of nextXid should be same as standby, or if the counter has
2035  * wrapped, then one greater than standby.
2036  *
2037  * This check doesn't care about whether clog exists for these xids
2038  * at all.
2039  */
2040 static bool
2042 {
2043  FullTransactionId nextFullXid;
2044  TransactionId nextXid;
2045  uint32 nextEpoch;
2046 
2047  nextFullXid = ReadNextFullTransactionId();
2048  nextXid = XidFromFullTransactionId(nextFullXid);
2049  nextEpoch = EpochFromFullTransactionId(nextFullXid);
2050 
2051  if (xid <= nextXid)
2052  {
2053  if (epoch != nextEpoch)
2054  return false;
2055  }
2056  else
2057  {
2058  if (epoch + 1 != nextEpoch)
2059  return false;
2060  }
2061 
2062  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2063  return false; /* epoch OK, but it's wrapped around */
2064 
2065  return true;
2066 }
2067 
2068 /*
2069  * Hot Standby feedback
2070  */
2071 static void
2073 {
2074  TransactionId feedbackXmin;
2075  uint32 feedbackEpoch;
2076  TransactionId feedbackCatalogXmin;
2077  uint32 feedbackCatalogEpoch;
2078  TimestampTz replyTime;
2079 
2080  /*
2081  * Decipher the reply message. The caller already consumed the msgtype
2082  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2083  * of this message.
2084  */
2085  replyTime = pq_getmsgint64(&reply_message);
2086  feedbackXmin = pq_getmsgint(&reply_message, 4);
2087  feedbackEpoch = pq_getmsgint(&reply_message, 4);
2088  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2089  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2090 
2092  {
2093  char *replyTimeStr;
2094 
2095  /* Copy because timestamptz_to_str returns a static buffer */
2096  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2097 
2098  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2099  feedbackXmin,
2100  feedbackEpoch,
2101  feedbackCatalogXmin,
2102  feedbackCatalogEpoch,
2103  replyTimeStr);
2104 
2105  pfree(replyTimeStr);
2106  }
2107 
2108  /*
2109  * Update shared state for this WalSender process based on reply data from
2110  * standby.
2111  */
2112  {
2113  WalSnd *walsnd = MyWalSnd;
2114 
2115  SpinLockAcquire(&walsnd->mutex);
2116  walsnd->replyTime = replyTime;
2117  SpinLockRelease(&walsnd->mutex);
2118  }
2119 
2120  /*
2121  * Unset WalSender's xmins if the feedback message values are invalid.
2122  * This happens when the downstream turned hot_standby_feedback off.
2123  */
2124  if (!TransactionIdIsNormal(feedbackXmin)
2125  && !TransactionIdIsNormal(feedbackCatalogXmin))
2126  {
2128  if (MyReplicationSlot != NULL)
2129  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2130  return;
2131  }
2132 
2133  /*
2134  * Check that the provided xmin/epoch are sane, that is, not in the future
2135  * and not so far back as to be already wrapped around. Ignore if not.
2136  */
2137  if (TransactionIdIsNormal(feedbackXmin) &&
2138  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2139  return;
2140 
2141  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2142  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2143  return;
2144 
2145  /*
2146  * Set the WalSender's xmin equal to the standby's requested xmin, so that
2147  * the xmin will be taken into account by GetSnapshotData() /
2148  * ComputeXidHorizons(). This will hold back the removal of dead rows and
2149  * thereby prevent the generation of cleanup conflicts on the standby
2150  * server.
2151  *
2152  * There is a small window for a race condition here: although we just
2153  * checked that feedbackXmin precedes nextXid, the nextXid could have
2154  * gotten advanced between our fetching it and applying the xmin below,
2155  * perhaps far enough to make feedbackXmin wrap around. In that case the
2156  * xmin we set here would be "in the future" and have no effect. No point
2157  * in worrying about this since it's too late to save the desired data
2158  * anyway. Assuming that the standby sends us an increasing sequence of
2159  * xmins, this could only happen during the first reply cycle, else our
2160  * own xmin would prevent nextXid from advancing so far.
2161  *
2162  * We don't bother taking the ProcArrayLock here. Setting the xmin field
2163  * is assumed atomic, and there's no real need to prevent concurrent
2164  * horizon determinations. (If we're moving our xmin forward, this is
2165  * obviously safe, and if we're moving it backwards, well, the data is at
2166  * risk already since a VACUUM could already have determined the horizon.)
2167  *
2168  * If we're using a replication slot we reserve the xmin via that,
2169  * otherwise via the walsender's PGPROC entry. We can only track the
2170  * catalog xmin separately when using a slot, so we store the least of the
2171  * two provided when not using a slot.
2172  *
2173  * XXX: It might make sense to generalize the ephemeral slot concept and
2174  * always use the slot mechanism to handle the feedback xmin.
2175  */
2176  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2177  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2178  else
2179  {
2180  if (TransactionIdIsNormal(feedbackCatalogXmin)
2181  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2182  MyProc->xmin = feedbackCatalogXmin;
2183  else
2184  MyProc->xmin = feedbackXmin;
2185  }
2186 }
2187 
2188 /*
2189  * Compute how long send/receive loops should sleep.
2190  *
2191  * If wal_sender_timeout is enabled we want to wake up in time to send
2192  * keepalives and to abort the connection if wal_sender_timeout has been
2193  * reached.
2194  */
2195 static long
2197 {
2198  long sleeptime = 10000; /* 10 s */
2199 
2201  {
2202  TimestampTz wakeup_time;
2203 
2204  /*
2205  * At the latest stop sleeping once wal_sender_timeout has been
2206  * reached.
2207  */
2210 
2211  /*
2212  * If no ping has been sent yet, wakeup when it's time to do so.
2213  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2214  * the timeout passed without a response.
2215  */
2218  wal_sender_timeout / 2);
2219 
2220  /* Compute relative time until wakeup. */
2221  sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
2222  }
2223 
2224  return sleeptime;
2225 }
2226 
2227 /*
2228  * Check whether there have been responses by the client within
2229  * wal_sender_timeout and shutdown if not. Using last_processing as the
2230  * reference point avoids counting server-side stalls against the client.
2231  * However, a long server-side stall can make WalSndKeepaliveIfNecessary()
2232  * postdate last_processing by more than wal_sender_timeout. If that happens,
2233  * the client must reply almost immediately to avoid a timeout. This rarely
2234  * affects the default configuration, under which clients spontaneously send a
2235  * message every standby_message_timeout = wal_sender_timeout/6 = 10s. We
2236  * could eliminate that problem by recognizing timeout expiration at
2237  * wal_sender_timeout/2 after the keepalive.
2238  */
2239 static void
2241 {
2242  TimestampTz timeout;
2243 
2244  /* don't bail out if we're doing something that doesn't require timeouts */
2245  if (last_reply_timestamp <= 0)
2246  return;
2247 
2250 
2251  if (wal_sender_timeout > 0 && last_processing >= timeout)
2252  {
2253  /*
2254  * Since typically expiration of replication timeout means
2255  * communication problem, we don't send the error message to the
2256  * standby.
2257  */
2259  (errmsg("terminating walsender process due to replication timeout")));
2260 
2261  WalSndShutdown();
2262  }
2263 }
2264 
2265 /* Main loop of walsender process that streams the WAL over Copy messages. */
2266 static void
2268 {
2269  /*
2270  * Initialize the last reply timestamp. That enables timeout processing
2271  * from hereon.
2272  */
2274  waiting_for_ping_response = false;
2275 
2276  /*
2277  * Loop until we reach the end of this timeline or the client requests to
2278  * stop streaming.
2279  */
2280  for (;;)
2281  {
2282  /* Clear any already-pending wakeups */
2284 
2286 
2287  /* Process any requests or signals received recently */
2288  if (ConfigReloadPending)
2289  {
2290  ConfigReloadPending = false;
2293  }
2294 
2295  /* Check for input from the client */
2297 
2298  /*
2299  * If we have received CopyDone from the client, sent CopyDone
2300  * ourselves, and the output buffer is empty, it's time to exit
2301  * streaming.
2302  */
2304  !pq_is_send_pending())
2305  break;
2306 
2307  /*
2308  * If we don't have any pending data in the output buffer, try to send
2309  * some more. If there is some, we don't bother to call send_data
2310  * again until we've flushed it ... but we'd better assume we are not
2311  * caught up.
2312  */
2313  if (!pq_is_send_pending())
2314  send_data();
2315  else
2316  WalSndCaughtUp = false;
2317 
2318  /* Try to flush pending output to the client */
2319  if (pq_flush_if_writable() != 0)
2320  WalSndShutdown();
2321 
2322  /* If nothing remains to be sent right now ... */
2324  {
2325  /*
2326  * If we're in catchup state, move to streaming. This is an
2327  * important state change for users to know about, since before
2328  * this point data loss might occur if the primary dies and we
2329  * need to failover to the standby. The state change is also
2330  * important for synchronous replication, since commits that
2331  * started to wait at that point might wait for some time.
2332  */
2333  if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
2334  {
2335  ereport(DEBUG1,
2336  (errmsg_internal("\"%s\" has now caught up with upstream server",
2337  application_name)));
2339  }
2340 
2341  /*
2342  * When SIGUSR2 arrives, we send any outstanding logs up to the
2343  * shutdown checkpoint record (i.e., the latest record), wait for
2344  * them to be replicated to the standby, and exit. This may be a
2345  * normal termination at shutdown, or a promotion, the walsender
2346  * is not sure which.
2347  */
2348  if (got_SIGUSR2)
2349  WalSndDone(send_data);
2350  }
2351 
2352  /* Check for replication timeout. */
2354 
2355  /* Send keepalive if the time has come */
2357 
2358  /*
2359  * Block if we have unsent data. XXX For logical replication, let
2360  * WalSndWaitForWal() handle any other blocking; idle receivers need
2361  * its additional actions. For physical replication, also block if
2362  * caught up; its send_data does not block.
2363  */
2364  if ((WalSndCaughtUp && send_data != XLogSendLogical &&
2367  {
2368  long sleeptime;
2369  int wakeEvents;
2370 
2372  wakeEvents = WL_SOCKET_READABLE;
2373  else
2374  wakeEvents = 0;
2375 
2376  /*
2377  * Use fresh timestamp, not last_processing, to reduce the chance
2378  * of reaching wal_sender_timeout before sending a keepalive.
2379  */
2381 
2382  if (pq_is_send_pending())
2383  wakeEvents |= WL_SOCKET_WRITEABLE;
2384 
2385  /* Sleep until something happens or we time out */
2386  WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
2387  }
2388  }
2389 }
2390 
2391 /* Initialize a per-walsender data structure for this walsender process */
2392 static void
2394 {
2395  int i;
2396 
2397  /*
2398  * WalSndCtl should be set up already (we inherit this by fork() or
2399  * EXEC_BACKEND mechanism from the postmaster).
2400  */
2401  Assert(WalSndCtl != NULL);
2402  Assert(MyWalSnd == NULL);
2403 
2404  /*
2405  * Find a free walsender slot and reserve it. This must not fail due to
2406  * the prior check for free WAL senders in InitProcess().
2407  */
2408  for (i = 0; i < max_wal_senders; i++)
2409  {
2410  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2411 
2412  SpinLockAcquire(&walsnd->mutex);
2413 
2414  if (walsnd->pid != 0)
2415  {
2416  SpinLockRelease(&walsnd->mutex);
2417  continue;
2418  }
2419  else
2420  {
2421  /*
2422  * Found a free slot. Reserve it for us.
2423  */
2424  walsnd->pid = MyProcPid;
2425  walsnd->state = WALSNDSTATE_STARTUP;
2426  walsnd->sentPtr = InvalidXLogRecPtr;
2427  walsnd->needreload = false;
2428  walsnd->write = InvalidXLogRecPtr;
2429  walsnd->flush = InvalidXLogRecPtr;
2430  walsnd->apply = InvalidXLogRecPtr;
2431  walsnd->writeLag = -1;
2432  walsnd->flushLag = -1;
2433  walsnd->applyLag = -1;
2434  walsnd->sync_standby_priority = 0;
2435  walsnd->latch = &MyProc->procLatch;
2436  walsnd->replyTime = 0;
2437  SpinLockRelease(&walsnd->mutex);
2438  /* don't need the lock anymore */
2439  MyWalSnd = (WalSnd *) walsnd;
2440 
2441  break;
2442  }
2443  }
2444 
2445  Assert(MyWalSnd != NULL);
2446 
2447  /* Arrange to clean up at walsender exit */
2449 }
2450 
2451 /* Destroy the per-walsender data structure for this walsender process */
2452 static void
2454 {
2455  WalSnd *walsnd = MyWalSnd;
2456 
2457  Assert(walsnd != NULL);
2458 
2459  MyWalSnd = NULL;
2460 
2461  SpinLockAcquire(&walsnd->mutex);
2462  /* clear latch while holding the spinlock, so it can safely be read */
2463  walsnd->latch = NULL;
2464  /* Mark WalSnd struct as no longer being in use. */
2465  walsnd->pid = 0;
2466  SpinLockRelease(&walsnd->mutex);
2467 }
2468 
2469 /* XLogReaderRoutine->segment_open callback */
2470 static void
2472  TimeLineID *tli_p)
2473 {
2474  char path[MAXPGPATH];
2475 
2476  /*-------
2477  * When reading from a historic timeline, and there is a timeline switch
2478  * within this segment, read from the WAL segment belonging to the new
2479  * timeline.
2480  *
2481  * For example, imagine that this server is currently on timeline 5, and
2482  * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
2483  * 0/13002088. In pg_wal, we have these files:
2484  *
2485  * ...
2486  * 000000040000000000000012
2487  * 000000040000000000000013
2488  * 000000050000000000000013
2489  * 000000050000000000000014
2490  * ...
2491  *
2492  * In this situation, when requested to send the WAL from segment 0x13, on
2493  * timeline 4, we read the WAL from file 000000050000000000000013. Archive
2494  * recovery prefers files from newer timelines, so if the segment was
2495  * restored from the archive on this server, the file belonging to the old
2496  * timeline, 000000040000000000000013, might not exist. Their contents are
2497  * equal up to the switchpoint, because at a timeline switch, the used
2498  * portion of the old segment is copied to the new file. -------
2499  */
2500  *tli_p = sendTimeLine;
2502  {
2503  XLogSegNo endSegNo;
2504 
2506  if (nextSegNo == endSegNo)
2507  *tli_p = sendTimeLineNextTLI;
2508  }
2509 
2510  XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
2511  state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2512  if (state->seg.ws_file >= 0)
2513  return;
2514 
2515  /*
2516  * If the file is not found, assume it's because the standby asked for a
2517  * too old WAL segment that has already been removed or recycled.
2518  */
2519  if (errno == ENOENT)
2520  {
2521  char xlogfname[MAXFNAMELEN];
2522  int save_errno = errno;
2523 
2524  XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
2525  errno = save_errno;
2526  ereport(ERROR,
2528  errmsg("requested WAL segment %s has already been removed",
2529  xlogfname)));
2530  }
2531  else
2532  ereport(ERROR,
2534  errmsg("could not open file \"%s\": %m",
2535  path)));
2536 }
2537 
2538 /*
2539  * Send out the WAL in its normal physical/stored form.
2540  *
2541  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
2542  * but not yet sent to the client, and buffer it in the libpq output
2543  * buffer.
2544  *
2545  * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
2546  * otherwise WalSndCaughtUp is set to false.
2547  */
2548 static void
2550 {
2551  XLogRecPtr SendRqstPtr;
2553  XLogRecPtr endptr;
2554  Size nbytes;
2555  XLogSegNo segno;
2556  WALReadError errinfo;
2557 
2558  /* If requested switch the WAL sender to the stopping state. */
2559  if (got_STOPPING)
2561 
2563  {
2564  WalSndCaughtUp = true;
2565  return;
2566  }
2567 
2568  /* Figure out how far we can safely send the WAL. */
2570  {
2571  /*
2572  * Streaming an old timeline that's in this server's history, but is
2573  * not the one we're currently inserting or replaying. It can be
2574  * streamed up to the point where we switched off that timeline.
2575  */
2576  SendRqstPtr = sendTimeLineValidUpto;
2577  }
2578  else if (am_cascading_walsender)
2579  {
2580  /*
2581  * Streaming the latest timeline on a standby.
2582  *
2583  * Attempt to send all WAL that has already been replayed, so that we
2584  * know it's valid. If we're receiving WAL through streaming
2585  * replication, it's also OK to send any WAL that has been received
2586  * but not replayed.
2587  *
2588  * The timeline we're recovering from can change, or we can be
2589  * promoted. In either case, the current timeline becomes historic. We
2590  * need to detect that so that we don't try to stream past the point
2591  * where we switched to another timeline. We check for promotion or
2592  * timeline switch after calculating FlushPtr, to avoid a race
2593  * condition: if the timeline becomes historic just after we checked
2594  * that it was still current, it's still be OK to stream it up to the
2595  * FlushPtr that was calculated before it became historic.
2596  */
2597  bool becameHistoric = false;
2598 
2599  SendRqstPtr = GetStandbyFlushRecPtr();
2600 
2601  if (!RecoveryInProgress())
2602  {
2603  /*
2604  * We have been promoted. RecoveryInProgress() updated
2605  * ThisTimeLineID to the new current timeline.
2606  */
2607  am_cascading_walsender = false;
2608  becameHistoric = true;
2609  }
2610  else
2611  {
2612  /*
2613  * Still a cascading standby. But is the timeline we're sending
2614  * still the one recovery is recovering from? ThisTimeLineID was
2615  * updated by the GetStandbyFlushRecPtr() call above.
2616  */
2618  becameHistoric = true;
2619  }
2620 
2621  if (becameHistoric)
2622  {
2623  /*
2624  * The timeline we were sending has become historic. Read the
2625  * timeline history file of the new timeline to see where exactly
2626  * we forked off from the timeline we were sending.
2627  */
2628  List *history;
2629 
2632 
2634  list_free_deep(history);
2635 
2636  sendTimeLineIsHistoric = true;
2637 
2638  SendRqstPtr = sendTimeLineValidUpto;
2639  }
2640  }
2641  else
2642  {
2643  /*
2644  * Streaming the current timeline on a primary.
2645  *
2646  * Attempt to send all data that's already been written out and
2647  * fsync'd to disk. We cannot go further than what's been written out
2648  * given the current implementation of WALRead(). And in any case
2649  * it's unsafe to send WAL that is not securely down to disk on the
2650  * primary: if the primary subsequently crashes and restarts, standbys
2651  * must not have applied any WAL that got lost on the primary.
2652  */
2653  SendRqstPtr = GetFlushRecPtr();
2654  }
2655 
2656  /*
2657  * Record the current system time as an approximation of the time at which
2658  * this WAL location was written for the purposes of lag tracking.
2659  *
2660  * In theory we could make XLogFlush() record a time in shmem whenever WAL
2661  * is flushed and we could get that time as well as the LSN when we call
2662  * GetFlushRecPtr() above (and likewise for the cascading standby
2663  * equivalent), but rather than putting any new code into the hot WAL path
2664  * it seems good enough to capture the time here. We should reach this
2665  * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2666  * may take some time, we read the WAL flush pointer and take the time
2667  * very close to together here so that we'll get a later position if it is
2668  * still moving.
2669  *
2670  * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
2671  * this gives us a cheap approximation for the WAL flush time for this
2672  * LSN.
2673  *
2674  * Note that the LSN is not necessarily the LSN for the data contained in
2675  * the present message; it's the end of the WAL, which might be further
2676  * ahead. All the lag tracking machinery cares about is finding out when
2677  * that arbitrary LSN is eventually reported as written, flushed and
2678  * applied, so that it can measure the elapsed time.
2679  */
2680  LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2681 
2682  /*
2683  * If this is a historic timeline and we've reached the point where we
2684  * forked to the next timeline, stop streaming.
2685  *
2686  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2687  * startup process will normally replay all WAL that has been received
2688  * from the primary, before promoting, but if the WAL streaming is
2689  * terminated at a WAL page boundary, the valid portion of the timeline
2690  * might end in the middle of a WAL record. We might've already sent the
2691  * first half of that partial WAL record to the cascading standby, so that
2692  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2693  * replay the partial WAL record either, so it can still follow our
2694  * timeline switch.
2695  */
2697  {
2698  /* close the current file. */
2699  if (xlogreader->seg.ws_file >= 0)
2700  wal_segment_close(xlogreader);
2701 
2702  /* Send CopyDone */
2703  pq_putmessage_noblock('c', NULL, 0);
2704  streamingDoneSending = true;
2705 
2706  WalSndCaughtUp = true;
2707 
2708  elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2711  return;
2712  }
2713 
2714  /* Do we have any work to do? */
2715  Assert(sentPtr <= SendRqstPtr);
2716  if (SendRqstPtr <= sentPtr)
2717  {
2718  WalSndCaughtUp = true;
2719  return;
2720  }
2721 
2722  /*
2723  * Figure out how much to send in one message. If there's no more than
2724  * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2725  * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2726  *
2727  * The rounding is not only for performance reasons. Walreceiver relies on
2728  * the fact that we never split a WAL record across two messages. Since a
2729  * long WAL record is split at page boundary into continuation records,
2730  * page boundary is always a safe cut-off point. We also assume that
2731  * SendRqstPtr never points to the middle of a WAL record.
2732  */
2733  startptr = sentPtr;
2734  endptr = startptr;
2735  endptr += MAX_SEND_SIZE;
2736 
2737  /* if we went beyond SendRqstPtr, back off */
2738  if (SendRqstPtr <= endptr)
2739  {
2740  endptr = SendRqstPtr;
2742  WalSndCaughtUp = false;
2743  else
2744  WalSndCaughtUp = true;
2745  }
2746  else
2747  {
2748  /* round down to page boundary. */
2749  endptr -= (endptr % XLOG_BLCKSZ);
2750  WalSndCaughtUp = false;
2751  }
2752 
2753  nbytes = endptr - startptr;
2754  Assert(nbytes <= MAX_SEND_SIZE);
2755 
2756  /*
2757  * OK to read and send the slice.
2758  */
2759  resetStringInfo(&output_message);
2760  pq_sendbyte(&output_message, 'w');
2761 
2762  pq_sendint64(&output_message, startptr); /* dataStart */
2763  pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2764  pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2765 
2766  /*
2767  * Read the log directly into the output buffer to avoid extra memcpy
2768  * calls.
2769  */
2770  enlargeStringInfo(&output_message, nbytes);
2771 
2772 retry:
2773  if (!WALRead(xlogreader,
2774  &output_message.data[output_message.len],
2775  startptr,
2776  nbytes,
2777  xlogreader->seg.ws_tli, /* Pass the current TLI because
2778  * only WalSndSegmentOpen controls
2779  * whether new TLI is needed. */
2780  &errinfo))
2781  WALReadRaiseError(&errinfo);
2782 
2783  /* See logical_read_xlog_page(). */
2784  XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
2785  CheckXLogRemoved(segno, xlogreader->seg.ws_tli);
2786 
2787  /*
2788  * During recovery, the currently-open WAL file might be replaced with the
2789  * file of the same name retrieved from archive. So we always need to
2790  * check what we read was valid after reading into the buffer. If it's
2791  * invalid, we try to open and read the file again.
2792  */
2794  {
2795  WalSnd *walsnd = MyWalSnd;
2796  bool reload;
2797 
2798  SpinLockAcquire(&walsnd->mutex);
2799  reload = walsnd->needreload;
2800  walsnd->needreload = false;
2801  SpinLockRelease(&walsnd->mutex);
2802 
2803  if (reload && xlogreader->seg.ws_file >= 0)
2804  {
2805  wal_segment_close(xlogreader);
2806 
2807  goto retry;
2808  }
2809  }
2810 
2811  output_message.len += nbytes;
2812  output_message.data[output_message.len] = '\0';
2813 
2814  /*
2815  * Fill the send timestamp last, so that it is taken as late as possible.
2816  */
2817  resetStringInfo(&tmpbuf);
2818  pq_sendint64(&tmpbuf, GetCurrentTimestamp());
2819  memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2820  tmpbuf.data, sizeof(int64));
2821 
2822  pq_putmessage_noblock('d', output_message.data, output_message.len);
2823 
2824  sentPtr = endptr;
2825 
2826  /* Update shared memory status */
2827  {
2828  WalSnd *walsnd = MyWalSnd;
2829 
2830  SpinLockAcquire(&walsnd->mutex);
2831  walsnd->sentPtr = sentPtr;
2832  SpinLockRelease(&walsnd->mutex);
2833  }
2834 
2835  /* Report progress of XLOG streaming in PS display */
2837  {
2838  char activitymsg[50];
2839 
2840  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
2842  set_ps_display(activitymsg);
2843  }
2844 }
2845 
2846 /*
2847  * Stream out logically decoded data.
2848  */
2849 static void
2851 {
2852  XLogRecord *record;
2853  char *errm;
2854 
2855  /*
2856  * We'll use the current flush point to determine whether we've caught up.
2857  * This variable is static in order to cache it across calls. Caching is
2858  * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
2859  * spinlock.
2860  */
2861  static XLogRecPtr flushPtr = InvalidXLogRecPtr;
2862 
2863  /*
2864  * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
2865  * true in WalSndWaitForWal, if we're actually waiting. We also set to
2866  * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
2867  * didn't wait - i.e. when we're shutting down.
2868  */
2869  WalSndCaughtUp = false;
2870 
2871  record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
2872 
2873  /* xlog record was invalid */
2874  if (errm != NULL)
2875  elog(ERROR, "%s", errm);
2876 
2877  if (record != NULL)
2878  {
2879  /*
2880  * Note the lack of any call to LagTrackerWrite() which is handled by
2881  * WalSndUpdateProgress which is called by output plugin through
2882  * logical decoding write api.
2883  */
2884  LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
2885 
2886  sentPtr = logical_decoding_ctx->reader->EndRecPtr;
2887  }
2888 
2889  /*
2890  * If first time through in this session, initialize flushPtr. Otherwise,
2891  * we only need to update flushPtr if EndRecPtr is past it.
2892  */
2893  if (flushPtr == InvalidXLogRecPtr)
2894  flushPtr = GetFlushRecPtr();
2895  else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2896  flushPtr = GetFlushRecPtr();
2897 
2898  /* If EndRecPtr is still past our flushPtr, it means we caught up. */
2899  if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2900  WalSndCaughtUp = true;
2901 
2902  /*
2903  * If we're caught up and have been requested to stop, have WalSndLoop()
2904  * terminate the connection in an orderly manner, after writing out all
2905  * the pending data.
2906  */
2908  got_SIGUSR2 = true;
2909 
2910  /* Update shared memory status */
2911  {
2912  WalSnd *walsnd = MyWalSnd;
2913 
2914  SpinLockAcquire(&walsnd->mutex);
2915  walsnd->sentPtr = sentPtr;
2916  SpinLockRelease(&walsnd->mutex);
2917  }
2918 }
2919 
2920 /*
2921  * Shutdown if the sender is caught up.
2922  *
2923  * NB: This should only be called when the shutdown signal has been received
2924  * from postmaster.
2925  *
2926  * Note that if we determine that there's still more data to send, this
2927  * function will return control to the caller.
2928  */
2929 static void
2931 {
2932  XLogRecPtr replicatedPtr;
2933 
2934  /* ... let's just be real sure we're caught up ... */
2935  send_data();
2936 
2937  /*
2938  * To figure out whether all WAL has successfully been replicated, check
2939  * flush location if valid, write otherwise. Tools like pg_receivewal will
2940  * usually (unless in synchronous mode) return an invalid flush location.
2941  */
2942  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2943  MyWalSnd->write : MyWalSnd->flush;
2944 
2945  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2946  !pq_is_send_pending())
2947  {
2948  QueryCompletion qc;
2949 
2950  /* Inform the standby that XLOG streaming is done */
2951  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
2952  EndCommand(&qc, DestRemote, false);
2953  pq_flush();
2954 
2955  proc_exit(0);
2956  }
2958  WalSndKeepalive(true);
2959 }
2960 
2961 /*
2962  * Returns the latest point in WAL that has been safely flushed to disk, and
2963  * can be sent to the standby. This should only be called when in recovery,
2964  * ie. we're streaming to a cascaded standby.
2965  *
2966  * As a side-effect, ThisTimeLineID is updated to the TLI of the last
2967  * replayed WAL record.
2968  */
2969 static XLogRecPtr
2971 {
2972  XLogRecPtr replayPtr;
2973  TimeLineID replayTLI;
2974  XLogRecPtr receivePtr;
2976  XLogRecPtr result;
2977 
2978  /*
2979  * We can safely send what's already been replayed. Also, if walreceiver
2980  * is streaming WAL from the same timeline, we can send anything that it
2981  * has streamed, but hasn't been replayed yet.
2982  */
2983 
2984  receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
2985  replayPtr = GetXLogReplayRecPtr(&replayTLI);
2986 
2987  ThisTimeLineID = replayTLI;
2988 
2989  result = replayPtr;
2990  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2991  result = receivePtr;
2992 
2993  return result;
2994 }
2995 
2996 /*
2997  * Request walsenders to reload the currently-open WAL file
2998  */
2999 void
3001 {
3002  int i;
3003 
3004  for (i = 0; i < max_wal_senders; i++)
3005  {
3006  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3007 
3008  SpinLockAcquire(&walsnd->mutex);
3009  if (walsnd->pid == 0)
3010  {
3011  SpinLockRelease(&walsnd->mutex);
3012  continue;
3013  }
3014  walsnd->needreload = true;
3015  SpinLockRelease(&walsnd->mutex);
3016  }
3017 }
3018 
3019 /*
3020  * Handle PROCSIG_WALSND_INIT_STOPPING signal.
3021  */
3022 void
3024 {
3026 
3027  /*
3028  * If replication has not yet started, die like with SIGTERM. If
3029  * replication is active, only set a flag and wake up the main loop. It
3030  * will send any outstanding WAL, wait for it to be replicated to the
3031  * standby, and then exit gracefully.
3032  */
3033  if (!replication_active)
3034  kill(MyProcPid, SIGTERM);
3035  else
3036  got_STOPPING = true;
3037 }
3038 
3039 /*
3040  * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
3041  * sender should already have been switched to WALSNDSTATE_STOPPING at
3042  * this point.
3043  */
3044 static void
3046 {
3047  int save_errno = errno;
3048 
3049  got_SIGUSR2 = true;
3050  SetLatch(MyLatch);
3051 
3052  errno = save_errno;
3053 }
3054 
3055 /* Set up signal handlers */
3056 void
3058 {
3059  /* Set up signal handlers */
3061  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3062  pqsignal(SIGTERM, die); /* request shutdown */
3063  /* SIGQUIT handler was already set up by InitPostmasterChild */
3064  InitializeTimeouts(); /* establishes SIGALRM handler */
3067  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3068  * shutdown */
3069 
3070  /* Reset some signals that are accepted by postmaster but not here */
3072 }
3073 
3074 /* Report shared-memory space needed by WalSndShmemInit */
3075 Size
3077 {
3078  Size size = 0;
3079 
3080  size = offsetof(WalSndCtlData, walsnds);
3081  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3082 
3083  return size;
3084 }
3085 
3086 /* Allocate and initialize walsender-related shared memory */
3087 void
3089 {
3090  bool found;
3091  int i;
3092 
3093  WalSndCtl = (WalSndCtlData *)
3094  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3095 
3096  if (!found)
3097  {
3098  /* First time through, so initialize */
3099  MemSet(WalSndCtl, 0, WalSndShmemSize());
3100 
3101  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3102  SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
3103 
3104  for (i = 0; i < max_wal_senders; i++)
3105  {
3106  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3107 
3108  SpinLockInit(&walsnd->mutex);
3109  }
3110  }
3111 }
3112 
3113 /*
3114  * Wake up all walsenders
3115  *
3116  * This will be called inside critical sections, so throwing an error is not
3117  * advisable.
3118  */
3119 void
3121 {
3122  int i;
3123 
3124  for (i = 0; i < max_wal_senders; i++)
3125  {
3126  Latch *latch;
3127  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3128 
3129  /*
3130  * Get latch pointer with spinlock held, for the unlikely case that
3131  * pointer reads aren't atomic (as they're 8 bytes).
3132  */
3133  SpinLockAcquire(&walsnd->mutex);
3134  latch = walsnd->latch;
3135  SpinLockRelease(&walsnd->mutex);
3136 
3137  if (latch != NULL)
3138  SetLatch(latch);
3139  }
3140 }
3141 
3142 /*
3143  * Wait for readiness on the FeBe socket, or a timeout. The mask should be
3144  * composed of optional WL_SOCKET_WRITEABLE and WL_SOCKET_READABLE flags. Exit
3145  * on postmaster death.
3146  */
3147 static void
3148 WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
3149 {
3150  WaitEvent event;
3151 
3152  ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
3153  if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
3154  (event.events & WL_POSTMASTER_DEATH))
3155  proc_exit(1);
3156 }
3157 
3158 /*
3159  * Signal all walsenders to move to stopping state.
3160  *
3161  * This will trigger walsenders to move to a state where no further WAL can be
3162  * generated. See this file's header for details.
3163  */
3164 void
3166 {
3167  int i;
3168 
3169  for (i = 0; i < max_wal_senders; i++)
3170  {
3171  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3172  pid_t pid;
3173 
3174  SpinLockAcquire(&walsnd->mutex);
3175  pid = walsnd->pid;
3176  SpinLockRelease(&walsnd->mutex);
3177 
3178  if (pid == 0)
3179  continue;
3180 
3182  }
3183 }
3184 
3185 /*
3186  * Wait that all the WAL senders have quit or reached the stopping state. This
3187  * is used by the checkpointer to control when the shutdown checkpoint can
3188  * safely be performed.
3189  */
3190 void
3192 {
3193  for (;;)
3194  {
3195  int i;
3196  bool all_stopped = true;
3197 
3198  for (i = 0; i < max_wal_senders; i++)
3199  {
3200  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3201 
3202  SpinLockAcquire(&walsnd->mutex);
3203 
3204  if (walsnd->pid == 0)
3205  {
3206  SpinLockRelease(&walsnd->mutex);
3207  continue;
3208  }
3209 
3210  if (walsnd->state != WALSNDSTATE_STOPPING)
3211  {
3212  all_stopped = false;
3213  SpinLockRelease(&walsnd->mutex);
3214  break;
3215  }
3216  SpinLockRelease(&walsnd->mutex);
3217  }
3218 
3219  /* safe to leave if confirmation is done for all WAL senders */
3220  if (all_stopped)
3221  return;
3222 
3223  pg_usleep(10000L); /* wait for 10 msec */
3224  }
3225 }
3226 
3227 /* Set state for current walsender (only called in walsender) */
3228 void
3230 {
3231  WalSnd *walsnd = MyWalSnd;
3232 
3234 
3235  if (walsnd->state == state)
3236  return;
3237 
3238  SpinLockAcquire(&walsnd->mutex);
3239  walsnd->state = state;
3240  SpinLockRelease(&walsnd->mutex);
3241 }
3242 
3243 /*
3244  * Return a string constant representing the state. This is used
3245  * in system views, and should *not* be translated.
3246  */
3247 static const char *
3249 {
3250  switch (state)
3251  {
3252  case WALSNDSTATE_STARTUP:
3253  return "startup";
3254  case WALSNDSTATE_BACKUP:
3255  return "backup";
3256  case WALSNDSTATE_CATCHUP:
3257  return "catchup";
3258  case WALSNDSTATE_STREAMING:
3259  return "streaming";
3260  case WALSNDSTATE_STOPPING:
3261  return "stopping";
3262  }
3263  return "UNKNOWN";
3264 }
3265 
3266 static Interval *
3268 {
3269  Interval *result = palloc(sizeof(Interval));
3270 
3271  result->month = 0;
3272  result->day = 0;
3273  result->time = offset;
3274 
3275  return result;
3276 }
3277 
3278 /*
3279  * Returns activity of walsenders, including pids and xlog locations sent to
3280  * standby servers.
3281  */
3282 Datum
3284 {
3285 #define PG_STAT_GET_WAL_SENDERS_COLS 12
3286  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3287  TupleDesc tupdesc;
3288  Tuplestorestate *tupstore;
3289  MemoryContext per_query_ctx;
3290  MemoryContext oldcontext;
3291  SyncRepStandbyData *sync_standbys;
3292  int num_standbys;
3293  int i;
3294 
3295  /* check to see if caller supports us returning a tuplestore */
3296  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3297  ereport(ERROR,
3298  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3299  errmsg("set-valued function called in context that cannot accept a set")));
3300  if (!(rsinfo->allowedModes & SFRM_Materialize))
3301  ereport(ERROR,
3302  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3303  errmsg("materialize mode required, but it is not allowed in this context")));
3304 
3305  /* Build a tuple descriptor for our result type */
3306  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3307  elog(ERROR, "return type must be a row type");
3308 
3309  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3310  oldcontext = MemoryContextSwitchTo(per_query_ctx);
3311 
3312  tupstore = tuplestore_begin_heap(true, false, work_mem);
3313  rsinfo->returnMode = SFRM_Materialize;
3314  rsinfo->setResult = tupstore;
3315  rsinfo->setDesc = tupdesc;
3316 
3317  MemoryContextSwitchTo(oldcontext);
3318 
3319  /*
3320  * Get the currently active synchronous standbys. This could be out of
3321  * date before we're done, but we'll use the data anyway.
3322  */
3323  num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3324 
3325  for (i = 0; i < max_wal_senders; i++)
3326  {
3327  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3329  XLogRecPtr write;
3330  XLogRecPtr flush;
3331  XLogRecPtr apply;
3332  TimeOffset writeLag;
3333  TimeOffset flushLag;
3334  TimeOffset applyLag;
3335  int priority;
3336  int pid;
3338  TimestampTz replyTime;
3339  bool is_sync_standby;
3341  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3342  int j;
3343 
3344  /* Collect data from shared memory */
3345  SpinLockAcquire(&walsnd->mutex);
3346  if (walsnd->pid == 0)
3347  {
3348  SpinLockRelease(&walsnd->mutex);
3349  continue;
3350  }
3351  pid = walsnd->pid;
3352  sentPtr = walsnd->sentPtr;
3353  state = walsnd->state;
3354  write = walsnd->write;
3355  flush = walsnd->flush;
3356  apply = walsnd->apply;
3357  writeLag = walsnd->writeLag;
3358  flushLag = walsnd->flushLag;
3359  applyLag = walsnd->applyLag;
3360  priority = walsnd->sync_standby_priority;
3361  replyTime = walsnd->replyTime;
3362  SpinLockRelease(&walsnd->mutex);
3363 
3364  /*
3365  * Detect whether walsender is/was considered synchronous. We can
3366  * provide some protection against stale data by checking the PID
3367  * along with walsnd_index.
3368  */
3369  is_sync_standby = false;
3370  for (j = 0; j < num_standbys; j++)
3371  {
3372  if (sync_standbys[j].walsnd_index == i &&
3373  sync_standbys[j].pid == pid)
3374  {
3375  is_sync_standby = true;
3376  break;
3377  }
3378  }
3379 
3380  memset(nulls, 0, sizeof(nulls));
3381  values[0] = Int32GetDatum(pid);
3382 
3383  if (!is_member_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
3384  {
3385  /*
3386  * Only superusers and members of pg_read_all_stats can see
3387  * details. Other users only get the pid value to know it's a
3388  * walsender, but no details.
3389  */
3390  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3391  }
3392  else
3393  {
3394  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3395 
3396  if (XLogRecPtrIsInvalid(sentPtr))
3397  nulls[2] = true;
3398  values[2] = LSNGetDatum(sentPtr);
3399 
3400  if (XLogRecPtrIsInvalid(write))
3401  nulls[3] = true;
3402  values[3] = LSNGetDatum(write);
3403 
3404  if (XLogRecPtrIsInvalid(flush))
3405  nulls[4] = true;
3406  values[4] = LSNGetDatum(flush);
3407 
3408  if (XLogRecPtrIsInvalid(apply))
3409  nulls[5] = true;
3410  values[5] = LSNGetDatum(apply);
3411 
3412  /*
3413  * Treat a standby such as a pg_basebackup background process
3414  * which always returns an invalid flush location, as an
3415  * asynchronous standby.
3416  */
3417  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3418 
3419  if (writeLag < 0)
3420  nulls[6] = true;
3421  else
3422  values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3423 
3424  if (flushLag < 0)
3425  nulls[7] = true;
3426  else
3427  values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3428 
3429  if (applyLag < 0)
3430  nulls[8] = true;
3431  else
3432  values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3433 
3434  values[9] = Int32GetDatum(priority);
3435 
3436  /*
3437  * More easily understood version of standby state. This is purely
3438  * informational.
3439  *
3440  * In quorum-based sync replication, the role of each standby
3441  * listed in synchronous_standby_names can be changing very
3442  * frequently. Any standbys considered as "sync" at one moment can
3443  * be switched to "potential" ones at the next moment. So, it's
3444  * basically useless to report "sync" or "potential" as their sync
3445  * states. We report just "quorum" for them.
3446  */
3447  if (priority == 0)
3448  values[10] = CStringGetTextDatum("async");
3449  else if (is_sync_standby)
3451  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3452  else
3453  values[10] = CStringGetTextDatum("potential");
3454 
3455  if (replyTime == 0)
3456  nulls[11] = true;
3457  else
3458  values[11] = TimestampTzGetDatum(replyTime);
3459  }
3460 
3461  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3462  }
3463 
3464  /* clean up and return the tuplestore */
3465  tuplestore_donestoring(tupstore);
3466 
3467  return (Datum) 0;
3468 }
3469 
3470 /*
3471  * Send a keepalive message to standby.
3472  *
3473  * If requestReply is set, the message requests the other party to send
3474  * a message back to us, for heartbeat purposes. We also set a flag to
3475  * let nearby code that we're waiting for that response, to avoid
3476  * repeated requests.
3477  */
3478 static void
3479 WalSndKeepalive(bool requestReply)
3480 {
3481  elog(DEBUG2, "sending replication keepalive");
3482 
3483  /* construct the message... */
3484  resetStringInfo(&output_message);
3485  pq_sendbyte(&output_message, 'k');
3486  pq_sendint64(&output_message, sentPtr);
3487  pq_sendint64(&output_message, GetCurrentTimestamp());
3488  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3489 
3490  /* ... and send it wrapped in CopyData */
3491  pq_putmessage_noblock('d', output_message.data, output_message.len);
3492 
3493  /* Set local flag */
3494  if (requestReply)
3496 }
3497 
3498 /*
3499  * Send keepalive message if too much time has elapsed.
3500  */
3501 static void
3503 {
3504  TimestampTz ping_time;
3505 
3506  /*
3507  * Don't send keepalive messages if timeouts are globally disabled or
3508  * we're doing something not partaking in timeouts.
3509  */
3510  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3511  return;
3512 
3514  return;
3515 
3516  /*
3517  * If half of wal_sender_timeout has lapsed without receiving any reply
3518  * from the standby, send a keep-alive message to the standby requesting
3519  * an immediate reply.
3520  */
3522  wal_sender_timeout / 2);
3523  if (last_processing >= ping_time)
3524  {
3525  WalSndKeepalive(true);
3526 
3527  /* Try to flush pending output to the client */
3528  if (pq_flush_if_writable() != 0)
3529  WalSndShutdown();
3530  }
3531 }
3532 
3533 /*
3534  * Record the end of the WAL and the time it was flushed locally, so that
3535  * LagTrackerRead can compute the elapsed time (lag) when this WAL location is
3536  * eventually reported to have been written, flushed and applied by the
3537  * standby in a reply message.
3538  */
3539 static void
3541 {
3542  bool buffer_full;
3543  int new_write_head;
3544  int i;
3545 
3546  if (!am_walsender)
3547  return;
3548 
3549  /*
3550  * If the lsn hasn't advanced since last time, then do nothing. This way
3551  * we only record a new sample when new WAL has been written.
3552  */
3553  if (lag_tracker->last_lsn == lsn)
3554  return;
3555  lag_tracker->last_lsn = lsn;
3556 
3557  /*
3558  * If advancing the write head of the circular buffer would crash into any
3559  * of the read heads, then the buffer is full. In other words, the
3560  * slowest reader (presumably apply) is the one that controls the release
3561  * of space.
3562  */
3563  new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3564  buffer_full = false;
3565  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3566  {
3567  if (new_write_head == lag_tracker->read_heads[i])
3568  buffer_full = true;
3569  }
3570 
3571  /*
3572  * If the buffer is full, for now we just rewind by one slot and overwrite
3573  * the last sample, as a simple (if somewhat uneven) way to lower the
3574  * sampling rate. There may be better adaptive compaction algorithms.
3575  */
3576  if (buffer_full)
3577  {
3578  new_write_head = lag_tracker->write_head;
3579  if (lag_tracker->write_head > 0)
3580  lag_tracker->write_head--;
3581  else
3582  lag_tracker->write_head = LAG_TRACKER_BUFFER_SIZE - 1;
3583  }
3584 
3585  /* Store a sample at the current write head position. */
3586  lag_tracker->buffer[lag_tracker->write_head].lsn = lsn;
3587  lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
3588  lag_tracker->write_head = new_write_head;
3589 }
3590 
3591 /*
3592  * Find out how much time has elapsed between the moment WAL location 'lsn'
3593  * (or the highest known earlier LSN) was flushed locally and the time 'now'.
3594  * We have a separate read head for each of the reported LSN locations we
3595  * receive in replies from standby; 'head' controls which read head is
3596  * used. Whenever a read head crosses an LSN which was written into the
3597  * lag buffer with LagTrackerWrite, we can use the associated timestamp to
3598  * find out the time this LSN (or an earlier one) was flushed locally, and
3599  * therefore compute the lag.
3600  *
3601  * Return -1 if no new sample data is available, and otherwise the elapsed
3602  * time in microseconds.
3603  */
3604 static TimeOffset
3606 {
3607  TimestampTz time = 0;
3608 
3609  /* Read all unread samples up to this LSN or end of buffer. */
3610  while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
3611  lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
3612  {
3613  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3614  lag_tracker->last_read[head] =
3615  lag_tracker->buffer[lag_tracker->read_heads[head]];
3616  lag_tracker->read_heads[head] =
3617  (lag_tracker->read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
3618  }
3619 
3620  /*
3621  * If the lag tracker is empty, that means the standby has processed
3622  * everything we've ever sent so we should now clear 'last_read'. If we
3623  * didn't do that, we'd risk using a stale and irrelevant sample for
3624  * interpolation at the beginning of the next burst of WAL after a period
3625  * of idleness.
3626  */
3627  if (lag_tracker->read_heads[head] == lag_tracker->write_head)
3628  lag_tracker->last_read[head].time = 0;
3629 
3630  if (time > now)
3631  {
3632  /* If the clock somehow went backwards, treat as not found. */
3633  return -1;
3634  }
3635  else if (time == 0)
3636  {
3637  /*
3638  * We didn't cross a time. If there is a future sample that we
3639  * haven't reached yet, and we've already reached at least one sample,
3640  * let's interpolate the local flushed time. This is mainly useful
3641  * for reporting a completely stuck apply position as having
3642  * increasing lag, since otherwise we'd have to wait for it to
3643  * eventually start moving again and cross one of our samples before
3644  * we can show the lag increasing.
3645  */
3646  if (lag_tracker->read_heads[head] == lag_tracker->write_head)
3647  {
3648  /* There are no future samples, so we can't interpolate. */
3649  return -1;
3650  }
3651  else if (lag_tracker->last_read[head].time != 0)
3652  {
3653  /* We can interpolate between last_read and the next sample. */
3654  double fraction;
3655  WalTimeSample prev = lag_tracker->last_read[head];
3656  WalTimeSample next = lag_tracker->buffer[lag_tracker->read_heads[head]];
3657 
3658  if (lsn < prev.lsn)
3659  {
3660  /*
3661  * Reported LSNs shouldn't normally go backwards, but it's
3662  * possible when there is a timeline change. Treat as not
3663  * found.
3664  */
3665  return -1;
3666  }
3667 
3668  Assert(prev.lsn < next.lsn);
3669 
3670  if (prev.time > next.time)
3671  {
3672  /* If the clock somehow went backwards, treat as not found. */
3673  return -1;
3674  }
3675 
3676  /* See how far we are between the previous and next samples. */
3677  fraction =
3678  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3679 
3680  /* Scale the local flush time proportionally. */
3681  time = (TimestampTz)
3682  ((double) prev.time + (next.time - prev.time) * fraction);
3683  }
3684  else
3685  {
3686  /*
3687  * We have only a future sample, implying that we were entirely
3688  * caught up but and now there is a new burst of WAL and the
3689  * standby hasn't processed the first sample yet. Until the
3690  * standby reaches the future sample the best we can do is report
3691  * the hypothetical lag if that sample were to be replayed now.
3692  */
3693  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3694  }
3695  }
3696 
3697  /* Return the elapsed time since local flush time in microseconds. */
3698  Assert(time != 0);
3699  return now - time;
3700 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
static void InitWalSenderSlot(void)
Definition: walsender.c:2393
void InitializeTimeouts(void)
Definition: timeout.c:435
#define pq_is_send_pending()
Definition: libpq.h:48
#define NIL
Definition: pg_list.h:65
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
XLogRecPtr write
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:213
static void ProcessStandbyMessage(void)
Definition: walsender.c:1824
#define pg_attribute_noreturn()
Definition: c.h:179
#define FeBeWaitSetSocketPos
Definition: libpq.h:63
#define WL_SOCKET_WRITEABLE
Definition: latch.h:127
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define IsA(nodeptr, _type_)
Definition: nodes.h:590
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:218
#define AllocSetContextCreate
Definition: memutils.h:173
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition: walsender.c:3148
XLogRecPtr startpoint
Definition: replnodes.h:85
#define DEBUG1
Definition: elog.h:25
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:207
int MyProcPid
Definition: globals.c:43
int wal_sender_timeout
Definition: walsender.c:123
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:829
uint32 TimeLineID
Definition: xlogdefs.h:59
#define pq_flush()
Definition: libpq.h:46
static int32 next
Definition: blutils.c:219
static void pgstat_report_wait_end(void)
Definition: wait_event.h:278
void ProcessConfigFile(GucContext context)
int wal_segment_size
Definition: xlog.c:119
int write_head
Definition: walsender.c:214
uint32 TransactionId
Definition: c.h:587
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:45
bool wake_wal_senders
Definition: walsender.c:130
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
bool exec_replication_command(const char *cmd_string)
Definition: walsender.c:1509
Oid GetUserId(void)
Definition: miscinit.c:478
#define SIGUSR1
Definition: win32_port.h:171
bool update_process_title
Definition: ps_status.c:36
#define write(a, b, c)
Definition: win32.h:14
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3045
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:465
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1128
#define SIGCHLD
Definition: win32_port.h:169
PGPROC * MyProc
Definition: proc.c:68
int64 TimestampTz
Definition: timestamp.h:39
#define PQ_SMALL_MESSAGE_LIMIT
Definition: libpq.h:30
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:216
ResourceOwner CurrentResourceOwner
Definition: resowner.c:146
Size WalSndShmemSize(void)
Definition: walsender.c:3076
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:307
char * pstrdup(const char *in)
Definition: mcxt.c:1299
void CommitTransactionCommand(void)
Definition: xact.c:2939
#define XACT_REPEATABLE_READ
Definition: xact.h:38
#define SpinLockInit(lock)
Definition: spin.h:60
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:380
uint8 syncrep_method
Definition: syncrep.h:69
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:975
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:76
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:391
static void XLogSendPhysical(void)
Definition: walsender.c:2549
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2286
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
TimeOffset flushLag
void ResourceOwnerDelete(ResourceOwner owner)
Definition: resowner.c:737
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:539
static StringInfoData output_message
Definition: walsender.c:157
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
struct cursor * cur
Definition: ecpg.c:28
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:698
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4701
#define MemSet(start, val, len)
Definition: c.h:1008
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
Definition: latch.c:948
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2453
#define kill(pid, sig)
Definition: win32_port.h:454
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase)
Definition: slot.c:226
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:2930
#define WL_SOCKET_READABLE
Definition: latch.h:126
void ReplicationSlotSave(void)
Definition: slot.c:710
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8569
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
TimestampTz time
Definition: walsender.c:203
WaitEventSet * FeBeWaitSet
Definition: pqcomm.c:168
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
Definition: snapmgr.c:2226
#define SIGPIPE
Definition: win32_port.h:164
ReplicationSlotPersistentData data
Definition: slot.h:147
TimeOffset writeLag
#define SIGUSR2
Definition: win32_port.h:172
#define LOG
Definition: elog.h:26
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
bool RecoveryInProgress(void)
Definition: xlog.c:8217
void SetLatch(Latch *latch)
Definition: latch.c:567
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1711
void list_free_deep(List *list)
Definition: list.c:1405
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1271
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:9316
void ResetLatch(Latch *latch)
Definition: latch.c:660
slock_t mutex
Node * replication_parse_result
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2949
Latch procLatch
Definition: proc.h:130
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
Definition: xlogutils.c:707
XLogRecPtr confirmed_flush
Definition: slot.h:84
XLogRecPtr EndRecPtr
Definition: xlogreader.h:176
int32 day
Definition: timestamp.h:47
#define XidFromFullTransactionId(x)
Definition: transam.h:48
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:2041
static LagTracker * lag_tracker
Definition: walsender.c:219
TimeLineID timeline
Definition: replnodes.h:97
ReplicationKind kind
Definition: replnodes.h:56
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
void set_ps_display(const char *activity)
Definition: ps_status.c:349
bool IsTransactionBlock(void)
Definition: xact.c:4683
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
WALOpenSegment seg
Definition: xlogreader.h:215
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:622
bool am_walsender
Definition: walsender.c:115
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
void HandleWalSndInitStopping(void)
Definition: walsender.c:3023
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
void ReplicationSlotReserveWal(void)
Definition: slot.c:1069
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:261
XLogRecPtr flush
bool defGetBoolean(DefElem *def)
Definition: define.c:111
char data[BLCKSZ]
Definition: c.h:1141
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:589
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:817
ReplicationKind kind
Definition: replnodes.h:82
void WalSndRqstFileReload(void)
Definition: walsender.c:3000
void pfree(void *pointer)
Definition: mcxt.c:1169
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:271
#define PQ_LARGE_MESSAGE_LIMIT
Definition: libpq.h:31
void ConditionVariableCancelSleep(void)
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3479
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2344
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:657
#define ERROR
Definition: elog.h:46
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:106
static TimestampTz last_processing
Definition: walsender.c:162
void pq_startmsgread(void)
Definition: pqcomm.c:1152
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2467
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1855
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3972
bool FirstSnapshotSet
Definition: snapmgr.c:149
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:166
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
void WalSndSignals(void)
Definition: walsender.c:3057
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
#define FATAL
Definition: elog.h:49
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:932
Latch * latch
void(* WalSndSendDataCallback)(void)
Definition: walsender.c:225
uint32 events
Definition: latch.h:145
#define MAXPGPATH
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11742
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2266
void ReplicationSlotPersist(void)
Definition: slot.c:745
TransactionId effective_xmin
Definition: slot.h:143
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1034
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:197
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:195
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:478
TransactionId xmin
Definition: proc.h:138
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2113
#define DEBUG2
Definition: elog.h:24
bool XLogBackgroundFlush(void)
Definition: xlog.c:3049
Definition: latch.h:110
bool message_level_is_interesting(int elevel)
Definition: elog.c:270
Definition: dest.h:89
#define TLHistoryFileName(fname, tli)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
WalSndState state
NodeTag type
Definition: nodes.h:541
static char * buf
Definition: pg_test_fsync.c:68
void WalSndWaitStopping(void)
Definition: walsender.c:3191
static bool streamingDoneSending
Definition: walsender.c:179
uint64 XLogSegNo
Definition: xlogdefs.h:48
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
int errdetail(const char *fmt,...)
Definition: elog.c:1042
#define COMMERROR
Definition: elog.h:30
int errcode_for_file_access(void)
Definition: elog.c:721
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:571
#define SIGHUP
Definition: win32_port.h:159
TransactionId catalog_xmin
Definition: slot.h:70
#define pq_flush_if_writable()
Definition: libpq.h:47
TimeOffset time
Definition: timestamp.h:45
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3379
XLogRecPtr currTLIValidUntil
Definition: xlogreader.h:238
#define InvalidTransactionId
Definition: transam.h:31
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:243
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:283
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1265
unsigned int uint32
Definition: c.h:441
void ReplicationSlotRelease(void)
Definition: slot.c:469
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition: xlogreader.c:78
TransactionId xmin
Definition: slot.h:62
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
void InitWalSender(void)
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
#define SlotIsLogical(slot)
Definition: slot.h:169
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1697
static XLogReaderState * xlogreader
Definition: walsender.c:137
void SyncRepInitConfig(void)
Definition: syncrep.c:411
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:221
static bool WalSndCaughtUp
Definition: walsender.c:183
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
MemoryContext TopMemoryContext
Definition: mcxt.c:48
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3540
XLogRecPtr lsn
Definition: walsender.c:202
int replication_yyparse(void)
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:261
Definition: guc.h:72
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:262
XLogRecPtr last_lsn
Definition: walsender.c:212
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:47
int32 month
Definition: timestamp.h:48
#define XL_ROUTINE(...)
Definition: xlogreader.h:116
int max_wal_senders
Definition: walsender.c:121
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2267
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3502
int CloseTransientFile(int fd)
Definition: fd.c:2644
#define SIG_IGN
Definition: win32_port.h:156
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1238
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
const char * debug_query_string
Definition: postgres.c:89
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1214
static StringInfoData reply_message
Definition: walsender.c:158
#define MAXFNAMELEN
TimeLineID nextTLI
Definition: xlogreader.h:244
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
int64 TimeOffset
Definition: timestamp.h:40
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1375
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2240
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
#define InvalidBackendId
Definition: backendid.h:23
#define WL_POSTMASTER_DEATH
Definition: latch.h:129
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3605
void WalSndErrorCleanup(void)
Definition: walsender.c:296
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
uintptr_t Datum
Definition: postgres.h:411
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3248
XLogRecPtr sentPtr
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
bool am_db_walsender
Definition: walsender.c:118
TransactionId effective_catalog_xmin
Definition: slot.h:144
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1138
Oid MyDatabaseId
Definition: globals.c:88
static void WalSndShutdown(void)
Definition: walsender.c:229
static TimeLineID receiveTLI
Definition: xlog.c:200
int work_mem
Definition: globals.c:124
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:580
void WalSndResourceCleanup(bool isCommit)
Definition: walsender.c:331
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
static bool two_phase
WalSnd * MyWalSnd
Definition: walsender.c:112
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:906
static XLogRecPtr sentPtr
Definition: walsender.c:154
void pq_endmsgread(void)
Definition: pqcomm.c:1176
#define InvalidOid
Definition: postgres_ext.h:36
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2471
TimeLineID ThisTimeLineID
Definition: xlog.c:194
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define ereport(elevel,...)
Definition: elog.h:157
int allowedModes
Definition: execnodes.h:305
void WalSndInitStopping(void)
Definition: walsender.c:3165
TimeLineID currTLI
Definition: xlogreader.h:228
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4869
TimeOffset applyLag
struct SnapBuild * snapshot_builder
Definition: logical.h:43
int errmsg_internal(const char *fmt,...)
Definition: elog.c:996
SetFunctionReturnMode returnMode
Definition: execnodes.h:307
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2196
#define SIG_DFL
Definition: win32_port.h:154
#define SIGNAL_ARGS
Definition: c.h:1333
static TimeLineID sendTimeLine
Definition: walsender.c:145
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
#define lfirst(lc)
Definition: pg_list.h:169
void WalSndSetState(WalSndState state)
Definition: walsender.c:3229
int sync_standby_priority
Definition: regguts.h:317
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
Definition: resowner.c:486
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:633
static void XLogSendLogical(void)
Definition: walsender.c:2850
XLogRecPtr restart_lsn
Definition: slot.h:73
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:36
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:215
void StartTransactionCommand(void)
Definition: xact.c:2838
bool needreload
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1887
size_t Size
Definition: c.h:540
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
char * dbname
Definition: streamutil.c:51
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:207
int XactIsoLevel
Definition: xact.c:75
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:146
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:148
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1695
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:233
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
WalSndState
Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
Definition: walsender.c:3283
TimeLineID ws_tli
Definition: xlogreader.h:48
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase)
Definition: walsender.c:860
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:924
static bool streamingDoneReceiving
Definition: walsender.c:180
Tuplestorestate * setResult
Definition: execnodes.h:310
static StringInfoData tmpbuf
Definition: walsender.c:159
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1033
#define XLogFilePath(path, tli, logSegNo, wal_segsz_bytes)
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition: syncrep.c:725
bool IsSubTransaction(void)
Definition: xact.c:4756
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:530
static Datum values[MAXATTR]
Definition: bootstrap.c:166
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:303
bool log_replication_commands
Definition: walsender.c:125
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4958
#define Int32GetDatum(X)
Definition: postgres.h:523
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:563
char * application_name
Definition: guc.c:621
TupleDesc setDesc
Definition: execnodes.h:311
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
void * palloc(Size size)
Definition: mcxt.c:1062
int errmsg(const char *fmt,...)
Definition: elog.c:909
XLogReaderState * reader
Definition: logical.h:41
void pgstat_report_activity(BackendState state, const char *cmd_str)
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:51
CRSSnapshotAction
Definition: walsender.h:20
#define elog(elevel,...)
Definition: elog.h:232
StringInfo out
Definition: logical.h:70
static const unsigned __int64 epoch
Definition: gettimeofday.c:34
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
int i
void WalSndShmemInit(void)
Definition: walsender.c:3088
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
#define NameStr(name)
Definition: c.h:681
TimeLineID timeline
Definition: replnodes.h:84
#define CStringGetTextDatum(s)
Definition: builtins.h:82
void * arg
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:1992
struct Latch * MyLatch
Definition: globals.c:57
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
Definition: xlogreader.c:1062
void ReplicationSlotCleanup(void)
Definition: slot.c:525
WALSegmentContext segcxt
Definition: xlogreader.h:214
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
char * defname
Definition: parsenodes.h:746
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2970
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:806
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:153
TimestampTz replyTime
void LWLockReleaseAll(void)
Definition: lwlock.c:1902
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3267
slock_t mutex
Definition: slot.h:120
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:642
CommandDest whereToSendOutput
Definition: postgres.c:92
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
static bool waiting_for_ping_response
Definition: walsender.c:171
XLogRecPtr apply
static void IdentifySystem(void)
Definition: walsender.c:376
Definition: pg_list.h:50
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:103
#define snprintf
Definition: port.h:216
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:318
int16 AttrNumber
Definition: attnum.h:21
#define UINT64_FORMAT
Definition: c.h:484
void replication_scanner_finish(void)
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:767
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:440
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1711
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544
#define MAX_SEND_SIZE
Definition: walsender.c:106
#define die(msg)
Definition: pg_test_fsync.c:97
#define read(a, b, c)
Definition: win32.h:13
static bool sendTimeLineIsHistoric
Definition: walsender.c:147
#define offsetof(type, field)
Definition: c.h:727
void WalSndWakeup(void)
Definition: walsender.c:3120
void ReplicationSlotMarkDirty(void)
Definition: slot.c:728
bool am_cascading_walsender
Definition: walsender.c:116
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:2072
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1693
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
Definition: latch.c:1306
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:683
static XLogRecPtr startptr
Definition: basebackup.c:116
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1349
void replication_scanner_init(const char *query_string)
#define TLHistoryFilePath(path, tli)
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1774
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)