PostgreSQL Source Code  git master
walsender.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * walsender.c
4  *
5  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6  * care of sending XLOG from the primary server to a single recipient.
7  * (Note that there can be more than one walsender process concurrently.)
8  * It is started by the postmaster when the walreceiver of a standby server
9  * connects to the primary server and requests XLOG streaming replication.
10  *
11  * A walsender is similar to a regular backend, ie. there is a one-to-one
12  * relationship between a connection and a walsender process, but instead
13  * of processing SQL queries, it understands a small set of special
14  * replication-mode commands. The START_REPLICATION command begins streaming
15  * WAL to the client. While streaming, the walsender keeps reading XLOG
16  * records from the disk and sends them to the standby server over the
17  * COPY protocol, until either side ends the replication by exiting COPY
18  * mode (or until the connection is closed).
19  *
20  * Normal termination is by SIGTERM, which instructs the walsender to
21  * close the connection and exit(0) at the next convenient moment. Emergency
22  * termination is by SIGQUIT; like any backend, the walsender will simply
23  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24  * are treated as not a crash but approximately normal termination;
25  * the walsender will exit quickly without sending any more XLOG records.
26  *
27  * If the server is shut down, checkpointer sends us
28  * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If
29  * the backend is idle or runs an SQL query this causes the backend to
30  * shutdown, if logical replication is in progress all existing WAL records
31  * are processed followed by a shutdown. Otherwise this causes the walsender
32  * to switch to the "stopping" state. In this state, the walsender will reject
33  * any further replication commands. The checkpointer begins the shutdown
34  * checkpoint once all walsenders are confirmed as stopping. When the shutdown
35  * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
36  * walsender to send any outstanding WAL, including the shutdown checkpoint
37  * record, wait for it to be replicated to the standby, and then exit.
38  *
39  *
40  * Portions Copyright (c) 2010-2020, PostgreSQL Global Development Group
41  *
42  * IDENTIFICATION
43  * src/backend/replication/walsender.c
44  *
45  *-------------------------------------------------------------------------
46  */
47 #include "postgres.h"
48 
49 #include <signal.h>
50 #include <unistd.h>
51 
52 #include "access/printtup.h"
53 #include "access/timeline.h"
54 #include "access/transam.h"
55 #include "access/xact.h"
56 #include "access/xlog_internal.h"
57 #include "access/xlogreader.h"
58 #include "access/xlogutils.h"
59 #include "catalog/pg_authid.h"
60 #include "catalog/pg_type.h"
61 #include "commands/dbcommands.h"
62 #include "commands/defrem.h"
63 #include "funcapi.h"
64 #include "libpq/libpq.h"
65 #include "libpq/pqformat.h"
66 #include "miscadmin.h"
67 #include "nodes/replnodes.h"
68 #include "pgstat.h"
69 #include "postmaster/interrupt.h"
70 #include "replication/basebackup.h"
71 #include "replication/decode.h"
72 #include "replication/logical.h"
73 #include "replication/slot.h"
74 #include "replication/snapbuild.h"
75 #include "replication/syncrep.h"
77 #include "replication/walsender.h"
80 #include "storage/fd.h"
81 #include "storage/ipc.h"
82 #include "storage/pmsignal.h"
83 #include "storage/proc.h"
84 #include "storage/procarray.h"
85 #include "tcop/dest.h"
86 #include "tcop/tcopprot.h"
87 #include "utils/acl.h"
88 #include "utils/builtins.h"
89 #include "utils/guc.h"
90 #include "utils/memutils.h"
91 #include "utils/pg_lsn.h"
92 #include "utils/portal.h"
93 #include "utils/ps_status.h"
94 #include "utils/timeout.h"
95 #include "utils/timestamp.h"
96 
97 /*
98  * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
99  *
100  * We don't have a good idea of what a good value would be; there's some
101  * overhead per message in both walsender and walreceiver, but on the other
102  * hand sending large batches makes walsender less responsive to signals
103  * because signals are checked only between messages. 128kB (with
104  * default 8k blocks) seems like a reasonable guess for now.
105  */
106 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
107 
108 /* Array of WalSnds in shared memory */
110 
111 /* My slot in the shared memory array */
112 WalSnd *MyWalSnd = NULL;
113 
114 /* Global state */
115 bool am_walsender = false; /* Am I a walsender process? */
116 bool am_cascading_walsender = false; /* Am I cascading WAL to another
117  * standby? */
118 bool am_db_walsender = false; /* Connected to a database? */
119 
120 /* User-settable parameters for walsender */
121 int max_wal_senders = 0; /* the maximum number of concurrent
122  * walsenders */
123 int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
124  * data message */
126 
127 /*
128  * State for WalSndWakeupRequest
129  */
130 bool wake_wal_senders = false;
131 
132 /*
133  * xlogreader used for replication. Note that a WAL sender doing physical
134  * replication does not need xlogreader to read WAL, but it needs one to
135  * keep a state of its work.
136  */
138 
139 /*
140  * These variables keep track of the state of the timeline we're currently
141  * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
142  * the timeline is not the latest timeline on this server, and the server's
143  * history forked off from that timeline at sendTimeLineValidUpto.
144  */
147 static bool sendTimeLineIsHistoric = false;
149 
150 /*
151  * How far have we sent WAL already? This is also advertised in
152  * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
153  */
154 static XLogRecPtr sentPtr = 0;
155 
156 /* Buffers for constructing outgoing messages and processing reply messages. */
160 
161 /* Timestamp of last ProcessRepliesIfAny(). */
163 
164 /*
165  * Timestamp of last ProcessRepliesIfAny() that saw a reply from the
166  * standby. Set to 0 if wal_sender_timeout doesn't need to be active.
167  */
169 
170 /* Have we sent a heartbeat message asking for reply, since last reply? */
171 static bool waiting_for_ping_response = false;
172 
173 /*
174  * While streaming WAL in Copy mode, streamingDoneSending is set to true
175  * after we have sent CopyDone. We should not send any more CopyData messages
176  * after that. streamingDoneReceiving is set to true when we receive CopyDone
177  * from the other end. When both become true, it's time to exit Copy mode.
178  */
181 
182 /* Are we there yet? */
183 static bool WalSndCaughtUp = false;
184 
185 /* Flags set by signal handlers for later service in main loop */
186 static volatile sig_atomic_t got_SIGUSR2 = false;
187 static volatile sig_atomic_t got_STOPPING = false;
188 
189 /*
190  * This is set while we are streaming. When not set
191  * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
192  * the main loop is responsible for checking got_STOPPING and terminating when
193  * it's set (after streaming any remaining WAL).
194  */
195 static volatile sig_atomic_t replication_active = false;
196 
198 
199 /* A sample associating a WAL location with the time it was written. */
200 typedef struct
201 {
204 } WalTimeSample;
205 
206 /* The size of our buffer of time samples. */
207 #define LAG_TRACKER_BUFFER_SIZE 8192
208 
209 /* A mechanism for tracking replication lag. */
210 typedef struct
211 {
215  int read_heads[NUM_SYNC_REP_WAIT_MODE];
217 } LagTracker;
218 
220 
221 /* Signal handlers */
223 
224 /* Prototypes for private functions */
225 typedef void (*WalSndSendDataCallback) (void);
226 static void WalSndLoop(WalSndSendDataCallback send_data);
227 static void InitWalSenderSlot(void);
228 static void WalSndKill(int code, Datum arg);
230 static void XLogSendPhysical(void);
231 static void XLogSendLogical(void);
232 static void WalSndDone(WalSndSendDataCallback send_data);
233 static XLogRecPtr GetStandbyFlushRecPtr(void);
234 static void IdentifySystem(void);
237 static void StartReplication(StartReplicationCmd *cmd);
239 static void ProcessStandbyMessage(void);
240 static void ProcessStandbyReplyMessage(void);
241 static void ProcessStandbyHSFeedbackMessage(void);
242 static void ProcessRepliesIfAny(void);
243 static void WalSndKeepalive(bool requestReply);
244 static void WalSndKeepaliveIfNecessary(void);
245 static void WalSndCheckTimeOut(void);
247 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
248 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
251 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
252 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
254 
255 static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
256  TimeLineID *tli_p);
257 static void UpdateSpillStats(LogicalDecodingContext *ctx);
258 
259 
260 /* Initialize walsender process before entering the main command loop */
261 void
262 InitWalSender(void)
263 {
265 
266  /* Create a per-walsender data structure in shared memory */
268 
269  /*
270  * We don't currently need any ResourceOwner in a walsender process, but
271  * if we did, we could call CreateAuxProcessResourceOwner here.
272  */
273 
274  /*
275  * Let postmaster know that we're a WAL sender. Once we've declared us as
276  * a WAL sender process, postmaster will let us outlive the bgwriter and
277  * kill us last in the shutdown sequence, so we get a chance to stream all
278  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
279  * there's no going back, and we mustn't write any WAL records after this.
280  */
283 
284  /* Initialize empty timestamp buffer for lag tracking. */
285  lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
286 }
287 
288 /*
289  * Clean up after an error.
290  *
291  * WAL sender processes don't use transactions like regular backends do.
292  * This function does any cleanup required after an error in a WAL sender
293  * process, similar to what transaction abort does in a regular backend.
294  */
295 void
297 {
301 
302  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
303  wal_segment_close(xlogreader);
304 
305  if (MyReplicationSlot != NULL)
307 
309 
310  replication_active = false;
311 
312  /*
313  * If there is a transaction in progress, it will clean up our
314  * ResourceOwner, but if a replication command set up a resource owner
315  * without a transaction, we've got to clean that up now.
316  */
318  WalSndResourceCleanup(false);
319 
320  if (got_STOPPING || got_SIGUSR2)
321  proc_exit(0);
322 
323  /* Revert back to startup state */
325 }
326 
327 /*
328  * Clean up any ResourceOwner we created.
329  */
330 void
331 WalSndResourceCleanup(bool isCommit)
332 {
333  ResourceOwner resowner;
334 
335  if (CurrentResourceOwner == NULL)
336  return;
337 
338  /*
339  * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
340  * in a local variable and clear it first.
341  */
342  resowner = CurrentResourceOwner;
343  CurrentResourceOwner = NULL;
344 
345  /* Now we can release resources and delete it. */
346  ResourceOwnerRelease(resowner,
347  RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
348  ResourceOwnerRelease(resowner,
349  RESOURCE_RELEASE_LOCKS, isCommit, true);
350  ResourceOwnerRelease(resowner,
351  RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
352  ResourceOwnerDelete(resowner);
353 }
354 
355 /*
356  * Handle a client's connection abort in an orderly manner.
357  */
358 static void
359 WalSndShutdown(void)
360 {
361  /*
362  * Reset whereToSendOutput to prevent ereport from attempting to send any
363  * more messages to the standby.
364  */
367 
368  proc_exit(0);
369  abort(); /* keep the compiler quiet */
370 }
371 
372 /*
373  * Handle the IDENTIFY_SYSTEM command.
374  */
375 static void
377 {
378  char sysid[32];
379  char xloc[MAXFNAMELEN];
380  XLogRecPtr logptr;
381  char *dbname = NULL;
383  TupOutputState *tstate;
384  TupleDesc tupdesc;
385  Datum values[4];
386  bool nulls[4];
387 
388  /*
389  * Reply with a result set with one row, four columns. First col is system
390  * ID, second is timeline ID, third is current xlog location and the
391  * fourth contains the database name if we are connected to one.
392  */
393 
394  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
396 
399  {
400  /* this also updates ThisTimeLineID */
401  logptr = GetStandbyFlushRecPtr();
402  }
403  else
404  logptr = GetFlushRecPtr();
405 
406  snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
407 
408  if (MyDatabaseId != InvalidOid)
409  {
411 
412  /* syscache access needs a transaction env. */
414  /* make dbname live outside TX context */
418  /* CommitTransactionCommand switches to TopMemoryContext */
420  }
421 
423  MemSet(nulls, false, sizeof(nulls));
424 
425  /* need a tuple descriptor representing four columns */
426  tupdesc = CreateTemplateTupleDesc(4);
427  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
428  TEXTOID, -1, 0);
429  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
430  INT4OID, -1, 0);
431  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
432  TEXTOID, -1, 0);
433  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
434  TEXTOID, -1, 0);
435 
436  /* prepare for projection of tuples */
437  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
438 
439  /* column 1: system identifier */
440  values[0] = CStringGetTextDatum(sysid);
441 
442  /* column 2: timeline */
443  values[1] = Int32GetDatum(ThisTimeLineID);
444 
445  /* column 3: wal location */
446  values[2] = CStringGetTextDatum(xloc);
447 
448  /* column 4: database name, or NULL if none */
449  if (dbname)
450  values[3] = CStringGetTextDatum(dbname);
451  else
452  nulls[3] = true;
453 
454  /* send it to dest */
455  do_tup_output(tstate, values, nulls);
456 
457  end_tup_output(tstate);
458 }
459 
460 
461 /*
462  * Handle TIMELINE_HISTORY command.
463  */
464 static void
466 {
468  char histfname[MAXFNAMELEN];
469  char path[MAXPGPATH];
470  int fd;
471  off_t histfilelen;
472  off_t bytesleft;
473  Size len;
474 
475  /*
476  * Reply with a result set with one row, and two columns. The first col is
477  * the name of the history file, 2nd is the contents.
478  */
479 
480  TLHistoryFileName(histfname, cmd->timeline);
481  TLHistoryFilePath(path, cmd->timeline);
482 
483  /* Send a RowDescription message */
484  pq_beginmessage(&buf, 'T');
485  pq_sendint16(&buf, 2); /* 2 fields */
486 
487  /* first field */
488  pq_sendstring(&buf, "filename"); /* col name */
489  pq_sendint32(&buf, 0); /* table oid */
490  pq_sendint16(&buf, 0); /* attnum */
491  pq_sendint32(&buf, TEXTOID); /* type oid */
492  pq_sendint16(&buf, -1); /* typlen */
493  pq_sendint32(&buf, 0); /* typmod */
494  pq_sendint16(&buf, 0); /* format code */
495 
496  /* second field */
497  pq_sendstring(&buf, "content"); /* col name */
498  pq_sendint32(&buf, 0); /* table oid */
499  pq_sendint16(&buf, 0); /* attnum */
500  pq_sendint32(&buf, BYTEAOID); /* type oid */
501  pq_sendint16(&buf, -1); /* typlen */
502  pq_sendint32(&buf, 0); /* typmod */
503  pq_sendint16(&buf, 0); /* format code */
504  pq_endmessage(&buf);
505 
506  /* Send a DataRow message */
507  pq_beginmessage(&buf, 'D');
508  pq_sendint16(&buf, 2); /* # of columns */
509  len = strlen(histfname);
510  pq_sendint32(&buf, len); /* col1 len */
511  pq_sendbytes(&buf, histfname, len);
512 
513  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
514  if (fd < 0)
515  ereport(ERROR,
517  errmsg("could not open file \"%s\": %m", path)));
518 
519  /* Determine file length and send it to client */
520  histfilelen = lseek(fd, 0, SEEK_END);
521  if (histfilelen < 0)
522  ereport(ERROR,
524  errmsg("could not seek to end of file \"%s\": %m", path)));
525  if (lseek(fd, 0, SEEK_SET) != 0)
526  ereport(ERROR,
528  errmsg("could not seek to beginning of file \"%s\": %m", path)));
529 
530  pq_sendint32(&buf, histfilelen); /* col2 len */
531 
532  bytesleft = histfilelen;
533  while (bytesleft > 0)
534  {
535  PGAlignedBlock rbuf;
536  int nread;
537 
539  nread = read(fd, rbuf.data, sizeof(rbuf));
541  if (nread < 0)
542  ereport(ERROR,
544  errmsg("could not read file \"%s\": %m",
545  path)));
546  else if (nread == 0)
547  ereport(ERROR,
549  errmsg("could not read file \"%s\": read %d of %zu",
550  path, nread, (Size) bytesleft)));
551 
552  pq_sendbytes(&buf, rbuf.data, nread);
553  bytesleft -= nread;
554  }
555 
556  if (CloseTransientFile(fd) != 0)
557  ereport(ERROR,
559  errmsg("could not close file \"%s\": %m", path)));
560 
561  pq_endmessage(&buf);
562 }
563 
564 /*
565  * Handle START_REPLICATION command.
566  *
567  * At the moment, this never returns, but an ereport(ERROR) will take us back
568  * to the main loop.
569  */
570 static void
572 {
574  XLogRecPtr FlushPtr;
575 
576  if (ThisTimeLineID == 0)
577  ereport(ERROR,
578  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
579  errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
580 
581  /* create xlogreader for physical replication */
582  xlogreader =
584  XL_ROUTINE(.segment_open = WalSndSegmentOpen,
585  .segment_close = wal_segment_close),
586  NULL);
587 
588  if (!xlogreader)
589  ereport(ERROR,
590  (errcode(ERRCODE_OUT_OF_MEMORY),
591  errmsg("out of memory")));
592 
593  /*
594  * We assume here that we're logging enough information in the WAL for
595  * log-shipping, since this is checked in PostmasterMain().
596  *
597  * NOTE: wal_level can only change at shutdown, so in most cases it is
598  * difficult for there to be WAL data that we can still see that was
599  * written at wal_level='minimal'.
600  */
601 
602  if (cmd->slotname)
603  {
606  ereport(ERROR,
607  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
608  errmsg("cannot use a logical replication slot for physical replication")));
609 
610  /*
611  * We don't need to verify the slot's restart_lsn here; instead we
612  * rely on the caller requesting the starting point to use. If the
613  * WAL segment doesn't exist, we'll fail later.
614  */
615  }
616 
617  /*
618  * Select the timeline. If it was given explicitly by the client, use
619  * that. Otherwise use the timeline of the last replayed record, which is
620  * kept in ThisTimeLineID.
621  */
623  {
624  /* this also updates ThisTimeLineID */
625  FlushPtr = GetStandbyFlushRecPtr();
626  }
627  else
628  FlushPtr = GetFlushRecPtr();
629 
630  if (cmd->timeline != 0)
631  {
632  XLogRecPtr switchpoint;
633 
634  sendTimeLine = cmd->timeline;
636  {
637  sendTimeLineIsHistoric = false;
639  }
640  else
641  {
642  List *timeLineHistory;
643 
644  sendTimeLineIsHistoric = true;
645 
646  /*
647  * Check that the timeline the client requested exists, and the
648  * requested start location is on that timeline.
649  */
650  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
651  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
653  list_free_deep(timeLineHistory);
654 
655  /*
656  * Found the requested timeline in the history. Check that
657  * requested startpoint is on that timeline in our history.
658  *
659  * This is quite loose on purpose. We only check that we didn't
660  * fork off the requested timeline before the switchpoint. We
661  * don't check that we switched *to* it before the requested
662  * starting point. This is because the client can legitimately
663  * request to start replication from the beginning of the WAL
664  * segment that contains switchpoint, but on the new timeline, so
665  * that it doesn't end up with a partial segment. If you ask for
666  * too old a starting point, you'll get an error later when we
667  * fail to find the requested WAL segment in pg_wal.
668  *
669  * XXX: we could be more strict here and only allow a startpoint
670  * that's older than the switchpoint, if it's still in the same
671  * WAL segment.
672  */
673  if (!XLogRecPtrIsInvalid(switchpoint) &&
674  switchpoint < cmd->startpoint)
675  {
676  ereport(ERROR,
677  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
678  (uint32) (cmd->startpoint >> 32),
679  (uint32) (cmd->startpoint),
680  cmd->timeline),
681  errdetail("This server's history forked from timeline %u at %X/%X.",
682  cmd->timeline,
683  (uint32) (switchpoint >> 32),
684  (uint32) (switchpoint))));
685  }
686  sendTimeLineValidUpto = switchpoint;
687  }
688  }
689  else
690  {
693  sendTimeLineIsHistoric = false;
694  }
695 
697 
698  /* If there is nothing to stream, don't even enter COPY mode */
700  {
701  /*
702  * When we first start replication the standby will be behind the
703  * primary. For some applications, for example synchronous
704  * replication, it is important to have a clear state for this initial
705  * catchup mode, so we can trigger actions when we change streaming
706  * state later. We may stay in this state for a long time, which is
707  * exactly why we want to be able to monitor whether or not we are
708  * still here.
709  */
711 
712  /* Send a CopyBothResponse message, and start streaming */
713  pq_beginmessage(&buf, 'W');
714  pq_sendbyte(&buf, 0);
715  pq_sendint16(&buf, 0);
716  pq_endmessage(&buf);
717  pq_flush();
718 
719  /*
720  * Don't allow a request to stream from a future point in WAL that
721  * hasn't been flushed to disk in this server yet.
722  */
723  if (FlushPtr < cmd->startpoint)
724  {
725  ereport(ERROR,
726  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
727  (uint32) (cmd->startpoint >> 32),
728  (uint32) (cmd->startpoint),
729  (uint32) (FlushPtr >> 32),
730  (uint32) (FlushPtr))));
731  }
732 
733  /* Start streaming from the requested point */
734  sentPtr = cmd->startpoint;
735 
736  /* Initialize shared memory status, too */
737  SpinLockAcquire(&MyWalSnd->mutex);
738  MyWalSnd->sentPtr = sentPtr;
739  SpinLockRelease(&MyWalSnd->mutex);
740 
742 
743  /* Main loop of walsender */
744  replication_active = true;
745 
747 
748  replication_active = false;
749  if (got_STOPPING)
750  proc_exit(0);
752 
754  }
755 
756  if (cmd->slotname)
758 
759  /*
760  * Copy is finished now. Send a single-row result set indicating the next
761  * timeline.
762  */
764  {
765  char startpos_str[8 + 1 + 8 + 1];
767  TupOutputState *tstate;
768  TupleDesc tupdesc;
769  Datum values[2];
770  bool nulls[2];
771 
772  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
773  (uint32) (sendTimeLineValidUpto >> 32),
775 
777  MemSet(nulls, false, sizeof(nulls));
778 
779  /*
780  * Need a tuple descriptor representing two columns. int8 may seem
781  * like a surprising data type for this, but in theory int4 would not
782  * be wide enough for this, as TimeLineID is unsigned.
783  */
784  tupdesc = CreateTemplateTupleDesc(2);
785  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
786  INT8OID, -1, 0);
787  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
788  TEXTOID, -1, 0);
789 
790  /* prepare for projection of tuple */
791  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
792 
793  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
794  values[1] = CStringGetTextDatum(startpos_str);
795 
796  /* send it to dest */
797  do_tup_output(tstate, values, nulls);
798 
799  end_tup_output(tstate);
800  }
801 
802  /* Send CommandComplete message */
803  pq_puttextmessage('C', "START_STREAMING");
804 }
805 
806 /*
807  * XLogReaderRoutine->page_read callback for logical decoding contexts, as a
808  * walsender process.
809  *
810  * Inside the walsender we can do better than read_local_xlog_page,
811  * which has to do a plain sleep/busy loop, because the walsender's latch gets
812  * set every time WAL is flushed.
813  */
814 static int
816  XLogRecPtr targetRecPtr, char *cur_page)
817 {
818  XLogRecPtr flushptr;
819  int count;
820  WALReadError errinfo;
821  XLogSegNo segno;
822 
823  XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
825  sendTimeLine = state->currTLI;
827  sendTimeLineNextTLI = state->nextTLI;
828 
829  /* make sure we have enough WAL available */
830  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
831 
832  /* fail if not (implies we are going to shut down) */
833  if (flushptr < targetPagePtr + reqLen)
834  return -1;
835 
836  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
837  count = XLOG_BLCKSZ; /* more than one block available */
838  else
839  count = flushptr - targetPagePtr; /* part of the page available */
840 
841  /* now actually read the data, we know it's there */
842  if (!WALRead(state,
843  cur_page,
844  targetPagePtr,
845  XLOG_BLCKSZ,
846  state->seg.ws_tli, /* Pass the current TLI because only
847  * WalSndSegmentOpen controls whether new
848  * TLI is needed. */
849  &errinfo))
850  WALReadRaiseError(&errinfo);
851 
852  /*
853  * After reading into the buffer, check that what we read was valid. We do
854  * this after reading, because even though the segment was present when we
855  * opened it, it might get recycled or removed while we read it. The
856  * read() succeeds in that case, but the data we tried to read might
857  * already have been overwritten with new WAL records.
858  */
859  XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
860  CheckXLogRemoved(segno, state->seg.ws_tli);
861 
862  return count;
863 }
864 
865 /*
866  * Process extra options given to CREATE_REPLICATION_SLOT.
867  */
868 static void
870  bool *reserve_wal,
871  CRSSnapshotAction *snapshot_action)
872 {
873  ListCell *lc;
874  bool snapshot_action_given = false;
875  bool reserve_wal_given = false;
876 
877  /* Parse options */
878  foreach(lc, cmd->options)
879  {
880  DefElem *defel = (DefElem *) lfirst(lc);
881 
882  if (strcmp(defel->defname, "export_snapshot") == 0)
883  {
884  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
885  ereport(ERROR,
886  (errcode(ERRCODE_SYNTAX_ERROR),
887  errmsg("conflicting or redundant options")));
888 
889  snapshot_action_given = true;
890  *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
892  }
893  else if (strcmp(defel->defname, "use_snapshot") == 0)
894  {
895  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
896  ereport(ERROR,
897  (errcode(ERRCODE_SYNTAX_ERROR),
898  errmsg("conflicting or redundant options")));
899 
900  snapshot_action_given = true;
901  *snapshot_action = CRS_USE_SNAPSHOT;
902  }
903  else if (strcmp(defel->defname, "reserve_wal") == 0)
904  {
905  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
906  ereport(ERROR,
907  (errcode(ERRCODE_SYNTAX_ERROR),
908  errmsg("conflicting or redundant options")));
909 
910  reserve_wal_given = true;
911  *reserve_wal = true;
912  }
913  else
914  elog(ERROR, "unrecognized option: %s", defel->defname);
915  }
916 }
917 
918 /*
919  * Create a new replication slot.
920  */
921 static void
923 {
924  const char *snapshot_name = NULL;
925  char xloc[MAXFNAMELEN];
926  char *slot_name;
927  bool reserve_wal = false;
928  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
930  TupOutputState *tstate;
931  TupleDesc tupdesc;
932  Datum values[4];
933  bool nulls[4];
934 
936 
937  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
938 
939  /* setup state for WalSndSegmentOpen */
940  sendTimeLineIsHistoric = false;
942 
943  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
944  {
945  ReplicationSlotCreate(cmd->slotname, false,
947  }
948  else
949  {
951 
952  /*
953  * Initially create persistent slot as ephemeral - that allows us to
954  * nicely handle errors during initialization because it'll get
955  * dropped if this transaction fails. We'll make it persistent at the
956  * end. Temporary slots can be created as temporary from beginning as
957  * they get dropped on error as well.
958  */
959  ReplicationSlotCreate(cmd->slotname, true,
961  }
962 
963  if (cmd->kind == REPLICATION_KIND_LOGICAL)
964  {
966  bool need_full_snapshot = false;
967 
968  /*
969  * Do options check early so that we can bail before calling the
970  * DecodingContextFindStartpoint which can take long time.
971  */
972  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
973  {
974  if (IsTransactionBlock())
975  ereport(ERROR,
976  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
977  (errmsg("%s must not be called inside a transaction",
978  "CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT")));
979 
980  need_full_snapshot = true;
981  }
982  else if (snapshot_action == CRS_USE_SNAPSHOT)
983  {
984  if (!IsTransactionBlock())
985  ereport(ERROR,
986  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
987  (errmsg("%s must be called inside a transaction",
988  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
989 
991  ereport(ERROR,
992  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
993  (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
994  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
995 
996  if (FirstSnapshotSet)
997  ereport(ERROR,
998  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
999  (errmsg("%s must be called before any query",
1000  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
1001 
1002  if (IsSubTransaction())
1003  ereport(ERROR,
1004  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1005  (errmsg("%s must not be called in a subtransaction",
1006  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
1007 
1008  need_full_snapshot = true;
1009  }
1010 
1011  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
1013  XL_ROUTINE(.page_read = logical_read_xlog_page,
1014  .segment_open = WalSndSegmentOpen,
1015  .segment_close = wal_segment_close),
1018 
1019  /*
1020  * Signal that we don't need the timeout mechanism. We're just
1021  * creating the replication slot and don't yet accept feedback
1022  * messages or send keepalives. As we possibly need to wait for
1023  * further WAL the walsender would otherwise possibly be killed too
1024  * soon.
1025  */
1027 
1028  /* build initial snapshot, might take a while */
1030 
1031  /*
1032  * Export or use the snapshot if we've been asked to do so.
1033  *
1034  * NB. We will convert the snapbuild.c kind of snapshot to normal
1035  * snapshot when doing this.
1036  */
1037  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1038  {
1039  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
1040  }
1041  else if (snapshot_action == CRS_USE_SNAPSHOT)
1042  {
1043  Snapshot snap;
1044 
1047  }
1048 
1049  /* don't need the decoding context anymore */
1050  FreeDecodingContext(ctx);
1051 
1052  if (!cmd->temporary)
1054  }
1055  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
1056  {
1058 
1060 
1061  /* Write this slot to disk if it's a permanent one. */
1062  if (!cmd->temporary)
1064  }
1065 
1066  snprintf(xloc, sizeof(xloc), "%X/%X",
1069 
1071  MemSet(nulls, false, sizeof(nulls));
1072 
1073  /*----------
1074  * Need a tuple descriptor representing four columns:
1075  * - first field: the slot name
1076  * - second field: LSN at which we became consistent
1077  * - third field: exported snapshot's name
1078  * - fourth field: output plugin
1079  *----------
1080  */
1081  tupdesc = CreateTemplateTupleDesc(4);
1082  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1083  TEXTOID, -1, 0);
1084  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1085  TEXTOID, -1, 0);
1086  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1087  TEXTOID, -1, 0);
1088  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1089  TEXTOID, -1, 0);
1090 
1091  /* prepare for projection of tuples */
1092  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1093 
1094  /* slot_name */
1095  slot_name = NameStr(MyReplicationSlot->data.name);
1096  values[0] = CStringGetTextDatum(slot_name);
1097 
1098  /* consistent wal location */
1099  values[1] = CStringGetTextDatum(xloc);
1100 
1101  /* snapshot name, or NULL if none */
1102  if (snapshot_name != NULL)
1103  values[2] = CStringGetTextDatum(snapshot_name);
1104  else
1105  nulls[2] = true;
1106 
1107  /* plugin, or NULL if none */
1108  if (cmd->plugin != NULL)
1109  values[3] = CStringGetTextDatum(cmd->plugin);
1110  else
1111  nulls[3] = true;
1112 
1113  /* send it to dest */
1114  do_tup_output(tstate, values, nulls);
1115  end_tup_output(tstate);
1116 
1118 }
1119 
1120 /*
1121  * Get rid of a replication slot that is no longer wanted.
1122  */
1123 static void
1125 {
1126  QueryCompletion qc;
1127 
1128  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1129  SetQueryCompletion(&qc, CMDTAG_DROP_REPLICATION_SLOT, 0);
1130  EndCommand(&qc, DestRemote, false);
1131 }
1132 
1133 /*
1134  * Load previously initiated logical slot and prepare for sending data (via
1135  * WalSndLoop).
1136  */
1137 static void
1139 {
1141  QueryCompletion qc;
1142 
1143  /* make sure that our requirements are still fulfilled */
1145 
1147 
1149 
1151  ereport(ERROR,
1152  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1153  errmsg("cannot read from logical replication slot \"%s\"",
1154  cmd->slotname),
1155  errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
1156 
1157  /*
1158  * Force a disconnect, so that the decoding code doesn't need to care
1159  * about an eventual switch from running in recovery, to running in a
1160  * normal environment. Client code is expected to handle reconnects.
1161  */
1163  {
1164  ereport(LOG,
1165  (errmsg("terminating walsender process after promotion")));
1166  got_STOPPING = true;
1167  }
1168 
1169  /*
1170  * Create our decoding context, making it start at the previously ack'ed
1171  * position.
1172  *
1173  * Do this before sending a CopyBothResponse message, so that any errors
1174  * are reported early.
1175  */
1176  logical_decoding_ctx =
1177  CreateDecodingContext(cmd->startpoint, cmd->options, false,
1178  XL_ROUTINE(.page_read = logical_read_xlog_page,
1179  .segment_open = WalSndSegmentOpen,
1180  .segment_close = wal_segment_close),
1183  xlogreader = logical_decoding_ctx->reader;
1184 
1186 
1187  /* Send a CopyBothResponse message, and start streaming */
1188  pq_beginmessage(&buf, 'W');
1189  pq_sendbyte(&buf, 0);
1190  pq_sendint16(&buf, 0);
1191  pq_endmessage(&buf);
1192  pq_flush();
1193 
1194  /* Start reading WAL from the oldest required WAL. */
1195  XLogBeginRead(logical_decoding_ctx->reader,
1197 
1198  /*
1199  * Report the location after which we'll send out further commits as the
1200  * current sentPtr.
1201  */
1203 
1204  /* Also update the sent position status in shared memory */
1205  SpinLockAcquire(&MyWalSnd->mutex);
1207  SpinLockRelease(&MyWalSnd->mutex);
1208 
1209  replication_active = true;
1210 
1212 
1213  /* Main loop of walsender */
1215 
1216  FreeDecodingContext(logical_decoding_ctx);
1218 
1219  replication_active = false;
1220  if (got_STOPPING)
1221  proc_exit(0);
1223 
1224  /* Get out of COPY mode (CommandComplete). */
1225  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
1226  EndCommand(&qc, DestRemote, false);
1227 }
1228 
1229 /*
1230  * LogicalDecodingContext 'prepare_write' callback.
1231  *
1232  * Prepare a write into a StringInfo.
1233  *
1234  * Don't do anything lasting in here, it's quite possible that nothing will be done
1235  * with the data.
1236  */
1237 static void
1239 {
1240  /* can't have sync rep confused by sending the same LSN several times */
1241  if (!last_write)
1242  lsn = InvalidXLogRecPtr;
1243 
1244  resetStringInfo(ctx->out);
1245 
1246  pq_sendbyte(ctx->out, 'w');
1247  pq_sendint64(ctx->out, lsn); /* dataStart */
1248  pq_sendint64(ctx->out, lsn); /* walEnd */
1249 
1250  /*
1251  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1252  * reserve space here.
1253  */
1254  pq_sendint64(ctx->out, 0); /* sendtime */
1255 }
1256 
1257 /*
1258  * LogicalDecodingContext 'write' callback.
1259  *
1260  * Actually write out data previously prepared by WalSndPrepareWrite out to
1261  * the network. Take as long as needed, but process replies from the other
1262  * side and check timeouts during that.
1263  */
1264 static void
1266  bool last_write)
1267 {
1268  TimestampTz now;
1269 
1270  /*
1271  * Fill the send timestamp last, so that it is taken as late as possible.
1272  * This is somewhat ugly, but the protocol is set as it's already used for
1273  * several releases by streaming physical replication.
1274  */
1275  resetStringInfo(&tmpbuf);
1276  now = GetCurrentTimestamp();
1277  pq_sendint64(&tmpbuf, now);
1278  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1279  tmpbuf.data, sizeof(int64));
1280 
1281  /* output previously gathered data in a CopyData packet */
1282  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1283 
1285 
1286  /* Try to flush pending output to the client */
1287  if (pq_flush_if_writable() != 0)
1288  WalSndShutdown();
1289 
1290  /* Try taking fast path unless we get too close to walsender timeout. */
1292  wal_sender_timeout / 2) &&
1293  !pq_is_send_pending())
1294  {
1295  return;
1296  }
1297 
1298  /* If we have pending write here, go to slow path */
1299  for (;;)
1300  {
1301  int wakeEvents;
1302  long sleeptime;
1303 
1304  /* Check for input from the client */
1306 
1307  /* die if timeout was reached */
1309 
1310  /* Send keepalive if the time has come */
1312 
1313  if (!pq_is_send_pending())
1314  break;
1315 
1317 
1318  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1320 
1321  /* Sleep until something happens or we time out */
1322  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
1323  MyProcPort->sock, sleeptime,
1325 
1326  /* Clear any already-pending wakeups */
1328 
1330 
1331  /* Process any requests or signals received recently */
1332  if (ConfigReloadPending)
1333  {
1334  ConfigReloadPending = false;
1337  }
1338 
1339  /* Try to flush pending output to the client */
1340  if (pq_flush_if_writable() != 0)
1341  WalSndShutdown();
1342  }
1343 
1344  /* reactivate latch so WalSndLoop knows to continue */
1345  SetLatch(MyLatch);
1346 }
1347 
1348 /*
1349  * LogicalDecodingContext 'update_progress' callback.
1350  *
1351  * Write the current position to the lag tracker (see XLogSendPhysical),
1352  * and update the spill statistics.
1353  */
1354 static void
1356 {
1357  static TimestampTz sendTime = 0;
1359 
1360  /*
1361  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1362  * avoid flooding the lag tracker when we commit frequently.
1363  */
1364 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1365  if (!TimestampDifferenceExceeds(sendTime, now,
1367  return;
1368 
1369  LagTrackerWrite(lsn, now);
1370  sendTime = now;
1371 
1372  /*
1373  * Update statistics about transactions that spilled to disk.
1374  */
1375  UpdateSpillStats(ctx);
1376 }
1377 
1378 /*
1379  * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
1380  *
1381  * Returns end LSN of flushed WAL. Normally this will be >= loc, but
1382  * if we detect a shutdown request (either from postmaster or client)
1383  * we will return early, so caller must always check.
1384  */
1385 static XLogRecPtr
1387 {
1388  int wakeEvents;
1389  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1390 
1391  /*
1392  * Fast path to avoid acquiring the spinlock in case we already know we
1393  * have enough WAL available. This is particularly interesting if we're
1394  * far behind.
1395  */
1396  if (RecentFlushPtr != InvalidXLogRecPtr &&
1397  loc <= RecentFlushPtr)
1398  return RecentFlushPtr;
1399 
1400  /* Get a more recent flush pointer. */
1401  if (!RecoveryInProgress())
1402  RecentFlushPtr = GetFlushRecPtr();
1403  else
1404  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1405 
1406  for (;;)
1407  {
1408  long sleeptime;
1409 
1410  /* Clear any already-pending wakeups */
1412 
1414 
1415  /* Process any requests or signals received recently */
1416  if (ConfigReloadPending)
1417  {
1418  ConfigReloadPending = false;
1421  }
1422 
1423  /* Check for input from the client */
1425 
1426  /*
1427  * If we're shutting down, trigger pending WAL to be written out,
1428  * otherwise we'd possibly end up waiting for WAL that never gets
1429  * written, because walwriter has shut down already.
1430  */
1431  if (got_STOPPING)
1433 
1434  /* Update our idea of the currently flushed position. */
1435  if (!RecoveryInProgress())
1436  RecentFlushPtr = GetFlushRecPtr();
1437  else
1438  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1439 
1440  /*
1441  * If postmaster asked us to stop, don't wait anymore.
1442  *
1443  * It's important to do this check after the recomputation of
1444  * RecentFlushPtr, so we can send all remaining data before shutting
1445  * down.
1446  */
1447  if (got_STOPPING)
1448  break;
1449 
1450  /*
1451  * We only send regular messages to the client for full decoded
1452  * transactions, but a synchronous replication and walsender shutdown
1453  * possibly are waiting for a later location. So, before sleeping, we
1454  * send a ping containing the flush location. If the receiver is
1455  * otherwise idle, this keepalive will trigger a reply. Processing the
1456  * reply will update these MyWalSnd locations.
1457  */
1458  if (MyWalSnd->flush < sentPtr &&
1459  MyWalSnd->write < sentPtr &&
1461  {
1462  WalSndKeepalive(false);
1464  }
1465 
1466  /* check whether we're done */
1467  if (loc <= RecentFlushPtr)
1468  break;
1469 
1470  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1471  WalSndCaughtUp = true;
1472 
1473  /*
1474  * Try to flush any pending output to the client.
1475  */
1476  if (pq_flush_if_writable() != 0)
1477  WalSndShutdown();
1478 
1479  /*
1480  * If we have received CopyDone from the client, sent CopyDone
1481  * ourselves, and the output buffer is empty, it's time to exit
1482  * streaming, so fail the current WAL fetch request.
1483  */
1485  !pq_is_send_pending())
1486  break;
1487 
1488  /* die if timeout was reached */
1490 
1491  /* Send keepalive if the time has come */
1493 
1494  /*
1495  * Sleep until something happens or we time out. Also wait for the
1496  * socket becoming writable, if there's still pending output.
1497  * Otherwise we might sit on sendable output data while waiting for
1498  * new WAL to be generated. (But if we have nothing to send, we don't
1499  * want to wake on socket-writable.)
1500  */
1502 
1503  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1505 
1506  if (pq_is_send_pending())
1507  wakeEvents |= WL_SOCKET_WRITEABLE;
1508 
1509  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
1510  MyProcPort->sock, sleeptime,
1512  }
1513 
1514  /* reactivate latch so WalSndLoop knows to continue */
1515  SetLatch(MyLatch);
1516  return RecentFlushPtr;
1517 }
1518 
1519 /*
1520  * Execute an incoming replication command.
1521  *
1522  * Returns true if the cmd_string was recognized as WalSender command, false
1523  * if not.
1524  */
1525 bool
1526 exec_replication_command(const char *cmd_string)
1527 {
1528  int parse_rc;
1529  Node *cmd_node;
1530  MemoryContext cmd_context;
1531  MemoryContext old_context;
1532  QueryCompletion qc;
1533 
1534  /*
1535  * If WAL sender has been told that shutdown is getting close, switch its
1536  * status accordingly to handle the next replication commands correctly.
1537  */
1538  if (got_STOPPING)
1540 
1541  /*
1542  * Throw error if in stopping mode. We need prevent commands that could
1543  * generate WAL while the shutdown checkpoint is being written. To be
1544  * safe, we just prohibit all new commands.
1545  */
1546  if (MyWalSnd->state == WALSNDSTATE_STOPPING)
1547  ereport(ERROR,
1548  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1549 
1550  /*
1551  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1552  * command arrives. Clean up the old stuff if there's anything.
1553  */
1555 
1557 
1559  "Replication command context",
1561  old_context = MemoryContextSwitchTo(cmd_context);
1562 
1563  replication_scanner_init(cmd_string);
1564  parse_rc = replication_yyparse();
1565  if (parse_rc != 0)
1566  ereport(ERROR,
1567  (errcode(ERRCODE_SYNTAX_ERROR),
1568  errmsg_internal("replication command parser returned %d",
1569  parse_rc)));
1570 
1571  cmd_node = replication_parse_result;
1572 
1573  /*
1574  * Log replication command if log_replication_commands is enabled. Even
1575  * when it's disabled, log the command with DEBUG1 level for backward
1576  * compatibility. Note that SQL commands are not logged here, and will be
1577  * logged later if log_statement is enabled.
1578  */
1579  if (cmd_node->type != T_SQLCmd)
1581  (errmsg("received replication command: %s", cmd_string)));
1582 
1583  /*
1584  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
1585  * called outside of transaction the snapshot should be cleared here.
1586  */
1587  if (!IsTransactionBlock())
1589 
1590  /*
1591  * For aborted transactions, don't allow anything except pure SQL, the
1592  * exec_simple_query() will handle it correctly.
1593  */
1594  if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
1595  ereport(ERROR,
1596  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1597  errmsg("current transaction is aborted, "
1598  "commands ignored until end of transaction block")));
1599 
1601 
1602  /*
1603  * Allocate buffers that will be used for each outgoing and incoming
1604  * message. We do this just once per command to reduce palloc overhead.
1605  */
1606  initStringInfo(&output_message);
1607  initStringInfo(&reply_message);
1608  initStringInfo(&tmpbuf);
1609 
1610  /* Report to pgstat that this process is running */
1612 
1613  switch (cmd_node->type)
1614  {
1615  case T_IdentifySystemCmd:
1616  IdentifySystem();
1617  break;
1618 
1619  case T_BaseBackupCmd:
1620  PreventInTransactionBlock(true, "BASE_BACKUP");
1621  SendBaseBackup((BaseBackupCmd *) cmd_node);
1622  break;
1623 
1626  break;
1627 
1630  break;
1631 
1632  case T_StartReplicationCmd:
1633  {
1634  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1635 
1636  PreventInTransactionBlock(true, "START_REPLICATION");
1637 
1638  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1639  StartReplication(cmd);
1640  else
1642 
1643  Assert(xlogreader != NULL);
1644  break;
1645  }
1646 
1647  case T_TimeLineHistoryCmd:
1648  PreventInTransactionBlock(true, "TIMELINE_HISTORY");
1650  break;
1651 
1652  case T_VariableShowStmt:
1653  {
1655  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1656 
1657  /* syscache access needs a transaction environment */
1659  GetPGVariable(n->name, dest);
1661  }
1662  break;
1663 
1664  case T_SQLCmd:
1665  if (MyDatabaseId == InvalidOid)
1666  ereport(ERROR,
1667  (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1668 
1669  /* Report to pgstat that this process is now idle */
1671 
1672  /* Tell the caller that this wasn't a WalSender command. */
1673  return false;
1674 
1675  default:
1676  elog(ERROR, "unrecognized replication command node tag: %u",
1677  cmd_node->type);
1678  }
1679 
1680  /* done */
1681  MemoryContextSwitchTo(old_context);
1682  MemoryContextDelete(cmd_context);
1683 
1684  /* Send CommandComplete message */
1685  SetQueryCompletion(&qc, CMDTAG_SELECT, 0);
1686  EndCommand(&qc, DestRemote, true);
1687 
1688  /* Report to pgstat that this process is now idle */
1690 
1691  return true;
1692 }
1693 
1694 /*
1695  * Process any incoming messages while streaming. Also checks if the remote
1696  * end has closed the connection.
1697  */
1698 static void
1700 {
1701  unsigned char firstchar;
1702  int r;
1703  bool received = false;
1704 
1706 
1707  for (;;)
1708  {
1709  pq_startmsgread();
1710  r = pq_getbyte_if_available(&firstchar);
1711  if (r < 0)
1712  {
1713  /* unexpected error or EOF */
1715  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1716  errmsg("unexpected EOF on standby connection")));
1717  proc_exit(0);
1718  }
1719  if (r == 0)
1720  {
1721  /* no data available without blocking */
1722  pq_endmsgread();
1723  break;
1724  }
1725 
1726  /* Read the message contents */
1727  resetStringInfo(&reply_message);
1728  if (pq_getmessage(&reply_message, 0))
1729  {
1731  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1732  errmsg("unexpected EOF on standby connection")));
1733  proc_exit(0);
1734  }
1735 
1736  /*
1737  * If we already received a CopyDone from the frontend, the frontend
1738  * should not send us anything until we've closed our end of the COPY.
1739  * XXX: In theory, the frontend could already send the next command
1740  * before receiving the CopyDone, but libpq doesn't currently allow
1741  * that.
1742  */
1743  if (streamingDoneReceiving && firstchar != 'X')
1744  ereport(FATAL,
1745  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1746  errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
1747  firstchar)));
1748 
1749  /* Handle the very limited subset of commands expected in this phase */
1750  switch (firstchar)
1751  {
1752  /*
1753  * 'd' means a standby reply wrapped in a CopyData packet.
1754  */
1755  case 'd':
1757  received = true;
1758  break;
1759 
1760  /*
1761  * CopyDone means the standby requested to finish streaming.
1762  * Reply with CopyDone, if we had not sent that already.
1763  */
1764  case 'c':
1765  if (!streamingDoneSending)
1766  {
1767  pq_putmessage_noblock('c', NULL, 0);
1768  streamingDoneSending = true;
1769  }
1770 
1771  streamingDoneReceiving = true;
1772  received = true;
1773  break;
1774 
1775  /*
1776  * 'X' means that the standby is closing down the socket.
1777  */
1778  case 'X':
1779  proc_exit(0);
1780 
1781  default:
1782  ereport(FATAL,
1783  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1784  errmsg("invalid standby message type \"%c\"",
1785  firstchar)));
1786  }
1787  }
1788 
1789  /*
1790  * Save the last reply timestamp if we've received at least one reply.
1791  */
1792  if (received)
1793  {
1795  waiting_for_ping_response = false;
1796  }
1797 }
1798 
1799 /*
1800  * Process a status update message received from standby.
1801  */
1802 static void
1804 {
1805  char msgtype;
1806 
1807  /*
1808  * Check message type from the first byte.
1809  */
1810  msgtype = pq_getmsgbyte(&reply_message);
1811 
1812  switch (msgtype)
1813  {
1814  case 'r':
1816  break;
1817 
1818  case 'h':
1820  break;
1821 
1822  default:
1824  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1825  errmsg("unexpected message type \"%c\"", msgtype)));
1826  proc_exit(0);
1827  }
1828 }
1829 
1830 /*
1831  * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
1832  */
1833 static void
1835 {
1836  bool changed = false;
1838 
1839  Assert(lsn != InvalidXLogRecPtr);
1840  SpinLockAcquire(&slot->mutex);
1841  if (slot->data.restart_lsn != lsn)
1842  {
1843  changed = true;
1844  slot->data.restart_lsn = lsn;
1845  }
1846  SpinLockRelease(&slot->mutex);
1847 
1848  if (changed)
1849  {
1852  }
1853 
1854  /*
1855  * One could argue that the slot should be saved to disk now, but that'd
1856  * be energy wasted - the worst lost information can do here is give us
1857  * wrong information in a statistics view - we'll just potentially be more
1858  * conservative in removing files.
1859  */
1860 }
1861 
1862 /*
1863  * Regular reply from standby advising of WAL locations on standby server.
1864  */
1865 static void
1867 {
1868  XLogRecPtr writePtr,
1869  flushPtr,
1870  applyPtr;
1871  bool replyRequested;
1872  TimeOffset writeLag,
1873  flushLag,
1874  applyLag;
1875  bool clearLagTimes;
1876  TimestampTz now;
1877  TimestampTz replyTime;
1878 
1879  static bool fullyAppliedLastTime = false;
1880 
1881  /* the caller already consumed the msgtype byte */
1882  writePtr = pq_getmsgint64(&reply_message);
1883  flushPtr = pq_getmsgint64(&reply_message);
1884  applyPtr = pq_getmsgint64(&reply_message);
1885  replyTime = pq_getmsgint64(&reply_message);
1886  replyRequested = pq_getmsgbyte(&reply_message);
1887 
1888  if (log_min_messages <= DEBUG2)
1889  {
1890  char *replyTimeStr;
1891 
1892  /* Copy because timestamptz_to_str returns a static buffer */
1893  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
1894 
1895  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
1896  (uint32) (writePtr >> 32), (uint32) writePtr,
1897  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1898  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1899  replyRequested ? " (reply requested)" : "",
1900  replyTimeStr);
1901 
1902  pfree(replyTimeStr);
1903  }
1904 
1905  /* See if we can compute the round-trip lag for these positions. */
1906  now = GetCurrentTimestamp();
1907  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1908  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1909  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1910 
1911  /*
1912  * If the standby reports that it has fully replayed the WAL in two
1913  * consecutive reply messages, then the second such message must result
1914  * from wal_receiver_status_interval expiring on the standby. This is a
1915  * convenient time to forget the lag times measured when it last
1916  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1917  * until more WAL traffic arrives.
1918  */
1919  clearLagTimes = false;
1920  if (applyPtr == sentPtr)
1921  {
1922  if (fullyAppliedLastTime)
1923  clearLagTimes = true;
1924  fullyAppliedLastTime = true;
1925  }
1926  else
1927  fullyAppliedLastTime = false;
1928 
1929  /* Send a reply if the standby requested one. */
1930  if (replyRequested)
1931  WalSndKeepalive(false);
1932 
1933  /*
1934  * Update shared state for this WalSender process based on reply data from
1935  * standby.
1936  */
1937  {
1938  WalSnd *walsnd = MyWalSnd;
1939 
1940  SpinLockAcquire(&walsnd->mutex);
1941  walsnd->write = writePtr;
1942  walsnd->flush = flushPtr;
1943  walsnd->apply = applyPtr;
1944  if (writeLag != -1 || clearLagTimes)
1945  walsnd->writeLag = writeLag;
1946  if (flushLag != -1 || clearLagTimes)
1947  walsnd->flushLag = flushLag;
1948  if (applyLag != -1 || clearLagTimes)
1949  walsnd->applyLag = applyLag;
1950  walsnd->replyTime = replyTime;
1951  SpinLockRelease(&walsnd->mutex);
1952  }
1953 
1956 
1957  /*
1958  * Advance our local xmin horizon when the client confirmed a flush.
1959  */
1960  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1961  {
1964  else
1966  }
1967 }
1968 
1969 /* compute new replication slot xmin horizon if needed */
1970 static void
1972 {
1973  bool changed = false;
1975 
1976  SpinLockAcquire(&slot->mutex);
1978 
1979  /*
1980  * For physical replication we don't need the interlock provided by xmin
1981  * and effective_xmin since the consequences of a missed increase are
1982  * limited to query cancellations, so set both at once.
1983  */
1984  if (!TransactionIdIsNormal(slot->data.xmin) ||
1985  !TransactionIdIsNormal(feedbackXmin) ||
1986  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1987  {
1988  changed = true;
1989  slot->data.xmin = feedbackXmin;
1990  slot->effective_xmin = feedbackXmin;
1991  }
1992  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
1993  !TransactionIdIsNormal(feedbackCatalogXmin) ||
1994  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
1995  {
1996  changed = true;
1997  slot->data.catalog_xmin = feedbackCatalogXmin;
1998  slot->effective_catalog_xmin = feedbackCatalogXmin;
1999  }
2000  SpinLockRelease(&slot->mutex);
2001 
2002  if (changed)
2003  {
2006  }
2007 }
2008 
2009 /*
2010  * Check that the provided xmin/epoch are sane, that is, not in the future
2011  * and not so far back as to be already wrapped around.
2012  *
2013  * Epoch of nextXid should be same as standby, or if the counter has
2014  * wrapped, then one greater than standby.
2015  *
2016  * This check doesn't care about whether clog exists for these xids
2017  * at all.
2018  */
2019 static bool
2021 {
2022  FullTransactionId nextFullXid;
2023  TransactionId nextXid;
2024  uint32 nextEpoch;
2025 
2026  nextFullXid = ReadNextFullTransactionId();
2027  nextXid = XidFromFullTransactionId(nextFullXid);
2028  nextEpoch = EpochFromFullTransactionId(nextFullXid);
2029 
2030  if (xid <= nextXid)
2031  {
2032  if (epoch != nextEpoch)
2033  return false;
2034  }
2035  else
2036  {
2037  if (epoch + 1 != nextEpoch)
2038  return false;
2039  }
2040 
2041  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2042  return false; /* epoch OK, but it's wrapped around */
2043 
2044  return true;
2045 }
2046 
2047 /*
2048  * Hot Standby feedback
2049  */
2050 static void
2052 {
2053  TransactionId feedbackXmin;
2054  uint32 feedbackEpoch;
2055  TransactionId feedbackCatalogXmin;
2056  uint32 feedbackCatalogEpoch;
2057  TimestampTz replyTime;
2058 
2059  /*
2060  * Decipher the reply message. The caller already consumed the msgtype
2061  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2062  * of this message.
2063  */
2064  replyTime = pq_getmsgint64(&reply_message);
2065  feedbackXmin = pq_getmsgint(&reply_message, 4);
2066  feedbackEpoch = pq_getmsgint(&reply_message, 4);
2067  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2068  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2069 
2070  if (log_min_messages <= DEBUG2)
2071  {
2072  char *replyTimeStr;
2073 
2074  /* Copy because timestamptz_to_str returns a static buffer */
2075  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2076 
2077  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2078  feedbackXmin,
2079  feedbackEpoch,
2080  feedbackCatalogXmin,
2081  feedbackCatalogEpoch,
2082  replyTimeStr);
2083 
2084  pfree(replyTimeStr);
2085  }
2086 
2087  /*
2088  * Update shared state for this WalSender process based on reply data from
2089  * standby.
2090  */
2091  {
2092  WalSnd *walsnd = MyWalSnd;
2093 
2094  SpinLockAcquire(&walsnd->mutex);
2095  walsnd->replyTime = replyTime;
2096  SpinLockRelease(&walsnd->mutex);
2097  }
2098 
2099  /*
2100  * Unset WalSender's xmins if the feedback message values are invalid.
2101  * This happens when the downstream turned hot_standby_feedback off.
2102  */
2103  if (!TransactionIdIsNormal(feedbackXmin)
2104  && !TransactionIdIsNormal(feedbackCatalogXmin))
2105  {
2107  if (MyReplicationSlot != NULL)
2108  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2109  return;
2110  }
2111 
2112  /*
2113  * Check that the provided xmin/epoch are sane, that is, not in the future
2114  * and not so far back as to be already wrapped around. Ignore if not.
2115  */
2116  if (TransactionIdIsNormal(feedbackXmin) &&
2117  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2118  return;
2119 
2120  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2121  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2122  return;
2123 
2124  /*
2125  * Set the WalSender's xmin equal to the standby's requested xmin, so that
2126  * the xmin will be taken into account by GetOldestXmin. This will hold
2127  * back the removal of dead rows and thereby prevent the generation of
2128  * cleanup conflicts on the standby server.
2129  *
2130  * There is a small window for a race condition here: although we just
2131  * checked that feedbackXmin precedes nextXid, the nextXid could have
2132  * gotten advanced between our fetching it and applying the xmin below,
2133  * perhaps far enough to make feedbackXmin wrap around. In that case the
2134  * xmin we set here would be "in the future" and have no effect. No point
2135  * in worrying about this since it's too late to save the desired data
2136  * anyway. Assuming that the standby sends us an increasing sequence of
2137  * xmins, this could only happen during the first reply cycle, else our
2138  * own xmin would prevent nextXid from advancing so far.
2139  *
2140  * We don't bother taking the ProcArrayLock here. Setting the xmin field
2141  * is assumed atomic, and there's no real need to prevent a concurrent
2142  * GetOldestXmin. (If we're moving our xmin forward, this is obviously
2143  * safe, and if we're moving it backwards, well, the data is at risk
2144  * already since a VACUUM could have just finished calling GetOldestXmin.)
2145  *
2146  * If we're using a replication slot we reserve the xmin via that,
2147  * otherwise via the walsender's PGXACT entry. We can only track the
2148  * catalog xmin separately when using a slot, so we store the least of the
2149  * two provided when not using a slot.
2150  *
2151  * XXX: It might make sense to generalize the ephemeral slot concept and
2152  * always use the slot mechanism to handle the feedback xmin.
2153  */
2154  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2155  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2156  else
2157  {
2158  if (TransactionIdIsNormal(feedbackCatalogXmin)
2159  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2160  MyPgXact->xmin = feedbackCatalogXmin;
2161  else
2162  MyPgXact->xmin = feedbackXmin;
2163  }
2164 }
2165 
2166 /*
2167  * Compute how long send/receive loops should sleep.
2168  *
2169  * If wal_sender_timeout is enabled we want to wake up in time to send
2170  * keepalives and to abort the connection if wal_sender_timeout has been
2171  * reached.
2172  */
2173 static long
2175 {
2176  long sleeptime = 10000; /* 10 s */
2177 
2179  {
2180  TimestampTz wakeup_time;
2181  long sec_to_timeout;
2182  int microsec_to_timeout;
2183 
2184  /*
2185  * At the latest stop sleeping once wal_sender_timeout has been
2186  * reached.
2187  */
2190 
2191  /*
2192  * If no ping has been sent yet, wakeup when it's time to do so.
2193  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2194  * the timeout passed without a response.
2195  */
2198  wal_sender_timeout / 2);
2199 
2200  /* Compute relative time until wakeup. */
2201  TimestampDifference(now, wakeup_time,
2202  &sec_to_timeout, &microsec_to_timeout);
2203 
2204  sleeptime = sec_to_timeout * 1000 +
2205  microsec_to_timeout / 1000;
2206  }
2207 
2208  return sleeptime;
2209 }
2210 
2211 /*
2212  * Check whether there have been responses by the client within
2213  * wal_sender_timeout and shutdown if not. Using last_processing as the
2214  * reference point avoids counting server-side stalls against the client.
2215  * However, a long server-side stall can make WalSndKeepaliveIfNecessary()
2216  * postdate last_processing by more than wal_sender_timeout. If that happens,
2217  * the client must reply almost immediately to avoid a timeout. This rarely
2218  * affects the default configuration, under which clients spontaneously send a
2219  * message every standby_message_timeout = wal_sender_timeout/6 = 10s. We
2220  * could eliminate that problem by recognizing timeout expiration at
2221  * wal_sender_timeout/2 after the keepalive.
2222  */
2223 static void
2225 {
2226  TimestampTz timeout;
2227 
2228  /* don't bail out if we're doing something that doesn't require timeouts */
2229  if (last_reply_timestamp <= 0)
2230  return;
2231 
2234 
2235  if (wal_sender_timeout > 0 && last_processing >= timeout)
2236  {
2237  /*
2238  * Since typically expiration of replication timeout means
2239  * communication problem, we don't send the error message to the
2240  * standby.
2241  */
2243  (errmsg("terminating walsender process due to replication timeout")));
2244 
2245  WalSndShutdown();
2246  }
2247 }
2248 
2249 /* Main loop of walsender process that streams the WAL over Copy messages. */
2250 static void
2252 {
2253  /*
2254  * Initialize the last reply timestamp. That enables timeout processing
2255  * from hereon.
2256  */
2258  waiting_for_ping_response = false;
2259 
2260  /*
2261  * Loop until we reach the end of this timeline or the client requests to
2262  * stop streaming.
2263  */
2264  for (;;)
2265  {
2266  /* Clear any already-pending wakeups */
2268 
2270 
2271  /* Process any requests or signals received recently */
2272  if (ConfigReloadPending)
2273  {
2274  ConfigReloadPending = false;
2277  }
2278 
2279  /* Check for input from the client */
2281 
2282  /*
2283  * If we have received CopyDone from the client, sent CopyDone
2284  * ourselves, and the output buffer is empty, it's time to exit
2285  * streaming.
2286  */
2288  !pq_is_send_pending())
2289  break;
2290 
2291  /*
2292  * If we don't have any pending data in the output buffer, try to send
2293  * some more. If there is some, we don't bother to call send_data
2294  * again until we've flushed it ... but we'd better assume we are not
2295  * caught up.
2296  */
2297  if (!pq_is_send_pending())
2298  send_data();
2299  else
2300  WalSndCaughtUp = false;
2301 
2302  /* Try to flush pending output to the client */
2303  if (pq_flush_if_writable() != 0)
2304  WalSndShutdown();
2305 
2306  /* If nothing remains to be sent right now ... */
2308  {
2309  /*
2310  * If we're in catchup state, move to streaming. This is an
2311  * important state change for users to know about, since before
2312  * this point data loss might occur if the primary dies and we
2313  * need to failover to the standby. The state change is also
2314  * important for synchronous replication, since commits that
2315  * started to wait at that point might wait for some time.
2316  */
2317  if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
2318  {
2319  ereport(DEBUG1,
2320  (errmsg("\"%s\" has now caught up with upstream server",
2321  application_name)));
2323  }
2324 
2325  /*
2326  * When SIGUSR2 arrives, we send any outstanding logs up to the
2327  * shutdown checkpoint record (i.e., the latest record), wait for
2328  * them to be replicated to the standby, and exit. This may be a
2329  * normal termination at shutdown, or a promotion, the walsender
2330  * is not sure which.
2331  */
2332  if (got_SIGUSR2)
2333  WalSndDone(send_data);
2334  }
2335 
2336  /* Check for replication timeout. */
2338 
2339  /* Send keepalive if the time has come */
2341 
2342  /*
2343  * Block if we have unsent data. XXX For logical replication, let
2344  * WalSndWaitForWal() handle any other blocking; idle receivers need
2345  * its additional actions. For physical replication, also block if
2346  * caught up; its send_data does not block.
2347  */
2348  if ((WalSndCaughtUp && send_data != XLogSendLogical &&
2351  {
2352  long sleeptime;
2353  int wakeEvents;
2354 
2355  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT |
2357 
2358  /*
2359  * Use fresh timestamp, not last_processing, to reduce the chance
2360  * of reaching wal_sender_timeout before sending a keepalive.
2361  */
2363 
2364  if (pq_is_send_pending())
2365  wakeEvents |= WL_SOCKET_WRITEABLE;
2366 
2367  /* Sleep until something happens or we time out */
2368  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
2369  MyProcPort->sock, sleeptime,
2371  }
2372  }
2373 }
2374 
2375 /* Initialize a per-walsender data structure for this walsender process */
2376 static void
2378 {
2379  int i;
2380 
2381  /*
2382  * WalSndCtl should be set up already (we inherit this by fork() or
2383  * EXEC_BACKEND mechanism from the postmaster).
2384  */
2385  Assert(WalSndCtl != NULL);
2386  Assert(MyWalSnd == NULL);
2387 
2388  /*
2389  * Find a free walsender slot and reserve it. This must not fail due to
2390  * the prior check for free WAL senders in InitProcess().
2391  */
2392  for (i = 0; i < max_wal_senders; i++)
2393  {
2394  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2395 
2396  SpinLockAcquire(&walsnd->mutex);
2397 
2398  if (walsnd->pid != 0)
2399  {
2400  SpinLockRelease(&walsnd->mutex);
2401  continue;
2402  }
2403  else
2404  {
2405  /*
2406  * Found a free slot. Reserve it for us.
2407  */
2408  walsnd->pid = MyProcPid;
2409  walsnd->state = WALSNDSTATE_STARTUP;
2410  walsnd->sentPtr = InvalidXLogRecPtr;
2411  walsnd->needreload = false;
2412  walsnd->write = InvalidXLogRecPtr;
2413  walsnd->flush = InvalidXLogRecPtr;
2414  walsnd->apply = InvalidXLogRecPtr;
2415  walsnd->writeLag = -1;
2416  walsnd->flushLag = -1;
2417  walsnd->applyLag = -1;
2418  walsnd->sync_standby_priority = 0;
2419  walsnd->latch = &MyProc->procLatch;
2420  walsnd->replyTime = 0;
2421  walsnd->spillTxns = 0;
2422  walsnd->spillCount = 0;
2423  walsnd->spillBytes = 0;
2424  SpinLockRelease(&walsnd->mutex);
2425  /* don't need the lock anymore */
2426  MyWalSnd = (WalSnd *) walsnd;
2427 
2428  break;
2429  }
2430  }
2431 
2432  Assert(MyWalSnd != NULL);
2433 
2434  /* Arrange to clean up at walsender exit */
2436 }
2437 
2438 /* Destroy the per-walsender data structure for this walsender process */
2439 static void
2441 {
2442  WalSnd *walsnd = MyWalSnd;
2443 
2444  Assert(walsnd != NULL);
2445 
2446  MyWalSnd = NULL;
2447 
2448  SpinLockAcquire(&walsnd->mutex);
2449  /* clear latch while holding the spinlock, so it can safely be read */
2450  walsnd->latch = NULL;
2451  /* Mark WalSnd struct as no longer being in use. */
2452  walsnd->pid = 0;
2453  SpinLockRelease(&walsnd->mutex);
2454 }
2455 
2456 /* XLogReaderRoutine->segment_open callback */
2457 static void
2459  TimeLineID *tli_p)
2460 {
2461  char path[MAXPGPATH];
2462 
2463  /*-------
2464  * When reading from a historic timeline, and there is a timeline switch
2465  * within this segment, read from the WAL segment belonging to the new
2466  * timeline.
2467  *
2468  * For example, imagine that this server is currently on timeline 5, and
2469  * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
2470  * 0/13002088. In pg_wal, we have these files:
2471  *
2472  * ...
2473  * 000000040000000000000012
2474  * 000000040000000000000013
2475  * 000000050000000000000013
2476  * 000000050000000000000014
2477  * ...
2478  *
2479  * In this situation, when requested to send the WAL from segment 0x13, on
2480  * timeline 4, we read the WAL from file 000000050000000000000013. Archive
2481  * recovery prefers files from newer timelines, so if the segment was
2482  * restored from the archive on this server, the file belonging to the old
2483  * timeline, 000000040000000000000013, might not exist. Their contents are
2484  * equal up to the switchpoint, because at a timeline switch, the used
2485  * portion of the old segment is copied to the new file. -------
2486  */
2487  *tli_p = sendTimeLine;
2489  {
2490  XLogSegNo endSegNo;
2491 
2493  if (state->seg.ws_segno == endSegNo)
2494  *tli_p = sendTimeLineNextTLI;
2495  }
2496 
2497  XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
2498  state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2499  if (state->seg.ws_file >= 0)
2500  return;
2501 
2502  /*
2503  * If the file is not found, assume it's because the standby asked for a
2504  * too old WAL segment that has already been removed or recycled.
2505  */
2506  if (errno == ENOENT)
2507  {
2508  char xlogfname[MAXFNAMELEN];
2509  int save_errno = errno;
2510 
2511  XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
2512  errno = save_errno;
2513  ereport(ERROR,
2515  errmsg("requested WAL segment %s has already been removed",
2516  xlogfname)));
2517  }
2518  else
2519  ereport(ERROR,
2521  errmsg("could not open file \"%s\": %m",
2522  path)));
2523 }
2524 
2525 /*
2526  * Send out the WAL in its normal physical/stored form.
2527  *
2528  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
2529  * but not yet sent to the client, and buffer it in the libpq output
2530  * buffer.
2531  *
2532  * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
2533  * otherwise WalSndCaughtUp is set to false.
2534  */
2535 static void
2537 {
2538  XLogRecPtr SendRqstPtr;
2540  XLogRecPtr endptr;
2541  Size nbytes;
2542  XLogSegNo segno;
2543  WALReadError errinfo;
2544 
2545  /* If requested switch the WAL sender to the stopping state. */
2546  if (got_STOPPING)
2548 
2550  {
2551  WalSndCaughtUp = true;
2552  return;
2553  }
2554 
2555  /* Figure out how far we can safely send the WAL. */
2557  {
2558  /*
2559  * Streaming an old timeline that's in this server's history, but is
2560  * not the one we're currently inserting or replaying. It can be
2561  * streamed up to the point where we switched off that timeline.
2562  */
2563  SendRqstPtr = sendTimeLineValidUpto;
2564  }
2565  else if (am_cascading_walsender)
2566  {
2567  /*
2568  * Streaming the latest timeline on a standby.
2569  *
2570  * Attempt to send all WAL that has already been replayed, so that we
2571  * know it's valid. If we're receiving WAL through streaming
2572  * replication, it's also OK to send any WAL that has been received
2573  * but not replayed.
2574  *
2575  * The timeline we're recovering from can change, or we can be
2576  * promoted. In either case, the current timeline becomes historic. We
2577  * need to detect that so that we don't try to stream past the point
2578  * where we switched to another timeline. We check for promotion or
2579  * timeline switch after calculating FlushPtr, to avoid a race
2580  * condition: if the timeline becomes historic just after we checked
2581  * that it was still current, it's still be OK to stream it up to the
2582  * FlushPtr that was calculated before it became historic.
2583  */
2584  bool becameHistoric = false;
2585 
2586  SendRqstPtr = GetStandbyFlushRecPtr();
2587 
2588  if (!RecoveryInProgress())
2589  {
2590  /*
2591  * We have been promoted. RecoveryInProgress() updated
2592  * ThisTimeLineID to the new current timeline.
2593  */
2594  am_cascading_walsender = false;
2595  becameHistoric = true;
2596  }
2597  else
2598  {
2599  /*
2600  * Still a cascading standby. But is the timeline we're sending
2601  * still the one recovery is recovering from? ThisTimeLineID was
2602  * updated by the GetStandbyFlushRecPtr() call above.
2603  */
2605  becameHistoric = true;
2606  }
2607 
2608  if (becameHistoric)
2609  {
2610  /*
2611  * The timeline we were sending has become historic. Read the
2612  * timeline history file of the new timeline to see where exactly
2613  * we forked off from the timeline we were sending.
2614  */
2615  List *history;
2616 
2619 
2621  list_free_deep(history);
2622 
2623  sendTimeLineIsHistoric = true;
2624 
2625  SendRqstPtr = sendTimeLineValidUpto;
2626  }
2627  }
2628  else
2629  {
2630  /*
2631  * Streaming the current timeline on a master.
2632  *
2633  * Attempt to send all data that's already been written out and
2634  * fsync'd to disk. We cannot go further than what's been written out
2635  * given the current implementation of WALRead(). And in any case
2636  * it's unsafe to send WAL that is not securely down to disk on the
2637  * master: if the master subsequently crashes and restarts, standbys
2638  * must not have applied any WAL that got lost on the master.
2639  */
2640  SendRqstPtr = GetFlushRecPtr();
2641  }
2642 
2643  /*
2644  * Record the current system time as an approximation of the time at which
2645  * this WAL location was written for the purposes of lag tracking.
2646  *
2647  * In theory we could make XLogFlush() record a time in shmem whenever WAL
2648  * is flushed and we could get that time as well as the LSN when we call
2649  * GetFlushRecPtr() above (and likewise for the cascading standby
2650  * equivalent), but rather than putting any new code into the hot WAL path
2651  * it seems good enough to capture the time here. We should reach this
2652  * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2653  * may take some time, we read the WAL flush pointer and take the time
2654  * very close to together here so that we'll get a later position if it is
2655  * still moving.
2656  *
2657  * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
2658  * this gives us a cheap approximation for the WAL flush time for this
2659  * LSN.
2660  *
2661  * Note that the LSN is not necessarily the LSN for the data contained in
2662  * the present message; it's the end of the WAL, which might be further
2663  * ahead. All the lag tracking machinery cares about is finding out when
2664  * that arbitrary LSN is eventually reported as written, flushed and
2665  * applied, so that it can measure the elapsed time.
2666  */
2667  LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2668 
2669  /*
2670  * If this is a historic timeline and we've reached the point where we
2671  * forked to the next timeline, stop streaming.
2672  *
2673  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2674  * startup process will normally replay all WAL that has been received
2675  * from the master, before promoting, but if the WAL streaming is
2676  * terminated at a WAL page boundary, the valid portion of the timeline
2677  * might end in the middle of a WAL record. We might've already sent the
2678  * first half of that partial WAL record to the cascading standby, so that
2679  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2680  * replay the partial WAL record either, so it can still follow our
2681  * timeline switch.
2682  */
2684  {
2685  /* close the current file. */
2686  if (xlogreader->seg.ws_file >= 0)
2687  wal_segment_close(xlogreader);
2688 
2689  /* Send CopyDone */
2690  pq_putmessage_noblock('c', NULL, 0);
2691  streamingDoneSending = true;
2692 
2693  WalSndCaughtUp = true;
2694 
2695  elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2697  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2698  return;
2699  }
2700 
2701  /* Do we have any work to do? */
2702  Assert(sentPtr <= SendRqstPtr);
2703  if (SendRqstPtr <= sentPtr)
2704  {
2705  WalSndCaughtUp = true;
2706  return;
2707  }
2708 
2709  /*
2710  * Figure out how much to send in one message. If there's no more than
2711  * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2712  * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2713  *
2714  * The rounding is not only for performance reasons. Walreceiver relies on
2715  * the fact that we never split a WAL record across two messages. Since a
2716  * long WAL record is split at page boundary into continuation records,
2717  * page boundary is always a safe cut-off point. We also assume that
2718  * SendRqstPtr never points to the middle of a WAL record.
2719  */
2720  startptr = sentPtr;
2721  endptr = startptr;
2722  endptr += MAX_SEND_SIZE;
2723 
2724  /* if we went beyond SendRqstPtr, back off */
2725  if (SendRqstPtr <= endptr)
2726  {
2727  endptr = SendRqstPtr;
2729  WalSndCaughtUp = false;
2730  else
2731  WalSndCaughtUp = true;
2732  }
2733  else
2734  {
2735  /* round down to page boundary. */
2736  endptr -= (endptr % XLOG_BLCKSZ);
2737  WalSndCaughtUp = false;
2738  }
2739 
2740  nbytes = endptr - startptr;
2741  Assert(nbytes <= MAX_SEND_SIZE);
2742 
2743  /*
2744  * OK to read and send the slice.
2745  */
2746  resetStringInfo(&output_message);
2747  pq_sendbyte(&output_message, 'w');
2748 
2749  pq_sendint64(&output_message, startptr); /* dataStart */
2750  pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2751  pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2752 
2753  /*
2754  * Read the log directly into the output buffer to avoid extra memcpy
2755  * calls.
2756  */
2757  enlargeStringInfo(&output_message, nbytes);
2758 
2759 retry:
2760  if (!WALRead(xlogreader,
2761  &output_message.data[output_message.len],
2762  startptr,
2763  nbytes,
2764  xlogreader->seg.ws_tli, /* Pass the current TLI because
2765  * only WalSndSegmentOpen controls
2766  * whether new TLI is needed. */
2767  &errinfo))
2768  WALReadRaiseError(&errinfo);
2769 
2770  /* See logical_read_xlog_page(). */
2771  XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
2772  CheckXLogRemoved(segno, xlogreader->seg.ws_tli);
2773 
2774  /*
2775  * During recovery, the currently-open WAL file might be replaced with the
2776  * file of the same name retrieved from archive. So we always need to
2777  * check what we read was valid after reading into the buffer. If it's
2778  * invalid, we try to open and read the file again.
2779  */
2781  {
2782  WalSnd *walsnd = MyWalSnd;
2783  bool reload;
2784 
2785  SpinLockAcquire(&walsnd->mutex);
2786  reload = walsnd->needreload;
2787  walsnd->needreload = false;
2788  SpinLockRelease(&walsnd->mutex);
2789 
2790  if (reload && xlogreader->seg.ws_file >= 0)
2791  {
2792  wal_segment_close(xlogreader);
2793 
2794  goto retry;
2795  }
2796  }
2797 
2798  output_message.len += nbytes;
2799  output_message.data[output_message.len] = '\0';
2800 
2801  /*
2802  * Fill the send timestamp last, so that it is taken as late as possible.
2803  */
2804  resetStringInfo(&tmpbuf);
2805  pq_sendint64(&tmpbuf, GetCurrentTimestamp());
2806  memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2807  tmpbuf.data, sizeof(int64));
2808 
2809  pq_putmessage_noblock('d', output_message.data, output_message.len);
2810 
2811  sentPtr = endptr;
2812 
2813  /* Update shared memory status */
2814  {
2815  WalSnd *walsnd = MyWalSnd;
2816 
2817  SpinLockAcquire(&walsnd->mutex);
2818  walsnd->sentPtr = sentPtr;
2819  SpinLockRelease(&walsnd->mutex);
2820  }
2821 
2822  /* Report progress of XLOG streaming in PS display */
2824  {
2825  char activitymsg[50];
2826 
2827  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
2828  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2829  set_ps_display(activitymsg);
2830  }
2831 }
2832 
2833 /*
2834  * Stream out logically decoded data.
2835  */
2836 static void
2838 {
2839  XLogRecord *record;
2840  char *errm;
2841 
2842  /*
2843  * We'll use the current flush point to determine whether we've caught up.
2844  * This variable is static in order to cache it across calls. Caching is
2845  * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
2846  * spinlock.
2847  */
2848  static XLogRecPtr flushPtr = InvalidXLogRecPtr;
2849 
2850  /*
2851  * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
2852  * true in WalSndWaitForWal, if we're actually waiting. We also set to
2853  * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
2854  * didn't wait - i.e. when we're shutting down.
2855  */
2856  WalSndCaughtUp = false;
2857 
2858  record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
2859 
2860  /* xlog record was invalid */
2861  if (errm != NULL)
2862  elog(ERROR, "%s", errm);
2863 
2864  if (record != NULL)
2865  {
2866  /*
2867  * Note the lack of any call to LagTrackerWrite() which is handled by
2868  * WalSndUpdateProgress which is called by output plugin through
2869  * logical decoding write api.
2870  */
2871  LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
2872 
2873  sentPtr = logical_decoding_ctx->reader->EndRecPtr;
2874  }
2875 
2876  /*
2877  * If first time through in this session, initialize flushPtr. Otherwise,
2878  * we only need to update flushPtr if EndRecPtr is past it.
2879  */
2880  if (flushPtr == InvalidXLogRecPtr)
2881  flushPtr = GetFlushRecPtr();
2882  else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2883  flushPtr = GetFlushRecPtr();
2884 
2885  /* If EndRecPtr is still past our flushPtr, it means we caught up. */
2886  if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2887  WalSndCaughtUp = true;
2888 
2889  /*
2890  * If we're caught up and have been requested to stop, have WalSndLoop()
2891  * terminate the connection in an orderly manner, after writing out all
2892  * the pending data.
2893  */
2895  got_SIGUSR2 = true;
2896 
2897  /* Update shared memory status */
2898  {
2899  WalSnd *walsnd = MyWalSnd;
2900 
2901  SpinLockAcquire(&walsnd->mutex);
2902  walsnd->sentPtr = sentPtr;
2903  SpinLockRelease(&walsnd->mutex);
2904  }
2905 }
2906 
2907 /*
2908  * Shutdown if the sender is caught up.
2909  *
2910  * NB: This should only be called when the shutdown signal has been received
2911  * from postmaster.
2912  *
2913  * Note that if we determine that there's still more data to send, this
2914  * function will return control to the caller.
2915  */
2916 static void
2918 {
2919  XLogRecPtr replicatedPtr;
2920 
2921  /* ... let's just be real sure we're caught up ... */
2922  send_data();
2923 
2924  /*
2925  * To figure out whether all WAL has successfully been replicated, check
2926  * flush location if valid, write otherwise. Tools like pg_receivewal will
2927  * usually (unless in synchronous mode) return an invalid flush location.
2928  */
2929  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2930  MyWalSnd->write : MyWalSnd->flush;
2931 
2932  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2933  !pq_is_send_pending())
2934  {
2935  QueryCompletion qc;
2936 
2937  /* Inform the standby that XLOG streaming is done */
2938  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
2939  EndCommand(&qc, DestRemote, false);
2940  pq_flush();
2941 
2942  proc_exit(0);
2943  }
2945  {
2946  WalSndKeepalive(true);
2948  }
2949 }
2950 
2951 /*
2952  * Returns the latest point in WAL that has been safely flushed to disk, and
2953  * can be sent to the standby. This should only be called when in recovery,
2954  * ie. we're streaming to a cascaded standby.
2955  *
2956  * As a side-effect, ThisTimeLineID is updated to the TLI of the last
2957  * replayed WAL record.
2958  */
2959 static XLogRecPtr
2961 {
2962  XLogRecPtr replayPtr;
2963  TimeLineID replayTLI;
2964  XLogRecPtr receivePtr;
2966  XLogRecPtr result;
2967 
2968  /*
2969  * We can safely send what's already been replayed. Also, if walreceiver
2970  * is streaming WAL from the same timeline, we can send anything that it
2971  * has streamed, but hasn't been replayed yet.
2972  */
2973 
2974  receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
2975  replayPtr = GetXLogReplayRecPtr(&replayTLI);
2976 
2977  ThisTimeLineID = replayTLI;
2978 
2979  result = replayPtr;
2980  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2981  result = receivePtr;
2982 
2983  return result;
2984 }
2985 
2986 /*
2987  * Request walsenders to reload the currently-open WAL file
2988  */
2989 void
2991 {
2992  int i;
2993 
2994  for (i = 0; i < max_wal_senders; i++)
2995  {
2996  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2997 
2998  SpinLockAcquire(&walsnd->mutex);
2999  if (walsnd->pid == 0)
3000  {
3001  SpinLockRelease(&walsnd->mutex);
3002  continue;
3003  }
3004  walsnd->needreload = true;
3005  SpinLockRelease(&walsnd->mutex);
3006  }
3007 }
3008 
3009 /*
3010  * Handle PROCSIG_WALSND_INIT_STOPPING signal.
3011  */
3012 void
3014 {
3016 
3017  /*
3018  * If replication has not yet started, die like with SIGTERM. If
3019  * replication is active, only set a flag and wake up the main loop. It
3020  * will send any outstanding WAL, wait for it to be replicated to the
3021  * standby, and then exit gracefully.
3022  */
3023  if (!replication_active)
3024  kill(MyProcPid, SIGTERM);
3025  else
3026  got_STOPPING = true;
3027 }
3028 
3029 /*
3030  * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
3031  * sender should already have been switched to WALSNDSTATE_STOPPING at
3032  * this point.
3033  */
3034 static void
3036 {
3037  int save_errno = errno;
3038 
3039  got_SIGUSR2 = true;
3040  SetLatch(MyLatch);
3041 
3042  errno = save_errno;
3043 }
3044 
3045 /* Set up signal handlers */
3046 void
3048 {
3049  /* Set up signal handlers */
3051  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3052  pqsignal(SIGTERM, die); /* request shutdown */
3053  pqsignal(SIGQUIT, quickdie); /* hard crash time */
3054  InitializeTimeouts(); /* establishes SIGALRM handler */
3057  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3058  * shutdown */
3059 
3060  /* Reset some signals that are accepted by postmaster but not here */
3062 }
3063 
3064 /* Report shared-memory space needed by WalSndShmemInit */
3065 Size
3067 {
3068  Size size = 0;
3069 
3070  size = offsetof(WalSndCtlData, walsnds);
3071  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3072 
3073  return size;
3074 }
3075 
3076 /* Allocate and initialize walsender-related shared memory */
3077 void
3079 {
3080  bool found;
3081  int i;
3082 
3083  WalSndCtl = (WalSndCtlData *)
3084  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3085 
3086  if (!found)
3087  {
3088  /* First time through, so initialize */
3089  MemSet(WalSndCtl, 0, WalSndShmemSize());
3090 
3091  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3092  SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
3093 
3094  for (i = 0; i < max_wal_senders; i++)
3095  {
3096  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3097 
3098  SpinLockInit(&walsnd->mutex);
3099  }
3100  }
3101 }
3102 
3103 /*
3104  * Wake up all walsenders
3105  *
3106  * This will be called inside critical sections, so throwing an error is not
3107  * advisable.
3108  */
3109 void
3111 {
3112  int i;
3113 
3114  for (i = 0; i < max_wal_senders; i++)
3115  {
3116  Latch *latch;
3117  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3118 
3119  /*
3120  * Get latch pointer with spinlock held, for the unlikely case that
3121  * pointer reads aren't atomic (as they're 8 bytes).
3122  */
3123  SpinLockAcquire(&walsnd->mutex);
3124  latch = walsnd->latch;
3125  SpinLockRelease(&walsnd->mutex);
3126 
3127  if (latch != NULL)
3128  SetLatch(latch);
3129  }
3130 }
3131 
3132 /*
3133  * Signal all walsenders to move to stopping state.
3134  *
3135  * This will trigger walsenders to move to a state where no further WAL can be
3136  * generated. See this file's header for details.
3137  */
3138 void
3140 {
3141  int i;
3142 
3143  for (i = 0; i < max_wal_senders; i++)
3144  {
3145  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3146  pid_t pid;
3147 
3148  SpinLockAcquire(&walsnd->mutex);
3149  pid = walsnd->pid;
3150  SpinLockRelease(&walsnd->mutex);
3151 
3152  if (pid == 0)
3153  continue;
3154 
3156  }
3157 }
3158 
3159 /*
3160  * Wait that all the WAL senders have quit or reached the stopping state. This
3161  * is used by the checkpointer to control when the shutdown checkpoint can
3162  * safely be performed.
3163  */
3164 void
3166 {
3167  for (;;)
3168  {
3169  int i;
3170  bool all_stopped = true;
3171 
3172  for (i = 0; i < max_wal_senders; i++)
3173  {
3174  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3175 
3176  SpinLockAcquire(&walsnd->mutex);
3177 
3178  if (walsnd->pid == 0)
3179  {
3180  SpinLockRelease(&walsnd->mutex);
3181  continue;
3182  }
3183 
3184  if (walsnd->state != WALSNDSTATE_STOPPING)
3185  {
3186  all_stopped = false;
3187  SpinLockRelease(&walsnd->mutex);
3188  break;
3189  }
3190  SpinLockRelease(&walsnd->mutex);
3191  }
3192 
3193  /* safe to leave if confirmation is done for all WAL senders */
3194  if (all_stopped)
3195  return;
3196 
3197  pg_usleep(10000L); /* wait for 10 msec */
3198  }
3199 }
3200 
3201 /* Set state for current walsender (only called in walsender) */
3202 void
3204 {
3205  WalSnd *walsnd = MyWalSnd;
3206 
3208 
3209  if (walsnd->state == state)
3210  return;
3211 
3212  SpinLockAcquire(&walsnd->mutex);
3213  walsnd->state = state;
3214  SpinLockRelease(&walsnd->mutex);
3215 }
3216 
3217 /*
3218  * Return a string constant representing the state. This is used
3219  * in system views, and should *not* be translated.
3220  */
3221 static const char *
3223 {
3224  switch (state)
3225  {
3226  case WALSNDSTATE_STARTUP:
3227  return "startup";
3228  case WALSNDSTATE_BACKUP:
3229  return "backup";
3230  case WALSNDSTATE_CATCHUP:
3231  return "catchup";
3232  case WALSNDSTATE_STREAMING:
3233  return "streaming";
3234  case WALSNDSTATE_STOPPING:
3235  return "stopping";
3236  }
3237  return "UNKNOWN";
3238 }
3239 
3240 static Interval *
3242 {
3243  Interval *result = palloc(sizeof(Interval));
3244 
3245  result->month = 0;
3246  result->day = 0;
3247  result->time = offset;
3248 
3249  return result;
3250 }
3251 
3252 /*
3253  * Returns activity of walsenders, including pids and xlog locations sent to
3254  * standby servers.
3255  */
3256 Datum
3258 {
3259 #define PG_STAT_GET_WAL_SENDERS_COLS 15
3260  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3261  TupleDesc tupdesc;
3262  Tuplestorestate *tupstore;
3263  MemoryContext per_query_ctx;
3264  MemoryContext oldcontext;
3265  SyncRepStandbyData *sync_standbys;
3266  int num_standbys;
3267  int i;
3268 
3269  /* check to see if caller supports us returning a tuplestore */
3270  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3271  ereport(ERROR,
3272  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3273  errmsg("set-valued function called in context that cannot accept a set")));
3274  if (!(rsinfo->allowedModes & SFRM_Materialize))
3275  ereport(ERROR,
3276  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3277  errmsg("materialize mode required, but it is not allowed in this context")));
3278 
3279  /* Build a tuple descriptor for our result type */
3280  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3281  elog(ERROR, "return type must be a row type");
3282 
3283  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3284  oldcontext = MemoryContextSwitchTo(per_query_ctx);
3285 
3286  tupstore = tuplestore_begin_heap(true, false, work_mem);
3287  rsinfo->returnMode = SFRM_Materialize;
3288  rsinfo->setResult = tupstore;
3289  rsinfo->setDesc = tupdesc;
3290 
3291  MemoryContextSwitchTo(oldcontext);
3292 
3293  /*
3294  * Get the currently active synchronous standbys. This could be out of
3295  * date before we're done, but we'll use the data anyway.
3296  */
3297  num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3298 
3299  for (i = 0; i < max_wal_senders; i++)
3300  {
3301  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3303  XLogRecPtr write;
3304  XLogRecPtr flush;
3305  XLogRecPtr apply;
3306  TimeOffset writeLag;
3307  TimeOffset flushLag;
3308  TimeOffset applyLag;
3309  int priority;
3310  int pid;
3312  TimestampTz replyTime;
3313  int64 spillTxns;
3314  int64 spillCount;
3315  int64 spillBytes;
3316  bool is_sync_standby;
3318  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3319  int j;
3320 
3321  /* Collect data from shared memory */
3322  SpinLockAcquire(&walsnd->mutex);
3323  if (walsnd->pid == 0)
3324  {
3325  SpinLockRelease(&walsnd->mutex);
3326  continue;
3327  }
3328  pid = walsnd->pid;
3329  sentPtr = walsnd->sentPtr;
3330  state = walsnd->state;
3331  write = walsnd->write;
3332  flush = walsnd->flush;
3333  apply = walsnd->apply;
3334  writeLag = walsnd->writeLag;
3335  flushLag = walsnd->flushLag;
3336  applyLag = walsnd->applyLag;
3337  priority = walsnd->sync_standby_priority;
3338  replyTime = walsnd->replyTime;
3339  spillTxns = walsnd->spillTxns;
3340  spillCount = walsnd->spillCount;
3341  spillBytes = walsnd->spillBytes;
3342  SpinLockRelease(&walsnd->mutex);
3343 
3344  /*
3345  * Detect whether walsender is/was considered synchronous. We can
3346  * provide some protection against stale data by checking the PID
3347  * along with walsnd_index.
3348  */
3349  is_sync_standby = false;
3350  for (j = 0; j < num_standbys; j++)
3351  {
3352  if (sync_standbys[j].walsnd_index == i &&
3353  sync_standbys[j].pid == pid)
3354  {
3355  is_sync_standby = true;
3356  break;
3357  }
3358  }
3359 
3360  memset(nulls, 0, sizeof(nulls));
3361  values[0] = Int32GetDatum(pid);
3362 
3363  if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
3364  {
3365  /*
3366  * Only superusers and members of pg_read_all_stats can see
3367  * details. Other users only get the pid value to know it's a
3368  * walsender, but no details.
3369  */
3370  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3371  }
3372  else
3373  {
3374  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3375 
3376  if (XLogRecPtrIsInvalid(sentPtr))
3377  nulls[2] = true;
3378  values[2] = LSNGetDatum(sentPtr);
3379 
3380  if (XLogRecPtrIsInvalid(write))
3381  nulls[3] = true;
3382  values[3] = LSNGetDatum(write);
3383 
3384  if (XLogRecPtrIsInvalid(flush))
3385  nulls[4] = true;
3386  values[4] = LSNGetDatum(flush);
3387 
3388  if (XLogRecPtrIsInvalid(apply))
3389  nulls[5] = true;
3390  values[5] = LSNGetDatum(apply);
3391 
3392  /*
3393  * Treat a standby such as a pg_basebackup background process
3394  * which always returns an invalid flush location, as an
3395  * asynchronous standby.
3396  */
3397  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3398 
3399  if (writeLag < 0)
3400  nulls[6] = true;
3401  else
3402  values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3403 
3404  if (flushLag < 0)
3405  nulls[7] = true;
3406  else
3407  values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3408 
3409  if (applyLag < 0)
3410  nulls[8] = true;
3411  else
3412  values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3413 
3414  values[9] = Int32GetDatum(priority);
3415 
3416  /*
3417  * More easily understood version of standby state. This is purely
3418  * informational.
3419  *
3420  * In quorum-based sync replication, the role of each standby
3421  * listed in synchronous_standby_names can be changing very
3422  * frequently. Any standbys considered as "sync" at one moment can
3423  * be switched to "potential" ones at the next moment. So, it's
3424  * basically useless to report "sync" or "potential" as their sync
3425  * states. We report just "quorum" for them.
3426  */
3427  if (priority == 0)
3428  values[10] = CStringGetTextDatum("async");
3429  else if (is_sync_standby)
3431  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3432  else
3433  values[10] = CStringGetTextDatum("potential");
3434 
3435  if (replyTime == 0)
3436  nulls[11] = true;
3437  else
3438  values[11] = TimestampTzGetDatum(replyTime);
3439 
3440  /* spill to disk */
3441  values[12] = Int64GetDatum(spillTxns);
3442  values[13] = Int64GetDatum(spillCount);
3443  values[14] = Int64GetDatum(spillBytes);
3444  }
3445 
3446  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3447  }
3448 
3449  /* clean up and return the tuplestore */
3450  tuplestore_donestoring(tupstore);
3451 
3452  return (Datum) 0;
3453 }
3454 
3455 /*
3456  * This function is used to send a keepalive message to standby.
3457  * If requestReply is set, sets a flag in the message requesting the standby
3458  * to send a message back to us, for heartbeat purposes.
3459  */
3460 static void
3461 WalSndKeepalive(bool requestReply)
3462 {
3463  elog(DEBUG2, "sending replication keepalive");
3464 
3465  /* construct the message... */
3466  resetStringInfo(&output_message);
3467  pq_sendbyte(&output_message, 'k');
3468  pq_sendint64(&output_message, sentPtr);
3469  pq_sendint64(&output_message, GetCurrentTimestamp());
3470  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3471 
3472  /* ... and send it wrapped in CopyData */
3473  pq_putmessage_noblock('d', output_message.data, output_message.len);
3474 }
3475 
3476 /*
3477  * Send keepalive message if too much time has elapsed.
3478  */
3479 static void
3481 {
3482  TimestampTz ping_time;
3483 
3484  /*
3485  * Don't send keepalive messages if timeouts are globally disabled or
3486  * we're doing something not partaking in timeouts.
3487  */
3488  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3489  return;
3490 
3492  return;
3493 
3494  /*
3495  * If half of wal_sender_timeout has lapsed without receiving any reply
3496  * from the standby, send a keep-alive message to the standby requesting
3497  * an immediate reply.
3498  */
3500  wal_sender_timeout / 2);
3501  if (last_processing >= ping_time)
3502  {
3503  WalSndKeepalive(true);
3505 
3506  /* Try to flush pending output to the client */
3507  if (pq_flush_if_writable() != 0)
3508  WalSndShutdown();
3509  }
3510 }
3511 
3512 /*
3513  * Record the end of the WAL and the time it was flushed locally, so that
3514  * LagTrackerRead can compute the elapsed time (lag) when this WAL location is
3515  * eventually reported to have been written, flushed and applied by the
3516  * standby in a reply message.
3517  */
3518 static void
3520 {
3521  bool buffer_full;
3522  int new_write_head;
3523  int i;
3524 
3525  if (!am_walsender)
3526  return;
3527 
3528  /*
3529  * If the lsn hasn't advanced since last time, then do nothing. This way
3530  * we only record a new sample when new WAL has been written.
3531  */
3532  if (lag_tracker->last_lsn == lsn)
3533  return;
3534  lag_tracker->last_lsn = lsn;
3535 
3536  /*
3537  * If advancing the write head of the circular buffer would crash into any
3538  * of the read heads, then the buffer is full. In other words, the
3539  * slowest reader (presumably apply) is the one that controls the release
3540  * of space.
3541  */
3542  new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3543  buffer_full = false;
3544  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3545  {
3546  if (new_write_head == lag_tracker->read_heads[i])
3547  buffer_full = true;
3548  }
3549 
3550  /*
3551  * If the buffer is full, for now we just rewind by one slot and overwrite
3552  * the last sample, as a simple (if somewhat uneven) way to lower the
3553  * sampling rate. There may be better adaptive compaction algorithms.
3554  */
3555  if (buffer_full)
3556  {
3557  new_write_head = lag_tracker->write_head;
3558  if (lag_tracker->write_head > 0)
3559  lag_tracker->write_head--;
3560  else
3561  lag_tracker->write_head = LAG_TRACKER_BUFFER_SIZE - 1;
3562  }
3563 
3564  /* Store a sample at the current write head position. */
3565  lag_tracker->buffer[lag_tracker->write_head].lsn = lsn;
3566  lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
3567  lag_tracker->write_head = new_write_head;
3568 }
3569 
3570 /*
3571  * Find out how much time has elapsed between the moment WAL location 'lsn'
3572  * (or the highest known earlier LSN) was flushed locally and the time 'now'.
3573  * We have a separate read head for each of the reported LSN locations we
3574  * receive in replies from standby; 'head' controls which read head is
3575  * used. Whenever a read head crosses an LSN which was written into the
3576  * lag buffer with LagTrackerWrite, we can use the associated timestamp to
3577  * find out the time this LSN (or an earlier one) was flushed locally, and
3578  * therefore compute the lag.
3579  *
3580  * Return -1 if no new sample data is available, and otherwise the elapsed
3581  * time in microseconds.
3582  */
3583 static TimeOffset
3585 {
3586  TimestampTz time = 0;
3587 
3588  /* Read all unread samples up to this LSN or end of buffer. */
3589  while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
3590  lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
3591  {
3592  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3593  lag_tracker->last_read[head] =
3594  lag_tracker->buffer[lag_tracker->read_heads[head]];
3595  lag_tracker->read_heads[head] =
3596  (lag_tracker->read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
3597  }
3598 
3599  /*
3600  * If the lag tracker is empty, that means the standby has processed
3601  * everything we've ever sent so we should now clear 'last_read'. If we
3602  * didn't do that, we'd risk using a stale and irrelevant sample for
3603  * interpolation at the beginning of the next burst of WAL after a period
3604  * of idleness.
3605  */
3606  if (lag_tracker->read_heads[head] == lag_tracker->write_head)
3607  lag_tracker->last_read[head].time = 0;
3608 
3609  if (time > now)
3610  {
3611  /* If the clock somehow went backwards, treat as not found. */
3612  return -1;
3613  }
3614  else if (time == 0)
3615  {
3616  /*
3617  * We didn't cross a time. If there is a future sample that we
3618  * haven't reached yet, and we've already reached at least one sample,
3619  * let's interpolate the local flushed time. This is mainly useful
3620  * for reporting a completely stuck apply position as having
3621  * increasing lag, since otherwise we'd have to wait for it to
3622  * eventually start moving again and cross one of our samples before
3623  * we can show the lag increasing.
3624  */
3625  if (lag_tracker->read_heads[head] == lag_tracker->write_head)
3626  {
3627  /* There are no future samples, so we can't interpolate. */
3628  return -1;
3629  }
3630  else if (lag_tracker->last_read[head].time != 0)
3631  {
3632  /* We can interpolate between last_read and the next sample. */
3633  double fraction;
3634  WalTimeSample prev = lag_tracker->last_read[head];
3635  WalTimeSample next = lag_tracker->buffer[lag_tracker->read_heads[head]];
3636 
3637  if (lsn < prev.lsn)
3638  {
3639  /*
3640  * Reported LSNs shouldn't normally go backwards, but it's
3641  * possible when there is a timeline change. Treat as not
3642  * found.
3643  */
3644  return -1;
3645  }
3646 
3647  Assert(prev.lsn < next.lsn);
3648 
3649  if (prev.time > next.time)
3650  {
3651  /* If the clock somehow went backwards, treat as not found. */
3652  return -1;
3653  }
3654 
3655  /* See how far we are between the previous and next samples. */
3656  fraction =
3657  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3658 
3659  /* Scale the local flush time proportionally. */
3660  time = (TimestampTz)
3661  ((double) prev.time + (next.time - prev.time) * fraction);
3662  }
3663  else
3664  {
3665  /*
3666  * We have only a future sample, implying that we were entirely
3667  * caught up but and now there is a new burst of WAL and the
3668  * standby hasn't processed the first sample yet. Until the
3669  * standby reaches the future sample the best we can do is report
3670  * the hypothetical lag if that sample were to be replayed now.
3671  */
3672  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3673  }
3674  }
3675 
3676  /* Return the elapsed time since local flush time in microseconds. */
3677  Assert(time != 0);
3678  return now - time;
3679 }
3680 
3681 static void
3683 {
3684  ReorderBuffer *rb = ctx->reorder;
3685 
3686  elog(DEBUG2, "UpdateSpillStats: updating stats %p %lld %lld %lld",
3687  rb,
3688  (long long) rb->spillTxns,
3689  (long long) rb->spillCount,
3690  (long long) rb->spillBytes);
3691 
3692  SpinLockAcquire(&MyWalSnd->mutex);
3693  MyWalSnd->spillTxns = rb->spillTxns;
3694  MyWalSnd->spillCount = rb->spillCount;
3695  MyWalSnd->spillBytes = rb->spillBytes;
3696  SpinLockRelease(&MyWalSnd->mutex);
3697 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
static void InitWalSenderSlot(void)
Definition: walsender.c:2377
void InitializeTimeouts(void)
Definition: timeout.c:346
#define pq_is_send_pending()
Definition: libpq.h:41
#define NIL
Definition: pg_list.h:65
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
XLogRecPtr write
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:213
static void ProcessStandbyMessage(void)
Definition: walsender.c:1803
#define pg_attribute_noreturn()
Definition: c.h:145
#define SIGQUIT
Definition: win32_port.h:154
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define IsA(nodeptr, _type_)
Definition: nodes.h:580
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
#define AllocSetContextCreate
Definition: memutils.h:170
XLogRecPtr startpoint
Definition: replnodes.h:85
#define DEBUG1
Definition: elog.h:25
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:205
int MyProcPid
Definition: globals.c:40
int wal_sender_timeout
Definition: walsender.c:123
struct ReorderBuffer * reorder
Definition: logical.h:42
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:813
uint32 TimeLineID
Definition: xlogdefs.h:52
#define pq_flush()
Definition: libpq.h:39
static int32 next
Definition: blutils.c:218
struct Port * MyProcPort
Definition: globals.c:43
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
int wal_segment_size
Definition: xlog.c:116
int write_head
Definition: walsender.c:214
uint32 TransactionId
Definition: c.h:513
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
bool wake_wal_senders
Definition: walsender.c:130
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
bool exec_replication_command(const char *cmd_string)
Definition: walsender.c:1526
Oid GetUserId(void)
Definition: miscinit.c:448
#define SIGUSR1
Definition: win32_port.h:165
bool update_process_title
Definition: ps_status.c:36
#define write(a, b, c)
Definition: win32.h:14
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
TransactionId xmin
Definition: proc.h:235
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3035
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3132
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:465
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
Definition: walsender.c:869
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1124
#define SIGCHLD
Definition: win32_port.h:163
PGPROC * MyProc
Definition: proc.c:67
int64 TimestampTz
Definition: timestamp.h:39
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:216
ResourceOwner CurrentResourceOwner
Definition: resowner.c:142
Size WalSndShmemSize(void)
Definition: walsender.c:3066
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:272
char * pstrdup(const char *in)
Definition: mcxt.c:1186
void CommitTransactionCommand(void)
Definition: xact.c:2919
#define XACT_REPEATABLE_READ
Definition: xact.h:38
#define SpinLockInit(lock)
Definition: spin.h:60
void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
Definition: snapmgr.c:2225
uint8 syncrep_method
Definition: syncrep.h:69
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:959
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:76
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:376
static void XLogSendPhysical(void)
Definition: walsender.c:2536
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2256
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
TimeOffset flushLag
void ResourceOwnerDelete(ResourceOwner owner)
Definition: resowner.c:712
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:222
Definition: nodes.h:529
static StringInfoData output_message
Definition: walsender.c:157
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
struct cursor * cur
Definition: ecpg.c:28
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:610
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4674
#define MemSet(start, val, len)
Definition: c.h:971
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2440
#define kill(pid, sig)
Definition: win32_port.h:426
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:2917
#define WL_SOCKET_READABLE
Definition: latch.h:125
void ReplicationSlotSave(void)
Definition: slot.c:697
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8422
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
TimestampTz time
Definition: walsender.c:203
int sleeptime
Definition: pg_standby.c:41
#define SIGPIPE
Definition: win32_port.h:158
ReplicationSlotPersistentData data
Definition: slot.h:143
TimeOffset writeLag
#define SIGUSR2
Definition: win32_port.h:166
#define LOG
Definition: elog.h:26
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
bool RecoveryInProgress(void)
Definition: xlog.c:8071
void SetLatch(Latch *latch)
Definition: latch.c:457
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1673
void list_free_deep(List *list)
Definition: list.c:1390
static int fd(const char *x, int i)
Definition: preproc-init.c:105
pgsocket sock
Definition: libpq-be.h:122
#define PG_BINARY
Definition: c.h:1233
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:8936
void ResetLatch(Latch *latch)
Definition: latch.c:540
slock_t mutex
Node * replication_parse_result
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2857
Latch procLatch
Definition: proc.h:111
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
Definition: xlogutils.c:690
XLogRecPtr confirmed_flush
Definition: slot.h:91
PGXACT * MyPgXact
Definition: proc.c:68
XLogRecPtr EndRecPtr
Definition: xlogreader.h:176
int32 day
Definition: timestamp.h:47
#define XidFromFullTransactionId(x)
Definition: transam.h:48
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:2020
static LagTracker * lag_tracker
Definition: walsender.c:219
TimeLineID timeline
Definition: replnodes.h:97
int64 spillCount
ReplicationKind kind
Definition: replnodes.h:56
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
void set_ps_display(const char *activity)
Definition: ps_status.c:349
bool IsTransactionBlock(void)
Definition: xact.c:4656
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
int64 spillBytes
WALOpenSegment seg
Definition: xlogreader.h:213
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:631
bool am_walsender
Definition: walsender.c:115
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
void HandleWalSndInitStopping(void)
Definition: walsender.c:3013
void ReplicationSlotReserveWal(void)
Definition: slot.c:1056
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:250
XLogRecPtr flush
bool defGetBoolean(DefElem *def)
Definition: define.c:111
char data[BLCKSZ]
Definition: c.h:1104
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:468
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:804
ReplicationKind kind
Definition: replnodes.h:82
void WalSndRqstFileReload(void)
Definition: walsender.c:2990
void pfree(void *pointer)
Definition: mcxt.c:1056
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:268
void ConditionVariableCancelSleep(void)
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3461
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2314
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:676
#define ERROR
Definition: elog.h:43
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:94
static TimestampTz last_processing
Definition: walsender.c:162
void pq_startmsgread(void)
Definition: pqcomm.c:1197
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2372
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1834
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3927
bool FirstSnapshotSet
Definition: snapmgr.c:205
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:166
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:392
void WalSndSignals(void)
Definition: walsender.c:3047
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
#define FATAL
Definition: elog.h:52
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:922
Latch * latch
void(* WalSndSendDataCallback)(void)
Definition: walsender.c:225
int64 spillTxns
#define MAXPGPATH
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11467
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2236
void ReplicationSlotPersist(void)
Definition: slot.c:732
TransactionId effective_xmin
Definition: slot.h:139
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1027
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:197
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:376
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2155
#define DEBUG2
Definition: elog.h:24
bool XLogBackgroundFlush(void)
Definition: xlog.c:3035
Definition: latch.h:110
Definition: dest.h:89
#define TLHistoryFileName(fname, tli)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
WalSndState state
NodeTag type
Definition: nodes.h:531
static char * buf
Definition: pg_test_fsync.c:67
void WalSndWaitStopping(void)
Definition: walsender.c:3165
static bool streamingDoneSending
Definition: walsender.c:179
uint64 XLogSegNo
Definition: xlogdefs.h:41
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
int errdetail(const char *fmt,...)
Definition: elog.c:957
XLogSegNo ws_segno
Definition: xlogreader.h:47
#define COMMERROR
Definition: elog.h:30
int errcode_for_file_access(void)
Definition: elog.c:633
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:571
#define SIGHUP
Definition: win32_port.h:153
TransactionId catalog_xmin
Definition: slot.h:77
#define pq_flush_if_writable()
Definition: libpq.h:40
TimeOffset time
Definition: timestamp.h:45
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3352
XLogRecPtr currTLIValidUntil
Definition: xlogreader.h:236
#define InvalidTransactionId
Definition: transam.h:31
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:240
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:283
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:390
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1265
unsigned int uint32
Definition: c.h:367
void ReplicationSlotRelease(void)
Definition: slot.c:476
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1381
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:230
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition: xlogreader.c:75
TransactionId xmin
Definition: slot.h:69
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
void InitWalSender(void)
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
#define SlotIsLogical(slot)
Definition: slot.h:165
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1701
static XLogReaderState * xlogreader
Definition: walsender.c:137
void SyncRepInitConfig(void)
Definition: syncrep.c:400
static void UpdateSpillStats(LogicalDecodingContext *ctx)
Definition: walsender.c:3682
static bool WalSndCaughtUp
Definition: walsender.c:183
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
MemoryContext TopMemoryContext
Definition: mcxt.c:44
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3519
XLogRecPtr lsn
Definition: walsender.c:202
int replication_yyparse(void)
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:246
Definition: guc.h:72
XLogRecPtr last_lsn
Definition: walsender.c:212
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:45
int32 month
Definition: timestamp.h:48
#define XL_ROUTINE(...)
Definition: xlogreader.h:116
int max_wal_senders
Definition: walsender.c:121
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2251
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3480
int CloseTransientFile(int fd)
Definition: fd.c:2549
#define SIG_IGN
Definition: win32_port.h:150
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1238
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1259
static StringInfoData reply_message
Definition: walsender.c:158
#define MAXFNAMELEN
TimeLineID nextTLI
Definition: xlogreader.h:242
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
int64 TimeOffset
Definition: timestamp.h:40
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1386
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2224
Size mul_size(Size s1, Size s2)
Definition: shmem.c:515
#define InvalidBackendId
Definition: backendid.h:23
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3584
void WalSndErrorCleanup(void)
Definition: walsender.c:296
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
uintptr_t Datum
Definition: postgres.h:367
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3222
XLogRecPtr sentPtr
Size add_size(Size s1, Size s2)
Definition: shmem.c:498
bool am_db_walsender
Definition: walsender.c:118
TransactionId effective_catalog_xmin
Definition: slot.h:140
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1138
Oid MyDatabaseId
Definition: globals.c:85
static void WalSndShutdown(void)
Definition: walsender.c:229
static TimeLineID receiveTLI
Definition: xlog.c:213
int work_mem
Definition: globals.c:121
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:580
void WalSndResourceCleanup(bool isCommit)
Definition: walsender.c:331
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
WalSnd * MyWalSnd
Definition: walsender.c:112
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:839
static XLogRecPtr sentPtr
Definition: walsender.c:154
int log_min_messages
Definition: guc.c:534
void pq_endmsgread(void)
Definition: pqcomm.c:1221
#define InvalidOid
Definition: postgres_ext.h:36
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2458
TimeLineID ThisTimeLineID
Definition: xlog.c:191
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define ereport(elevel,...)
Definition: elog.h:144
int allowedModes
Definition: execnodes.h:305
void WalSndInitStopping(void)
Definition: walsender.c:3139
TimeLineID currTLI
Definition: xlogreader.h:226
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4916
TimeOffset applyLag
struct SnapBuild * snapshot_builder
Definition: logical.h:43
int errmsg_internal(const char *fmt,...)
Definition: elog.c:911
SetFunctionReturnMode returnMode
Definition: execnodes.h:307
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2174
#define SIG_DFL
Definition: win32_port.h:148
#define SIGNAL_ARGS
Definition: c.h:1295
static TimeLineID sendTimeLine
Definition: walsender.c:145
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:738
#define lfirst(lc)
Definition: pg_list.h:190
void WalSndSetState(WalSndState state)
Definition: walsender.c:3203
int sync_standby_priority
Definition: regguts.h:298
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
Definition: resowner.c:478
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:511
static void XLogSendLogical(void)
Definition: walsender.c:2837
XLogRecPtr restart_lsn
Definition: slot.h:80
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:36
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:215
void StartTransactionCommand(void)
Definition: xact.c:2818
bool needreload
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1866
size_t Size
Definition: c.h:466
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1357
char * dbname
Definition: streamutil.c:50
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:207
int XactIsoLevel
Definition: xact.c:75
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:146
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:148
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1012
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:233
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
WalSndState
Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
Definition: walsender.c:3257
TimeLineID ws_tli
Definition: xlogreader.h:48
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:922
static bool streamingDoneReceiving
Definition: walsender.c:180
Tuplestorestate * setResult
Definition: execnodes.h:310
static StringInfoData tmpbuf
Definition: walsender.c:159
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:985
#define XLogFilePath(path, tli, logSegNo, wal_segsz_bytes)
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition: syncrep.c:714
bool IsSubTransaction(void)
Definition: xact.c:4729
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:539
static Datum values[MAXATTR]
Definition: bootstrap.c:167
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:303
bool log_replication_commands
Definition: walsender.c:125
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4912
#define Int32GetDatum(X)
Definition: postgres.h:479
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:569
char * application_name
Definition: guc.c:559
TupleDesc setDesc
Definition: execnodes.h:311
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
void * palloc(Size size)
Definition: mcxt.c:949
int errmsg(const char *fmt,...)
Definition: elog.c:824
XLogReaderState * reader
Definition: logical.h:41
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
int ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
Definition: slot.c:367
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
CRSSnapshotAction
Definition: walsender.h:20
#define elog(elevel,...)
Definition: elog.h:214
StringInfo out
Definition: logical.h:70
static const unsigned __int64 epoch
Definition: gettimeofday.c:34
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
int i
void WalSndShmemInit(void)
Definition: walsender.c:3078
Definition: slot.h:42
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
#define NameStr(name)
Definition: c.h:615
TimeLineID timeline
Definition: replnodes.h:84
#define CStringGetTextDatum(s)
Definition: builtins.h:87
void * arg
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:1971
struct Latch * MyLatch
Definition: globals.c:54
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
Definition: xlogreader.c:1062
void ReplicationSlotCleanup(void)
Definition: slot.c:531
WALSegmentContext segcxt
Definition: xlogreader.h:212
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
char * defname
Definition: parsenodes.h:732
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2960
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:815
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:146
TimestampTz replyTime
void LWLockReleaseAll(void)
Definition: lwlock.c:1911
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3241
slock_t mutex
Definition: slot.h:116
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:551
CommandDest whereToSendOutput
Definition: postgres.c:91
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1648
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
static bool waiting_for_ping_response
Definition: walsender.c:171
void pq_puttextmessage(char msgtype, const char *str)
Definition: pqformat.c:369
XLogRecPtr apply
static void IdentifySystem(void)
Definition: walsender.c:376
Definition: pg_list.h:50
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:75
#define snprintf
Definition: port.h:193
#define WL_LATCH_SET
Definition: latch.h:124
int16 AttrNumber
Definition: attnum.h:21
void quickdie(SIGNAL_ARGS)
Definition: postgres.c:2759
#define UINT64_FORMAT
Definition: c.h:410
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:754
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:429
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1699
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1538
#define MAX_SEND_SIZE
Definition: walsender.c:106
#define die(msg)
Definition: pg_test_fsync.c:96
#define read(a, b, c)
Definition: win32.h:13
static bool sendTimeLineIsHistoric
Definition: walsender.c:147
#define offsetof(type, field)
Definition: c.h:661
void WalSndWakeup(void)
Definition: walsender.c:3110
void ReplicationSlotMarkDirty(void)
Definition: slot.c:715
bool am_cascading_walsender
Definition: walsender.c:116
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:2051
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:692
static XLogRecPtr startptr
Definition: basebackup.c:116
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1355
void replication_scanner_init(const char *query_string)
#define TLHistoryFilePath(path, tli)
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1736
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)