PostgreSQL Source Code  git master
walsender.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * walsender.c
4  *
5  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6  * care of sending XLOG from the primary server to a single recipient.
7  * (Note that there can be more than one walsender process concurrently.)
8  * It is started by the postmaster when the walreceiver of a standby server
9  * connects to the primary server and requests XLOG streaming replication.
10  *
11  * A walsender is similar to a regular backend, ie. there is a one-to-one
12  * relationship between a connection and a walsender process, but instead
13  * of processing SQL queries, it understands a small set of special
14  * replication-mode commands. The START_REPLICATION command begins streaming
15  * WAL to the client. While streaming, the walsender keeps reading XLOG
16  * records from the disk and sends them to the standby server over the
17  * COPY protocol, until either side ends the replication by exiting COPY
18  * mode (or until the connection is closed).
19  *
20  * Normal termination is by SIGTERM, which instructs the walsender to
21  * close the connection and exit(0) at the next convenient moment. Emergency
22  * termination is by SIGQUIT; like any backend, the walsender will simply
23  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24  * are treated as not a crash but approximately normal termination;
25  * the walsender will exit quickly without sending any more XLOG records.
26  *
27  * If the server is shut down, checkpointer sends us
28  * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If
29  * the backend is idle or runs an SQL query this causes the backend to
30  * shutdown, if logical replication is in progress all existing WAL records
31  * are processed followed by a shutdown. Otherwise this causes the walsender
32  * to switch to the "stopping" state. In this state, the walsender will reject
33  * any further replication commands. The checkpointer begins the shutdown
34  * checkpoint once all walsenders are confirmed as stopping. When the shutdown
35  * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
36  * walsender to send any outstanding WAL, including the shutdown checkpoint
37  * record, wait for it to be replicated to the standby, and then exit.
38  *
39  *
40  * Portions Copyright (c) 2010-2018, PostgreSQL Global Development Group
41  *
42  * IDENTIFICATION
43  * src/backend/replication/walsender.c
44  *
45  *-------------------------------------------------------------------------
46  */
47 #include "postgres.h"
48 
49 #include <signal.h>
50 #include <unistd.h>
51 
52 #include "access/printtup.h"
53 #include "access/timeline.h"
54 #include "access/transam.h"
55 #include "access/xact.h"
56 #include "access/xlog_internal.h"
57 #include "access/xlogutils.h"
58 
59 #include "catalog/pg_authid.h"
60 #include "catalog/pg_type.h"
61 #include "commands/dbcommands.h"
62 #include "commands/defrem.h"
63 #include "funcapi.h"
64 #include "libpq/libpq.h"
65 #include "libpq/pqformat.h"
66 #include "miscadmin.h"
67 #include "nodes/replnodes.h"
68 #include "pgstat.h"
69 #include "replication/basebackup.h"
70 #include "replication/decode.h"
71 #include "replication/logical.h"
73 #include "replication/slot.h"
74 #include "replication/snapbuild.h"
75 #include "replication/syncrep.h"
77 #include "replication/walsender.h"
80 #include "storage/fd.h"
81 #include "storage/ipc.h"
82 #include "storage/pmsignal.h"
83 #include "storage/proc.h"
84 #include "storage/procarray.h"
85 #include "tcop/dest.h"
86 #include "tcop/tcopprot.h"
87 #include "utils/builtins.h"
88 #include "utils/guc.h"
89 #include "utils/memutils.h"
90 #include "utils/pg_lsn.h"
91 #include "utils/portal.h"
92 #include "utils/ps_status.h"
93 #include "utils/resowner.h"
94 #include "utils/timeout.h"
95 #include "utils/timestamp.h"
96 
97 /*
98  * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
99  *
100  * We don't have a good idea of what a good value would be; there's some
101  * overhead per message in both walsender and walreceiver, but on the other
102  * hand sending large batches makes walsender less responsive to signals
103  * because signals are checked only between messages. 128kB (with
104  * default 8k blocks) seems like a reasonable guess for now.
105  */
106 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
107 
108 /* Array of WalSnds in shared memory */
110 
111 /* My slot in the shared memory array */
112 WalSnd *MyWalSnd = NULL;
113 
114 /* Global state */
115 bool am_walsender = false; /* Am I a walsender process? */
116 bool am_cascading_walsender = false; /* Am I cascading WAL to another
117  * standby? */
118 bool am_db_walsender = false; /* Connected to a database? */
119 
120 /* User-settable parameters for walsender */
121 int max_wal_senders = 0; /* the maximum number of concurrent
122  * walsenders */
123 int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
124  * data message */
126 
127 /*
128  * State for WalSndWakeupRequest
129  */
130 bool wake_wal_senders = false;
131 
132 /*
133  * These variables are used similarly to openLogFile/SegNo/Off,
134  * but for walsender to read the XLOG.
135  */
136 static int sendFile = -1;
137 static XLogSegNo sendSegNo = 0;
138 static uint32 sendOff = 0;
139 
140 /* Timeline ID of the currently open file */
142 
143 /*
144  * These variables keep track of the state of the timeline we're currently
145  * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
146  * the timeline is not the latest timeline on this server, and the server's
147  * history forked off from that timeline at sendTimeLineValidUpto.
148  */
151 static bool sendTimeLineIsHistoric = false;
153 
154 /*
155  * How far have we sent WAL already? This is also advertised in
156  * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
157  */
158 static XLogRecPtr sentPtr = 0;
159 
160 /* Buffers for constructing outgoing messages and processing reply messages. */
164 
165 /*
166  * Timestamp of the last receipt of the reply from the standby. Set to 0 if
167  * wal_sender_timeout doesn't need to be active.
168  */
170 
171 /* Have we sent a heartbeat message asking for reply, since last reply? */
172 static bool waiting_for_ping_response = false;
173 
174 /*
175  * While streaming WAL in Copy mode, streamingDoneSending is set to true
176  * after we have sent CopyDone. We should not send any more CopyData messages
177  * after that. streamingDoneReceiving is set to true when we receive CopyDone
178  * from the other end. When both become true, it's time to exit Copy mode.
179  */
182 
183 /* Are we there yet? */
184 static bool WalSndCaughtUp = false;
185 
186 /* Flags set by signal handlers for later service in main loop */
187 static volatile sig_atomic_t got_SIGUSR2 = false;
188 static volatile sig_atomic_t got_STOPPING = false;
189 
190 /*
191  * This is set while we are streaming. When not set
192  * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
193  * the main loop is responsible for checking got_STOPPING and terminating when
194  * it's set (after streaming any remaining WAL).
195  */
196 static volatile sig_atomic_t replication_active = false;
197 
200 
201 /* A sample associating a WAL location with the time it was written. */
202 typedef struct
203 {
206 } WalTimeSample;
207 
208 /* The size of our buffer of time samples. */
209 #define LAG_TRACKER_BUFFER_SIZE 8192
210 
211 /* A mechanism for tracking replication lag. */
212 static struct
213 {
219 } LagTracker;
220 
221 /* Signal handlers */
223 
224 /* Prototypes for private functions */
225 typedef void (*WalSndSendDataCallback) (void);
226 static void WalSndLoop(WalSndSendDataCallback send_data);
227 static void InitWalSenderSlot(void);
228 static void WalSndKill(int code, Datum arg);
230 static void XLogSendPhysical(void);
231 static void XLogSendLogical(void);
232 static void WalSndDone(WalSndSendDataCallback send_data);
233 static XLogRecPtr GetStandbyFlushRecPtr(void);
234 static void IdentifySystem(void);
237 static void StartReplication(StartReplicationCmd *cmd);
239 static void ProcessStandbyMessage(void);
240 static void ProcessStandbyReplyMessage(void);
241 static void ProcessStandbyHSFeedbackMessage(void);
242 static void ProcessRepliesIfAny(void);
243 static void WalSndKeepalive(bool requestReply);
245 static void WalSndCheckTimeOut(TimestampTz now);
247 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
248 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
251 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
252 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
254 
255 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
256 
257 
258 /* Initialize walsender process before entering the main command loop */
259 void
260 InitWalSender(void)
261 {
263 
264  /* Create a per-walsender data structure in shared memory */
266 
267  /* Set up resource owner */
268  CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
269 
270  /*
271  * Let postmaster know that we're a WAL sender. Once we've declared us as
272  * a WAL sender process, postmaster will let us outlive the bgwriter and
273  * kill us last in the shutdown sequence, so we get a chance to stream all
274  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
275  * there's no going back, and we mustn't write any WAL records after this.
276  */
279 
280  /* Initialize empty timestamp buffer for lag tracking. */
281  memset(&LagTracker, 0, sizeof(LagTracker));
282 }
283 
284 /*
285  * Clean up after an error.
286  *
287  * WAL sender processes don't use transactions like regular backends do.
288  * This function does any cleanup required after an error in a WAL sender
289  * process, similar to what transaction abort does in a regular backend.
290  */
291 void
293 {
297 
298  if (sendFile >= 0)
299  {
300  close(sendFile);
301  sendFile = -1;
302  }
303 
304  if (MyReplicationSlot != NULL)
306 
308 
309  replication_active = false;
310 
311  if (got_STOPPING || got_SIGUSR2)
312  proc_exit(0);
313 
314  /* Revert back to startup state */
316 }
317 
318 /*
319  * Handle a client's connection abort in an orderly manner.
320  */
321 static void
322 WalSndShutdown(void)
323 {
324  /*
325  * Reset whereToSendOutput to prevent ereport from attempting to send any
326  * more messages to the standby.
327  */
330 
331  proc_exit(0);
332  abort(); /* keep the compiler quiet */
333 }
334 
335 /*
336  * Handle the IDENTIFY_SYSTEM command.
337  */
338 static void
340 {
341  char sysid[32];
342  char xloc[MAXFNAMELEN];
343  XLogRecPtr logptr;
344  char *dbname = NULL;
346  TupOutputState *tstate;
347  TupleDesc tupdesc;
348  Datum values[4];
349  bool nulls[4];
350 
351  /*
352  * Reply with a result set with one row, four columns. First col is system
353  * ID, second is timeline ID, third is current xlog location and the
354  * fourth contains the database name if we are connected to one.
355  */
356 
357  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
359 
362  {
363  /* this also updates ThisTimeLineID */
364  logptr = GetStandbyFlushRecPtr();
365  }
366  else
367  logptr = GetFlushRecPtr();
368 
369  snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
370 
371  if (MyDatabaseId != InvalidOid)
372  {
374 
375  /* syscache access needs a transaction env. */
377  /* make dbname live outside TX context */
381  /* CommitTransactionCommand switches to TopMemoryContext */
383  }
384 
386  MemSet(nulls, false, sizeof(nulls));
387 
388  /* need a tuple descriptor representing four columns */
389  tupdesc = CreateTemplateTupleDesc(4, false);
390  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
391  TEXTOID, -1, 0);
392  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
393  INT4OID, -1, 0);
394  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
395  TEXTOID, -1, 0);
396  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
397  TEXTOID, -1, 0);
398 
399  /* prepare for projection of tuples */
400  tstate = begin_tup_output_tupdesc(dest, tupdesc);
401 
402  /* column 1: system identifier */
403  values[0] = CStringGetTextDatum(sysid);
404 
405  /* column 2: timeline */
406  values[1] = Int32GetDatum(ThisTimeLineID);
407 
408  /* column 3: wal location */
409  values[2] = CStringGetTextDatum(xloc);
410 
411  /* column 4: database name, or NULL if none */
412  if (dbname)
413  values[3] = CStringGetTextDatum(dbname);
414  else
415  nulls[3] = true;
416 
417  /* send it to dest */
418  do_tup_output(tstate, values, nulls);
419 
420  end_tup_output(tstate);
421 }
422 
423 
424 /*
425  * Handle TIMELINE_HISTORY command.
426  */
427 static void
429 {
431  char histfname[MAXFNAMELEN];
432  char path[MAXPGPATH];
433  int fd;
434  off_t histfilelen;
435  off_t bytesleft;
436  Size len;
437 
438  /*
439  * Reply with a result set with one row, and two columns. The first col is
440  * the name of the history file, 2nd is the contents.
441  */
442 
443  TLHistoryFileName(histfname, cmd->timeline);
444  TLHistoryFilePath(path, cmd->timeline);
445 
446  /* Send a RowDescription message */
447  pq_beginmessage(&buf, 'T');
448  pq_sendint16(&buf, 2); /* 2 fields */
449 
450  /* first field */
451  pq_sendstring(&buf, "filename"); /* col name */
452  pq_sendint32(&buf, 0); /* table oid */
453  pq_sendint16(&buf, 0); /* attnum */
454  pq_sendint32(&buf, TEXTOID); /* type oid */
455  pq_sendint16(&buf, -1); /* typlen */
456  pq_sendint32(&buf, 0); /* typmod */
457  pq_sendint16(&buf, 0); /* format code */
458 
459  /* second field */
460  pq_sendstring(&buf, "content"); /* col name */
461  pq_sendint32(&buf, 0); /* table oid */
462  pq_sendint16(&buf, 0); /* attnum */
463  pq_sendint32(&buf, BYTEAOID); /* type oid */
464  pq_sendint16(&buf, -1); /* typlen */
465  pq_sendint32(&buf, 0); /* typmod */
466  pq_sendint16(&buf, 0); /* format code */
467  pq_endmessage(&buf);
468 
469  /* Send a DataRow message */
470  pq_beginmessage(&buf, 'D');
471  pq_sendint16(&buf, 2); /* # of columns */
472  len = strlen(histfname);
473  pq_sendint32(&buf, len); /* col1 len */
474  pq_sendbytes(&buf, histfname, len);
475 
476  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
477  if (fd < 0)
478  ereport(ERROR,
480  errmsg("could not open file \"%s\": %m", path)));
481 
482  /* Determine file length and send it to client */
483  histfilelen = lseek(fd, 0, SEEK_END);
484  if (histfilelen < 0)
485  ereport(ERROR,
487  errmsg("could not seek to end of file \"%s\": %m", path)));
488  if (lseek(fd, 0, SEEK_SET) != 0)
489  ereport(ERROR,
491  errmsg("could not seek to beginning of file \"%s\": %m", path)));
492 
493  pq_sendint32(&buf, histfilelen); /* col2 len */
494 
495  bytesleft = histfilelen;
496  while (bytesleft > 0)
497  {
498  char rbuf[BLCKSZ];
499  int nread;
500 
502  nread = read(fd, rbuf, sizeof(rbuf));
504  if (nread <= 0)
505  ereport(ERROR,
507  errmsg("could not read file \"%s\": %m",
508  path)));
509  pq_sendbytes(&buf, rbuf, nread);
510  bytesleft -= nread;
511  }
512  CloseTransientFile(fd);
513 
514  pq_endmessage(&buf);
515 }
516 
517 /*
518  * Handle START_REPLICATION command.
519  *
520  * At the moment, this never returns, but an ereport(ERROR) will take us back
521  * to the main loop.
522  */
523 static void
525 {
527  XLogRecPtr FlushPtr;
528 
529  if (ThisTimeLineID == 0)
530  ereport(ERROR,
531  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
532  errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
533 
534  /*
535  * We assume here that we're logging enough information in the WAL for
536  * log-shipping, since this is checked in PostmasterMain().
537  *
538  * NOTE: wal_level can only change at shutdown, so in most cases it is
539  * difficult for there to be WAL data that we can still see that was
540  * written at wal_level='minimal'.
541  */
542 
543  if (cmd->slotname)
544  {
545  ReplicationSlotAcquire(cmd->slotname, true);
547  ereport(ERROR,
548  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
549  (errmsg("cannot use a logical replication slot for physical replication"))));
550  }
551 
552  /*
553  * Select the timeline. If it was given explicitly by the client, use
554  * that. Otherwise use the timeline of the last replayed record, which is
555  * kept in ThisTimeLineID.
556  */
558  {
559  /* this also updates ThisTimeLineID */
560  FlushPtr = GetStandbyFlushRecPtr();
561  }
562  else
563  FlushPtr = GetFlushRecPtr();
564 
565  if (cmd->timeline != 0)
566  {
567  XLogRecPtr switchpoint;
568 
569  sendTimeLine = cmd->timeline;
571  {
572  sendTimeLineIsHistoric = false;
574  }
575  else
576  {
577  List *timeLineHistory;
578 
579  sendTimeLineIsHistoric = true;
580 
581  /*
582  * Check that the timeline the client requested exists, and the
583  * requested start location is on that timeline.
584  */
585  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
586  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
588  list_free_deep(timeLineHistory);
589 
590  /*
591  * Found the requested timeline in the history. Check that
592  * requested startpoint is on that timeline in our history.
593  *
594  * This is quite loose on purpose. We only check that we didn't
595  * fork off the requested timeline before the switchpoint. We
596  * don't check that we switched *to* it before the requested
597  * starting point. This is because the client can legitimately
598  * request to start replication from the beginning of the WAL
599  * segment that contains switchpoint, but on the new timeline, so
600  * that it doesn't end up with a partial segment. If you ask for
601  * too old a starting point, you'll get an error later when we
602  * fail to find the requested WAL segment in pg_wal.
603  *
604  * XXX: we could be more strict here and only allow a startpoint
605  * that's older than the switchpoint, if it's still in the same
606  * WAL segment.
607  */
608  if (!XLogRecPtrIsInvalid(switchpoint) &&
609  switchpoint < cmd->startpoint)
610  {
611  ereport(ERROR,
612  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
613  (uint32) (cmd->startpoint >> 32),
614  (uint32) (cmd->startpoint),
615  cmd->timeline),
616  errdetail("This server's history forked from timeline %u at %X/%X.",
617  cmd->timeline,
618  (uint32) (switchpoint >> 32),
619  (uint32) (switchpoint))));
620  }
621  sendTimeLineValidUpto = switchpoint;
622  }
623  }
624  else
625  {
628  sendTimeLineIsHistoric = false;
629  }
630 
632 
633  /* If there is nothing to stream, don't even enter COPY mode */
635  {
636  /*
637  * When we first start replication the standby will be behind the
638  * primary. For some applications, for example synchronous
639  * replication, it is important to have a clear state for this initial
640  * catchup mode, so we can trigger actions when we change streaming
641  * state later. We may stay in this state for a long time, which is
642  * exactly why we want to be able to monitor whether or not we are
643  * still here.
644  */
646 
647  /* Send a CopyBothResponse message, and start streaming */
648  pq_beginmessage(&buf, 'W');
649  pq_sendbyte(&buf, 0);
650  pq_sendint16(&buf, 0);
651  pq_endmessage(&buf);
652  pq_flush();
653 
654  /*
655  * Don't allow a request to stream from a future point in WAL that
656  * hasn't been flushed to disk in this server yet.
657  */
658  if (FlushPtr < cmd->startpoint)
659  {
660  ereport(ERROR,
661  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
662  (uint32) (cmd->startpoint >> 32),
663  (uint32) (cmd->startpoint),
664  (uint32) (FlushPtr >> 32),
665  (uint32) (FlushPtr))));
666  }
667 
668  /* Start streaming from the requested point */
669  sentPtr = cmd->startpoint;
670 
671  /* Initialize shared memory status, too */
672  SpinLockAcquire(&MyWalSnd->mutex);
673  MyWalSnd->sentPtr = sentPtr;
674  SpinLockRelease(&MyWalSnd->mutex);
675 
677 
678  /* Main loop of walsender */
679  replication_active = true;
680 
682 
683  replication_active = false;
684  if (got_STOPPING)
685  proc_exit(0);
687 
689  }
690 
691  if (cmd->slotname)
693 
694  /*
695  * Copy is finished now. Send a single-row result set indicating the next
696  * timeline.
697  */
699  {
700  char startpos_str[8 + 1 + 8 + 1];
702  TupOutputState *tstate;
703  TupleDesc tupdesc;
704  Datum values[2];
705  bool nulls[2];
706 
707  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
708  (uint32) (sendTimeLineValidUpto >> 32),
710 
712  MemSet(nulls, false, sizeof(nulls));
713 
714  /*
715  * Need a tuple descriptor representing two columns. int8 may seem
716  * like a surprising data type for this, but in theory int4 would not
717  * be wide enough for this, as TimeLineID is unsigned.
718  */
719  tupdesc = CreateTemplateTupleDesc(2, false);
720  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
721  INT8OID, -1, 0);
722  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
723  TEXTOID, -1, 0);
724 
725  /* prepare for projection of tuple */
726  tstate = begin_tup_output_tupdesc(dest, tupdesc);
727 
728  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
729  values[1] = CStringGetTextDatum(startpos_str);
730 
731  /* send it to dest */
732  do_tup_output(tstate, values, nulls);
733 
734  end_tup_output(tstate);
735  }
736 
737  /* Send CommandComplete message */
738  pq_puttextmessage('C', "START_STREAMING");
739 }
740 
741 /*
742  * read_page callback for logical decoding contexts, as a walsender process.
743  *
744  * Inside the walsender we can do better than logical_read_local_xlog_page,
745  * which has to do a plain sleep/busy loop, because the walsender's latch gets
746  * set every time WAL is flushed.
747  */
748 static int
750  XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
751 {
752  XLogRecPtr flushptr;
753  int count;
754 
755  XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
757  sendTimeLine = state->currTLI;
759  sendTimeLineNextTLI = state->nextTLI;
760 
761  /* make sure we have enough WAL available */
762  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
763 
764  /* fail if not (implies we are going to shut down) */
765  if (flushptr < targetPagePtr + reqLen)
766  return -1;
767 
768  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
769  count = XLOG_BLCKSZ; /* more than one block available */
770  else
771  count = flushptr - targetPagePtr; /* part of the page available */
772 
773  /* now actually read the data, we know it's there */
774  XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
775 
776  return count;
777 }
778 
779 /*
780  * Process extra options given to CREATE_REPLICATION_SLOT.
781  */
782 static void
784  bool *reserve_wal,
785  CRSSnapshotAction *snapshot_action)
786 {
787  ListCell *lc;
788  bool snapshot_action_given = false;
789  bool reserve_wal_given = false;
790 
791  /* Parse options */
792  foreach(lc, cmd->options)
793  {
794  DefElem *defel = (DefElem *) lfirst(lc);
795 
796  if (strcmp(defel->defname, "export_snapshot") == 0)
797  {
798  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
799  ereport(ERROR,
800  (errcode(ERRCODE_SYNTAX_ERROR),
801  errmsg("conflicting or redundant options")));
802 
803  snapshot_action_given = true;
804  *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
806  }
807  else if (strcmp(defel->defname, "use_snapshot") == 0)
808  {
809  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
810  ereport(ERROR,
811  (errcode(ERRCODE_SYNTAX_ERROR),
812  errmsg("conflicting or redundant options")));
813 
814  snapshot_action_given = true;
815  *snapshot_action = CRS_USE_SNAPSHOT;
816  }
817  else if (strcmp(defel->defname, "reserve_wal") == 0)
818  {
819  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
820  ereport(ERROR,
821  (errcode(ERRCODE_SYNTAX_ERROR),
822  errmsg("conflicting or redundant options")));
823 
824  reserve_wal_given = true;
825  *reserve_wal = true;
826  }
827  else
828  elog(ERROR, "unrecognized option: %s", defel->defname);
829  }
830 }
831 
832 /*
833  * Create a new replication slot.
834  */
835 static void
837 {
838  const char *snapshot_name = NULL;
839  char xloc[MAXFNAMELEN];
840  char *slot_name;
841  bool reserve_wal = false;
842  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
844  TupOutputState *tstate;
845  TupleDesc tupdesc;
846  Datum values[4];
847  bool nulls[4];
848 
850 
851  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
852 
853  /* setup state for XLogReadPage */
854  sendTimeLineIsHistoric = false;
856 
857  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
858  {
859  ReplicationSlotCreate(cmd->slotname, false,
861  }
862  else
863  {
865 
866  /*
867  * Initially create persistent slot as ephemeral - that allows us to
868  * nicely handle errors during initialization because it'll get
869  * dropped if this transaction fails. We'll make it persistent at the
870  * end. Temporary slots can be created as temporary from beginning as
871  * they get dropped on error as well.
872  */
873  ReplicationSlotCreate(cmd->slotname, true,
875  }
876 
877  if (cmd->kind == REPLICATION_KIND_LOGICAL)
878  {
880  bool need_full_snapshot = false;
881 
882  /*
883  * Do options check early so that we can bail before calling the
884  * DecodingContextFindStartpoint which can take long time.
885  */
886  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
887  {
888  if (IsTransactionBlock())
889  ereport(ERROR,
890  (errmsg("CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT "
891  "must not be called inside a transaction")));
892 
893  need_full_snapshot = true;
894  }
895  else if (snapshot_action == CRS_USE_SNAPSHOT)
896  {
897  if (!IsTransactionBlock())
898  ereport(ERROR,
899  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
900  "must be called inside a transaction")));
901 
903  ereport(ERROR,
904  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
905  "must be called in REPEATABLE READ isolation mode transaction")));
906 
907  if (FirstSnapshotSet)
908  ereport(ERROR,
909  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
910  "must be called before any query")));
911 
912  if (IsSubTransaction())
913  ereport(ERROR,
914  (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
915  "must not be called in a subtransaction")));
916 
917  need_full_snapshot = true;
918  }
919 
920  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
924 
925  /*
926  * Signal that we don't need the timeout mechanism. We're just
927  * creating the replication slot and don't yet accept feedback
928  * messages or send keepalives. As we possibly need to wait for
929  * further WAL the walsender would otherwise possibly be killed too
930  * soon.
931  */
933 
934  /* build initial snapshot, might take a while */
936 
937  /*
938  * Export or use the snapshot if we've been asked to do so.
939  *
940  * NB. We will convert the snapbuild.c kind of snapshot to normal
941  * snapshot when doing this.
942  */
943  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
944  {
945  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
946  }
947  else if (snapshot_action == CRS_USE_SNAPSHOT)
948  {
949  Snapshot snap;
950 
953  }
954 
955  /* don't need the decoding context anymore */
956  FreeDecodingContext(ctx);
957 
958  if (!cmd->temporary)
960  }
961  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
962  {
964 
966 
967  /* Write this slot to disk if it's a permanent one. */
968  if (!cmd->temporary)
970  }
971 
972  snprintf(xloc, sizeof(xloc), "%X/%X",
975 
977  MemSet(nulls, false, sizeof(nulls));
978 
979  /*----------
980  * Need a tuple descriptor representing four columns:
981  * - first field: the slot name
982  * - second field: LSN at which we became consistent
983  * - third field: exported snapshot's name
984  * - fourth field: output plugin
985  *----------
986  */
987  tupdesc = CreateTemplateTupleDesc(4, false);
988  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
989  TEXTOID, -1, 0);
990  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
991  TEXTOID, -1, 0);
992  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
993  TEXTOID, -1, 0);
994  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
995  TEXTOID, -1, 0);
996 
997  /* prepare for projection of tuples */
998  tstate = begin_tup_output_tupdesc(dest, tupdesc);
999 
1000  /* slot_name */
1001  slot_name = NameStr(MyReplicationSlot->data.name);
1002  values[0] = CStringGetTextDatum(slot_name);
1003 
1004  /* consistent wal location */
1005  values[1] = CStringGetTextDatum(xloc);
1006 
1007  /* snapshot name, or NULL if none */
1008  if (snapshot_name != NULL)
1009  values[2] = CStringGetTextDatum(snapshot_name);
1010  else
1011  nulls[2] = true;
1012 
1013  /* plugin, or NULL if none */
1014  if (cmd->plugin != NULL)
1015  values[3] = CStringGetTextDatum(cmd->plugin);
1016  else
1017  nulls[3] = true;
1018 
1019  /* send it to dest */
1020  do_tup_output(tstate, values, nulls);
1021  end_tup_output(tstate);
1022 
1024 }
1025 
1026 /*
1027  * Get rid of a replication slot that is no longer wanted.
1028  */
1029 static void
1031 {
1032  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1033  EndCommand("DROP_REPLICATION_SLOT", DestRemote);
1034 }
1035 
1036 /*
1037  * Load previously initiated logical slot and prepare for sending data (via
1038  * WalSndLoop).
1039  */
1040 static void
1042 {
1044 
1045  /* make sure that our requirements are still fulfilled */
1047 
1049 
1050  ReplicationSlotAcquire(cmd->slotname, true);
1051 
1052  /*
1053  * Force a disconnect, so that the decoding code doesn't need to care
1054  * about an eventual switch from running in recovery, to running in a
1055  * normal environment. Client code is expected to handle reconnects.
1056  */
1058  {
1059  ereport(LOG,
1060  (errmsg("terminating walsender process after promotion")));
1061  got_STOPPING = true;
1062  }
1063 
1065 
1066  /* Send a CopyBothResponse message, and start streaming */
1067  pq_beginmessage(&buf, 'W');
1068  pq_sendbyte(&buf, 0);
1069  pq_sendint16(&buf, 0);
1070  pq_endmessage(&buf);
1071  pq_flush();
1072 
1073  /*
1074  * Initialize position to the last ack'ed one, then the xlog records begin
1075  * to be shipped from that position.
1076  */
1077  logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options,
1078  false,
1083 
1084  /* Start reading WAL from the oldest required WAL. */
1086 
1087  /*
1088  * Report the location after which we'll send out further commits as the
1089  * current sentPtr.
1090  */
1092 
1093  /* Also update the sent position status in shared memory */
1094  SpinLockAcquire(&MyWalSnd->mutex);
1096  SpinLockRelease(&MyWalSnd->mutex);
1097 
1098  replication_active = true;
1099 
1101 
1102  /* Main loop of walsender */
1104 
1105  FreeDecodingContext(logical_decoding_ctx);
1107 
1108  replication_active = false;
1109  if (got_STOPPING)
1110  proc_exit(0);
1112 
1113  /* Get out of COPY mode (CommandComplete). */
1114  EndCommand("COPY 0", DestRemote);
1115 }
1116 
1117 /*
1118  * LogicalDecodingContext 'prepare_write' callback.
1119  *
1120  * Prepare a write into a StringInfo.
1121  *
1122  * Don't do anything lasting in here, it's quite possible that nothing will be done
1123  * with the data.
1124  */
1125 static void
1127 {
1128  /* can't have sync rep confused by sending the same LSN several times */
1129  if (!last_write)
1130  lsn = InvalidXLogRecPtr;
1131 
1132  resetStringInfo(ctx->out);
1133 
1134  pq_sendbyte(ctx->out, 'w');
1135  pq_sendint64(ctx->out, lsn); /* dataStart */
1136  pq_sendint64(ctx->out, lsn); /* walEnd */
1137 
1138  /*
1139  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1140  * reserve space here.
1141  */
1142  pq_sendint64(ctx->out, 0); /* sendtime */
1143 }
1144 
1145 /*
1146  * LogicalDecodingContext 'write' callback.
1147  *
1148  * Actually write out data previously prepared by WalSndPrepareWrite out to
1149  * the network. Take as long as needed, but process replies from the other
1150  * side and check timeouts during that.
1151  */
1152 static void
1154  bool last_write)
1155 {
1156  TimestampTz now;
1157 
1158  /* output previously gathered data in a CopyData packet */
1159  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1160 
1161  /*
1162  * Fill the send timestamp last, so that it is taken as late as possible.
1163  * This is somewhat ugly, but the protocol is set as it's already used for
1164  * several releases by streaming physical replication.
1165  */
1166  resetStringInfo(&tmpbuf);
1167  now = GetCurrentTimestamp();
1168  pq_sendint64(&tmpbuf, now);
1169  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1170  tmpbuf.data, sizeof(int64));
1171 
1173 
1174  /* Try to flush pending output to the client */
1175  if (pq_flush_if_writable() != 0)
1176  WalSndShutdown();
1177 
1178  /* Try taking fast path unless we get too close to walsender timeout. */
1180  wal_sender_timeout / 2) &&
1181  !pq_is_send_pending())
1182  {
1183  return;
1184  }
1185 
1186  /* If we have pending write here, go to slow path */
1187  for (;;)
1188  {
1189  int wakeEvents;
1190  long sleeptime;
1191 
1192  /* Check for input from the client */
1194 
1195  now = GetCurrentTimestamp();
1196 
1197  /* die if timeout was reached */
1198  WalSndCheckTimeOut(now);
1199 
1200  /* Send keepalive if the time has come */
1202 
1203  if (!pq_is_send_pending())
1204  break;
1205 
1206  sleeptime = WalSndComputeSleeptime(now);
1207 
1208  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1210 
1211  /* Sleep until something happens or we time out */
1212  WaitLatchOrSocket(MyLatch, wakeEvents,
1213  MyProcPort->sock, sleeptime,
1215 
1216  /*
1217  * Emergency bailout if postmaster has died. This is to avoid the
1218  * necessity for manual cleanup of all postmaster children.
1219  */
1220  if (!PostmasterIsAlive())
1221  exit(1);
1222 
1223  /* Clear any already-pending wakeups */
1225 
1227 
1228  /* Process any requests or signals received recently */
1229  if (ConfigReloadPending)
1230  {
1231  ConfigReloadPending = false;
1234  }
1235 
1236  /* Try to flush pending output to the client */
1237  if (pq_flush_if_writable() != 0)
1238  WalSndShutdown();
1239  }
1240 
1241  /* reactivate latch so WalSndLoop knows to continue */
1242  SetLatch(MyLatch);
1243 }
1244 
1245 /*
1246  * LogicalDecodingContext 'update_progress' callback.
1247  *
1248  * Write the current position to the log tracker (see XLogSendPhysical).
1249  */
1250 static void
1252 {
1253  static TimestampTz sendTime = 0;
1255 
1256  /*
1257  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1258  * avoid flooding the lag tracker when we commit frequently.
1259  */
1260 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1261  if (!TimestampDifferenceExceeds(sendTime, now,
1263  return;
1264 
1265  LagTrackerWrite(lsn, now);
1266  sendTime = now;
1267 }
1268 
1269 /*
1270  * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
1271  *
1272  * Returns end LSN of flushed WAL. Normally this will be >= loc, but
1273  * if we detect a shutdown request (either from postmaster or client)
1274  * we will return early, so caller must always check.
1275  */
1276 static XLogRecPtr
1278 {
1279  int wakeEvents;
1280  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1281 
1282 
1283  /*
1284  * Fast path to avoid acquiring the spinlock in case we already know we
1285  * have enough WAL available. This is particularly interesting if we're
1286  * far behind.
1287  */
1288  if (RecentFlushPtr != InvalidXLogRecPtr &&
1289  loc <= RecentFlushPtr)
1290  return RecentFlushPtr;
1291 
1292  /* Get a more recent flush pointer. */
1293  if (!RecoveryInProgress())
1294  RecentFlushPtr = GetFlushRecPtr();
1295  else
1296  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1297 
1298  for (;;)
1299  {
1300  long sleeptime;
1301  TimestampTz now;
1302 
1303  /*
1304  * Emergency bailout if postmaster has died. This is to avoid the
1305  * necessity for manual cleanup of all postmaster children.
1306  */
1307  if (!PostmasterIsAlive())
1308  exit(1);
1309 
1310  /* Clear any already-pending wakeups */
1312 
1314 
1315  /* Process any requests or signals received recently */
1316  if (ConfigReloadPending)
1317  {
1318  ConfigReloadPending = false;
1321  }
1322 
1323  /* Check for input from the client */
1325 
1326  /*
1327  * If we're shutting down, trigger pending WAL to be written out,
1328  * otherwise we'd possibly end up waiting for WAL that never gets
1329  * written, because walwriter has shut down already.
1330  */
1331  if (got_STOPPING)
1333 
1334  /* Update our idea of the currently flushed position. */
1335  if (!RecoveryInProgress())
1336  RecentFlushPtr = GetFlushRecPtr();
1337  else
1338  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1339 
1340  /*
1341  * If postmaster asked us to stop, don't wait anymore.
1342  *
1343  * It's important to do this check after the recomputation of
1344  * RecentFlushPtr, so we can send all remaining data before shutting
1345  * down.
1346  */
1347  if (got_STOPPING)
1348  break;
1349 
1350  /*
1351  * We only send regular messages to the client for full decoded
1352  * transactions, but a synchronous replication and walsender shutdown
1353  * possibly are waiting for a later location. So we send pings
1354  * containing the flush location every now and then.
1355  */
1356  if (MyWalSnd->flush < sentPtr &&
1357  MyWalSnd->write < sentPtr &&
1359  {
1360  WalSndKeepalive(false);
1362  }
1363 
1364  /* check whether we're done */
1365  if (loc <= RecentFlushPtr)
1366  break;
1367 
1368  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1369  WalSndCaughtUp = true;
1370 
1371  /*
1372  * Try to flush any pending output to the client.
1373  */
1374  if (pq_flush_if_writable() != 0)
1375  WalSndShutdown();
1376 
1377  /*
1378  * If we have received CopyDone from the client, sent CopyDone
1379  * ourselves, and the output buffer is empty, it's time to exit
1380  * streaming, so fail the current WAL fetch request.
1381  */
1383  !pq_is_send_pending())
1384  break;
1385 
1386  now = GetCurrentTimestamp();
1387 
1388  /* die if timeout was reached */
1389  WalSndCheckTimeOut(now);
1390 
1391  /* Send keepalive if the time has come */
1393 
1394  /*
1395  * Sleep until something happens or we time out. Also wait for the
1396  * socket becoming writable, if there's still pending output.
1397  * Otherwise we might sit on sendable output data while waiting for
1398  * new WAL to be generated. (But if we have nothing to send, we don't
1399  * want to wake on socket-writable.)
1400  */
1401  sleeptime = WalSndComputeSleeptime(now);
1402 
1403  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1405 
1406  if (pq_is_send_pending())
1407  wakeEvents |= WL_SOCKET_WRITEABLE;
1408 
1409  WaitLatchOrSocket(MyLatch, wakeEvents,
1410  MyProcPort->sock, sleeptime,
1412  }
1413 
1414  /* reactivate latch so WalSndLoop knows to continue */
1415  SetLatch(MyLatch);
1416  return RecentFlushPtr;
1417 }
1418 
1419 /*
1420  * Execute an incoming replication command.
1421  *
1422  * Returns true if the cmd_string was recognized as WalSender command, false
1423  * if not.
1424  */
1425 bool
1426 exec_replication_command(const char *cmd_string)
1427 {
1428  int parse_rc;
1429  Node *cmd_node;
1430  MemoryContext cmd_context;
1431  MemoryContext old_context;
1432 
1433  /*
1434  * If WAL sender has been told that shutdown is getting close, switch its
1435  * status accordingly to handle the next replication commands correctly.
1436  */
1437  if (got_STOPPING)
1439 
1440  /*
1441  * Throw error if in stopping mode. We need prevent commands that could
1442  * generate WAL while the shutdown checkpoint is being written. To be
1443  * safe, we just prohibit all new commands.
1444  */
1445  if (MyWalSnd->state == WALSNDSTATE_STOPPING)
1446  ereport(ERROR,
1447  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1448 
1449  /*
1450  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1451  * command arrives. Clean up the old stuff if there's anything.
1452  */
1454 
1456 
1458  "Replication command context",
1460  old_context = MemoryContextSwitchTo(cmd_context);
1461 
1462  replication_scanner_init(cmd_string);
1463  parse_rc = replication_yyparse();
1464  if (parse_rc != 0)
1465  ereport(ERROR,
1466  (errcode(ERRCODE_SYNTAX_ERROR),
1467  (errmsg_internal("replication command parser returned %d",
1468  parse_rc))));
1469 
1470  cmd_node = replication_parse_result;
1471 
1472  /*
1473  * Log replication command if log_replication_commands is enabled. Even
1474  * when it's disabled, log the command with DEBUG1 level for backward
1475  * compatibility. Note that SQL commands are not logged here, and will be
1476  * logged later if log_statement is enabled.
1477  */
1478  if (cmd_node->type != T_SQLCmd)
1480  (errmsg("received replication command: %s", cmd_string)));
1481 
1482  /*
1483  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
1484  * called outside of transaction the snapshot should be cleared here.
1485  */
1486  if (!IsTransactionBlock())
1488 
1489  /*
1490  * For aborted transactions, don't allow anything except pure SQL, the
1491  * exec_simple_query() will handle it correctly.
1492  */
1493  if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
1494  ereport(ERROR,
1495  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1496  errmsg("current transaction is aborted, "
1497  "commands ignored until end of transaction block")));
1498 
1500 
1501  /*
1502  * Allocate buffers that will be used for each outgoing and incoming
1503  * message. We do this just once per command to reduce palloc overhead.
1504  */
1505  initStringInfo(&output_message);
1506  initStringInfo(&reply_message);
1507  initStringInfo(&tmpbuf);
1508 
1509  /* Report to pgstat that this process is running */
1511 
1512  switch (cmd_node->type)
1513  {
1514  case T_IdentifySystemCmd:
1515  IdentifySystem();
1516  break;
1517 
1518  case T_BaseBackupCmd:
1519  PreventTransactionChain(true, "BASE_BACKUP");
1520  SendBaseBackup((BaseBackupCmd *) cmd_node);
1521  break;
1522 
1525  break;
1526 
1529  break;
1530 
1531  case T_StartReplicationCmd:
1532  {
1533  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1534 
1535  PreventTransactionChain(true, "START_REPLICATION");
1536 
1537  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1538  StartReplication(cmd);
1539  else
1541  break;
1542  }
1543 
1544  case T_TimeLineHistoryCmd:
1545  PreventTransactionChain(true, "TIMELINE_HISTORY");
1547  break;
1548 
1549  case T_VariableShowStmt:
1550  {
1552  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1553 
1554  GetPGVariable(n->name, dest);
1555  }
1556  break;
1557 
1558  case T_SQLCmd:
1559  if (MyDatabaseId == InvalidOid)
1560  ereport(ERROR,
1561  (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1562 
1563  /* Report to pgstat that this process is now idle */
1565 
1566  /* Tell the caller that this wasn't a WalSender command. */
1567  return false;
1568 
1569  default:
1570  elog(ERROR, "unrecognized replication command node tag: %u",
1571  cmd_node->type);
1572  }
1573 
1574  /* done */
1575  MemoryContextSwitchTo(old_context);
1576  MemoryContextDelete(cmd_context);
1577 
1578  /* Send CommandComplete message */
1579  EndCommand("SELECT", DestRemote);
1580 
1581  /* Report to pgstat that this process is now idle */
1583 
1584  return true;
1585 }
1586 
1587 /*
1588  * Process any incoming messages while streaming. Also checks if the remote
1589  * end has closed the connection.
1590  */
1591 static void
1593 {
1594  unsigned char firstchar;
1595  int r;
1596  bool received = false;
1597 
1598  for (;;)
1599  {
1600  pq_startmsgread();
1601  r = pq_getbyte_if_available(&firstchar);
1602  if (r < 0)
1603  {
1604  /* unexpected error or EOF */
1606  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1607  errmsg("unexpected EOF on standby connection")));
1608  proc_exit(0);
1609  }
1610  if (r == 0)
1611  {
1612  /* no data available without blocking */
1613  pq_endmsgread();
1614  break;
1615  }
1616 
1617  /* Read the message contents */
1618  resetStringInfo(&reply_message);
1619  if (pq_getmessage(&reply_message, 0))
1620  {
1622  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1623  errmsg("unexpected EOF on standby connection")));
1624  proc_exit(0);
1625  }
1626 
1627  /*
1628  * If we already received a CopyDone from the frontend, the frontend
1629  * should not send us anything until we've closed our end of the COPY.
1630  * XXX: In theory, the frontend could already send the next command
1631  * before receiving the CopyDone, but libpq doesn't currently allow
1632  * that.
1633  */
1634  if (streamingDoneReceiving && firstchar != 'X')
1635  ereport(FATAL,
1636  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1637  errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
1638  firstchar)));
1639 
1640  /* Handle the very limited subset of commands expected in this phase */
1641  switch (firstchar)
1642  {
1643  /*
1644  * 'd' means a standby reply wrapped in a CopyData packet.
1645  */
1646  case 'd':
1648  received = true;
1649  break;
1650 
1651  /*
1652  * CopyDone means the standby requested to finish streaming.
1653  * Reply with CopyDone, if we had not sent that already.
1654  */
1655  case 'c':
1656  if (!streamingDoneSending)
1657  {
1658  pq_putmessage_noblock('c', NULL, 0);
1659  streamingDoneSending = true;
1660  }
1661 
1662  streamingDoneReceiving = true;
1663  received = true;
1664  break;
1665 
1666  /*
1667  * 'X' means that the standby is closing down the socket.
1668  */
1669  case 'X':
1670  proc_exit(0);
1671 
1672  default:
1673  ereport(FATAL,
1674  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1675  errmsg("invalid standby message type \"%c\"",
1676  firstchar)));
1677  }
1678  }
1679 
1680  /*
1681  * Save the last reply timestamp if we've received at least one reply.
1682  */
1683  if (received)
1684  {
1686  waiting_for_ping_response = false;
1687  }
1688 }
1689 
1690 /*
1691  * Process a status update message received from standby.
1692  */
1693 static void
1695 {
1696  char msgtype;
1697 
1698  /*
1699  * Check message type from the first byte.
1700  */
1701  msgtype = pq_getmsgbyte(&reply_message);
1702 
1703  switch (msgtype)
1704  {
1705  case 'r':
1707  break;
1708 
1709  case 'h':
1711  break;
1712 
1713  default:
1715  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1716  errmsg("unexpected message type \"%c\"", msgtype)));
1717  proc_exit(0);
1718  }
1719 }
1720 
1721 /*
1722  * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
1723  */
1724 static void
1726 {
1727  bool changed = false;
1729 
1730  Assert(lsn != InvalidXLogRecPtr);
1731  SpinLockAcquire(&slot->mutex);
1732  if (slot->data.restart_lsn != lsn)
1733  {
1734  changed = true;
1735  slot->data.restart_lsn = lsn;
1736  }
1737  SpinLockRelease(&slot->mutex);
1738 
1739  if (changed)
1740  {
1743  }
1744 
1745  /*
1746  * One could argue that the slot should be saved to disk now, but that'd
1747  * be energy wasted - the worst lost information can do here is give us
1748  * wrong information in a statistics view - we'll just potentially be more
1749  * conservative in removing files.
1750  */
1751 }
1752 
1753 /*
1754  * Regular reply from standby advising of WAL locations on standby server.
1755  */
1756 static void
1758 {
1759  XLogRecPtr writePtr,
1760  flushPtr,
1761  applyPtr;
1762  bool replyRequested;
1763  TimeOffset writeLag,
1764  flushLag,
1765  applyLag;
1766  bool clearLagTimes;
1767  TimestampTz now;
1768 
1769  static bool fullyAppliedLastTime = false;
1770 
1771  /* the caller already consumed the msgtype byte */
1772  writePtr = pq_getmsgint64(&reply_message);
1773  flushPtr = pq_getmsgint64(&reply_message);
1774  applyPtr = pq_getmsgint64(&reply_message);
1775  (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1776  replyRequested = pq_getmsgbyte(&reply_message);
1777 
1778  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
1779  (uint32) (writePtr >> 32), (uint32) writePtr,
1780  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1781  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1782  replyRequested ? " (reply requested)" : "");
1783 
1784  /* See if we can compute the round-trip lag for these positions. */
1785  now = GetCurrentTimestamp();
1786  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1787  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1788  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1789 
1790  /*
1791  * If the standby reports that it has fully replayed the WAL in two
1792  * consecutive reply messages, then the second such message must result
1793  * from wal_receiver_status_interval expiring on the standby. This is a
1794  * convenient time to forget the lag times measured when it last
1795  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1796  * until more WAL traffic arrives.
1797  */
1798  clearLagTimes = false;
1799  if (applyPtr == sentPtr)
1800  {
1801  if (fullyAppliedLastTime)
1802  clearLagTimes = true;
1803  fullyAppliedLastTime = true;
1804  }
1805  else
1806  fullyAppliedLastTime = false;
1807 
1808  /* Send a reply if the standby requested one. */
1809  if (replyRequested)
1810  WalSndKeepalive(false);
1811 
1812  /*
1813  * Update shared state for this WalSender process based on reply data from
1814  * standby.
1815  */
1816  {
1817  WalSnd *walsnd = MyWalSnd;
1818 
1819  SpinLockAcquire(&walsnd->mutex);
1820  walsnd->write = writePtr;
1821  walsnd->flush = flushPtr;
1822  walsnd->apply = applyPtr;
1823  if (writeLag != -1 || clearLagTimes)
1824  walsnd->writeLag = writeLag;
1825  if (flushLag != -1 || clearLagTimes)
1826  walsnd->flushLag = flushLag;
1827  if (applyLag != -1 || clearLagTimes)
1828  walsnd->applyLag = applyLag;
1829  SpinLockRelease(&walsnd->mutex);
1830  }
1831 
1834 
1835  /*
1836  * Advance our local xmin horizon when the client confirmed a flush.
1837  */
1838  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1839  {
1842  else
1844  }
1845 }
1846 
1847 /* compute new replication slot xmin horizon if needed */
1848 static void
1850 {
1851  bool changed = false;
1853 
1854  SpinLockAcquire(&slot->mutex);
1856 
1857  /*
1858  * For physical replication we don't need the interlock provided by xmin
1859  * and effective_xmin since the consequences of a missed increase are
1860  * limited to query cancellations, so set both at once.
1861  */
1862  if (!TransactionIdIsNormal(slot->data.xmin) ||
1863  !TransactionIdIsNormal(feedbackXmin) ||
1864  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1865  {
1866  changed = true;
1867  slot->data.xmin = feedbackXmin;
1868  slot->effective_xmin = feedbackXmin;
1869  }
1870  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
1871  !TransactionIdIsNormal(feedbackCatalogXmin) ||
1872  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
1873  {
1874  changed = true;
1875  slot->data.catalog_xmin = feedbackCatalogXmin;
1876  slot->effective_catalog_xmin = feedbackCatalogXmin;
1877  }
1878  SpinLockRelease(&slot->mutex);
1879 
1880  if (changed)
1881  {
1884  }
1885 }
1886 
1887 /*
1888  * Check that the provided xmin/epoch are sane, that is, not in the future
1889  * and not so far back as to be already wrapped around.
1890  *
1891  * Epoch of nextXid should be same as standby, or if the counter has
1892  * wrapped, then one greater than standby.
1893  *
1894  * This check doesn't care about whether clog exists for these xids
1895  * at all.
1896  */
1897 static bool
1899 {
1900  TransactionId nextXid;
1901  uint32 nextEpoch;
1902 
1903  GetNextXidAndEpoch(&nextXid, &nextEpoch);
1904 
1905  if (xid <= nextXid)
1906  {
1907  if (epoch != nextEpoch)
1908  return false;
1909  }
1910  else
1911  {
1912  if (epoch + 1 != nextEpoch)
1913  return false;
1914  }
1915 
1916  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
1917  return false; /* epoch OK, but it's wrapped around */
1918 
1919  return true;
1920 }
1921 
1922 /*
1923  * Hot Standby feedback
1924  */
1925 static void
1927 {
1928  TransactionId feedbackXmin;
1929  uint32 feedbackEpoch;
1930  TransactionId feedbackCatalogXmin;
1931  uint32 feedbackCatalogEpoch;
1932 
1933  /*
1934  * Decipher the reply message. The caller already consumed the msgtype
1935  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
1936  * of this message.
1937  */
1938  (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1939  feedbackXmin = pq_getmsgint(&reply_message, 4);
1940  feedbackEpoch = pq_getmsgint(&reply_message, 4);
1941  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
1942  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
1943 
1944  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
1945  feedbackXmin,
1946  feedbackEpoch,
1947  feedbackCatalogXmin,
1948  feedbackCatalogEpoch);
1949 
1950  /*
1951  * Unset WalSender's xmins if the feedback message values are invalid.
1952  * This happens when the downstream turned hot_standby_feedback off.
1953  */
1954  if (!TransactionIdIsNormal(feedbackXmin)
1955  && !TransactionIdIsNormal(feedbackCatalogXmin))
1956  {
1958  if (MyReplicationSlot != NULL)
1959  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
1960  return;
1961  }
1962 
1963  /*
1964  * Check that the provided xmin/epoch are sane, that is, not in the future
1965  * and not so far back as to be already wrapped around. Ignore if not.
1966  */
1967  if (TransactionIdIsNormal(feedbackXmin) &&
1968  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
1969  return;
1970 
1971  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
1972  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
1973  return;
1974 
1975  /*
1976  * Set the WalSender's xmin equal to the standby's requested xmin, so that
1977  * the xmin will be taken into account by GetOldestXmin. This will hold
1978  * back the removal of dead rows and thereby prevent the generation of
1979  * cleanup conflicts on the standby server.
1980  *
1981  * There is a small window for a race condition here: although we just
1982  * checked that feedbackXmin precedes nextXid, the nextXid could have
1983  * gotten advanced between our fetching it and applying the xmin below,
1984  * perhaps far enough to make feedbackXmin wrap around. In that case the
1985  * xmin we set here would be "in the future" and have no effect. No point
1986  * in worrying about this since it's too late to save the desired data
1987  * anyway. Assuming that the standby sends us an increasing sequence of
1988  * xmins, this could only happen during the first reply cycle, else our
1989  * own xmin would prevent nextXid from advancing so far.
1990  *
1991  * We don't bother taking the ProcArrayLock here. Setting the xmin field
1992  * is assumed atomic, and there's no real need to prevent a concurrent
1993  * GetOldestXmin. (If we're moving our xmin forward, this is obviously
1994  * safe, and if we're moving it backwards, well, the data is at risk
1995  * already since a VACUUM could have just finished calling GetOldestXmin.)
1996  *
1997  * If we're using a replication slot we reserve the xmin via that,
1998  * otherwise via the walsender's PGXACT entry. We can only track the
1999  * catalog xmin separately when using a slot, so we store the least of the
2000  * two provided when not using a slot.
2001  *
2002  * XXX: It might make sense to generalize the ephemeral slot concept and
2003  * always use the slot mechanism to handle the feedback xmin.
2004  */
2005  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2006  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2007  else
2008  {
2009  if (TransactionIdIsNormal(feedbackCatalogXmin)
2010  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2011  MyPgXact->xmin = feedbackCatalogXmin;
2012  else
2013  MyPgXact->xmin = feedbackXmin;
2014  }
2015 }
2016 
2017 /*
2018  * Compute how long send/receive loops should sleep.
2019  *
2020  * If wal_sender_timeout is enabled we want to wake up in time to send
2021  * keepalives and to abort the connection if wal_sender_timeout has been
2022  * reached.
2023  */
2024 static long
2026 {
2027  long sleeptime = 10000; /* 10 s */
2028 
2030  {
2031  TimestampTz wakeup_time;
2032  long sec_to_timeout;
2033  int microsec_to_timeout;
2034 
2035  /*
2036  * At the latest stop sleeping once wal_sender_timeout has been
2037  * reached.
2038  */
2041 
2042  /*
2043  * If no ping has been sent yet, wakeup when it's time to do so.
2044  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2045  * the timeout passed without a response.
2046  */
2049  wal_sender_timeout / 2);
2050 
2051  /* Compute relative time until wakeup. */
2052  TimestampDifference(now, wakeup_time,
2053  &sec_to_timeout, &microsec_to_timeout);
2054 
2055  sleeptime = sec_to_timeout * 1000 +
2056  microsec_to_timeout / 1000;
2057  }
2058 
2059  return sleeptime;
2060 }
2061 
2062 /*
2063  * Check whether there have been responses by the client within
2064  * wal_sender_timeout and shutdown if not.
2065  */
2066 static void
2068 {
2069  TimestampTz timeout;
2070 
2071  /* don't bail out if we're doing something that doesn't require timeouts */
2072  if (last_reply_timestamp <= 0)
2073  return;
2074 
2077 
2078  if (wal_sender_timeout > 0 && now >= timeout)
2079  {
2080  /*
2081  * Since typically expiration of replication timeout means
2082  * communication problem, we don't send the error message to the
2083  * standby.
2084  */
2086  (errmsg("terminating walsender process due to replication timeout")));
2087 
2088  WalSndShutdown();
2089  }
2090 }
2091 
2092 /* Main loop of walsender process that streams the WAL over Copy messages. */
2093 static void
2095 {
2096  /*
2097  * Initialize the last reply timestamp. That enables timeout processing
2098  * from hereon.
2099  */
2101  waiting_for_ping_response = false;
2102 
2103  /*
2104  * Loop until we reach the end of this timeline or the client requests to
2105  * stop streaming.
2106  */
2107  for (;;)
2108  {
2109  TimestampTz now;
2110 
2111  /*
2112  * Emergency bailout if postmaster has died. This is to avoid the
2113  * necessity for manual cleanup of all postmaster children.
2114  */
2115  if (!PostmasterIsAlive())
2116  exit(1);
2117 
2118  /* Clear any already-pending wakeups */
2120 
2122 
2123  /* Process any requests or signals received recently */
2124  if (ConfigReloadPending)
2125  {
2126  ConfigReloadPending = false;
2129  }
2130 
2131  /* Check for input from the client */
2133 
2134  /*
2135  * If we have received CopyDone from the client, sent CopyDone
2136  * ourselves, and the output buffer is empty, it's time to exit
2137  * streaming.
2138  */
2140  !pq_is_send_pending())
2141  break;
2142 
2143  /*
2144  * If we don't have any pending data in the output buffer, try to send
2145  * some more. If there is some, we don't bother to call send_data
2146  * again until we've flushed it ... but we'd better assume we are not
2147  * caught up.
2148  */
2149  if (!pq_is_send_pending())
2150  send_data();
2151  else
2152  WalSndCaughtUp = false;
2153 
2154  /* Try to flush pending output to the client */
2155  if (pq_flush_if_writable() != 0)
2156  WalSndShutdown();
2157 
2158  /* If nothing remains to be sent right now ... */
2160  {
2161  /*
2162  * If we're in catchup state, move to streaming. This is an
2163  * important state change for users to know about, since before
2164  * this point data loss might occur if the primary dies and we
2165  * need to failover to the standby. The state change is also
2166  * important for synchronous replication, since commits that
2167  * started to wait at that point might wait for some time.
2168  */
2169  if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
2170  {
2171  ereport(DEBUG1,
2172  (errmsg("standby \"%s\" has now caught up with primary",
2173  application_name)));
2175  }
2176 
2177  /*
2178  * When SIGUSR2 arrives, we send any outstanding logs up to the
2179  * shutdown checkpoint record (i.e., the latest record), wait for
2180  * them to be replicated to the standby, and exit. This may be a
2181  * normal termination at shutdown, or a promotion, the walsender
2182  * is not sure which.
2183  */
2184  if (got_SIGUSR2)
2185  WalSndDone(send_data);
2186  }
2187 
2188  now = GetCurrentTimestamp();
2189 
2190  /* Check for replication timeout. */
2191  WalSndCheckTimeOut(now);
2192 
2193  /* Send keepalive if the time has come */
2195 
2196  /*
2197  * We don't block if not caught up, unless there is unsent data
2198  * pending in which case we'd better block until the socket is
2199  * write-ready. This test is only needed for the case where the
2200  * send_data callback handled a subset of the available data but then
2201  * pq_flush_if_writable flushed it all --- we should immediately try
2202  * to send more.
2203  */
2205  {
2206  long sleeptime;
2207  int wakeEvents;
2208 
2209  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT |
2211 
2212  sleeptime = WalSndComputeSleeptime(now);
2213 
2214  if (pq_is_send_pending())
2215  wakeEvents |= WL_SOCKET_WRITEABLE;
2216 
2217  /* Sleep until something happens or we time out */
2218  WaitLatchOrSocket(MyLatch, wakeEvents,
2219  MyProcPort->sock, sleeptime,
2221  }
2222  }
2223  return;
2224 }
2225 
2226 /* Initialize a per-walsender data structure for this walsender process */
2227 static void
2229 {
2230  int i;
2231 
2232  /*
2233  * WalSndCtl should be set up already (we inherit this by fork() or
2234  * EXEC_BACKEND mechanism from the postmaster).
2235  */
2236  Assert(WalSndCtl != NULL);
2237  Assert(MyWalSnd == NULL);
2238 
2239  /*
2240  * Find a free walsender slot and reserve it. If this fails, we must be
2241  * out of WalSnd structures.
2242  */
2243  for (i = 0; i < max_wal_senders; i++)
2244  {
2245  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2246 
2247  SpinLockAcquire(&walsnd->mutex);
2248 
2249  if (walsnd->pid != 0)
2250  {
2251  SpinLockRelease(&walsnd->mutex);
2252  continue;
2253  }
2254  else
2255  {
2256  /*
2257  * Found a free slot. Reserve it for us.
2258  */
2259  walsnd->pid = MyProcPid;
2260  walsnd->sentPtr = InvalidXLogRecPtr;
2261  walsnd->write = InvalidXLogRecPtr;
2262  walsnd->flush = InvalidXLogRecPtr;
2263  walsnd->apply = InvalidXLogRecPtr;
2264  walsnd->writeLag = -1;
2265  walsnd->flushLag = -1;
2266  walsnd->applyLag = -1;
2267  walsnd->state = WALSNDSTATE_STARTUP;
2268  walsnd->latch = &MyProc->procLatch;
2269  SpinLockRelease(&walsnd->mutex);
2270  /* don't need the lock anymore */
2271  MyWalSnd = (WalSnd *) walsnd;
2272 
2273  break;
2274  }
2275  }
2276  if (MyWalSnd == NULL)
2277  ereport(FATAL,
2278  (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
2279  errmsg("number of requested standby connections "
2280  "exceeds max_wal_senders (currently %d)",
2281  max_wal_senders)));
2282 
2283  /* Arrange to clean up at walsender exit */
2285 }
2286 
2287 /* Destroy the per-walsender data structure for this walsender process */
2288 static void
2290 {
2291  WalSnd *walsnd = MyWalSnd;
2292 
2293  Assert(walsnd != NULL);
2294 
2295  MyWalSnd = NULL;
2296 
2297  SpinLockAcquire(&walsnd->mutex);
2298  /* clear latch while holding the spinlock, so it can safely be read */
2299  walsnd->latch = NULL;
2300  /* Mark WalSnd struct as no longer being in use. */
2301  walsnd->pid = 0;
2302  SpinLockRelease(&walsnd->mutex);
2303 }
2304 
2305 /*
2306  * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
2307  *
2308  * XXX probably this should be improved to suck data directly from the
2309  * WAL buffers when possible.
2310  *
2311  * Will open, and keep open, one WAL segment stored in the global file
2312  * descriptor sendFile. This means if XLogRead is used once, there will
2313  * always be one descriptor left open until the process ends, but never
2314  * more than one.
2315  */
2316 static void
2317 XLogRead(char *buf, XLogRecPtr startptr, Size count)
2318 {
2319  char *p;
2320  XLogRecPtr recptr;
2321  Size nbytes;
2322  XLogSegNo segno;
2323 
2324 retry:
2325  p = buf;
2326  recptr = startptr;
2327  nbytes = count;
2328 
2329  while (nbytes > 0)
2330  {
2331  uint32 startoff;
2332  int segbytes;
2333  int readbytes;
2334 
2335  startoff = XLogSegmentOffset(recptr, wal_segment_size);
2336 
2337  if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, wal_segment_size))
2338  {
2339  char path[MAXPGPATH];
2340 
2341  /* Switch to another logfile segment */
2342  if (sendFile >= 0)
2343  close(sendFile);
2344 
2346 
2347  /*-------
2348  * When reading from a historic timeline, and there is a timeline
2349  * switch within this segment, read from the WAL segment belonging
2350  * to the new timeline.
2351  *
2352  * For example, imagine that this server is currently on timeline
2353  * 5, and we're streaming timeline 4. The switch from timeline 4
2354  * to 5 happened at 0/13002088. In pg_wal, we have these files:
2355  *
2356  * ...
2357  * 000000040000000000000012
2358  * 000000040000000000000013
2359  * 000000050000000000000013
2360  * 000000050000000000000014
2361  * ...
2362  *
2363  * In this situation, when requested to send the WAL from
2364  * segment 0x13, on timeline 4, we read the WAL from file
2365  * 000000050000000000000013. Archive recovery prefers files from
2366  * newer timelines, so if the segment was restored from the
2367  * archive on this server, the file belonging to the old timeline,
2368  * 000000040000000000000013, might not exist. Their contents are
2369  * equal up to the switchpoint, because at a timeline switch, the
2370  * used portion of the old segment is copied to the new file.
2371  *-------
2372  */
2375  {
2376  XLogSegNo endSegNo;
2377 
2379  if (sendSegNo == endSegNo)
2381  }
2382 
2384 
2385  sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2386  if (sendFile < 0)
2387  {
2388  /*
2389  * If the file is not found, assume it's because the standby
2390  * asked for a too old WAL segment that has already been
2391  * removed or recycled.
2392  */
2393  if (errno == ENOENT)
2394  ereport(ERROR,
2396  errmsg("requested WAL segment %s has already been removed",
2398  else
2399  ereport(ERROR,
2401  errmsg("could not open file \"%s\": %m",
2402  path)));
2403  }
2404  sendOff = 0;
2405  }
2406 
2407  /* Need to seek in the file? */
2408  if (sendOff != startoff)
2409  {
2410  if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
2411  ereport(ERROR,
2413  errmsg("could not seek in log segment %s to offset %u: %m",
2415  startoff)));
2416  sendOff = startoff;
2417  }
2418 
2419  /* How many bytes are within this segment? */
2420  if (nbytes > (wal_segment_size - startoff))
2421  segbytes = wal_segment_size - startoff;
2422  else
2423  segbytes = nbytes;
2424 
2426  readbytes = read(sendFile, p, segbytes);
2428  if (readbytes <= 0)
2429  {
2430  ereport(ERROR,
2432  errmsg("could not read from log segment %s, offset %u, length %lu: %m",
2434  sendOff, (unsigned long) segbytes)));
2435  }
2436 
2437  /* Update state for read */
2438  recptr += readbytes;
2439 
2440  sendOff += readbytes;
2441  nbytes -= readbytes;
2442  p += readbytes;
2443  }
2444 
2445  /*
2446  * After reading into the buffer, check that what we read was valid. We do
2447  * this after reading, because even though the segment was present when we
2448  * opened it, it might get recycled or removed while we read it. The
2449  * read() succeeds in that case, but the data we tried to read might
2450  * already have been overwritten with new WAL records.
2451  */
2452  XLByteToSeg(startptr, segno, wal_segment_size);
2454 
2455  /*
2456  * During recovery, the currently-open WAL file might be replaced with the
2457  * file of the same name retrieved from archive. So we always need to
2458  * check what we read was valid after reading into the buffer. If it's
2459  * invalid, we try to open and read the file again.
2460  */
2462  {
2463  WalSnd *walsnd = MyWalSnd;
2464  bool reload;
2465 
2466  SpinLockAcquire(&walsnd->mutex);
2467  reload = walsnd->needreload;
2468  walsnd->needreload = false;
2469  SpinLockRelease(&walsnd->mutex);
2470 
2471  if (reload && sendFile >= 0)
2472  {
2473  close(sendFile);
2474  sendFile = -1;
2475 
2476  goto retry;
2477  }
2478  }
2479 }
2480 
2481 /*
2482  * Send out the WAL in its normal physical/stored form.
2483  *
2484  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
2485  * but not yet sent to the client, and buffer it in the libpq output
2486  * buffer.
2487  *
2488  * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
2489  * otherwise WalSndCaughtUp is set to false.
2490  */
2491 static void
2493 {
2494  XLogRecPtr SendRqstPtr;
2495  XLogRecPtr startptr;
2496  XLogRecPtr endptr;
2497  Size nbytes;
2498 
2499  /* If requested switch the WAL sender to the stopping state. */
2500  if (got_STOPPING)
2502 
2504  {
2505  WalSndCaughtUp = true;
2506  return;
2507  }
2508 
2509  /* Figure out how far we can safely send the WAL. */
2511  {
2512  /*
2513  * Streaming an old timeline that's in this server's history, but is
2514  * not the one we're currently inserting or replaying. It can be
2515  * streamed up to the point where we switched off that timeline.
2516  */
2517  SendRqstPtr = sendTimeLineValidUpto;
2518  }
2519  else if (am_cascading_walsender)
2520  {
2521  /*
2522  * Streaming the latest timeline on a standby.
2523  *
2524  * Attempt to send all WAL that has already been replayed, so that we
2525  * know it's valid. If we're receiving WAL through streaming
2526  * replication, it's also OK to send any WAL that has been received
2527  * but not replayed.
2528  *
2529  * The timeline we're recovering from can change, or we can be
2530  * promoted. In either case, the current timeline becomes historic. We
2531  * need to detect that so that we don't try to stream past the point
2532  * where we switched to another timeline. We check for promotion or
2533  * timeline switch after calculating FlushPtr, to avoid a race
2534  * condition: if the timeline becomes historic just after we checked
2535  * that it was still current, it's still be OK to stream it up to the
2536  * FlushPtr that was calculated before it became historic.
2537  */
2538  bool becameHistoric = false;
2539 
2540  SendRqstPtr = GetStandbyFlushRecPtr();
2541 
2542  if (!RecoveryInProgress())
2543  {
2544  /*
2545  * We have been promoted. RecoveryInProgress() updated
2546  * ThisTimeLineID to the new current timeline.
2547  */
2548  am_cascading_walsender = false;
2549  becameHistoric = true;
2550  }
2551  else
2552  {
2553  /*
2554  * Still a cascading standby. But is the timeline we're sending
2555  * still the one recovery is recovering from? ThisTimeLineID was
2556  * updated by the GetStandbyFlushRecPtr() call above.
2557  */
2559  becameHistoric = true;
2560  }
2561 
2562  if (becameHistoric)
2563  {
2564  /*
2565  * The timeline we were sending has become historic. Read the
2566  * timeline history file of the new timeline to see where exactly
2567  * we forked off from the timeline we were sending.
2568  */
2569  List *history;
2570 
2573 
2575  list_free_deep(history);
2576 
2577  sendTimeLineIsHistoric = true;
2578 
2579  SendRqstPtr = sendTimeLineValidUpto;
2580  }
2581  }
2582  else
2583  {
2584  /*
2585  * Streaming the current timeline on a master.
2586  *
2587  * Attempt to send all data that's already been written out and
2588  * fsync'd to disk. We cannot go further than what's been written out
2589  * given the current implementation of XLogRead(). And in any case
2590  * it's unsafe to send WAL that is not securely down to disk on the
2591  * master: if the master subsequently crashes and restarts, standbys
2592  * must not have applied any WAL that got lost on the master.
2593  */
2594  SendRqstPtr = GetFlushRecPtr();
2595  }
2596 
2597  /*
2598  * Record the current system time as an approximation of the time at which
2599  * this WAL location was written for the purposes of lag tracking.
2600  *
2601  * In theory we could make XLogFlush() record a time in shmem whenever WAL
2602  * is flushed and we could get that time as well as the LSN when we call
2603  * GetFlushRecPtr() above (and likewise for the cascading standby
2604  * equivalent), but rather than putting any new code into the hot WAL path
2605  * it seems good enough to capture the time here. We should reach this
2606  * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2607  * may take some time, we read the WAL flush pointer and take the time
2608  * very close to together here so that we'll get a later position if it is
2609  * still moving.
2610  *
2611  * Because LagTrackerWriter ignores samples when the LSN hasn't advanced,
2612  * this gives us a cheap approximation for the WAL flush time for this
2613  * LSN.
2614  *
2615  * Note that the LSN is not necessarily the LSN for the data contained in
2616  * the present message; it's the end of the WAL, which might be further
2617  * ahead. All the lag tracking machinery cares about is finding out when
2618  * that arbitrary LSN is eventually reported as written, flushed and
2619  * applied, so that it can measure the elapsed time.
2620  */
2621  LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2622 
2623  /*
2624  * If this is a historic timeline and we've reached the point where we
2625  * forked to the next timeline, stop streaming.
2626  *
2627  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2628  * startup process will normally replay all WAL that has been received
2629  * from the master, before promoting, but if the WAL streaming is
2630  * terminated at a WAL page boundary, the valid portion of the timeline
2631  * might end in the middle of a WAL record. We might've already sent the
2632  * first half of that partial WAL record to the cascading standby, so that
2633  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2634  * replay the partial WAL record either, so it can still follow our
2635  * timeline switch.
2636  */
2638  {
2639  /* close the current file. */
2640  if (sendFile >= 0)
2641  close(sendFile);
2642  sendFile = -1;
2643 
2644  /* Send CopyDone */
2645  pq_putmessage_noblock('c', NULL, 0);
2646  streamingDoneSending = true;
2647 
2648  WalSndCaughtUp = true;
2649 
2650  elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2652  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2653  return;
2654  }
2655 
2656  /* Do we have any work to do? */
2657  Assert(sentPtr <= SendRqstPtr);
2658  if (SendRqstPtr <= sentPtr)
2659  {
2660  WalSndCaughtUp = true;
2661  return;
2662  }
2663 
2664  /*
2665  * Figure out how much to send in one message. If there's no more than
2666  * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2667  * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2668  *
2669  * The rounding is not only for performance reasons. Walreceiver relies on
2670  * the fact that we never split a WAL record across two messages. Since a
2671  * long WAL record is split at page boundary into continuation records,
2672  * page boundary is always a safe cut-off point. We also assume that
2673  * SendRqstPtr never points to the middle of a WAL record.
2674  */
2675  startptr = sentPtr;
2676  endptr = startptr;
2677  endptr += MAX_SEND_SIZE;
2678 
2679  /* if we went beyond SendRqstPtr, back off */
2680  if (SendRqstPtr <= endptr)
2681  {
2682  endptr = SendRqstPtr;
2684  WalSndCaughtUp = false;
2685  else
2686  WalSndCaughtUp = true;
2687  }
2688  else
2689  {
2690  /* round down to page boundary. */
2691  endptr -= (endptr % XLOG_BLCKSZ);
2692  WalSndCaughtUp = false;
2693  }
2694 
2695  nbytes = endptr - startptr;
2696  Assert(nbytes <= MAX_SEND_SIZE);
2697 
2698  /*
2699  * OK to read and send the slice.
2700  */
2701  resetStringInfo(&output_message);
2702  pq_sendbyte(&output_message, 'w');
2703 
2704  pq_sendint64(&output_message, startptr); /* dataStart */
2705  pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2706  pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2707 
2708  /*
2709  * Read the log directly into the output buffer to avoid extra memcpy
2710  * calls.
2711  */
2712  enlargeStringInfo(&output_message, nbytes);
2713  XLogRead(&output_message.data[output_message.len], startptr, nbytes);
2714  output_message.len += nbytes;
2715  output_message.data[output_message.len] = '\0';
2716 
2717  /*
2718  * Fill the send timestamp last, so that it is taken as late as possible.
2719  */
2720  resetStringInfo(&tmpbuf);
2721  pq_sendint64(&tmpbuf, GetCurrentTimestamp());
2722  memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2723  tmpbuf.data, sizeof(int64));
2724 
2725  pq_putmessage_noblock('d', output_message.data, output_message.len);
2726 
2727  sentPtr = endptr;
2728 
2729  /* Update shared memory status */
2730  {
2731  WalSnd *walsnd = MyWalSnd;
2732 
2733  SpinLockAcquire(&walsnd->mutex);
2734  walsnd->sentPtr = sentPtr;
2735  SpinLockRelease(&walsnd->mutex);
2736  }
2737 
2738  /* Report progress of XLOG streaming in PS display */
2740  {
2741  char activitymsg[50];
2742 
2743  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
2744  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2745  set_ps_display(activitymsg, false);
2746  }
2747 
2748  return;
2749 }
2750 
2751 /*
2752  * Stream out logically decoded data.
2753  */
2754 static void
2756 {
2757  XLogRecord *record;
2758  char *errm;
2759 
2760  /*
2761  * Don't know whether we've caught up yet. We'll set it to true in
2762  * WalSndWaitForWal, if we're actually waiting. We also set to true if
2763  * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
2764  * i.e. when we're shutting down.
2765  */
2766  WalSndCaughtUp = false;
2767 
2768  record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
2770 
2771  /* xlog record was invalid */
2772  if (errm != NULL)
2773  elog(ERROR, "%s", errm);
2774 
2775  if (record != NULL)
2776  {
2777  /*
2778  * Note the lack of any call to LagTrackerWrite() which is handled by
2779  * WalSndUpdateProgress which is called by output plugin through
2780  * logical decoding write api.
2781  */
2782  LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
2783 
2784  sentPtr = logical_decoding_ctx->reader->EndRecPtr;
2785  }
2786  else
2787  {
2788  /*
2789  * If the record we just wanted read is at or beyond the flushed
2790  * point, then we're caught up.
2791  */
2792  if (logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr())
2793  {
2794  WalSndCaughtUp = true;
2795 
2796  /*
2797  * Have WalSndLoop() terminate the connection in an orderly
2798  * manner, after writing out all the pending data.
2799  */
2800  if (got_STOPPING)
2801  got_SIGUSR2 = true;
2802  }
2803  }
2804 
2805  /* Update shared memory status */
2806  {
2807  WalSnd *walsnd = MyWalSnd;
2808 
2809  SpinLockAcquire(&walsnd->mutex);
2810  walsnd->sentPtr = sentPtr;
2811  SpinLockRelease(&walsnd->mutex);
2812  }
2813 }
2814 
2815 /*
2816  * Shutdown if the sender is caught up.
2817  *
2818  * NB: This should only be called when the shutdown signal has been received
2819  * from postmaster.
2820  *
2821  * Note that if we determine that there's still more data to send, this
2822  * function will return control to the caller.
2823  */
2824 static void
2826 {
2827  XLogRecPtr replicatedPtr;
2828 
2829  /* ... let's just be real sure we're caught up ... */
2830  send_data();
2831 
2832  /*
2833  * To figure out whether all WAL has successfully been replicated, check
2834  * flush location if valid, write otherwise. Tools like pg_receivewal will
2835  * usually (unless in synchronous mode) return an invalid flush location.
2836  */
2837  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2838  MyWalSnd->write : MyWalSnd->flush;
2839 
2840  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2841  !pq_is_send_pending())
2842  {
2843  /* Inform the standby that XLOG streaming is done */
2844  EndCommand("COPY 0", DestRemote);
2845  pq_flush();
2846 
2847  proc_exit(0);
2848  }
2850  {
2851  WalSndKeepalive(true);
2853  }
2854 }
2855 
2856 /*
2857  * Returns the latest point in WAL that has been safely flushed to disk, and
2858  * can be sent to the standby. This should only be called when in recovery,
2859  * ie. we're streaming to a cascaded standby.
2860  *
2861  * As a side-effect, ThisTimeLineID is updated to the TLI of the last
2862  * replayed WAL record.
2863  */
2864 static XLogRecPtr
2866 {
2867  XLogRecPtr replayPtr;
2868  TimeLineID replayTLI;
2869  XLogRecPtr receivePtr;
2871  XLogRecPtr result;
2872 
2873  /*
2874  * We can safely send what's already been replayed. Also, if walreceiver
2875  * is streaming WAL from the same timeline, we can send anything that it
2876  * has streamed, but hasn't been replayed yet.
2877  */
2878 
2879  receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
2880  replayPtr = GetXLogReplayRecPtr(&replayTLI);
2881 
2882  ThisTimeLineID = replayTLI;
2883 
2884  result = replayPtr;
2885  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2886  result = receivePtr;
2887 
2888  return result;
2889 }
2890 
2891 /*
2892  * Request walsenders to reload the currently-open WAL file
2893  */
2894 void
2896 {
2897  int i;
2898 
2899  for (i = 0; i < max_wal_senders; i++)
2900  {
2901  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2902 
2903  SpinLockAcquire(&walsnd->mutex);
2904  if (walsnd->pid == 0)
2905  {
2906  SpinLockRelease(&walsnd->mutex);
2907  continue;
2908  }
2909  walsnd->needreload = true;
2910  SpinLockRelease(&walsnd->mutex);
2911  }
2912 }
2913 
2914 /*
2915  * Handle PROCSIG_WALSND_INIT_STOPPING signal.
2916  */
2917 void
2919 {
2921 
2922  /*
2923  * If replication has not yet started, die like with SIGTERM. If
2924  * replication is active, only set a flag and wake up the main loop. It
2925  * will send any outstanding WAL, wait for it to be replicated to the
2926  * standby, and then exit gracefully.
2927  */
2928  if (!replication_active)
2929  kill(MyProcPid, SIGTERM);
2930  else
2931  got_STOPPING = true;
2932 }
2933 
2934 /*
2935  * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
2936  * sender should already have been switched to WALSNDSTATE_STOPPING at
2937  * this point.
2938  */
2939 static void
2941 {
2942  int save_errno = errno;
2943 
2944  got_SIGUSR2 = true;
2945  SetLatch(MyLatch);
2946 
2947  errno = save_errno;
2948 }
2949 
2950 /* Set up signal handlers */
2951 void
2953 {
2954  /* Set up signal handlers */
2955  pqsignal(SIGHUP, PostgresSigHupHandler); /* set flag to read config
2956  * file */
2957  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
2958  pqsignal(SIGTERM, die); /* request shutdown */
2959  pqsignal(SIGQUIT, quickdie); /* hard crash time */
2960  InitializeTimeouts(); /* establishes SIGALRM handler */
2963  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
2964  * shutdown */
2965 
2966  /* Reset some signals that are accepted by postmaster but not here */
2972 }
2973 
2974 /* Report shared-memory space needed by WalSndShmemInit */
2975 Size
2977 {
2978  Size size = 0;
2979 
2980  size = offsetof(WalSndCtlData, walsnds);
2981  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
2982 
2983  return size;
2984 }
2985 
2986 /* Allocate and initialize walsender-related shared memory */
2987 void
2989 {
2990  bool found;
2991  int i;
2992 
2993  WalSndCtl = (WalSndCtlData *)
2994  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
2995 
2996  if (!found)
2997  {
2998  /* First time through, so initialize */
2999  MemSet(WalSndCtl, 0, WalSndShmemSize());
3000 
3001  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3002  SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
3003 
3004  for (i = 0; i < max_wal_senders; i++)
3005  {
3006  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3007 
3008  SpinLockInit(&walsnd->mutex);
3009  }
3010  }
3011 }
3012 
3013 /*
3014  * Wake up all walsenders
3015  *
3016  * This will be called inside critical sections, so throwing an error is not
3017  * advisable.
3018  */
3019 void
3021 {
3022  int i;
3023 
3024  for (i = 0; i < max_wal_senders; i++)
3025  {
3026  Latch *latch;
3027  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3028 
3029  /*
3030  * Get latch pointer with spinlock held, for the unlikely case that
3031  * pointer reads aren't atomic (as they're 8 bytes).
3032  */
3033  SpinLockAcquire(&walsnd->mutex);
3034  latch = walsnd->latch;
3035  SpinLockRelease(&walsnd->mutex);
3036 
3037  if (latch != NULL)
3038  SetLatch(latch);
3039  }
3040 }
3041 
3042 /*
3043  * Signal all walsenders to move to stopping state.
3044  *
3045  * This will trigger walsenders to move to a state where no further WAL can be
3046  * generated. See this file's header for details.
3047  */
3048 void
3050 {
3051  int i;
3052 
3053  for (i = 0; i < max_wal_senders; i++)
3054  {
3055  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3056  pid_t pid;
3057 
3058  SpinLockAcquire(&walsnd->mutex);
3059  pid = walsnd->pid;
3060  SpinLockRelease(&walsnd->mutex);
3061 
3062  if (pid == 0)
3063  continue;
3064 
3066  }
3067 }
3068 
3069 /*
3070  * Wait that all the WAL senders have quit or reached the stopping state. This
3071  * is used by the checkpointer to control when the shutdown checkpoint can
3072  * safely be performed.
3073  */
3074 void
3076 {
3077  for (;;)
3078  {
3079  int i;
3080  bool all_stopped = true;
3081 
3082  for (i = 0; i < max_wal_senders; i++)
3083  {
3084  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3085 
3086  SpinLockAcquire(&walsnd->mutex);
3087 
3088  if (walsnd->pid == 0)
3089  {
3090  SpinLockRelease(&walsnd->mutex);
3091  continue;
3092  }
3093 
3094  if (walsnd->state != WALSNDSTATE_STOPPING)
3095  {
3096  all_stopped = false;
3097  SpinLockRelease(&walsnd->mutex);
3098  break;
3099  }
3100  SpinLockRelease(&walsnd->mutex);
3101  }
3102 
3103  /* safe to leave if confirmation is done for all WAL senders */
3104  if (all_stopped)
3105  return;
3106 
3107  pg_usleep(10000L); /* wait for 10 msec */
3108  }
3109 }
3110 
3111 /* Set state for current walsender (only called in walsender) */
3112 void
3114 {
3115  WalSnd *walsnd = MyWalSnd;
3116 
3118 
3119  if (walsnd->state == state)
3120  return;
3121 
3122  SpinLockAcquire(&walsnd->mutex);
3123  walsnd->state = state;
3124  SpinLockRelease(&walsnd->mutex);
3125 }
3126 
3127 /*
3128  * Return a string constant representing the state. This is used
3129  * in system views, and should *not* be translated.
3130  */
3131 static const char *
3133 {
3134  switch (state)
3135  {
3136  case WALSNDSTATE_STARTUP:
3137  return "startup";
3138  case WALSNDSTATE_BACKUP:
3139  return "backup";
3140  case WALSNDSTATE_CATCHUP:
3141  return "catchup";
3142  case WALSNDSTATE_STREAMING:
3143  return "streaming";
3144  case WALSNDSTATE_STOPPING:
3145  return "stopping";
3146  }
3147  return "UNKNOWN";
3148 }
3149 
3150 static Interval *
3152 {
3153  Interval *result = palloc(sizeof(Interval));
3154 
3155  result->month = 0;
3156  result->day = 0;
3157  result->time = offset;
3158 
3159  return result;
3160 }
3161 
3162 /*
3163  * Returns activity of walsenders, including pids and xlog locations sent to
3164  * standby servers.
3165  */
3166 Datum
3168 {
3169 #define PG_STAT_GET_WAL_SENDERS_COLS 11
3170  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3171  TupleDesc tupdesc;
3172  Tuplestorestate *tupstore;
3173  MemoryContext per_query_ctx;
3174  MemoryContext oldcontext;
3175  List *sync_standbys;
3176  int i;
3177 
3178  /* check to see if caller supports us returning a tuplestore */
3179  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3180  ereport(ERROR,
3181  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3182  errmsg("set-valued function called in context that cannot accept a set")));
3183  if (!(rsinfo->allowedModes & SFRM_Materialize))
3184  ereport(ERROR,
3185  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3186  errmsg("materialize mode required, but it is not " \
3187  "allowed in this context")));
3188 
3189  /* Build a tuple descriptor for our result type */
3190  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3191  elog(ERROR, "return type must be a row type");
3192 
3193  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3194  oldcontext = MemoryContextSwitchTo(per_query_ctx);
3195 
3196  tupstore = tuplestore_begin_heap(true, false, work_mem);
3197  rsinfo->returnMode = SFRM_Materialize;
3198  rsinfo->setResult = tupstore;
3199  rsinfo->setDesc = tupdesc;
3200 
3201  MemoryContextSwitchTo(oldcontext);
3202 
3203  /*
3204  * Get the currently active synchronous standbys.
3205  */
3206  LWLockAcquire(SyncRepLock, LW_SHARED);
3207  sync_standbys = SyncRepGetSyncStandbys(NULL);
3208  LWLockRelease(SyncRepLock);
3209 
3210  for (i = 0; i < max_wal_senders; i++)
3211  {
3212  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3214  XLogRecPtr write;
3215  XLogRecPtr flush;
3216  XLogRecPtr apply;
3217  TimeOffset writeLag;
3218  TimeOffset flushLag;
3219  TimeOffset applyLag;
3220  int priority;
3221  int pid;
3224  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3225 
3226  SpinLockAcquire(&walsnd->mutex);
3227  if (walsnd->pid == 0)
3228  {
3229  SpinLockRelease(&walsnd->mutex);
3230  continue;
3231  }
3232  pid = walsnd->pid;
3233  sentPtr = walsnd->sentPtr;
3234  state = walsnd->state;
3235  write = walsnd->write;
3236  flush = walsnd->flush;
3237  apply = walsnd->apply;
3238  writeLag = walsnd->writeLag;
3239  flushLag = walsnd->flushLag;
3240  applyLag = walsnd->applyLag;
3241  priority = walsnd->sync_standby_priority;
3242  SpinLockRelease(&walsnd->mutex);
3243 
3244  memset(nulls, 0, sizeof(nulls));
3245  values[0] = Int32GetDatum(pid);
3246 
3248  {
3249  /*
3250  * Only superusers and members of pg_read_all_stats can see details.
3251  * Other users only get the pid value to know it's a walsender,
3252  * but no details.
3253  */
3254  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3255  }
3256  else
3257  {
3258  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3259 
3260  if (XLogRecPtrIsInvalid(sentPtr))
3261  nulls[2] = true;
3262  values[2] = LSNGetDatum(sentPtr);
3263 
3264  if (XLogRecPtrIsInvalid(write))
3265  nulls[3] = true;
3266  values[3] = LSNGetDatum(write);
3267 
3268  if (XLogRecPtrIsInvalid(flush))
3269  nulls[4] = true;
3270  values[4] = LSNGetDatum(flush);
3271 
3272  if (XLogRecPtrIsInvalid(apply))
3273  nulls[5] = true;
3274  values[5] = LSNGetDatum(apply);
3275 
3276  /*
3277  * Treat a standby such as a pg_basebackup background process
3278  * which always returns an invalid flush location, as an
3279  * asynchronous standby.
3280  */
3281  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3282 
3283  if (writeLag < 0)
3284  nulls[6] = true;
3285  else
3286  values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3287 
3288  if (flushLag < 0)
3289  nulls[7] = true;
3290  else
3291  values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3292 
3293  if (applyLag < 0)
3294  nulls[8] = true;
3295  else
3296  values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3297 
3298  values[9] = Int32GetDatum(priority);
3299 
3300  /*
3301  * More easily understood version of standby state. This is purely
3302  * informational.
3303  *
3304  * In quorum-based sync replication, the role of each standby
3305  * listed in synchronous_standby_names can be changing very
3306  * frequently. Any standbys considered as "sync" at one moment can
3307  * be switched to "potential" ones at the next moment. So, it's
3308  * basically useless to report "sync" or "potential" as their sync
3309  * states. We report just "quorum" for them.
3310  */
3311  if (priority == 0)
3312  values[10] = CStringGetTextDatum("async");
3313  else if (list_member_int(sync_standbys, i))
3315  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3316  else
3317  values[10] = CStringGetTextDatum("potential");
3318  }
3319 
3320  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3321  }
3322 
3323  /* clean up and return the tuplestore */
3324  tuplestore_donestoring(tupstore);
3325 
3326  return (Datum) 0;
3327 }
3328 
3329 /*
3330  * This function is used to send a keepalive message to standby.
3331  * If requestReply is set, sets a flag in the message requesting the standby
3332  * to send a message back to us, for heartbeat purposes.
3333  */
3334 static void
3335 WalSndKeepalive(bool requestReply)
3336 {
3337  elog(DEBUG2, "sending replication keepalive");
3338 
3339  /* construct the message... */
3340  resetStringInfo(&output_message);
3341  pq_sendbyte(&output_message, 'k');
3342  pq_sendint64(&output_message, sentPtr);
3343  pq_sendint64(&output_message, GetCurrentTimestamp());
3344  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3345 
3346  /* ... and send it wrapped in CopyData */
3347  pq_putmessage_noblock('d', output_message.data, output_message.len);
3348 }
3349 
3350 /*
3351  * Send keepalive message if too much time has elapsed.
3352  */
3353 static void
3355 {
3356  TimestampTz ping_time;
3357 
3358  /*
3359  * Don't send keepalive messages if timeouts are globally disabled or
3360  * we're doing something not partaking in timeouts.
3361  */
3362  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3363  return;
3364 
3366  return;
3367 
3368  /*
3369  * If half of wal_sender_timeout has lapsed without receiving any reply
3370  * from the standby, send a keep-alive message to the standby requesting
3371  * an immediate reply.
3372  */
3374  wal_sender_timeout / 2);
3375  if (now >= ping_time)
3376  {
3377  WalSndKeepalive(true);
3379 
3380  /* Try to flush pending output to the client */
3381  if (pq_flush_if_writable() != 0)
3382  WalSndShutdown();
3383  }
3384 }
3385 
3386 /*
3387  * Record the end of the WAL and the time it was flushed locally, so that
3388  * LagTrackerRead can compute the elapsed time (lag) when this WAL location is
3389  * eventually reported to have been written, flushed and applied by the
3390  * standby in a reply message.
3391  */
3392 static void
3394 {
3395  bool buffer_full;
3396  int new_write_head;
3397  int i;
3398 
3399  if (!am_walsender)
3400  return;
3401 
3402  /*
3403  * If the lsn hasn't advanced since last time, then do nothing. This way
3404  * we only record a new sample when new WAL has been written.
3405  */
3406  if (LagTracker.last_lsn == lsn)
3407  return;
3408  LagTracker.last_lsn = lsn;
3409 
3410  /*
3411  * If advancing the write head of the circular buffer would crash into any
3412  * of the read heads, then the buffer is full. In other words, the
3413  * slowest reader (presumably apply) is the one that controls the release
3414  * of space.
3415  */
3416  new_write_head = (LagTracker.write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3417  buffer_full = false;
3418  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3419  {
3420  if (new_write_head == LagTracker.read_heads[i])
3421  buffer_full = true;
3422  }
3423 
3424  /*
3425  * If the buffer is full, for now we just rewind by one slot and overwrite
3426  * the last sample, as a simple (if somewhat uneven) way to lower the
3427  * sampling rate. There may be better adaptive compaction algorithms.
3428  */
3429  if (buffer_full)
3430  {
3431  new_write_head = LagTracker.write_head;
3432  if (LagTracker.write_head > 0)
3433  LagTracker.write_head--;
3434  else
3435  LagTracker.write_head = LAG_TRACKER_BUFFER_SIZE - 1;
3436  }
3437 
3438  /* Store a sample at the current write head position. */
3439  LagTracker.buffer[LagTracker.write_head].lsn = lsn;
3440  LagTracker.buffer[LagTracker.write_head].time = local_flush_time;
3441  LagTracker.write_head = new_write_head;
3442 }
3443 
3444 /*
3445  * Find out how much time has elapsed between the moment WAL location 'lsn'
3446  * (or the highest known earlier LSN) was flushed locally and the time 'now'.
3447  * We have a separate read head for each of the reported LSN locations we
3448  * receive in replies from standby; 'head' controls which read head is
3449  * used. Whenever a read head crosses an LSN which was written into the
3450  * lag buffer with LagTrackerWrite, we can use the associated timestamp to
3451  * find out the time this LSN (or an earlier one) was flushed locally, and
3452  * therefore compute the lag.
3453  *
3454  * Return -1 if no new sample data is available, and otherwise the elapsed
3455  * time in microseconds.
3456  */
3457 static TimeOffset
3459 {
3460  TimestampTz time = 0;
3461 
3462  /* Read all unread samples up to this LSN or end of buffer. */
3463  while (LagTracker.read_heads[head] != LagTracker.write_head &&
3464  LagTracker.buffer[LagTracker.read_heads[head]].lsn <= lsn)
3465  {
3466  time = LagTracker.buffer[LagTracker.read_heads[head]].time;
3467  LagTracker.last_read[head] =
3468  LagTracker.buffer[LagTracker.read_heads[head]];
3469  LagTracker.read_heads[head] =
3470  (LagTracker.read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
3471  }
3472 
3473  /*
3474  * If the lag tracker is empty, that means the standby has processed
3475  * everything we've ever sent so we should now clear 'last_read'. If we
3476  * didn't do that, we'd risk using a stale and irrelevant sample for
3477  * interpolation at the beginning of the next burst of WAL after a period
3478  * of idleness.
3479  */
3480  if (LagTracker.read_heads[head] == LagTracker.write_head)
3481  LagTracker.last_read[head].time = 0;
3482 
3483  if (time > now)
3484  {
3485  /* If the clock somehow went backwards, treat as not found. */
3486  return -1;
3487  }
3488  else if (time == 0)
3489  {
3490  /*
3491  * We didn't cross a time. If there is a future sample that we
3492  * haven't reached yet, and we've already reached at least one sample,
3493  * let's interpolate the local flushed time. This is mainly useful
3494  * for reporting a completely stuck apply position as having
3495  * increasing lag, since otherwise we'd have to wait for it to
3496  * eventually start moving again and cross one of our samples before
3497  * we can show the lag increasing.
3498  */
3499  if (LagTracker.read_heads[head] == LagTracker.write_head)
3500  {
3501  /* There are no future samples, so we can't interpolate. */
3502  return -1;
3503  }
3504  else if (LagTracker.last_read[head].time != 0)
3505  {
3506  /* We can interpolate between last_read and the next sample. */
3507  double fraction;
3508  WalTimeSample prev = LagTracker.last_read[head];
3509  WalTimeSample next = LagTracker.buffer[LagTracker.read_heads[head]];
3510 
3511  if (lsn < prev.lsn)
3512  {
3513  /*
3514  * Reported LSNs shouldn't normally go backwards, but it's
3515  * possible when there is a timeline change. Treat as not
3516  * found.
3517  */
3518  return -1;
3519  }
3520 
3521  Assert(prev.lsn < next.lsn);
3522 
3523  if (prev.time > next.time)
3524  {
3525  /* If the clock somehow went backwards, treat as not found. */
3526  return -1;
3527  }
3528 
3529  /* See how far we are between the previous and next samples. */
3530  fraction =
3531  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3532 
3533  /* Scale the local flush time proportionally. */
3534  time = (TimestampTz)
3535  ((double) prev.time + (next.time - prev.time) * fraction);
3536  }
3537  else
3538  {
3539  /*
3540  * We have only a future sample, implying that we were entirely
3541  * caught up but and now there is a new burst of WAL and the
3542  * standby hasn't processed the first sample yet. Until the
3543  * standby reaches the future sample the best we can do is report
3544  * the hypothetical lag if that sample were to be replayed now.
3545  */
3546  time = LagTracker.buffer[LagTracker.read_heads[head]].time;
3547  }
3548  }
3549 
3550  /* Return the elapsed time since local flush time in microseconds. */
3551  Assert(time != 0);
3552  return now - time;
3553 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
static void InitWalSenderSlot(void)
Definition: walsender.c:2228
void InitializeTimeouts(void)
Definition: timeout.c:340
#define pq_is_send_pending()
Definition: libpq.h:41
#define NIL
Definition: pg_list.h:69
XLogRecPtr write
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
static void ProcessStandbyMessage(void)
Definition: walsender.c:1694
#define SIGQUIT
Definition: win32_port.h:164
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define IsA(nodeptr, _type_)
Definition: nodes.h:564
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:198
XLogRecPtr startpoint
Definition: replnodes.h:85
#define DEBUG1
Definition: elog.h:25
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
int MyProcPid
Definition: globals.c:39
int wal_sender_timeout
Definition: walsender.c:123
uint32 TimeLineID
Definition: xlogdefs.h:45
#define pq_flush()
Definition: libpq.h:39
static int32 next
Definition: blutils.c:210
#define SIGTTOU
Definition: win32_port.h:175
XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
struct Port * MyProcPort
Definition: globals.c:41
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
int wal_segment_size
Definition: xlog.c:113
uint32 TransactionId
Definition: c.h:463
bool wake_wal_senders
Definition: walsender.c:130
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
bool exec_replication_command(const char *cmd_string)
Definition: walsender.c:1426
Oid GetUserId(void)
Definition: miscinit.c:284
#define SIGTTIN
Definition: win32_port.h:174
#define SIGUSR1
Definition: win32_port.h:177
bool update_process_title
Definition: ps_status.c:35
#define write(a, b, c)
Definition: win32.h:14
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
TransactionId xmin
Definition: proc.h:225
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:2940
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:2994
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:428
#define TEXTOID
Definition: pg_type.h:324
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
Definition: walsender.c:783
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1030
#define SIGCHLD
Definition: win32_port.h:173
PGPROC * MyProc
Definition: proc.c:67
int64 TimestampTz
Definition: timestamp.h:39
static void pq_sendint32(StringInfo buf, int32 i)
Definition: pqformat.h:148
#define SIGWINCH
Definition: win32_port.h:176
#define SIGCONT
Definition: win32_port.h:172
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
Size WalSndShmemSize(void)
Definition: walsender.c:2976
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:239
void CommitTransactionCommand(void)
Definition: xact.c:2745
#define XACT_REPEATABLE_READ
Definition: xact.h:30
#define SpinLockInit(lock)
Definition: spin.h:60
void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
Definition: snapmgr.c:2191
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:330
uint8 syncrep_method
Definition: syncrep.h:51
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:75
static int sendFile
Definition: walsender.c:136
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:371
static void XLogSendPhysical(void)
Definition: walsender.c:2492
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:1309
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
TimeOffset flushLag
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:221
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:326
#define INT4OID
Definition: pg_type.h:316
Definition: nodes.h:513
static StringInfoData output_message
Definition: walsender.c:161
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
struct cursor * cur
Definition: ecpg.c:28
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:188
void proc_exit(int code)
Definition: ipc.c:104
static void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.h:156
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
#define MemSet(start, val, len)
Definition: c.h:897
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2289
#define kill(pid, sig)
Definition: win32_port.h:437
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:2825
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define WL_SOCKET_READABLE
Definition: latch.h:125
void ReplicationSlotSave(void)
Definition: slot.c:638
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8251
List * SyncRepGetSyncStandbys(bool *am_sync)
Definition: syncrep.c:678
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:220
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:218
TimestampTz time
Definition: walsender.c:205
static void pq_sendbyte(StringInfo buf, int8 byt)
Definition: pqformat.h:164
int sleeptime
Definition: pg_standby.c:42
#define SIGPIPE
Definition: win32_port.h:168
ReplicationSlotPersistentData data
Definition: slot.h:120
TimeOffset writeLag
#define SIGUSR2
Definition: win32_port.h:178
void ResetLatch(volatile Latch *latch)
Definition: latch.c:497
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:7919
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1649
static struct @26 LagTracker
XLogRecPtr last_lsn
Definition: walsender.c:214
void list_free_deep(List *list)
Definition: list.c:1147
static int fd(const char *x, int i)
Definition: preproc-init.c:105
pgsocket sock
Definition: libpq-be.h:118
#define PG_BINARY
Definition: c.h:1069
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:7941
slock_t mutex
Node * replication_parse_result
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2687
Latch procLatch
Definition: proc.h:104
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
Definition: xlogutils.c:802
XLogRecord * XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
Definition: xlogreader.c:195
XLogRecPtr confirmed_flush
Definition: slot.h:81
PGXACT * MyPgXact
Definition: proc.c:68
XLogRecPtr EndRecPtr
Definition: xlogreader.h:120
int32 day
Definition: timestamp.h:47
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:1898
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
TimeLineID timeline
Definition: replnodes.h:97
ReplicationKind kind
Definition: replnodes.h:56
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1724
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: walsender.c:749
bool IsTransactionBlock(void)
Definition: xact.c:4448
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:635
bool am_walsender
Definition: walsender.c:115
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
void HandleWalSndInitStopping(void)
Definition: walsender.c:2918
void ReplicationSlotReserveWal(void)
Definition: slot.c:985
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:180
XLogRecPtr flush
bool defGetBoolean(DefElem *def)
Definition: define.c:111
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:438
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:741
void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
Definition: xlog.c:8320
ReplicationKind kind
Definition: replnodes.h:82
void WalSndRqstFileReload(void)
Definition: walsender.c:2895
void ConditionVariableCancelSleep(void)
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3335
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:1367
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:614
#define ERROR
Definition: elog.h:43
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:96
void pq_startmsgread(void)
Definition: pqcomm.c:1210
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2403
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1725
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3774
bool FirstSnapshotSet
Definition: snapmgr.c:203
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
void WalSndSignals(void)
Definition: walsender.c:2952
#define FATAL
Definition: elog.h:52
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:691
Latch * latch
void(* WalSndSendDataCallback)(void)
Definition: walsender.c:225
#define MAXPGPATH
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11155
void ReplicationSlotPersist(void)
Definition: slot.c:673
TransactionId effective_xmin
Definition: slot.h:116
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1040
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:198
static TimeLineID curFileTimeLine
Definition: walsender.c:141
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:197
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2056
#define DEBUG2
Definition: elog.h:24
bool XLogBackgroundFlush(void)
Definition: xlog.c:2952
bool list_member_int(const List *list, int datum)
Definition: list.c:485
Definition: latch.h:110
Definition: dest.h:88
#define TLHistoryFileName(fname, tli)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:355
WalSndState state
NodeTag type
Definition: nodes.h:515
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10158
static char * buf
Definition: pg_test_fsync.c:67
void WalSndWaitStopping(void)
Definition: walsender.c:3075
static bool streamingDoneSending
Definition: walsender.c:180
uint64 XLogSegNo
Definition: xlogdefs.h:34
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
int errdetail(const char *fmt,...)
Definition: elog.c:873
#define COMMERROR
Definition: elog.h:30
int errcode_for_file_access(void)
Definition: elog.c:598
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:524
#define SIGHUP
Definition: win32_port.h:163
TransactionId catalog_xmin
Definition: slot.h:70
#define pq_flush_if_writable()
Definition: libpq.h:40
TimeOffset time
Definition: timestamp.h:45
XLogRecPtr currTLIValidUntil
Definition: xlogreader.h:180
#define InvalidTransactionId
Definition: transam.h:31
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:264
void PostgresSigHupHandler(SIGNAL_ARGS)
Definition: postgres.c:2727
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1153
static XLogRecPtr logical_startptr
Definition: walsender.c:199
unsigned int uint32
Definition: c.h:314
void ReplicationSlotRelease(void)
Definition: slot.c:416
static void XLogRead(char *buf, XLogRecPtr startptr, Size count)
Definition: walsender.c:2317
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1260
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
TransactionId xmin
Definition: slot.h:62
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
void InitWalSender(void)
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:187
#define SlotIsLogical(slot)
Definition: slot.h:142
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1786
void SyncRepInitConfig(void)
Definition: syncrep.c:382
#define ereport(elevel, rest)
Definition: elog.h:122
static bool WalSndCaughtUp
Definition: walsender.c:184
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3393
XLogRecPtr lsn
Definition: walsender.c:204
int replication_yyparse(void)
Definition: guc.h:72
int32 month
Definition: timestamp.h:48
volatile sig_atomic_t ConfigReloadPending
Definition: globals.c:34
int max_wal_senders
Definition: walsender.c:121
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc)
Definition: execTuples.c:1291
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2094
int CloseTransientFile(int fd)
Definition: fd.c:2573
#define SIG_IGN
Definition: win32_port.h:160
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1126
#define AllocSetContextCreate(parent, name, allocparams)
Definition: memutils.h:165
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1272
static StringInfoData reply_message
Definition: walsender.c:162
#define MAXFNAMELEN
TimeLineID nextTLI
Definition: xlogreader.h:186
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
int64 TimeOffset
Definition: timestamp.h:40
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1277
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
#define InvalidBackendId
Definition: backendid.h:23
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3458
void WalSndErrorCleanup(void)
Definition: walsender.c:292
static volatile sig_atomic_t replication_active
Definition: walsender.c:196
uintptr_t Datum
Definition: postgres.h:365
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3132
XLogRecPtr sentPtr
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
bool am_db_walsender
Definition: walsender.c:118
TransactionId effective_catalog_xmin
Definition: slot.h:117
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1041
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:348
Oid MyDatabaseId
Definition: globals.c:77
static void WalSndShutdown(void)
Definition: walsender.c:229
static TimeLineID receiveTLI
Definition: xlog.c:203
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
int work_mem
Definition: globals.c:113
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:561
WalSnd * MyWalSnd
Definition: walsender.c:112
#define DEFAULT_ROLE_READ_ALL_STATS
Definition: pg_authid.h:108
static XLogRecPtr sentPtr
Definition: walsender.c:158
void pq_endmsgread(void)
Definition: pqcomm.c:1234
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:181
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:217
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
int allowedModes
Definition: execnodes.h:282
void WalSndInitStopping(void)
Definition: walsender.c:3049
TimeLineID currTLI
Definition: xlogreader.h:170
#define INT8OID
Definition: pg_type.h:304
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
static void WalSndKeepaliveIfNecessary(TimestampTz now)
Definition: walsender.c:3354
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4857
TimeOffset applyLag
struct SnapBuild * snapshot_builder
Definition: logical.h:46
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
SetFunctionReturnMode returnMode
Definition: execnodes.h:284
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2025
void SetLatch(volatile Latch *latch)
Definition: latch.c:414
#define SIG_DFL
Definition: win32_port.h:158
#define SIGNAL_ARGS
Definition: c.h:1110
static TimeLineID sendTimeLine
Definition: walsender.c:149
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:688
static void WalSndCheckTimeOut(TimestampTz now)
Definition: walsender.c:2067
#define lfirst(lc)
Definition: pg_list.h:106
void WalSndSetState(WalSndState state)
Definition: walsender.c:3113
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:356
int sync_standby_priority
Definition: regguts.h:298
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:481
static void XLogSendLogical(void)
Definition: walsender.c:2755
XLogRecPtr restart_lsn
Definition: slot.h:73
void StartTransactionCommand(void)
Definition: xact.c:2674
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:215
bool needreload
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1757
size_t Size
Definition: c.h:422
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1236
char * dbname
Definition: streamutil.c:42
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:209
int XactIsoLevel
Definition: xact.c:74
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1120
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:150
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:152
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:934
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:216
TupleDesc CreateTemplateTupleDesc(int natts, bool hasoid)
Definition: tupdesc.c:43
WalSndState
static XLogSegNo sendSegNo
Definition: walsender.c:137
Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
Definition: walsender.c:3167
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:836
static bool streamingDoneReceiving
Definition: walsender.c:181
#define BYTEAOID
Definition: pg_type.h:292
Tuplestorestate * setResult
Definition: execnodes.h:287
#define pg_attribute_noreturn()
Definition: c.h:147
static StringInfoData tmpbuf
Definition: walsender.c:163
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:939
static void pq_sendint16(StringInfo buf, int16 i)
Definition: pqformat.h:140
#define XLogFilePath(path, tli, logSegNo, wal_segsz_bytes)
bool IsSubTransaction(void)
Definition: xact.c:4521
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:545
static Datum values[MAXATTR]
Definition: bootstrap.c:164
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:280
bool log_replication_commands
Definition: walsender.c:125
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4711
#define Int32GetDatum(X)
Definition: postgres.h:462
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:509
char * application_name
Definition: guc.c:470
TupleDesc setDesc
Definition: execnodes.h:288
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
void * palloc(Size size)
Definition: mcxt.c:835
int errmsg(const char *fmt,...)
Definition: elog.c:797
XLogReaderState * reader
Definition: logical.h:44
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
void die(SIGNAL_ARGS)
Definition: postgres.c:2656
CRSSnapshotAction
Definition: walsender.h:22
StringInfo out
Definition: logical.h:73
static const unsigned __int64 epoch
Definition: gettimeofday.c:34
int i
void WalSndShmemInit(void)
Definition: walsender.c:2988
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
#define NameStr(name)
Definition: c.h:565
TimeLineID timeline
Definition: replnodes.h:84
#define CStringGetTextDatum(s)
Definition: builtins.h:91
void * arg
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:1849
struct Latch * MyLatch
Definition: globals.c:52
void ReplicationSlotCleanup(void)
Definition: slot.c:471
#define PG_FUNCTION_ARGS
Definition: fmgr.h:158
char * defname
Definition: parsenodes.h:727
static uint32 sendOff
Definition: walsender.c:138
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2865
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:113
#define elog
Definition: elog.h:219
void LWLockReleaseAll(void)
Definition: lwlock.c:1823
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3151
slock_t mutex
Definition: slot.h:93
#define close(a)
Definition: win32.h:12
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:261
CommandDest whereToSendOutput
Definition: postgres.c:88
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1624
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
static bool waiting_for_ping_response
Definition: walsender.c:172
void pq_puttextmessage(char msgtype, const char *str)
Definition: pqformat.c:369
XLogRecPtr apply
static void IdentifySystem(void)
Definition: walsender.c:339
Definition: pg_list.h:45
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:76
#define WL_LATCH_SET
Definition: latch.h:124
int16 AttrNumber
Definition: attnum.h:21
void quickdie(SIGNAL_ARGS)
Definition: postgres.c:2595
#define UINT64_FORMAT
Definition: c.h:357
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:695
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:410
static TimestampTz last_reply_timestamp
Definition: walsender.c:169
int write_head
Definition: walsender.c:216
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1592
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
#define MAX_SEND_SIZE
Definition: walsender.c:106
#define read(a, b, c)
Definition: win32.h:13
static bool sendTimeLineIsHistoric
Definition: walsender.c:151
#define offsetof(type, field)
Definition: c.h:611
void WalSndWakeup(void)
Definition: walsender.c:3020
void ReplicationSlotMarkDirty(void)
Definition: slot.c:656
bool am_cascading_walsender
Definition: walsender.c:116
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:1926
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:696
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1251
void PreventTransactionChain(bool isTopLevel, const char *stmtType)
Definition: xact.c:3154
void replication_scanner_init(const char *query_string)
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:416
#define TLHistoryFilePath(path, tli)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)