PostgreSQL Source Code  git master
walsender.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * walsender.c
4  *
5  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6  * care of sending XLOG from the primary server to a single recipient.
7  * (Note that there can be more than one walsender process concurrently.)
8  * It is started by the postmaster when the walreceiver of a standby server
9  * connects to the primary server and requests XLOG streaming replication.
10  *
11  * A walsender is similar to a regular backend, ie. there is a one-to-one
12  * relationship between a connection and a walsender process, but instead
13  * of processing SQL queries, it understands a small set of special
14  * replication-mode commands. The START_REPLICATION command begins streaming
15  * WAL to the client. While streaming, the walsender keeps reading XLOG
16  * records from the disk and sends them to the standby server over the
17  * COPY protocol, until either side ends the replication by exiting COPY
18  * mode (or until the connection is closed).
19  *
20  * Normal termination is by SIGTERM, which instructs the walsender to
21  * close the connection and exit(0) at the next convenient moment. Emergency
22  * termination is by SIGQUIT; like any backend, the walsender will simply
23  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24  * are treated as not a crash but approximately normal termination;
25  * the walsender will exit quickly without sending any more XLOG records.
26  *
27  * If the server is shut down, checkpointer sends us
28  * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If
29  * the backend is idle or runs an SQL query this causes the backend to
30  * shutdown, if logical replication is in progress all existing WAL records
31  * are processed followed by a shutdown. Otherwise this causes the walsender
32  * to switch to the "stopping" state. In this state, the walsender will reject
33  * any further replication commands. The checkpointer begins the shutdown
34  * checkpoint once all walsenders are confirmed as stopping. When the shutdown
35  * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
36  * walsender to send any outstanding WAL, including the shutdown checkpoint
37  * record, wait for it to be replicated to the standby, and then exit.
38  *
39  *
40  * Portions Copyright (c) 2010-2019, PostgreSQL Global Development Group
41  *
42  * IDENTIFICATION
43  * src/backend/replication/walsender.c
44  *
45  *-------------------------------------------------------------------------
46  */
47 #include "postgres.h"
48 
49 #include <signal.h>
50 #include <unistd.h>
51 
52 #include "access/printtup.h"
53 #include "access/timeline.h"
54 #include "access/transam.h"
55 #include "access/xact.h"
56 #include "access/xlog_internal.h"
57 #include "access/xlogutils.h"
58 
59 #include "catalog/pg_authid.h"
60 #include "catalog/pg_type.h"
61 #include "commands/dbcommands.h"
62 #include "commands/defrem.h"
63 #include "funcapi.h"
64 #include "libpq/libpq.h"
65 #include "libpq/pqformat.h"
66 #include "miscadmin.h"
67 #include "nodes/replnodes.h"
68 #include "pgstat.h"
69 #include "replication/basebackup.h"
70 #include "replication/decode.h"
71 #include "replication/logical.h"
73 #include "replication/slot.h"
74 #include "replication/snapbuild.h"
75 #include "replication/syncrep.h"
77 #include "replication/walsender.h"
80 #include "storage/fd.h"
81 #include "storage/ipc.h"
82 #include "storage/pmsignal.h"
83 #include "storage/proc.h"
84 #include "storage/procarray.h"
85 #include "tcop/dest.h"
86 #include "tcop/tcopprot.h"
87 #include "utils/builtins.h"
88 #include "utils/guc.h"
89 #include "utils/memutils.h"
90 #include "utils/pg_lsn.h"
91 #include "utils/portal.h"
92 #include "utils/ps_status.h"
93 #include "utils/timeout.h"
94 #include "utils/timestamp.h"
95 
96 /*
97  * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
98  *
99  * We don't have a good idea of what a good value would be; there's some
100  * overhead per message in both walsender and walreceiver, but on the other
101  * hand sending large batches makes walsender less responsive to signals
102  * because signals are checked only between messages. 128kB (with
103  * default 8k blocks) seems like a reasonable guess for now.
104  */
105 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
106 
107 /* Array of WalSnds in shared memory */
109 
110 /* My slot in the shared memory array */
111 WalSnd *MyWalSnd = NULL;
112 
113 /* Global state */
114 bool am_walsender = false; /* Am I a walsender process? */
115 bool am_cascading_walsender = false; /* Am I cascading WAL to another
116  * standby? */
117 bool am_db_walsender = false; /* Connected to a database? */
118 
119 /* User-settable parameters for walsender */
120 int max_wal_senders = 0; /* the maximum number of concurrent
121  * walsenders */
122 int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
123  * data message */
125 
126 /*
127  * State for WalSndWakeupRequest
128  */
129 bool wake_wal_senders = false;
130 
131 /*
132  * These variables are used similarly to openLogFile/SegNo/Off,
133  * but for walsender to read the XLOG.
134  */
135 static int sendFile = -1;
136 static XLogSegNo sendSegNo = 0;
137 static uint32 sendOff = 0;
138 
139 /* Timeline ID of the currently open file */
141 
142 /*
143  * These variables keep track of the state of the timeline we're currently
144  * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
145  * the timeline is not the latest timeline on this server, and the server's
146  * history forked off from that timeline at sendTimeLineValidUpto.
147  */
150 static bool sendTimeLineIsHistoric = false;
152 
153 /*
154  * How far have we sent WAL already? This is also advertised in
155  * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
156  */
157 static XLogRecPtr sentPtr = 0;
158 
159 /* Buffers for constructing outgoing messages and processing reply messages. */
163 
164 /* Timestamp of last ProcessRepliesIfAny(). */
166 
167 /*
168  * Timestamp of last ProcessRepliesIfAny() that saw a reply from the
169  * standby. Set to 0 if wal_sender_timeout doesn't need to be active.
170  */
172 
173 /* Have we sent a heartbeat message asking for reply, since last reply? */
174 static bool waiting_for_ping_response = false;
175 
176 /*
177  * While streaming WAL in Copy mode, streamingDoneSending is set to true
178  * after we have sent CopyDone. We should not send any more CopyData messages
179  * after that. streamingDoneReceiving is set to true when we receive CopyDone
180  * from the other end. When both become true, it's time to exit Copy mode.
181  */
184 
185 /* Are we there yet? */
186 static bool WalSndCaughtUp = false;
187 
188 /* Flags set by signal handlers for later service in main loop */
189 static volatile sig_atomic_t got_SIGUSR2 = false;
190 static volatile sig_atomic_t got_STOPPING = false;
191 
192 /*
193  * This is set while we are streaming. When not set
194  * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
195  * the main loop is responsible for checking got_STOPPING and terminating when
196  * it's set (after streaming any remaining WAL).
197  */
198 static volatile sig_atomic_t replication_active = false;
199 
202 
203 /* A sample associating a WAL location with the time it was written. */
204 typedef struct
205 {
208 } WalTimeSample;
209 
210 /* The size of our buffer of time samples. */
211 #define LAG_TRACKER_BUFFER_SIZE 8192
212 
213 /* A mechanism for tracking replication lag. */
214 typedef struct
215 {
219  int read_heads[NUM_SYNC_REP_WAIT_MODE];
221 } LagTracker;
222 
224 
225 /* Signal handlers */
227 
228 /* Prototypes for private functions */
229 typedef void (*WalSndSendDataCallback) (void);
230 static void WalSndLoop(WalSndSendDataCallback send_data);
231 static void InitWalSenderSlot(void);
232 static void WalSndKill(int code, Datum arg);
234 static void XLogSendPhysical(void);
235 static void XLogSendLogical(void);
236 static void WalSndDone(WalSndSendDataCallback send_data);
237 static XLogRecPtr GetStandbyFlushRecPtr(void);
238 static void IdentifySystem(void);
241 static void StartReplication(StartReplicationCmd *cmd);
243 static void ProcessStandbyMessage(void);
244 static void ProcessStandbyReplyMessage(void);
245 static void ProcessStandbyHSFeedbackMessage(void);
246 static void ProcessRepliesIfAny(void);
247 static void WalSndKeepalive(bool requestReply);
248 static void WalSndKeepaliveIfNecessary(void);
249 static void WalSndCheckTimeOut(void);
251 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
252 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
255 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
256 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
258 
259 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
260 
261 
262 /* Initialize walsender process before entering the main command loop */
263 void
264 InitWalSender(void)
265 {
267 
268  /* Create a per-walsender data structure in shared memory */
270 
271  /*
272  * We don't currently need any ResourceOwner in a walsender process, but
273  * if we did, we could call CreateAuxProcessResourceOwner here.
274  */
275 
276  /*
277  * Let postmaster know that we're a WAL sender. Once we've declared us as
278  * a WAL sender process, postmaster will let us outlive the bgwriter and
279  * kill us last in the shutdown sequence, so we get a chance to stream all
280  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
281  * there's no going back, and we mustn't write any WAL records after this.
282  */
285 
286  /* Initialize empty timestamp buffer for lag tracking. */
287  lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
288 }
289 
290 /*
291  * Clean up after an error.
292  *
293  * WAL sender processes don't use transactions like regular backends do.
294  * This function does any cleanup required after an error in a WAL sender
295  * process, similar to what transaction abort does in a regular backend.
296  */
297 void
299 {
303 
304  if (sendFile >= 0)
305  {
306  close(sendFile);
307  sendFile = -1;
308  }
309 
310  if (MyReplicationSlot != NULL)
312 
314 
315  replication_active = false;
316 
317  if (got_STOPPING || got_SIGUSR2)
318  proc_exit(0);
319 
320  /* Revert back to startup state */
322 }
323 
324 /*
325  * Handle a client's connection abort in an orderly manner.
326  */
327 static void
328 WalSndShutdown(void)
329 {
330  /*
331  * Reset whereToSendOutput to prevent ereport from attempting to send any
332  * more messages to the standby.
333  */
336 
337  proc_exit(0);
338  abort(); /* keep the compiler quiet */
339 }
340 
341 /*
342  * Handle the IDENTIFY_SYSTEM command.
343  */
344 static void
346 {
347  char sysid[32];
348  char xloc[MAXFNAMELEN];
349  XLogRecPtr logptr;
350  char *dbname = NULL;
352  TupOutputState *tstate;
353  TupleDesc tupdesc;
354  Datum values[4];
355  bool nulls[4];
356 
357  /*
358  * Reply with a result set with one row, four columns. First col is system
359  * ID, second is timeline ID, third is current xlog location and the
360  * fourth contains the database name if we are connected to one.
361  */
362 
363  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
365 
368  {
369  /* this also updates ThisTimeLineID */
370  logptr = GetStandbyFlushRecPtr();
371  }
372  else
373  logptr = GetFlushRecPtr();
374 
375  snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
376 
377  if (MyDatabaseId != InvalidOid)
378  {
380 
381  /* syscache access needs a transaction env. */
383  /* make dbname live outside TX context */
387  /* CommitTransactionCommand switches to TopMemoryContext */
389  }
390 
392  MemSet(nulls, false, sizeof(nulls));
393 
394  /* need a tuple descriptor representing four columns */
395  tupdesc = CreateTemplateTupleDesc(4);
396  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
397  TEXTOID, -1, 0);
398  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
399  INT4OID, -1, 0);
400  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
401  TEXTOID, -1, 0);
402  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
403  TEXTOID, -1, 0);
404 
405  /* prepare for projection of tuples */
406  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
407 
408  /* column 1: system identifier */
409  values[0] = CStringGetTextDatum(sysid);
410 
411  /* column 2: timeline */
412  values[1] = Int32GetDatum(ThisTimeLineID);
413 
414  /* column 3: wal location */
415  values[2] = CStringGetTextDatum(xloc);
416 
417  /* column 4: database name, or NULL if none */
418  if (dbname)
419  values[3] = CStringGetTextDatum(dbname);
420  else
421  nulls[3] = true;
422 
423  /* send it to dest */
424  do_tup_output(tstate, values, nulls);
425 
426  end_tup_output(tstate);
427 }
428 
429 
430 /*
431  * Handle TIMELINE_HISTORY command.
432  */
433 static void
435 {
437  char histfname[MAXFNAMELEN];
438  char path[MAXPGPATH];
439  int fd;
440  off_t histfilelen;
441  off_t bytesleft;
442  Size len;
443 
444  /*
445  * Reply with a result set with one row, and two columns. The first col is
446  * the name of the history file, 2nd is the contents.
447  */
448 
449  TLHistoryFileName(histfname, cmd->timeline);
450  TLHistoryFilePath(path, cmd->timeline);
451 
452  /* Send a RowDescription message */
453  pq_beginmessage(&buf, 'T');
454  pq_sendint16(&buf, 2); /* 2 fields */
455 
456  /* first field */
457  pq_sendstring(&buf, "filename"); /* col name */
458  pq_sendint32(&buf, 0); /* table oid */
459  pq_sendint16(&buf, 0); /* attnum */
460  pq_sendint32(&buf, TEXTOID); /* type oid */
461  pq_sendint16(&buf, -1); /* typlen */
462  pq_sendint32(&buf, 0); /* typmod */
463  pq_sendint16(&buf, 0); /* format code */
464 
465  /* second field */
466  pq_sendstring(&buf, "content"); /* col name */
467  pq_sendint32(&buf, 0); /* table oid */
468  pq_sendint16(&buf, 0); /* attnum */
469  pq_sendint32(&buf, BYTEAOID); /* type oid */
470  pq_sendint16(&buf, -1); /* typlen */
471  pq_sendint32(&buf, 0); /* typmod */
472  pq_sendint16(&buf, 0); /* format code */
473  pq_endmessage(&buf);
474 
475  /* Send a DataRow message */
476  pq_beginmessage(&buf, 'D');
477  pq_sendint16(&buf, 2); /* # of columns */
478  len = strlen(histfname);
479  pq_sendint32(&buf, len); /* col1 len */
480  pq_sendbytes(&buf, histfname, len);
481 
482  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
483  if (fd < 0)
484  ereport(ERROR,
486  errmsg("could not open file \"%s\": %m", path)));
487 
488  /* Determine file length and send it to client */
489  histfilelen = lseek(fd, 0, SEEK_END);
490  if (histfilelen < 0)
491  ereport(ERROR,
493  errmsg("could not seek to end of file \"%s\": %m", path)));
494  if (lseek(fd, 0, SEEK_SET) != 0)
495  ereport(ERROR,
497  errmsg("could not seek to beginning of file \"%s\": %m", path)));
498 
499  pq_sendint32(&buf, histfilelen); /* col2 len */
500 
501  bytesleft = histfilelen;
502  while (bytesleft > 0)
503  {
504  PGAlignedBlock rbuf;
505  int nread;
506 
508  nread = read(fd, rbuf.data, sizeof(rbuf));
510  if (nread < 0)
511  ereport(ERROR,
513  errmsg("could not read file \"%s\": %m",
514  path)));
515  else if (nread == 0)
516  ereport(ERROR,
518  errmsg("could not read file \"%s\": read %d of %zu",
519  path, nread, (Size) bytesleft)));
520 
521  pq_sendbytes(&buf, rbuf.data, nread);
522  bytesleft -= nread;
523  }
524 
525  if (CloseTransientFile(fd) != 0)
526  ereport(ERROR,
528  errmsg("could not close file \"%s\": %m", path)));
529 
530  pq_endmessage(&buf);
531 }
532 
533 /*
534  * Handle START_REPLICATION command.
535  *
536  * At the moment, this never returns, but an ereport(ERROR) will take us back
537  * to the main loop.
538  */
539 static void
541 {
543  XLogRecPtr FlushPtr;
544 
545  if (ThisTimeLineID == 0)
546  ereport(ERROR,
547  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
548  errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
549 
550  /*
551  * We assume here that we're logging enough information in the WAL for
552  * log-shipping, since this is checked in PostmasterMain().
553  *
554  * NOTE: wal_level can only change at shutdown, so in most cases it is
555  * difficult for there to be WAL data that we can still see that was
556  * written at wal_level='minimal'.
557  */
558 
559  if (cmd->slotname)
560  {
561  ReplicationSlotAcquire(cmd->slotname, true);
563  ereport(ERROR,
564  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
565  (errmsg("cannot use a logical replication slot for physical replication"))));
566  }
567 
568  /*
569  * Select the timeline. If it was given explicitly by the client, use
570  * that. Otherwise use the timeline of the last replayed record, which is
571  * kept in ThisTimeLineID.
572  */
574  {
575  /* this also updates ThisTimeLineID */
576  FlushPtr = GetStandbyFlushRecPtr();
577  }
578  else
579  FlushPtr = GetFlushRecPtr();
580 
581  if (cmd->timeline != 0)
582  {
583  XLogRecPtr switchpoint;
584 
585  sendTimeLine = cmd->timeline;
587  {
588  sendTimeLineIsHistoric = false;
590  }
591  else
592  {
593  List *timeLineHistory;
594 
595  sendTimeLineIsHistoric = true;
596 
597  /*
598  * Check that the timeline the client requested exists, and the
599  * requested start location is on that timeline.
600  */
601  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
602  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
604  list_free_deep(timeLineHistory);
605 
606  /*
607  * Found the requested timeline in the history. Check that
608  * requested startpoint is on that timeline in our history.
609  *
610  * This is quite loose on purpose. We only check that we didn't
611  * fork off the requested timeline before the switchpoint. We
612  * don't check that we switched *to* it before the requested
613  * starting point. This is because the client can legitimately
614  * request to start replication from the beginning of the WAL
615  * segment that contains switchpoint, but on the new timeline, so
616  * that it doesn't end up with a partial segment. If you ask for
617  * too old a starting point, you'll get an error later when we
618  * fail to find the requested WAL segment in pg_wal.
619  *
620  * XXX: we could be more strict here and only allow a startpoint
621  * that's older than the switchpoint, if it's still in the same
622  * WAL segment.
623  */
624  if (!XLogRecPtrIsInvalid(switchpoint) &&
625  switchpoint < cmd->startpoint)
626  {
627  ereport(ERROR,
628  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
629  (uint32) (cmd->startpoint >> 32),
630  (uint32) (cmd->startpoint),
631  cmd->timeline),
632  errdetail("This server's history forked from timeline %u at %X/%X.",
633  cmd->timeline,
634  (uint32) (switchpoint >> 32),
635  (uint32) (switchpoint))));
636  }
637  sendTimeLineValidUpto = switchpoint;
638  }
639  }
640  else
641  {
644  sendTimeLineIsHistoric = false;
645  }
646 
648 
649  /* If there is nothing to stream, don't even enter COPY mode */
651  {
652  /*
653  * When we first start replication the standby will be behind the
654  * primary. For some applications, for example synchronous
655  * replication, it is important to have a clear state for this initial
656  * catchup mode, so we can trigger actions when we change streaming
657  * state later. We may stay in this state for a long time, which is
658  * exactly why we want to be able to monitor whether or not we are
659  * still here.
660  */
662 
663  /* Send a CopyBothResponse message, and start streaming */
664  pq_beginmessage(&buf, 'W');
665  pq_sendbyte(&buf, 0);
666  pq_sendint16(&buf, 0);
667  pq_endmessage(&buf);
668  pq_flush();
669 
670  /*
671  * Don't allow a request to stream from a future point in WAL that
672  * hasn't been flushed to disk in this server yet.
673  */
674  if (FlushPtr < cmd->startpoint)
675  {
676  ereport(ERROR,
677  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
678  (uint32) (cmd->startpoint >> 32),
679  (uint32) (cmd->startpoint),
680  (uint32) (FlushPtr >> 32),
681  (uint32) (FlushPtr))));
682  }
683 
684  /* Start streaming from the requested point */
685  sentPtr = cmd->startpoint;
686 
687  /* Initialize shared memory status, too */
688  SpinLockAcquire(&MyWalSnd->mutex);
689  MyWalSnd->sentPtr = sentPtr;
690  SpinLockRelease(&MyWalSnd->mutex);
691 
693 
694  /* Main loop of walsender */
695  replication_active = true;
696 
698 
699  replication_active = false;
700  if (got_STOPPING)
701  proc_exit(0);
703 
705  }
706 
707  if (cmd->slotname)
709 
710  /*
711  * Copy is finished now. Send a single-row result set indicating the next
712  * timeline.
713  */
715  {
716  char startpos_str[8 + 1 + 8 + 1];
718  TupOutputState *tstate;
719  TupleDesc tupdesc;
720  Datum values[2];
721  bool nulls[2];
722 
723  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
724  (uint32) (sendTimeLineValidUpto >> 32),
726 
728  MemSet(nulls, false, sizeof(nulls));
729 
730  /*
731  * Need a tuple descriptor representing two columns. int8 may seem
732  * like a surprising data type for this, but in theory int4 would not
733  * be wide enough for this, as TimeLineID is unsigned.
734  */
735  tupdesc = CreateTemplateTupleDesc(2);
736  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
737  INT8OID, -1, 0);
738  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
739  TEXTOID, -1, 0);
740 
741  /* prepare for projection of tuple */
742  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
743 
744  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
745  values[1] = CStringGetTextDatum(startpos_str);
746 
747  /* send it to dest */
748  do_tup_output(tstate, values, nulls);
749 
750  end_tup_output(tstate);
751  }
752 
753  /* Send CommandComplete message */
754  pq_puttextmessage('C', "START_STREAMING");
755 }
756 
757 /*
758  * read_page callback for logical decoding contexts, as a walsender process.
759  *
760  * Inside the walsender we can do better than logical_read_local_xlog_page,
761  * which has to do a plain sleep/busy loop, because the walsender's latch gets
762  * set every time WAL is flushed.
763  */
764 static int
766  XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
767 {
768  XLogRecPtr flushptr;
769  int count;
770 
771  XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
773  sendTimeLine = state->currTLI;
775  sendTimeLineNextTLI = state->nextTLI;
776 
777  /* make sure we have enough WAL available */
778  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
779 
780  /* fail if not (implies we are going to shut down) */
781  if (flushptr < targetPagePtr + reqLen)
782  return -1;
783 
784  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
785  count = XLOG_BLCKSZ; /* more than one block available */
786  else
787  count = flushptr - targetPagePtr; /* part of the page available */
788 
789  /* now actually read the data, we know it's there */
790  XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
791 
792  return count;
793 }
794 
795 /*
796  * Process extra options given to CREATE_REPLICATION_SLOT.
797  */
798 static void
800  bool *reserve_wal,
801  CRSSnapshotAction *snapshot_action)
802 {
803  ListCell *lc;
804  bool snapshot_action_given = false;
805  bool reserve_wal_given = false;
806 
807  /* Parse options */
808  foreach(lc, cmd->options)
809  {
810  DefElem *defel = (DefElem *) lfirst(lc);
811 
812  if (strcmp(defel->defname, "export_snapshot") == 0)
813  {
814  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
815  ereport(ERROR,
816  (errcode(ERRCODE_SYNTAX_ERROR),
817  errmsg("conflicting or redundant options")));
818 
819  snapshot_action_given = true;
820  *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
822  }
823  else if (strcmp(defel->defname, "use_snapshot") == 0)
824  {
825  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
826  ereport(ERROR,
827  (errcode(ERRCODE_SYNTAX_ERROR),
828  errmsg("conflicting or redundant options")));
829 
830  snapshot_action_given = true;
831  *snapshot_action = CRS_USE_SNAPSHOT;
832  }
833  else if (strcmp(defel->defname, "reserve_wal") == 0)
834  {
835  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
836  ereport(ERROR,
837  (errcode(ERRCODE_SYNTAX_ERROR),
838  errmsg("conflicting or redundant options")));
839 
840  reserve_wal_given = true;
841  *reserve_wal = true;
842  }
843  else
844  elog(ERROR, "unrecognized option: %s", defel->defname);
845  }
846 }
847 
848 /*
849  * Create a new replication slot.
850  */
851 static void
853 {
854  const char *snapshot_name = NULL;
855  char xloc[MAXFNAMELEN];
856  char *slot_name;
857  bool reserve_wal = false;
858  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
860  TupOutputState *tstate;
861  TupleDesc tupdesc;
862  Datum values[4];
863  bool nulls[4];
864 
866 
867  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
868 
869  /* setup state for XLogReadPage */
870  sendTimeLineIsHistoric = false;
872 
873  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
874  {
875  ReplicationSlotCreate(cmd->slotname, false,
877  }
878  else
879  {
881 
882  /*
883  * Initially create persistent slot as ephemeral - that allows us to
884  * nicely handle errors during initialization because it'll get
885  * dropped if this transaction fails. We'll make it persistent at the
886  * end. Temporary slots can be created as temporary from beginning as
887  * they get dropped on error as well.
888  */
889  ReplicationSlotCreate(cmd->slotname, true,
891  }
892 
893  if (cmd->kind == REPLICATION_KIND_LOGICAL)
894  {
896  bool need_full_snapshot = false;
897 
898  /*
899  * Do options check early so that we can bail before calling the
900  * DecodingContextFindStartpoint which can take long time.
901  */
902  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
903  {
904  if (IsTransactionBlock())
905  ereport(ERROR,
906  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
907  (errmsg("%s must not be called inside a transaction",
908  "CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT")));
909 
910  need_full_snapshot = true;
911  }
912  else if (snapshot_action == CRS_USE_SNAPSHOT)
913  {
914  if (!IsTransactionBlock())
915  ereport(ERROR,
916  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
917  (errmsg("%s must be called inside a transaction",
918  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
919 
921  ereport(ERROR,
922  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
923  (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
924  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
925 
926  if (FirstSnapshotSet)
927  ereport(ERROR,
928  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
929  (errmsg("%s must be called before any query",
930  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
931 
932  if (IsSubTransaction())
933  ereport(ERROR,
934  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
935  (errmsg("%s must not be called in a subtransaction",
936  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
937 
938  need_full_snapshot = true;
939  }
940 
941  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
946 
947  /*
948  * Signal that we don't need the timeout mechanism. We're just
949  * creating the replication slot and don't yet accept feedback
950  * messages or send keepalives. As we possibly need to wait for
951  * further WAL the walsender would otherwise possibly be killed too
952  * soon.
953  */
955 
956  /* build initial snapshot, might take a while */
958 
959  /*
960  * Export or use the snapshot if we've been asked to do so.
961  *
962  * NB. We will convert the snapbuild.c kind of snapshot to normal
963  * snapshot when doing this.
964  */
965  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
966  {
967  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
968  }
969  else if (snapshot_action == CRS_USE_SNAPSHOT)
970  {
971  Snapshot snap;
972 
975  }
976 
977  /* don't need the decoding context anymore */
978  FreeDecodingContext(ctx);
979 
980  if (!cmd->temporary)
982  }
983  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
984  {
986 
988 
989  /* Write this slot to disk if it's a permanent one. */
990  if (!cmd->temporary)
992  }
993 
994  snprintf(xloc, sizeof(xloc), "%X/%X",
997 
999  MemSet(nulls, false, sizeof(nulls));
1000 
1001  /*----------
1002  * Need a tuple descriptor representing four columns:
1003  * - first field: the slot name
1004  * - second field: LSN at which we became consistent
1005  * - third field: exported snapshot's name
1006  * - fourth field: output plugin
1007  *----------
1008  */
1009  tupdesc = CreateTemplateTupleDesc(4);
1010  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1011  TEXTOID, -1, 0);
1012  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1013  TEXTOID, -1, 0);
1014  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1015  TEXTOID, -1, 0);
1016  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1017  TEXTOID, -1, 0);
1018 
1019  /* prepare for projection of tuples */
1020  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1021 
1022  /* slot_name */
1023  slot_name = NameStr(MyReplicationSlot->data.name);
1024  values[0] = CStringGetTextDatum(slot_name);
1025 
1026  /* consistent wal location */
1027  values[1] = CStringGetTextDatum(xloc);
1028 
1029  /* snapshot name, or NULL if none */
1030  if (snapshot_name != NULL)
1031  values[2] = CStringGetTextDatum(snapshot_name);
1032  else
1033  nulls[2] = true;
1034 
1035  /* plugin, or NULL if none */
1036  if (cmd->plugin != NULL)
1037  values[3] = CStringGetTextDatum(cmd->plugin);
1038  else
1039  nulls[3] = true;
1040 
1041  /* send it to dest */
1042  do_tup_output(tstate, values, nulls);
1043  end_tup_output(tstate);
1044 
1046 }
1047 
1048 /*
1049  * Get rid of a replication slot that is no longer wanted.
1050  */
1051 static void
1053 {
1054  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1055  EndCommand("DROP_REPLICATION_SLOT", DestRemote);
1056 }
1057 
1058 /*
1059  * Load previously initiated logical slot and prepare for sending data (via
1060  * WalSndLoop).
1061  */
1062 static void
1064 {
1066 
1067  /* make sure that our requirements are still fulfilled */
1069 
1071 
1072  ReplicationSlotAcquire(cmd->slotname, true);
1073 
1074  /*
1075  * Force a disconnect, so that the decoding code doesn't need to care
1076  * about an eventual switch from running in recovery, to running in a
1077  * normal environment. Client code is expected to handle reconnects.
1078  */
1080  {
1081  ereport(LOG,
1082  (errmsg("terminating walsender process after promotion")));
1083  got_STOPPING = true;
1084  }
1085 
1086  /*
1087  * Create our decoding context, making it start at the previously ack'ed
1088  * position.
1089  *
1090  * Do this before sending a CopyBothResponse message, so that any errors
1091  * are reported early.
1092  */
1093  logical_decoding_ctx =
1094  CreateDecodingContext(cmd->startpoint, cmd->options, false,
1098 
1099 
1101 
1102  /* Send a CopyBothResponse message, and start streaming */
1103  pq_beginmessage(&buf, 'W');
1104  pq_sendbyte(&buf, 0);
1105  pq_sendint16(&buf, 0);
1106  pq_endmessage(&buf);
1107  pq_flush();
1108 
1109 
1110  /* Start reading WAL from the oldest required WAL. */
1112 
1113  /*
1114  * Report the location after which we'll send out further commits as the
1115  * current sentPtr.
1116  */
1118 
1119  /* Also update the sent position status in shared memory */
1120  SpinLockAcquire(&MyWalSnd->mutex);
1122  SpinLockRelease(&MyWalSnd->mutex);
1123 
1124  replication_active = true;
1125 
1127 
1128  /* Main loop of walsender */
1130 
1131  FreeDecodingContext(logical_decoding_ctx);
1133 
1134  replication_active = false;
1135  if (got_STOPPING)
1136  proc_exit(0);
1138 
1139  /* Get out of COPY mode (CommandComplete). */
1140  EndCommand("COPY 0", DestRemote);
1141 }
1142 
1143 /*
1144  * LogicalDecodingContext 'prepare_write' callback.
1145  *
1146  * Prepare a write into a StringInfo.
1147  *
1148  * Don't do anything lasting in here, it's quite possible that nothing will be done
1149  * with the data.
1150  */
1151 static void
1153 {
1154  /* can't have sync rep confused by sending the same LSN several times */
1155  if (!last_write)
1156  lsn = InvalidXLogRecPtr;
1157 
1158  resetStringInfo(ctx->out);
1159 
1160  pq_sendbyte(ctx->out, 'w');
1161  pq_sendint64(ctx->out, lsn); /* dataStart */
1162  pq_sendint64(ctx->out, lsn); /* walEnd */
1163 
1164  /*
1165  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1166  * reserve space here.
1167  */
1168  pq_sendint64(ctx->out, 0); /* sendtime */
1169 }
1170 
1171 /*
1172  * LogicalDecodingContext 'write' callback.
1173  *
1174  * Actually write out data previously prepared by WalSndPrepareWrite out to
1175  * the network. Take as long as needed, but process replies from the other
1176  * side and check timeouts during that.
1177  */
1178 static void
1180  bool last_write)
1181 {
1182  TimestampTz now;
1183 
1184  /* output previously gathered data in a CopyData packet */
1185  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1186 
1187  /*
1188  * Fill the send timestamp last, so that it is taken as late as possible.
1189  * This is somewhat ugly, but the protocol is set as it's already used for
1190  * several releases by streaming physical replication.
1191  */
1192  resetStringInfo(&tmpbuf);
1193  now = GetCurrentTimestamp();
1194  pq_sendint64(&tmpbuf, now);
1195  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1196  tmpbuf.data, sizeof(int64));
1197 
1199 
1200  /* Try to flush pending output to the client */
1201  if (pq_flush_if_writable() != 0)
1202  WalSndShutdown();
1203 
1204  /* Try taking fast path unless we get too close to walsender timeout. */
1206  wal_sender_timeout / 2) &&
1207  !pq_is_send_pending())
1208  {
1209  return;
1210  }
1211 
1212  /* If we have pending write here, go to slow path */
1213  for (;;)
1214  {
1215  int wakeEvents;
1216  long sleeptime;
1217 
1218  /* Check for input from the client */
1220 
1221  /* die if timeout was reached */
1223 
1224  /* Send keepalive if the time has come */
1226 
1227  if (!pq_is_send_pending())
1228  break;
1229 
1231 
1232  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1234 
1235  /* Sleep until something happens or we time out */
1236  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
1237  MyProcPort->sock, sleeptime,
1239 
1240  /* Clear any already-pending wakeups */
1242 
1244 
1245  /* Process any requests or signals received recently */
1246  if (ConfigReloadPending)
1247  {
1248  ConfigReloadPending = false;
1251  }
1252 
1253  /* Try to flush pending output to the client */
1254  if (pq_flush_if_writable() != 0)
1255  WalSndShutdown();
1256  }
1257 
1258  /* reactivate latch so WalSndLoop knows to continue */
1259  SetLatch(MyLatch);
1260 }
1261 
1262 /*
1263  * LogicalDecodingContext 'update_progress' callback.
1264  *
1265  * Write the current position to the lag tracker (see XLogSendPhysical).
1266  */
1267 static void
1269 {
1270  static TimestampTz sendTime = 0;
1272 
1273  /*
1274  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1275  * avoid flooding the lag tracker when we commit frequently.
1276  */
1277 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1278  if (!TimestampDifferenceExceeds(sendTime, now,
1280  return;
1281 
1282  LagTrackerWrite(lsn, now);
1283  sendTime = now;
1284 }
1285 
1286 /*
1287  * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
1288  *
1289  * Returns end LSN of flushed WAL. Normally this will be >= loc, but
1290  * if we detect a shutdown request (either from postmaster or client)
1291  * we will return early, so caller must always check.
1292  */
1293 static XLogRecPtr
1295 {
1296  int wakeEvents;
1297  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1298 
1299 
1300  /*
1301  * Fast path to avoid acquiring the spinlock in case we already know we
1302  * have enough WAL available. This is particularly interesting if we're
1303  * far behind.
1304  */
1305  if (RecentFlushPtr != InvalidXLogRecPtr &&
1306  loc <= RecentFlushPtr)
1307  return RecentFlushPtr;
1308 
1309  /* Get a more recent flush pointer. */
1310  if (!RecoveryInProgress())
1311  RecentFlushPtr = GetFlushRecPtr();
1312  else
1313  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1314 
1315  for (;;)
1316  {
1317  long sleeptime;
1318 
1319  /* Clear any already-pending wakeups */
1321 
1323 
1324  /* Process any requests or signals received recently */
1325  if (ConfigReloadPending)
1326  {
1327  ConfigReloadPending = false;
1330  }
1331 
1332  /* Check for input from the client */
1334 
1335  /*
1336  * If we're shutting down, trigger pending WAL to be written out,
1337  * otherwise we'd possibly end up waiting for WAL that never gets
1338  * written, because walwriter has shut down already.
1339  */
1340  if (got_STOPPING)
1342 
1343  /* Update our idea of the currently flushed position. */
1344  if (!RecoveryInProgress())
1345  RecentFlushPtr = GetFlushRecPtr();
1346  else
1347  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1348 
1349  /*
1350  * If postmaster asked us to stop, don't wait anymore.
1351  *
1352  * It's important to do this check after the recomputation of
1353  * RecentFlushPtr, so we can send all remaining data before shutting
1354  * down.
1355  */
1356  if (got_STOPPING)
1357  break;
1358 
1359  /*
1360  * We only send regular messages to the client for full decoded
1361  * transactions, but a synchronous replication and walsender shutdown
1362  * possibly are waiting for a later location. So we send pings
1363  * containing the flush location every now and then.
1364  */
1365  if (MyWalSnd->flush < sentPtr &&
1366  MyWalSnd->write < sentPtr &&
1368  {
1369  WalSndKeepalive(false);
1371  }
1372 
1373  /* check whether we're done */
1374  if (loc <= RecentFlushPtr)
1375  break;
1376 
1377  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1378  WalSndCaughtUp = true;
1379 
1380  /*
1381  * Try to flush any pending output to the client.
1382  */
1383  if (pq_flush_if_writable() != 0)
1384  WalSndShutdown();
1385 
1386  /*
1387  * If we have received CopyDone from the client, sent CopyDone
1388  * ourselves, and the output buffer is empty, it's time to exit
1389  * streaming, so fail the current WAL fetch request.
1390  */
1392  !pq_is_send_pending())
1393  break;
1394 
1395  /* die if timeout was reached */
1397 
1398  /* Send keepalive if the time has come */
1400 
1401  /*
1402  * Sleep until something happens or we time out. Also wait for the
1403  * socket becoming writable, if there's still pending output.
1404  * Otherwise we might sit on sendable output data while waiting for
1405  * new WAL to be generated. (But if we have nothing to send, we don't
1406  * want to wake on socket-writable.)
1407  */
1409 
1410  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1412 
1413  if (pq_is_send_pending())
1414  wakeEvents |= WL_SOCKET_WRITEABLE;
1415 
1416  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
1417  MyProcPort->sock, sleeptime,
1419  }
1420 
1421  /* reactivate latch so WalSndLoop knows to continue */
1422  SetLatch(MyLatch);
1423  return RecentFlushPtr;
1424 }
1425 
1426 /*
1427  * Execute an incoming replication command.
1428  *
1429  * Returns true if the cmd_string was recognized as WalSender command, false
1430  * if not.
1431  */
1432 bool
1433 exec_replication_command(const char *cmd_string)
1434 {
1435  int parse_rc;
1436  Node *cmd_node;
1437  MemoryContext cmd_context;
1438  MemoryContext old_context;
1439 
1440  /*
1441  * If WAL sender has been told that shutdown is getting close, switch its
1442  * status accordingly to handle the next replication commands correctly.
1443  */
1444  if (got_STOPPING)
1446 
1447  /*
1448  * Throw error if in stopping mode. We need prevent commands that could
1449  * generate WAL while the shutdown checkpoint is being written. To be
1450  * safe, we just prohibit all new commands.
1451  */
1452  if (MyWalSnd->state == WALSNDSTATE_STOPPING)
1453  ereport(ERROR,
1454  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1455 
1456  /*
1457  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1458  * command arrives. Clean up the old stuff if there's anything.
1459  */
1461 
1463 
1465  "Replication command context",
1467  old_context = MemoryContextSwitchTo(cmd_context);
1468 
1469  replication_scanner_init(cmd_string);
1470  parse_rc = replication_yyparse();
1471  if (parse_rc != 0)
1472  ereport(ERROR,
1473  (errcode(ERRCODE_SYNTAX_ERROR),
1474  (errmsg_internal("replication command parser returned %d",
1475  parse_rc))));
1476 
1477  cmd_node = replication_parse_result;
1478 
1479  /*
1480  * Log replication command if log_replication_commands is enabled. Even
1481  * when it's disabled, log the command with DEBUG1 level for backward
1482  * compatibility. Note that SQL commands are not logged here, and will be
1483  * logged later if log_statement is enabled.
1484  */
1485  if (cmd_node->type != T_SQLCmd)
1487  (errmsg("received replication command: %s", cmd_string)));
1488 
1489  /*
1490  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
1491  * called outside of transaction the snapshot should be cleared here.
1492  */
1493  if (!IsTransactionBlock())
1495 
1496  /*
1497  * For aborted transactions, don't allow anything except pure SQL, the
1498  * exec_simple_query() will handle it correctly.
1499  */
1500  if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
1501  ereport(ERROR,
1502  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1503  errmsg("current transaction is aborted, "
1504  "commands ignored until end of transaction block")));
1505 
1507 
1508  /*
1509  * Allocate buffers that will be used for each outgoing and incoming
1510  * message. We do this just once per command to reduce palloc overhead.
1511  */
1512  initStringInfo(&output_message);
1513  initStringInfo(&reply_message);
1514  initStringInfo(&tmpbuf);
1515 
1516  /* Report to pgstat that this process is running */
1518 
1519  switch (cmd_node->type)
1520  {
1521  case T_IdentifySystemCmd:
1522  IdentifySystem();
1523  break;
1524 
1525  case T_BaseBackupCmd:
1526  PreventInTransactionBlock(true, "BASE_BACKUP");
1527  SendBaseBackup((BaseBackupCmd *) cmd_node);
1528  break;
1529 
1532  break;
1533 
1536  break;
1537 
1538  case T_StartReplicationCmd:
1539  {
1540  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1541 
1542  PreventInTransactionBlock(true, "START_REPLICATION");
1543 
1544  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1545  StartReplication(cmd);
1546  else
1548  break;
1549  }
1550 
1551  case T_TimeLineHistoryCmd:
1552  PreventInTransactionBlock(true, "TIMELINE_HISTORY");
1554  break;
1555 
1556  case T_VariableShowStmt:
1557  {
1559  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1560 
1561  /* syscache access needs a transaction environment */
1563  GetPGVariable(n->name, dest);
1565  }
1566  break;
1567 
1568  case T_SQLCmd:
1569  if (MyDatabaseId == InvalidOid)
1570  ereport(ERROR,
1571  (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1572 
1573  /* Report to pgstat that this process is now idle */
1575 
1576  /* Tell the caller that this wasn't a WalSender command. */
1577  return false;
1578 
1579  default:
1580  elog(ERROR, "unrecognized replication command node tag: %u",
1581  cmd_node->type);
1582  }
1583 
1584  /* done */
1585  MemoryContextSwitchTo(old_context);
1586  MemoryContextDelete(cmd_context);
1587 
1588  /* Send CommandComplete message */
1589  EndCommand("SELECT", DestRemote);
1590 
1591  /* Report to pgstat that this process is now idle */
1593 
1594  return true;
1595 }
1596 
1597 /*
1598  * Process any incoming messages while streaming. Also checks if the remote
1599  * end has closed the connection.
1600  */
1601 static void
1603 {
1604  unsigned char firstchar;
1605  int r;
1606  bool received = false;
1607 
1609 
1610  for (;;)
1611  {
1612  pq_startmsgread();
1613  r = pq_getbyte_if_available(&firstchar);
1614  if (r < 0)
1615  {
1616  /* unexpected error or EOF */
1618  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1619  errmsg("unexpected EOF on standby connection")));
1620  proc_exit(0);
1621  }
1622  if (r == 0)
1623  {
1624  /* no data available without blocking */
1625  pq_endmsgread();
1626  break;
1627  }
1628 
1629  /* Read the message contents */
1630  resetStringInfo(&reply_message);
1631  if (pq_getmessage(&reply_message, 0))
1632  {
1634  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1635  errmsg("unexpected EOF on standby connection")));
1636  proc_exit(0);
1637  }
1638 
1639  /*
1640  * If we already received a CopyDone from the frontend, the frontend
1641  * should not send us anything until we've closed our end of the COPY.
1642  * XXX: In theory, the frontend could already send the next command
1643  * before receiving the CopyDone, but libpq doesn't currently allow
1644  * that.
1645  */
1646  if (streamingDoneReceiving && firstchar != 'X')
1647  ereport(FATAL,
1648  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1649  errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
1650  firstchar)));
1651 
1652  /* Handle the very limited subset of commands expected in this phase */
1653  switch (firstchar)
1654  {
1655  /*
1656  * 'd' means a standby reply wrapped in a CopyData packet.
1657  */
1658  case 'd':
1660  received = true;
1661  break;
1662 
1663  /*
1664  * CopyDone means the standby requested to finish streaming.
1665  * Reply with CopyDone, if we had not sent that already.
1666  */
1667  case 'c':
1668  if (!streamingDoneSending)
1669  {
1670  pq_putmessage_noblock('c', NULL, 0);
1671  streamingDoneSending = true;
1672  }
1673 
1674  streamingDoneReceiving = true;
1675  received = true;
1676  break;
1677 
1678  /*
1679  * 'X' means that the standby is closing down the socket.
1680  */
1681  case 'X':
1682  proc_exit(0);
1683 
1684  default:
1685  ereport(FATAL,
1686  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1687  errmsg("invalid standby message type \"%c\"",
1688  firstchar)));
1689  }
1690  }
1691 
1692  /*
1693  * Save the last reply timestamp if we've received at least one reply.
1694  */
1695  if (received)
1696  {
1698  waiting_for_ping_response = false;
1699  }
1700 }
1701 
1702 /*
1703  * Process a status update message received from standby.
1704  */
1705 static void
1707 {
1708  char msgtype;
1709 
1710  /*
1711  * Check message type from the first byte.
1712  */
1713  msgtype = pq_getmsgbyte(&reply_message);
1714 
1715  switch (msgtype)
1716  {
1717  case 'r':
1719  break;
1720 
1721  case 'h':
1723  break;
1724 
1725  default:
1727  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1728  errmsg("unexpected message type \"%c\"", msgtype)));
1729  proc_exit(0);
1730  }
1731 }
1732 
1733 /*
1734  * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
1735  */
1736 static void
1738 {
1739  bool changed = false;
1741 
1742  Assert(lsn != InvalidXLogRecPtr);
1743  SpinLockAcquire(&slot->mutex);
1744  if (slot->data.restart_lsn != lsn)
1745  {
1746  changed = true;
1747  slot->data.restart_lsn = lsn;
1748  }
1749  SpinLockRelease(&slot->mutex);
1750 
1751  if (changed)
1752  {
1755  }
1756 
1757  /*
1758  * One could argue that the slot should be saved to disk now, but that'd
1759  * be energy wasted - the worst lost information can do here is give us
1760  * wrong information in a statistics view - we'll just potentially be more
1761  * conservative in removing files.
1762  */
1763 }
1764 
1765 /*
1766  * Regular reply from standby advising of WAL locations on standby server.
1767  */
1768 static void
1770 {
1771  XLogRecPtr writePtr,
1772  flushPtr,
1773  applyPtr;
1774  bool replyRequested;
1775  TimeOffset writeLag,
1776  flushLag,
1777  applyLag;
1778  bool clearLagTimes;
1779  TimestampTz now;
1780  TimestampTz replyTime;
1781 
1782  static bool fullyAppliedLastTime = false;
1783 
1784  /* the caller already consumed the msgtype byte */
1785  writePtr = pq_getmsgint64(&reply_message);
1786  flushPtr = pq_getmsgint64(&reply_message);
1787  applyPtr = pq_getmsgint64(&reply_message);
1788  replyTime = pq_getmsgint64(&reply_message);
1789  replyRequested = pq_getmsgbyte(&reply_message);
1790 
1791  if (log_min_messages <= DEBUG2)
1792  {
1793  char *replyTimeStr;
1794 
1795  /* Copy because timestamptz_to_str returns a static buffer */
1796  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
1797 
1798  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
1799  (uint32) (writePtr >> 32), (uint32) writePtr,
1800  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1801  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1802  replyRequested ? " (reply requested)" : "",
1803  replyTimeStr);
1804 
1805  pfree(replyTimeStr);
1806  }
1807 
1808  /* See if we can compute the round-trip lag for these positions. */
1809  now = GetCurrentTimestamp();
1810  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1811  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1812  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1813 
1814  /*
1815  * If the standby reports that it has fully replayed the WAL in two
1816  * consecutive reply messages, then the second such message must result
1817  * from wal_receiver_status_interval expiring on the standby. This is a
1818  * convenient time to forget the lag times measured when it last
1819  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1820  * until more WAL traffic arrives.
1821  */
1822  clearLagTimes = false;
1823  if (applyPtr == sentPtr)
1824  {
1825  if (fullyAppliedLastTime)
1826  clearLagTimes = true;
1827  fullyAppliedLastTime = true;
1828  }
1829  else
1830  fullyAppliedLastTime = false;
1831 
1832  /* Send a reply if the standby requested one. */
1833  if (replyRequested)
1834  WalSndKeepalive(false);
1835 
1836  /*
1837  * Update shared state for this WalSender process based on reply data from
1838  * standby.
1839  */
1840  {
1841  WalSnd *walsnd = MyWalSnd;
1842 
1843  SpinLockAcquire(&walsnd->mutex);
1844  walsnd->write = writePtr;
1845  walsnd->flush = flushPtr;
1846  walsnd->apply = applyPtr;
1847  if (writeLag != -1 || clearLagTimes)
1848  walsnd->writeLag = writeLag;
1849  if (flushLag != -1 || clearLagTimes)
1850  walsnd->flushLag = flushLag;
1851  if (applyLag != -1 || clearLagTimes)
1852  walsnd->applyLag = applyLag;
1853  walsnd->replyTime = replyTime;
1854  SpinLockRelease(&walsnd->mutex);
1855  }
1856 
1859 
1860  /*
1861  * Advance our local xmin horizon when the client confirmed a flush.
1862  */
1863  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1864  {
1867  else
1869  }
1870 }
1871 
1872 /* compute new replication slot xmin horizon if needed */
1873 static void
1875 {
1876  bool changed = false;
1878 
1879  SpinLockAcquire(&slot->mutex);
1881 
1882  /*
1883  * For physical replication we don't need the interlock provided by xmin
1884  * and effective_xmin since the consequences of a missed increase are
1885  * limited to query cancellations, so set both at once.
1886  */
1887  if (!TransactionIdIsNormal(slot->data.xmin) ||
1888  !TransactionIdIsNormal(feedbackXmin) ||
1889  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1890  {
1891  changed = true;
1892  slot->data.xmin = feedbackXmin;
1893  slot->effective_xmin = feedbackXmin;
1894  }
1895  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
1896  !TransactionIdIsNormal(feedbackCatalogXmin) ||
1897  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
1898  {
1899  changed = true;
1900  slot->data.catalog_xmin = feedbackCatalogXmin;
1901  slot->effective_catalog_xmin = feedbackCatalogXmin;
1902  }
1903  SpinLockRelease(&slot->mutex);
1904 
1905  if (changed)
1906  {
1909  }
1910 }
1911 
1912 /*
1913  * Check that the provided xmin/epoch are sane, that is, not in the future
1914  * and not so far back as to be already wrapped around.
1915  *
1916  * Epoch of nextXid should be same as standby, or if the counter has
1917  * wrapped, then one greater than standby.
1918  *
1919  * This check doesn't care about whether clog exists for these xids
1920  * at all.
1921  */
1922 static bool
1924 {
1925  FullTransactionId nextFullXid;
1926  TransactionId nextXid;
1927  uint32 nextEpoch;
1928 
1929  nextFullXid = ReadNextFullTransactionId();
1930  nextXid = XidFromFullTransactionId(nextFullXid);
1931  nextEpoch = EpochFromFullTransactionId(nextFullXid);
1932 
1933  if (xid <= nextXid)
1934  {
1935  if (epoch != nextEpoch)
1936  return false;
1937  }
1938  else
1939  {
1940  if (epoch + 1 != nextEpoch)
1941  return false;
1942  }
1943 
1944  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
1945  return false; /* epoch OK, but it's wrapped around */
1946 
1947  return true;
1948 }
1949 
1950 /*
1951  * Hot Standby feedback
1952  */
1953 static void
1955 {
1956  TransactionId feedbackXmin;
1957  uint32 feedbackEpoch;
1958  TransactionId feedbackCatalogXmin;
1959  uint32 feedbackCatalogEpoch;
1960  TimestampTz replyTime;
1961 
1962  /*
1963  * Decipher the reply message. The caller already consumed the msgtype
1964  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
1965  * of this message.
1966  */
1967  replyTime = pq_getmsgint64(&reply_message);
1968  feedbackXmin = pq_getmsgint(&reply_message, 4);
1969  feedbackEpoch = pq_getmsgint(&reply_message, 4);
1970  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
1971  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
1972 
1973  if (log_min_messages <= DEBUG2)
1974  {
1975  char *replyTimeStr;
1976 
1977  /* Copy because timestamptz_to_str returns a static buffer */
1978  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
1979 
1980  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
1981  feedbackXmin,
1982  feedbackEpoch,
1983  feedbackCatalogXmin,
1984  feedbackCatalogEpoch,
1985  replyTimeStr);
1986 
1987  pfree(replyTimeStr);
1988  }
1989 
1990  /*
1991  * Update shared state for this WalSender process based on reply data from
1992  * standby.
1993  */
1994  {
1995  WalSnd *walsnd = MyWalSnd;
1996 
1997  SpinLockAcquire(&walsnd->mutex);
1998  walsnd->replyTime = replyTime;
1999  SpinLockRelease(&walsnd->mutex);
2000  }
2001 
2002  /*
2003  * Unset WalSender's xmins if the feedback message values are invalid.
2004  * This happens when the downstream turned hot_standby_feedback off.
2005  */
2006  if (!TransactionIdIsNormal(feedbackXmin)
2007  && !TransactionIdIsNormal(feedbackCatalogXmin))
2008  {
2010  if (MyReplicationSlot != NULL)
2011  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2012  return;
2013  }
2014 
2015  /*
2016  * Check that the provided xmin/epoch are sane, that is, not in the future
2017  * and not so far back as to be already wrapped around. Ignore if not.
2018  */
2019  if (TransactionIdIsNormal(feedbackXmin) &&
2020  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2021  return;
2022 
2023  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2024  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2025  return;
2026 
2027  /*
2028  * Set the WalSender's xmin equal to the standby's requested xmin, so that
2029  * the xmin will be taken into account by GetOldestXmin. This will hold
2030  * back the removal of dead rows and thereby prevent the generation of
2031  * cleanup conflicts on the standby server.
2032  *
2033  * There is a small window for a race condition here: although we just
2034  * checked that feedbackXmin precedes nextXid, the nextXid could have
2035  * gotten advanced between our fetching it and applying the xmin below,
2036  * perhaps far enough to make feedbackXmin wrap around. In that case the
2037  * xmin we set here would be "in the future" and have no effect. No point
2038  * in worrying about this since it's too late to save the desired data
2039  * anyway. Assuming that the standby sends us an increasing sequence of
2040  * xmins, this could only happen during the first reply cycle, else our
2041  * own xmin would prevent nextXid from advancing so far.
2042  *
2043  * We don't bother taking the ProcArrayLock here. Setting the xmin field
2044  * is assumed atomic, and there's no real need to prevent a concurrent
2045  * GetOldestXmin. (If we're moving our xmin forward, this is obviously
2046  * safe, and if we're moving it backwards, well, the data is at risk
2047  * already since a VACUUM could have just finished calling GetOldestXmin.)
2048  *
2049  * If we're using a replication slot we reserve the xmin via that,
2050  * otherwise via the walsender's PGXACT entry. We can only track the
2051  * catalog xmin separately when using a slot, so we store the least of the
2052  * two provided when not using a slot.
2053  *
2054  * XXX: It might make sense to generalize the ephemeral slot concept and
2055  * always use the slot mechanism to handle the feedback xmin.
2056  */
2057  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2058  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2059  else
2060  {
2061  if (TransactionIdIsNormal(feedbackCatalogXmin)
2062  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2063  MyPgXact->xmin = feedbackCatalogXmin;
2064  else
2065  MyPgXact->xmin = feedbackXmin;
2066  }
2067 }
2068 
2069 /*
2070  * Compute how long send/receive loops should sleep.
2071  *
2072  * If wal_sender_timeout is enabled we want to wake up in time to send
2073  * keepalives and to abort the connection if wal_sender_timeout has been
2074  * reached.
2075  */
2076 static long
2078 {
2079  long sleeptime = 10000; /* 10 s */
2080 
2082  {
2083  TimestampTz wakeup_time;
2084  long sec_to_timeout;
2085  int microsec_to_timeout;
2086 
2087  /*
2088  * At the latest stop sleeping once wal_sender_timeout has been
2089  * reached.
2090  */
2093 
2094  /*
2095  * If no ping has been sent yet, wakeup when it's time to do so.
2096  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2097  * the timeout passed without a response.
2098  */
2101  wal_sender_timeout / 2);
2102 
2103  /* Compute relative time until wakeup. */
2104  TimestampDifference(now, wakeup_time,
2105  &sec_to_timeout, &microsec_to_timeout);
2106 
2107  sleeptime = sec_to_timeout * 1000 +
2108  microsec_to_timeout / 1000;
2109  }
2110 
2111  return sleeptime;
2112 }
2113 
2114 /*
2115  * Check whether there have been responses by the client within
2116  * wal_sender_timeout and shutdown if not. Using last_processing as the
2117  * reference point avoids counting server-side stalls against the client.
2118  * However, a long server-side stall can make WalSndKeepaliveIfNecessary()
2119  * postdate last_processing by more than wal_sender_timeout. If that happens,
2120  * the client must reply almost immediately to avoid a timeout. This rarely
2121  * affects the default configuration, under which clients spontaneously send a
2122  * message every standby_message_timeout = wal_sender_timeout/6 = 10s. We
2123  * could eliminate that problem by recognizing timeout expiration at
2124  * wal_sender_timeout/2 after the keepalive.
2125  */
2126 static void
2128 {
2129  TimestampTz timeout;
2130 
2131  /* don't bail out if we're doing something that doesn't require timeouts */
2132  if (last_reply_timestamp <= 0)
2133  return;
2134 
2137 
2138  if (wal_sender_timeout > 0 && last_processing >= timeout)
2139  {
2140  /*
2141  * Since typically expiration of replication timeout means
2142  * communication problem, we don't send the error message to the
2143  * standby.
2144  */
2146  (errmsg("terminating walsender process due to replication timeout")));
2147 
2148  WalSndShutdown();
2149  }
2150 }
2151 
2152 /* Main loop of walsender process that streams the WAL over Copy messages. */
2153 static void
2155 {
2156  /*
2157  * Initialize the last reply timestamp. That enables timeout processing
2158  * from hereon.
2159  */
2161  waiting_for_ping_response = false;
2162 
2163  /*
2164  * Loop until we reach the end of this timeline or the client requests to
2165  * stop streaming.
2166  */
2167  for (;;)
2168  {
2169  /* Clear any already-pending wakeups */
2171 
2173 
2174  /* Process any requests or signals received recently */
2175  if (ConfigReloadPending)
2176  {
2177  ConfigReloadPending = false;
2180  }
2181 
2182  /* Check for input from the client */
2184 
2185  /*
2186  * If we have received CopyDone from the client, sent CopyDone
2187  * ourselves, and the output buffer is empty, it's time to exit
2188  * streaming.
2189  */
2191  !pq_is_send_pending())
2192  break;
2193 
2194  /*
2195  * If we don't have any pending data in the output buffer, try to send
2196  * some more. If there is some, we don't bother to call send_data
2197  * again until we've flushed it ... but we'd better assume we are not
2198  * caught up.
2199  */
2200  if (!pq_is_send_pending())
2201  send_data();
2202  else
2203  WalSndCaughtUp = false;
2204 
2205  /* Try to flush pending output to the client */
2206  if (pq_flush_if_writable() != 0)
2207  WalSndShutdown();
2208 
2209  /* If nothing remains to be sent right now ... */
2211  {
2212  /*
2213  * If we're in catchup state, move to streaming. This is an
2214  * important state change for users to know about, since before
2215  * this point data loss might occur if the primary dies and we
2216  * need to failover to the standby. The state change is also
2217  * important for synchronous replication, since commits that
2218  * started to wait at that point might wait for some time.
2219  */
2220  if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
2221  {
2222  ereport(DEBUG1,
2223  (errmsg("\"%s\" has now caught up with upstream server",
2224  application_name)));
2226  }
2227 
2228  /*
2229  * When SIGUSR2 arrives, we send any outstanding logs up to the
2230  * shutdown checkpoint record (i.e., the latest record), wait for
2231  * them to be replicated to the standby, and exit. This may be a
2232  * normal termination at shutdown, or a promotion, the walsender
2233  * is not sure which.
2234  */
2235  if (got_SIGUSR2)
2236  WalSndDone(send_data);
2237  }
2238 
2239  /* Check for replication timeout. */
2241 
2242  /* Send keepalive if the time has come */
2244 
2245  /*
2246  * We don't block if not caught up, unless there is unsent data
2247  * pending in which case we'd better block until the socket is
2248  * write-ready. This test is only needed for the case where the
2249  * send_data callback handled a subset of the available data but then
2250  * pq_flush_if_writable flushed it all --- we should immediately try
2251  * to send more.
2252  */
2254  {
2255  long sleeptime;
2256  int wakeEvents;
2257 
2258  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT |
2260 
2261  /*
2262  * Use fresh timestamp, not last_processing, to reduce the chance
2263  * of reaching wal_sender_timeout before sending a keepalive.
2264  */
2266 
2267  if (pq_is_send_pending())
2268  wakeEvents |= WL_SOCKET_WRITEABLE;
2269 
2270  /* Sleep until something happens or we time out */
2271  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
2272  MyProcPort->sock, sleeptime,
2274  }
2275  }
2276  return;
2277 }
2278 
2279 /* Initialize a per-walsender data structure for this walsender process */
2280 static void
2282 {
2283  int i;
2284 
2285  /*
2286  * WalSndCtl should be set up already (we inherit this by fork() or
2287  * EXEC_BACKEND mechanism from the postmaster).
2288  */
2289  Assert(WalSndCtl != NULL);
2290  Assert(MyWalSnd == NULL);
2291 
2292  /*
2293  * Find a free walsender slot and reserve it. This must not fail due to
2294  * the prior check for free WAL senders in InitProcess().
2295  */
2296  for (i = 0; i < max_wal_senders; i++)
2297  {
2298  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2299 
2300  SpinLockAcquire(&walsnd->mutex);
2301 
2302  if (walsnd->pid != 0)
2303  {
2304  SpinLockRelease(&walsnd->mutex);
2305  continue;
2306  }
2307  else
2308  {
2309  /*
2310  * Found a free slot. Reserve it for us.
2311  */
2312  walsnd->pid = MyProcPid;
2313  walsnd->sentPtr = InvalidXLogRecPtr;
2314  walsnd->write = InvalidXLogRecPtr;
2315  walsnd->flush = InvalidXLogRecPtr;
2316  walsnd->apply = InvalidXLogRecPtr;
2317  walsnd->writeLag = -1;
2318  walsnd->flushLag = -1;
2319  walsnd->applyLag = -1;
2320  walsnd->state = WALSNDSTATE_STARTUP;
2321  walsnd->latch = &MyProc->procLatch;
2322  walsnd->replyTime = 0;
2323  SpinLockRelease(&walsnd->mutex);
2324  /* don't need the lock anymore */
2325  MyWalSnd = (WalSnd *) walsnd;
2326 
2327  break;
2328  }
2329  }
2330 
2331  Assert(MyWalSnd != NULL);
2332 
2333  /* Arrange to clean up at walsender exit */
2335 }
2336 
2337 /* Destroy the per-walsender data structure for this walsender process */
2338 static void
2340 {
2341  WalSnd *walsnd = MyWalSnd;
2342 
2343  Assert(walsnd != NULL);
2344 
2345  MyWalSnd = NULL;
2346 
2347  SpinLockAcquire(&walsnd->mutex);
2348  /* clear latch while holding the spinlock, so it can safely be read */
2349  walsnd->latch = NULL;
2350  /* Mark WalSnd struct as no longer being in use. */
2351  walsnd->pid = 0;
2352  SpinLockRelease(&walsnd->mutex);
2353 }
2354 
2355 /*
2356  * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
2357  *
2358  * XXX probably this should be improved to suck data directly from the
2359  * WAL buffers when possible.
2360  *
2361  * Will open, and keep open, one WAL segment stored in the global file
2362  * descriptor sendFile. This means if XLogRead is used once, there will
2363  * always be one descriptor left open until the process ends, but never
2364  * more than one.
2365  */
2366 static void
2368 {
2369  char *p;
2370  XLogRecPtr recptr;
2371  Size nbytes;
2372  XLogSegNo segno;
2373 
2374 retry:
2375  p = buf;
2376  recptr = startptr;
2377  nbytes = count;
2378 
2379  while (nbytes > 0)
2380  {
2381  uint32 startoff;
2382  int segbytes;
2383  int readbytes;
2384 
2385  startoff = XLogSegmentOffset(recptr, wal_segment_size);
2386 
2387  if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, wal_segment_size))
2388  {
2389  char path[MAXPGPATH];
2390 
2391  /* Switch to another logfile segment */
2392  if (sendFile >= 0)
2393  close(sendFile);
2394 
2396 
2397  /*-------
2398  * When reading from a historic timeline, and there is a timeline
2399  * switch within this segment, read from the WAL segment belonging
2400  * to the new timeline.
2401  *
2402  * For example, imagine that this server is currently on timeline
2403  * 5, and we're streaming timeline 4. The switch from timeline 4
2404  * to 5 happened at 0/13002088. In pg_wal, we have these files:
2405  *
2406  * ...
2407  * 000000040000000000000012
2408  * 000000040000000000000013
2409  * 000000050000000000000013
2410  * 000000050000000000000014
2411  * ...
2412  *
2413  * In this situation, when requested to send the WAL from
2414  * segment 0x13, on timeline 4, we read the WAL from file
2415  * 000000050000000000000013. Archive recovery prefers files from
2416  * newer timelines, so if the segment was restored from the
2417  * archive on this server, the file belonging to the old timeline,
2418  * 000000040000000000000013, might not exist. Their contents are
2419  * equal up to the switchpoint, because at a timeline switch, the
2420  * used portion of the old segment is copied to the new file.
2421  *-------
2422  */
2425  {
2426  XLogSegNo endSegNo;
2427 
2429  if (sendSegNo == endSegNo)
2431  }
2432 
2434 
2435  sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2436  if (sendFile < 0)
2437  {
2438  /*
2439  * If the file is not found, assume it's because the standby
2440  * asked for a too old WAL segment that has already been
2441  * removed or recycled.
2442  */
2443  if (errno == ENOENT)
2444  ereport(ERROR,
2446  errmsg("requested WAL segment %s has already been removed",
2448  else
2449  ereport(ERROR,
2451  errmsg("could not open file \"%s\": %m",
2452  path)));
2453  }
2454  sendOff = 0;
2455  }
2456 
2457  /* Need to seek in the file? */
2458  if (sendOff != startoff)
2459  {
2460  if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
2461  ereport(ERROR,
2463  errmsg("could not seek in log segment %s to offset %u: %m",
2465  startoff)));
2466  sendOff = startoff;
2467  }
2468 
2469  /* How many bytes are within this segment? */
2470  if (nbytes > (wal_segment_size - startoff))
2471  segbytes = wal_segment_size - startoff;
2472  else
2473  segbytes = nbytes;
2474 
2476  readbytes = read(sendFile, p, segbytes);
2478  if (readbytes < 0)
2479  {
2480  ereport(ERROR,
2482  errmsg("could not read from log segment %s, offset %u, length %zu: %m",
2484  sendOff, (Size) segbytes)));
2485  }
2486  else if (readbytes == 0)
2487  {
2488  ereport(ERROR,
2490  errmsg("could not read from log segment %s, offset %u: read %d of %zu",
2492  sendOff, readbytes, (Size) segbytes)));
2493  }
2494 
2495  /* Update state for read */
2496  recptr += readbytes;
2497 
2498  sendOff += readbytes;
2499  nbytes -= readbytes;
2500  p += readbytes;
2501  }
2502 
2503  /*
2504  * After reading into the buffer, check that what we read was valid. We do
2505  * this after reading, because even though the segment was present when we
2506  * opened it, it might get recycled or removed while we read it. The
2507  * read() succeeds in that case, but the data we tried to read might
2508  * already have been overwritten with new WAL records.
2509  */
2510  XLByteToSeg(startptr, segno, wal_segment_size);
2512 
2513  /*
2514  * During recovery, the currently-open WAL file might be replaced with the
2515  * file of the same name retrieved from archive. So we always need to
2516  * check what we read was valid after reading into the buffer. If it's
2517  * invalid, we try to open and read the file again.
2518  */
2520  {
2521  WalSnd *walsnd = MyWalSnd;
2522  bool reload;
2523 
2524  SpinLockAcquire(&walsnd->mutex);
2525  reload = walsnd->needreload;
2526  walsnd->needreload = false;
2527  SpinLockRelease(&walsnd->mutex);
2528 
2529  if (reload && sendFile >= 0)
2530  {
2531  close(sendFile);
2532  sendFile = -1;
2533 
2534  goto retry;
2535  }
2536  }
2537 }
2538 
2539 /*
2540  * Send out the WAL in its normal physical/stored form.
2541  *
2542  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
2543  * but not yet sent to the client, and buffer it in the libpq output
2544  * buffer.
2545  *
2546  * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
2547  * otherwise WalSndCaughtUp is set to false.
2548  */
2549 static void
2551 {
2552  XLogRecPtr SendRqstPtr;
2554  XLogRecPtr endptr;
2555  Size nbytes;
2556 
2557  /* If requested switch the WAL sender to the stopping state. */
2558  if (got_STOPPING)
2560 
2562  {
2563  WalSndCaughtUp = true;
2564  return;
2565  }
2566 
2567  /* Figure out how far we can safely send the WAL. */
2569  {
2570  /*
2571  * Streaming an old timeline that's in this server's history, but is
2572  * not the one we're currently inserting or replaying. It can be
2573  * streamed up to the point where we switched off that timeline.
2574  */
2575  SendRqstPtr = sendTimeLineValidUpto;
2576  }
2577  else if (am_cascading_walsender)
2578  {
2579  /*
2580  * Streaming the latest timeline on a standby.
2581  *
2582  * Attempt to send all WAL that has already been replayed, so that we
2583  * know it's valid. If we're receiving WAL through streaming
2584  * replication, it's also OK to send any WAL that has been received
2585  * but not replayed.
2586  *
2587  * The timeline we're recovering from can change, or we can be
2588  * promoted. In either case, the current timeline becomes historic. We
2589  * need to detect that so that we don't try to stream past the point
2590  * where we switched to another timeline. We check for promotion or
2591  * timeline switch after calculating FlushPtr, to avoid a race
2592  * condition: if the timeline becomes historic just after we checked
2593  * that it was still current, it's still be OK to stream it up to the
2594  * FlushPtr that was calculated before it became historic.
2595  */
2596  bool becameHistoric = false;
2597 
2598  SendRqstPtr = GetStandbyFlushRecPtr();
2599 
2600  if (!RecoveryInProgress())
2601  {
2602  /*
2603  * We have been promoted. RecoveryInProgress() updated
2604  * ThisTimeLineID to the new current timeline.
2605  */
2606  am_cascading_walsender = false;
2607  becameHistoric = true;
2608  }
2609  else
2610  {
2611  /*
2612  * Still a cascading standby. But is the timeline we're sending
2613  * still the one recovery is recovering from? ThisTimeLineID was
2614  * updated by the GetStandbyFlushRecPtr() call above.
2615  */
2617  becameHistoric = true;
2618  }
2619 
2620  if (becameHistoric)
2621  {
2622  /*
2623  * The timeline we were sending has become historic. Read the
2624  * timeline history file of the new timeline to see where exactly
2625  * we forked off from the timeline we were sending.
2626  */
2627  List *history;
2628 
2631 
2633  list_free_deep(history);
2634 
2635  sendTimeLineIsHistoric = true;
2636 
2637  SendRqstPtr = sendTimeLineValidUpto;
2638  }
2639  }
2640  else
2641  {
2642  /*
2643  * Streaming the current timeline on a master.
2644  *
2645  * Attempt to send all data that's already been written out and
2646  * fsync'd to disk. We cannot go further than what's been written out
2647  * given the current implementation of XLogRead(). And in any case
2648  * it's unsafe to send WAL that is not securely down to disk on the
2649  * master: if the master subsequently crashes and restarts, standbys
2650  * must not have applied any WAL that got lost on the master.
2651  */
2652  SendRqstPtr = GetFlushRecPtr();
2653  }
2654 
2655  /*
2656  * Record the current system time as an approximation of the time at which
2657  * this WAL location was written for the purposes of lag tracking.
2658  *
2659  * In theory we could make XLogFlush() record a time in shmem whenever WAL
2660  * is flushed and we could get that time as well as the LSN when we call
2661  * GetFlushRecPtr() above (and likewise for the cascading standby
2662  * equivalent), but rather than putting any new code into the hot WAL path
2663  * it seems good enough to capture the time here. We should reach this
2664  * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2665  * may take some time, we read the WAL flush pointer and take the time
2666  * very close to together here so that we'll get a later position if it is
2667  * still moving.
2668  *
2669  * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
2670  * this gives us a cheap approximation for the WAL flush time for this
2671  * LSN.
2672  *
2673  * Note that the LSN is not necessarily the LSN for the data contained in
2674  * the present message; it's the end of the WAL, which might be further
2675  * ahead. All the lag tracking machinery cares about is finding out when
2676  * that arbitrary LSN is eventually reported as written, flushed and
2677  * applied, so that it can measure the elapsed time.
2678  */
2679  LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2680 
2681  /*
2682  * If this is a historic timeline and we've reached the point where we
2683  * forked to the next timeline, stop streaming.
2684  *
2685  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2686  * startup process will normally replay all WAL that has been received
2687  * from the master, before promoting, but if the WAL streaming is
2688  * terminated at a WAL page boundary, the valid portion of the timeline
2689  * might end in the middle of a WAL record. We might've already sent the
2690  * first half of that partial WAL record to the cascading standby, so that
2691  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2692  * replay the partial WAL record either, so it can still follow our
2693  * timeline switch.
2694  */
2696  {
2697  /* close the current file. */
2698  if (sendFile >= 0)
2699  close(sendFile);
2700  sendFile = -1;
2701 
2702  /* Send CopyDone */
2703  pq_putmessage_noblock('c', NULL, 0);
2704  streamingDoneSending = true;
2705 
2706  WalSndCaughtUp = true;
2707 
2708  elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2710  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2711  return;
2712  }
2713 
2714  /* Do we have any work to do? */
2715  Assert(sentPtr <= SendRqstPtr);
2716  if (SendRqstPtr <= sentPtr)
2717  {
2718  WalSndCaughtUp = true;
2719  return;
2720  }
2721 
2722  /*
2723  * Figure out how much to send in one message. If there's no more than
2724  * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2725  * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2726  *
2727  * The rounding is not only for performance reasons. Walreceiver relies on
2728  * the fact that we never split a WAL record across two messages. Since a
2729  * long WAL record is split at page boundary into continuation records,
2730  * page boundary is always a safe cut-off point. We also assume that
2731  * SendRqstPtr never points to the middle of a WAL record.
2732  */
2733  startptr = sentPtr;
2734  endptr = startptr;
2735  endptr += MAX_SEND_SIZE;
2736 
2737  /* if we went beyond SendRqstPtr, back off */
2738  if (SendRqstPtr <= endptr)
2739  {
2740  endptr = SendRqstPtr;
2742  WalSndCaughtUp = false;
2743  else
2744  WalSndCaughtUp = true;
2745  }
2746  else
2747  {
2748  /* round down to page boundary. */
2749  endptr -= (endptr % XLOG_BLCKSZ);
2750  WalSndCaughtUp = false;
2751  }
2752 
2753  nbytes = endptr - startptr;
2754  Assert(nbytes <= MAX_SEND_SIZE);
2755 
2756  /*
2757  * OK to read and send the slice.
2758  */
2759  resetStringInfo(&output_message);
2760  pq_sendbyte(&output_message, 'w');
2761 
2762  pq_sendint64(&output_message, startptr); /* dataStart */
2763  pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2764  pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2765 
2766  /*
2767  * Read the log directly into the output buffer to avoid extra memcpy
2768  * calls.
2769  */
2770  enlargeStringInfo(&output_message, nbytes);
2771  XLogRead(&output_message.data[output_message.len], startptr, nbytes);
2772  output_message.len += nbytes;
2773  output_message.data[output_message.len] = '\0';
2774 
2775  /*
2776  * Fill the send timestamp last, so that it is taken as late as possible.
2777  */
2778  resetStringInfo(&tmpbuf);
2779  pq_sendint64(&tmpbuf, GetCurrentTimestamp());
2780  memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2781  tmpbuf.data, sizeof(int64));
2782 
2783  pq_putmessage_noblock('d', output_message.data, output_message.len);
2784 
2785  sentPtr = endptr;
2786 
2787  /* Update shared memory status */
2788  {
2789  WalSnd *walsnd = MyWalSnd;
2790 
2791  SpinLockAcquire(&walsnd->mutex);
2792  walsnd->sentPtr = sentPtr;
2793  SpinLockRelease(&walsnd->mutex);
2794  }
2795 
2796  /* Report progress of XLOG streaming in PS display */
2798  {
2799  char activitymsg[50];
2800 
2801  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
2802  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2803  set_ps_display(activitymsg, false);
2804  }
2805 
2806  return;
2807 }
2808 
2809 /*
2810  * Stream out logically decoded data.
2811  */
2812 static void
2814 {
2815  XLogRecord *record;
2816  char *errm;
2817 
2818  /*
2819  * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
2820  * true in WalSndWaitForWal, if we're actually waiting. We also set to
2821  * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
2822  * didn't wait - i.e. when we're shutting down.
2823  */
2824  WalSndCaughtUp = false;
2825 
2826  record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
2828 
2829  /* xlog record was invalid */
2830  if (errm != NULL)
2831  elog(ERROR, "%s", errm);
2832 
2833  if (record != NULL)
2834  {
2835  /* XXX: Note that logical decoding cannot be used while in recovery */
2836  XLogRecPtr flushPtr = GetFlushRecPtr();
2837 
2838  /*
2839  * Note the lack of any call to LagTrackerWrite() which is handled by
2840  * WalSndUpdateProgress which is called by output plugin through
2841  * logical decoding write api.
2842  */
2843  LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
2844 
2845  sentPtr = logical_decoding_ctx->reader->EndRecPtr;
2846 
2847  /*
2848  * If we have sent a record that is at or beyond the flushed point, we
2849  * have caught up.
2850  */
2851  if (sentPtr >= flushPtr)
2852  WalSndCaughtUp = true;
2853  }
2854  else
2855  {
2856  /*
2857  * If the record we just wanted read is at or beyond the flushed
2858  * point, then we're caught up.
2859  */
2860  if (logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr())
2861  {
2862  WalSndCaughtUp = true;
2863 
2864  /*
2865  * Have WalSndLoop() terminate the connection in an orderly
2866  * manner, after writing out all the pending data.
2867  */
2868  if (got_STOPPING)
2869  got_SIGUSR2 = true;
2870  }
2871  }
2872 
2873  /* Update shared memory status */
2874  {
2875  WalSnd *walsnd = MyWalSnd;
2876 
2877  SpinLockAcquire(&walsnd->mutex);
2878  walsnd->sentPtr = sentPtr;
2879  SpinLockRelease(&walsnd->mutex);
2880  }
2881 }
2882 
2883 /*
2884  * Shutdown if the sender is caught up.
2885  *
2886  * NB: This should only be called when the shutdown signal has been received
2887  * from postmaster.
2888  *
2889  * Note that if we determine that there's still more data to send, this
2890  * function will return control to the caller.
2891  */
2892 static void
2894 {
2895  XLogRecPtr replicatedPtr;
2896 
2897  /* ... let's just be real sure we're caught up ... */
2898  send_data();
2899 
2900  /*
2901  * To figure out whether all WAL has successfully been replicated, check
2902  * flush location if valid, write otherwise. Tools like pg_receivewal will
2903  * usually (unless in synchronous mode) return an invalid flush location.
2904  */
2905  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2906  MyWalSnd->write : MyWalSnd->flush;
2907 
2908  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2909  !pq_is_send_pending())
2910  {
2911  /* Inform the standby that XLOG streaming is done */
2912  EndCommand("COPY 0", DestRemote);
2913  pq_flush();
2914 
2915  proc_exit(0);
2916  }
2918  {
2919  WalSndKeepalive(true);
2921  }
2922 }
2923 
2924 /*
2925  * Returns the latest point in WAL that has been safely flushed to disk, and
2926  * can be sent to the standby. This should only be called when in recovery,
2927  * ie. we're streaming to a cascaded standby.
2928  *
2929  * As a side-effect, ThisTimeLineID is updated to the TLI of the last
2930  * replayed WAL record.
2931  */
2932 static XLogRecPtr
2934 {
2935  XLogRecPtr replayPtr;
2936  TimeLineID replayTLI;
2937  XLogRecPtr receivePtr;
2939  XLogRecPtr result;
2940 
2941  /*
2942  * We can safely send what's already been replayed. Also, if walreceiver
2943  * is streaming WAL from the same timeline, we can send anything that it
2944  * has streamed, but hasn't been replayed yet.
2945  */
2946 
2947  receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
2948  replayPtr = GetXLogReplayRecPtr(&replayTLI);
2949 
2950  ThisTimeLineID = replayTLI;
2951 
2952  result = replayPtr;
2953  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2954  result = receivePtr;
2955 
2956  return result;
2957 }
2958 
2959 /*
2960  * Request walsenders to reload the currently-open WAL file
2961  */
2962 void
2964 {
2965  int i;
2966 
2967  for (i = 0; i < max_wal_senders; i++)
2968  {
2969  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2970 
2971  SpinLockAcquire(&walsnd->mutex);
2972  if (walsnd->pid == 0)
2973  {
2974  SpinLockRelease(&walsnd->mutex);
2975  continue;
2976  }
2977  walsnd->needreload = true;
2978  SpinLockRelease(&walsnd->mutex);
2979  }
2980 }
2981 
2982 /*
2983  * Handle PROCSIG_WALSND_INIT_STOPPING signal.
2984  */
2985 void
2987 {
2989 
2990  /*
2991  * If replication has not yet started, die like with SIGTERM. If
2992  * replication is active, only set a flag and wake up the main loop. It
2993  * will send any outstanding WAL, wait for it to be replicated to the
2994  * standby, and then exit gracefully.
2995  */
2996  if (!replication_active)
2997  kill(MyProcPid, SIGTERM);
2998  else
2999  got_STOPPING = true;
3000 }
3001 
3002 /*
3003  * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
3004  * sender should already have been switched to WALSNDSTATE_STOPPING at
3005  * this point.
3006  */
3007 static void
3009 {
3010  int save_errno = errno;
3011 
3012  got_SIGUSR2 = true;
3013  SetLatch(MyLatch);
3014 
3015  errno = save_errno;
3016 }
3017 
3018 /* Set up signal handlers */
3019 void
3021 {
3022  /* Set up signal handlers */
3023  pqsignal(SIGHUP, PostgresSigHupHandler); /* set flag to read config
3024  * file */
3025  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3026  pqsignal(SIGTERM, die); /* request shutdown */
3027  pqsignal(SIGQUIT, quickdie); /* hard crash time */
3028  InitializeTimeouts(); /* establishes SIGALRM handler */
3031  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3032  * shutdown */
3033 
3034  /* Reset some signals that are accepted by postmaster but not here */
3036 }
3037 
3038 /* Report shared-memory space needed by WalSndShmemInit */
3039 Size
3041 {
3042  Size size = 0;
3043 
3044  size = offsetof(WalSndCtlData, walsnds);
3045  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3046 
3047  return size;
3048 }
3049 
3050 /* Allocate and initialize walsender-related shared memory */
3051 void
3053 {
3054  bool found;
3055  int i;
3056 
3057  WalSndCtl = (WalSndCtlData *)
3058  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3059 
3060  if (!found)
3061  {
3062  /* First time through, so initialize */
3063  MemSet(WalSndCtl, 0, WalSndShmemSize());
3064 
3065  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3066  SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
3067 
3068  for (i = 0; i < max_wal_senders; i++)
3069  {
3070  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3071 
3072  SpinLockInit(&walsnd->mutex);
3073  }
3074  }
3075 }
3076 
3077 /*
3078  * Wake up all walsenders
3079  *
3080  * This will be called inside critical sections, so throwing an error is not
3081  * advisable.
3082  */
3083 void
3085 {
3086  int i;
3087 
3088  for (i = 0; i < max_wal_senders; i++)
3089  {
3090  Latch *latch;
3091  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3092 
3093  /*
3094  * Get latch pointer with spinlock held, for the unlikely case that
3095  * pointer reads aren't atomic (as they're 8 bytes).
3096  */
3097  SpinLockAcquire(&walsnd->mutex);
3098  latch = walsnd->latch;
3099  SpinLockRelease(&walsnd->mutex);
3100 
3101  if (latch != NULL)
3102  SetLatch(latch);
3103  }
3104 }
3105 
3106 /*
3107  * Signal all walsenders to move to stopping state.
3108  *
3109  * This will trigger walsenders to move to a state where no further WAL can be
3110  * generated. See this file's header for details.
3111  */
3112 void
3114 {
3115  int i;
3116 
3117  for (i = 0; i < max_wal_senders; i++)
3118  {
3119  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3120  pid_t pid;
3121 
3122  SpinLockAcquire(&walsnd->mutex);
3123  pid = walsnd->pid;
3124  SpinLockRelease(&walsnd->mutex);
3125 
3126  if (pid == 0)
3127  continue;
3128 
3130  }
3131 }
3132 
3133 /*
3134  * Wait that all the WAL senders have quit or reached the stopping state. This
3135  * is used by the checkpointer to control when the shutdown checkpoint can
3136  * safely be performed.
3137  */
3138 void
3140 {
3141  for (;;)
3142  {
3143  int i;
3144  bool all_stopped = true;
3145 
3146  for (i = 0; i < max_wal_senders; i++)
3147  {
3148  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3149 
3150  SpinLockAcquire(&walsnd->mutex);
3151 
3152  if (walsnd->pid == 0)
3153  {
3154  SpinLockRelease(&walsnd->mutex);
3155  continue;
3156  }
3157 
3158  if (walsnd->state != WALSNDSTATE_STOPPING)
3159  {
3160  all_stopped = false;
3161  SpinLockRelease(&walsnd->mutex);
3162  break;
3163  }
3164  SpinLockRelease(&walsnd->mutex);
3165  }
3166 
3167  /* safe to leave if confirmation is done for all WAL senders */
3168  if (all_stopped)
3169  return;
3170 
3171  pg_usleep(10000L); /* wait for 10 msec */
3172  }
3173 }
3174 
3175 /* Set state for current walsender (only called in walsender) */
3176 void
3178 {
3179  WalSnd *walsnd = MyWalSnd;
3180 
3182 
3183  if (walsnd->state == state)
3184  return;
3185 
3186  SpinLockAcquire(&walsnd->mutex);
3187  walsnd->state = state;
3188  SpinLockRelease(&walsnd->mutex);
3189 }
3190 
3191 /*
3192  * Return a string constant representing the state. This is used
3193  * in system views, and should *not* be translated.
3194  */
3195 static const char *
3197 {
3198  switch (state)
3199  {
3200  case WALSNDSTATE_STARTUP:
3201  return "startup";
3202  case WALSNDSTATE_BACKUP:
3203  return "backup";
3204  case WALSNDSTATE_CATCHUP:
3205  return "catchup";
3206  case WALSNDSTATE_STREAMING:
3207  return "streaming";
3208  case WALSNDSTATE_STOPPING:
3209  return "stopping";
3210  }
3211  return "UNKNOWN";
3212 }
3213 
3214 static Interval *
3216 {
3217  Interval *result = palloc(sizeof(Interval));
3218 
3219  result->month = 0;
3220  result->day = 0;
3221  result->time = offset;
3222 
3223  return result;
3224 }
3225 
3226 /*
3227  * Returns activity of walsenders, including pids and xlog locations sent to
3228  * standby servers.
3229  */
3230 Datum
3232 {
3233 #define PG_STAT_GET_WAL_SENDERS_COLS 12
3234  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3235  TupleDesc tupdesc;
3236  Tuplestorestate *tupstore;
3237  MemoryContext per_query_ctx;
3238  MemoryContext oldcontext;
3239  List *sync_standbys;
3240  int i;
3241 
3242  /* check to see if caller supports us returning a tuplestore */
3243  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3244  ereport(ERROR,
3245  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3246  errmsg("set-valued function called in context that cannot accept a set")));
3247  if (!(rsinfo->allowedModes & SFRM_Materialize))
3248  ereport(ERROR,
3249  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3250  errmsg("materialize mode required, but it is not " \
3251  "allowed in this context")));
3252 
3253  /* Build a tuple descriptor for our result type */
3254  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3255  elog(ERROR, "return type must be a row type");
3256 
3257  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3258  oldcontext = MemoryContextSwitchTo(per_query_ctx);
3259 
3260  tupstore = tuplestore_begin_heap(true, false, work_mem);
3261  rsinfo->returnMode = SFRM_Materialize;
3262  rsinfo->setResult = tupstore;
3263  rsinfo->setDesc = tupdesc;
3264 
3265  MemoryContextSwitchTo(oldcontext);
3266 
3267  /*
3268  * Get the currently active synchronous standbys.
3269  */
3270  LWLockAcquire(SyncRepLock, LW_SHARED);
3271  sync_standbys = SyncRepGetSyncStandbys(NULL);
3272  LWLockRelease(SyncRepLock);
3273 
3274  for (i = 0; i < max_wal_senders; i++)
3275  {
3276  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3278  XLogRecPtr write;
3279  XLogRecPtr flush;
3280  XLogRecPtr apply;
3281  TimeOffset writeLag;
3282  TimeOffset flushLag;
3283  TimeOffset applyLag;
3284  int priority;
3285  int pid;
3287  TimestampTz replyTime;
3289  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3290 
3291  SpinLockAcquire(&walsnd->mutex);
3292  if (walsnd->pid == 0)
3293  {
3294  SpinLockRelease(&walsnd->mutex);
3295  continue;
3296  }
3297  pid = walsnd->pid;
3298  sentPtr = walsnd->sentPtr;
3299  state = walsnd->state;
3300  write = walsnd->write;
3301  flush = walsnd->flush;
3302  apply = walsnd->apply;
3303  writeLag = walsnd->writeLag;
3304  flushLag = walsnd->flushLag;
3305  applyLag = walsnd->applyLag;
3306  priority = walsnd->sync_standby_priority;
3307  replyTime = walsnd->replyTime;
3308  SpinLockRelease(&walsnd->mutex);
3309 
3310  memset(nulls, 0, sizeof(nulls));
3311  values[0] = Int32GetDatum(pid);
3312 
3313  if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
3314  {
3315  /*
3316  * Only superusers and members of pg_read_all_stats can see
3317  * details. Other users only get the pid value to know it's a
3318  * walsender, but no details.
3319  */
3320  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3321  }
3322  else
3323  {
3324  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3325 
3326  if (XLogRecPtrIsInvalid(sentPtr))
3327  nulls[2] = true;
3328  values[2] = LSNGetDatum(sentPtr);
3329 
3330  if (XLogRecPtrIsInvalid(write))
3331  nulls[3] = true;
3332  values[3] = LSNGetDatum(write);
3333 
3334  if (XLogRecPtrIsInvalid(flush))
3335  nulls[4] = true;
3336  values[4] = LSNGetDatum(flush);
3337 
3338  if (XLogRecPtrIsInvalid(apply))
3339  nulls[5] = true;
3340  values[5] = LSNGetDatum(apply);
3341 
3342  /*
3343  * Treat a standby such as a pg_basebackup background process
3344  * which always returns an invalid flush location, as an
3345  * asynchronous standby.
3346  */
3347  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3348 
3349  if (writeLag < 0)
3350  nulls[6] = true;
3351  else
3352  values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3353 
3354  if (flushLag < 0)
3355  nulls[7] = true;
3356  else
3357  values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3358 
3359  if (applyLag < 0)
3360  nulls[8] = true;
3361  else
3362  values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3363 
3364  values[9] = Int32GetDatum(priority);
3365 
3366  /*
3367  * More easily understood version of standby state. This is purely
3368  * informational.
3369  *
3370  * In quorum-based sync replication, the role of each standby
3371  * listed in synchronous_standby_names can be changing very
3372  * frequently. Any standbys considered as "sync" at one moment can
3373  * be switched to "potential" ones at the next moment. So, it's
3374  * basically useless to report "sync" or "potential" as their sync
3375  * states. We report just "quorum" for them.
3376  */
3377  if (priority == 0)
3378  values[10] = CStringGetTextDatum("async");
3379  else if (list_member_int(sync_standbys, i))
3381  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3382  else
3383  values[10] = CStringGetTextDatum("potential");
3384 
3385  if (replyTime == 0)
3386  nulls[11] = true;
3387  else
3388  values[11] = TimestampTzGetDatum(replyTime);
3389  }
3390 
3391  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3392  }
3393 
3394  /* clean up and return the tuplestore */
3395  tuplestore_donestoring(tupstore);
3396 
3397  return (Datum) 0;
3398 }
3399 
3400 /*
3401  * This function is used to send a keepalive message to standby.
3402  * If requestReply is set, sets a flag in the message requesting the standby
3403  * to send a message back to us, for heartbeat purposes.
3404  */
3405 static void
3406 WalSndKeepalive(bool requestReply)
3407 {
3408  elog(DEBUG2, "sending replication keepalive");
3409 
3410  /* construct the message... */
3411  resetStringInfo(&output_message);
3412  pq_sendbyte(&output_message, 'k');
3413  pq_sendint64(&output_message, sentPtr);
3414  pq_sendint64(&output_message, GetCurrentTimestamp());
3415  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3416 
3417  /* ... and send it wrapped in CopyData */
3418  pq_putmessage_noblock('d', output_message.data, output_message.len);
3419 }
3420 
3421 /*
3422  * Send keepalive message if too much time has elapsed.
3423  */
3424 static void
3426 {
3427  TimestampTz ping_time;
3428 
3429  /*
3430  * Don't send keepalive messages if timeouts are globally disabled or
3431  * we're doing something not partaking in timeouts.
3432  */
3433  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3434  return;
3435 
3437  return;
3438 
3439  /*
3440  * If half of wal_sender_timeout has lapsed without receiving any reply
3441  * from the standby, send a keep-alive message to the standby requesting
3442  * an immediate reply.
3443  */
3445  wal_sender_timeout / 2);
3446  if (last_processing >= ping_time)
3447  {
3448  WalSndKeepalive(true);
3450 
3451  /* Try to flush pending output to the client */
3452  if (pq_flush_if_writable() != 0)
3453  WalSndShutdown();
3454  }
3455 }
3456 
3457 /*
3458  * Record the end of the WAL and the time it was flushed locally, so that
3459  * LagTrackerRead can compute the elapsed time (lag) when this WAL location is
3460  * eventually reported to have been written, flushed and applied by the
3461  * standby in a reply message.
3462  */
3463 static void
3465 {
3466  bool buffer_full;
3467  int new_write_head;
3468  int i;
3469 
3470  if (!am_walsender)
3471  return;
3472 
3473  /*
3474  * If the lsn hasn't advanced since last time, then do nothing. This way
3475  * we only record a new sample when new WAL has been written.
3476  */
3477  if (lag_tracker->last_lsn == lsn)
3478  return;
3479  lag_tracker->last_lsn = lsn;
3480 
3481  /*
3482  * If advancing the write head of the circular buffer would crash into any
3483  * of the read heads, then the buffer is full. In other words, the
3484  * slowest reader (presumably apply) is the one that controls the release
3485  * of space.
3486  */
3487  new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3488  buffer_full = false;
3489  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3490  {
3491  if (new_write_head == lag_tracker->read_heads[i])
3492  buffer_full = true;
3493  }
3494 
3495  /*
3496  * If the buffer is full, for now we just rewind by one slot and overwrite
3497  * the last sample, as a simple (if somewhat uneven) way to lower the
3498  * sampling rate. There may be better adaptive compaction algorithms.
3499  */
3500  if (buffer_full)
3501  {
3502  new_write_head = lag_tracker->write_head;
3503  if (lag_tracker->write_head > 0)
3504  lag_tracker->write_head--;
3505  else
3506  lag_tracker->write_head = LAG_TRACKER_BUFFER_SIZE - 1;
3507  }
3508 
3509  /* Store a sample at the current write head position. */
3510  lag_tracker->buffer[lag_tracker->write_head].lsn = lsn;
3511  lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
3512  lag_tracker->write_head = new_write_head;
3513 }
3514 
3515 /*
3516  * Find out how much time has elapsed between the moment WAL location 'lsn'
3517  * (or the highest known earlier LSN) was flushed locally and the time 'now'.
3518  * We have a separate read head for each of the reported LSN locations we
3519  * receive in replies from standby; 'head' controls which read head is
3520  * used. Whenever a read head crosses an LSN which was written into the
3521  * lag buffer with LagTrackerWrite, we can use the associated timestamp to
3522  * find out the time this LSN (or an earlier one) was flushed locally, and
3523  * therefore compute the lag.
3524  *
3525  * Return -1 if no new sample data is available, and otherwise the elapsed
3526  * time in microseconds.
3527  */
3528 static TimeOffset
3530 {
3531  TimestampTz time = 0;
3532 
3533  /* Read all unread samples up to this LSN or end of buffer. */
3534  while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
3535  lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
3536  {
3537  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3538  lag_tracker->last_read[head] =
3539  lag_tracker->buffer[lag_tracker->read_heads[head]];
3540  lag_tracker->read_heads[head] =
3541  (lag_tracker->read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
3542  }
3543 
3544  /*
3545  * If the lag tracker is empty, that means the standby has processed
3546  * everything we've ever sent so we should now clear 'last_read'. If we
3547  * didn't do that, we'd risk using a stale and irrelevant sample for
3548  * interpolation at the beginning of the next burst of WAL after a period
3549  * of idleness.
3550  */
3551  if (lag_tracker->read_heads[head] == lag_tracker->write_head)
3552  lag_tracker->last_read[head].time = 0;
3553 
3554  if (time > now)
3555  {
3556  /* If the clock somehow went backwards, treat as not found. */
3557  return -1;
3558  }
3559  else if (time == 0)
3560  {
3561  /*
3562  * We didn't cross a time. If there is a future sample that we
3563  * haven't reached yet, and we've already reached at least one sample,
3564  * let's interpolate the local flushed time. This is mainly useful
3565  * for reporting a completely stuck apply position as having
3566  * increasing lag, since otherwise we'd have to wait for it to
3567  * eventually start moving again and cross one of our samples before
3568  * we can show the lag increasing.
3569  */
3570  if (lag_tracker->read_heads[head] == lag_tracker->write_head)
3571  {
3572  /* There are no future samples, so we can't interpolate. */
3573  return -1;
3574  }
3575  else if (lag_tracker->last_read[head].time != 0)
3576  {
3577  /* We can interpolate between last_read and the next sample. */
3578  double fraction;
3579  WalTimeSample prev = lag_tracker->last_read[head];
3580  WalTimeSample next = lag_tracker->buffer[lag_tracker->read_heads[head]];
3581 
3582  if (lsn < prev.lsn)
3583  {
3584  /*
3585  * Reported LSNs shouldn't normally go backwards, but it's
3586  * possible when there is a timeline change. Treat as not
3587  * found.
3588  */
3589  return -1;
3590  }
3591 
3592  Assert(prev.lsn < next.lsn);
3593 
3594  if (prev.time > next.time)
3595  {
3596  /* If the clock somehow went backwards, treat as not found. */
3597  return -1;
3598  }
3599 
3600  /* See how far we are between the previous and next samples. */
3601  fraction =
3602  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3603 
3604  /* Scale the local flush time proportionally. */
3605  time = (TimestampTz)
3606  ((double) prev.time + (next.time - prev.time) * fraction);
3607  }
3608  else
3609  {
3610  /*
3611  * We have only a future sample, implying that we were entirely
3612  * caught up but and now there is a new burst of WAL and the
3613  * standby hasn't processed the first sample yet. Until the
3614  * standby reaches the future sample the best we can do is report
3615  * the hypothetical lag if that sample were to be replayed now.
3616  */
3617  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3618  }
3619  }
3620 
3621  /* Return the elapsed time since local flush time in microseconds. */
3622  Assert(time != 0);
3623  return now - time;
3624 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
static void InitWalSenderSlot(void)
Definition: walsender.c:2281
void InitializeTimeouts(void)
Definition: timeout.c:340
#define pq_is_send_pending()
Definition: libpq.h:41
#define NIL
Definition: pg_list.h:65
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
XLogRecPtr write
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:217
static void ProcessStandbyMessage(void)
Definition: walsender.c:1706
#define SIGQUIT
Definition: win32_port.h:164
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define IsA(nodeptr, _type_)
Definition: nodes.h:575
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
#define AllocSetContextCreate
Definition: memutils.h:169
XLogRecPtr startpoint
Definition: replnodes.h:85
#define DEBUG1
Definition: elog.h:25
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:196
int MyProcPid
Definition: globals.c:40
int wal_sender_timeout
Definition: walsender.c:122
uint32 TimeLineID
Definition: xlogdefs.h:52
#define pq_flush()
Definition: libpq.h:39
static int32 next
Definition: blutils.c:213
XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
struct Port * MyProcPort
Definition: globals.c:43
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
int wal_segment_size
Definition: xlog.c:112
int write_head
Definition: walsender.c:218
uint32 TransactionId
Definition: c.h:507
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
bool wake_wal_senders
Definition: walsender.c:129
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
bool exec_replication_command(const char *cmd_string)
Definition: walsender.c:1433
Oid GetUserId(void)
Definition: miscinit.c:380
#define SIGUSR1
Definition: win32_port.h:175
bool update_process_title
Definition: ps_status.c:35
#define write(a, b, c)
Definition: win32.h:14
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1569
TransactionId xmin
Definition: proc.h:228
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3008
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3121
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:434
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
Definition: walsender.c:799
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1052
#define SIGCHLD
Definition: win32_port.h:173
PGPROC * MyProc
Definition: proc.c:68
int64 TimestampTz
Definition: timestamp.h:39
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:220
Size WalSndShmemSize(void)
Definition: walsender.c:3040
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:272
char * pstrdup(const char *in)
Definition: mcxt.c:1161
void CommitTransactionCommand(void)
Definition: xact.c:2895
#define XACT_REPEATABLE_READ
Definition: xact.h:38
#define SpinLockInit(lock)
Definition: spin.h:60
void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
Definition: snapmgr.c:2225
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:330
uint8 syncrep_method
Definition: syncrep.h:51
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:75
static int sendFile
Definition: walsender.c:135
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:376
static void XLogSendPhysical(void)
Definition: walsender.c:2550
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2252
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
TimeOffset flushLag
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:221
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:331
Definition: nodes.h:524
static StringInfoData output_message
Definition: walsender.c:160
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
struct cursor * cur
Definition: ecpg.c:28
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:190
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:570
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
#define MemSet(start, val, len)
Definition: c.h:955
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2339
#define kill(pid, sig)
Definition: win32_port.h:435
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:2893
#define WL_SOCKET_READABLE
Definition: latch.h:125
void ReplicationSlotSave(void)
Definition: slot.c:645
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8230
List * SyncRepGetSyncStandbys(bool *am_sync)
Definition: syncrep.c:681
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
TimestampTz time
Definition: walsender.c:207
int sleeptime
Definition: pg_standby.c:42
#define SIGPIPE
Definition: win32_port.h:168
ReplicationSlotPersistentData data
Definition: slot.h:132
TimeOffset writeLag
#define SIGUSR2
Definition: win32_port.h:176
#define LOG
Definition: elog.h:26
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
bool RecoveryInProgress(void)
Definition: xlog.c:7898
void SetLatch(Latch *latch)
Definition: latch.c:436
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1668
void list_free_deep(List *list)
Definition: list.c:1387
static int fd(const char *x, int i)
Definition: preproc-init.c:105
pgsocket sock
Definition: libpq-be.h:122
#define PG_BINARY
Definition: c.h:1191
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:8738
void ResetLatch(Latch *latch)
Definition: latch.c:519
slock_t mutex
Node * replication_parse_result
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2792
Latch procLatch
Definition: proc.h:104
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
Definition: xlogutils.c:803
XLogRecord * XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
Definition: xlogreader.c:217
XLogRecPtr confirmed_flush
Definition: slot.h:80
PGXACT * MyPgXact
Definition: proc.c:69
XLogRecPtr EndRecPtr
Definition: xlogreader.h:124
int32 day
Definition: timestamp.h:47
#define XidFromFullTransactionId(x)
Definition: transam.h:48
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:1923
static LagTracker * lag_tracker
Definition: walsender.c:223
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
TimeLineID timeline
Definition: replnodes.h:97
ReplicationKind kind
Definition: replnodes.h:56
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: walsender.c:765
bool IsTransactionBlock(void)
Definition: xact.c:4609
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:636
bool am_walsender
Definition: walsender.c:114
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
void HandleWalSndInitStopping(void)
Definition: walsender.c:2986
void ReplicationSlotReserveWal(void)
Definition: slot.c:997
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:180
XLogRecPtr flush
bool defGetBoolean(DefElem *def)
Definition: define.c:111
char data[BLCKSZ]
Definition: c.h:1060
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:466
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:748
ReplicationKind kind
Definition: replnodes.h:82
void WalSndRqstFileReload(void)
Definition: walsender.c:2963
void pfree(void *pointer)
Definition: mcxt.c:1031
void ConditionVariableCancelSleep(void)
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3406
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2310
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:166
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:676
#define ERROR
Definition: elog.h:43
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:97
static TimestampTz last_processing
Definition: walsender.c:165
void pq_startmsgread(void)
Definition: pqcomm.c:1211
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2257
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1737
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3846
bool FirstSnapshotSet
Definition: snapmgr.c:205
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
void WalSndSignals(void)
Definition: walsender.c:3020
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
#define FATAL
Definition: elog.h:52
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:746
Latch * latch
void(* WalSndSendDataCallback)(void)
Definition: walsender.c:229
#define MAXPGPATH
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11130
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2232
void ReplicationSlotPersist(void)
Definition: slot.c:680
TransactionId effective_xmin
Definition: slot.h:128
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1041
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:200
static TimeLineID curFileTimeLine
Definition: walsender.c:140
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:191
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2099
#define DEBUG2
Definition: elog.h:24
bool XLogBackgroundFlush(void)
Definition: xlog.c:2987
bool list_member_int(const List *list, int datum)
Definition: list.c:654
Definition: latch.h:110
Definition: dest.h:88
#define TLHistoryFileName(fname, tli)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
WalSndState state
NodeTag type
Definition: nodes.h:526
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10130
static char * buf
Definition: pg_test_fsync.c:68
void WalSndWaitStopping(void)
Definition: walsender.c:3139
static bool streamingDoneSending
Definition: walsender.c:182
uint64 XLogSegNo
Definition: xlogdefs.h:41
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
int errdetail(const char *fmt,...)
Definition: elog.c:860
#define COMMERROR
Definition: elog.h:30
int errcode_for_file_access(void)
Definition: elog.c:593
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:540
#define SIGHUP
Definition: win32_port.h:163
TransactionId catalog_xmin
Definition: slot.h:69
#define pq_flush_if_writable()
Definition: libpq.h:40
TimeOffset time
Definition: timestamp.h:45
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3328
XLogRecPtr currTLIValidUntil
Definition: xlogreader.h:184
#define InvalidTransactionId
Definition: transam.h:31
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:270
void PostgresSigHupHandler(SIGNAL_ARGS)
Definition: postgres.c:2832
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1179
static XLogRecPtr logical_startptr
Definition: walsender.c:201
unsigned int uint32
Definition: c.h:358
void ReplicationSlotRelease(void)
Definition: slot.c:424
static void XLogRead(char *buf, XLogRecPtr startptr, Size count)
Definition: walsender.c:2367
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1342
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
TransactionId xmin
Definition: slot.h:61
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
void InitWalSender(void)
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:189
#define SlotIsLogical(slot)
Definition: slot.h:154
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1699
void SyncRepInitConfig(void)
Definition: syncrep.c:383
#define ereport(elevel, rest)
Definition: elog.h:141
static bool WalSndCaughtUp
Definition: walsender.c:186
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
MemoryContext TopMemoryContext
Definition: mcxt.c:44
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3464
XLogRecPtr lsn
Definition: walsender.c:206
int replication_yyparse(void)
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:246
Definition: guc.h:72
XLogRecPtr last_lsn
Definition: walsender.c:216
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:44
int32 month
Definition: timestamp.h:48
volatile sig_atomic_t ConfigReloadPending
Definition: globals.c:35
int max_wal_senders
Definition: walsender.c:120
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2154
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3425
int CloseTransientFile(int fd)
Definition: fd.c:2434
#define SIG_IGN
Definition: win32_port.h:160
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1152
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1273
static StringInfoData reply_message
Definition: walsender.c:161
#define MAXFNAMELEN
TimeLineID nextTLI
Definition: xlogreader.h:190
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
int64 TimeOffset
Definition: timestamp.h:40
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1294
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2127
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
#define InvalidBackendId
Definition: backendid.h:23
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:231
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3529
void WalSndErrorCleanup(void)
Definition: walsender.c:298
static volatile sig_atomic_t replication_active
Definition: walsender.c:198
uintptr_t Datum
Definition: postgres.h:367
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3196
XLogRecPtr sentPtr
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
bool am_db_walsender
Definition: walsender.c:117
TransactionId effective_catalog_xmin
Definition: slot.h:129
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1063
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:374
Oid MyDatabaseId
Definition: globals.c:85
static void WalSndShutdown(void)
Definition: walsender.c:233
static TimeLineID receiveTLI
Definition: xlog.c:209
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
int work_mem
Definition: globals.c:121
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:563
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
WalSnd * MyWalSnd
Definition: walsender.c:111
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:814
static XLogRecPtr sentPtr
Definition: walsender.c:157
int log_min_messages
Definition: guc.c:510
void pq_endmsgread(void)
Definition: pqcomm.c:1235
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:187
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
int allowedModes
Definition: execnodes.h:303
void WalSndInitStopping(void)
Definition: walsender.c:3113
TimeLineID currTLI
Definition: xlogreader.h:174
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4932
TimeOffset applyLag
struct SnapBuild * snapshot_builder
Definition: logical.h:44
int errmsg_internal(const char *fmt,...)
Definition: elog.c:814
SetFunctionReturnMode returnMode
Definition: execnodes.h:305
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2077
#define SIG_DFL
Definition: win32_port.h:158
#define SIGNAL_ARGS
Definition: c.h:1259
static TimeLineID sendTimeLine
Definition: walsender.c:148
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:732
#define lfirst(lc)
Definition: pg_list.h:190
void WalSndSetState(WalSndState state)
Definition: walsender.c:3177
int sync_standby_priority
Definition: regguts.h:298
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:512
static void XLogSendLogical(void)
Definition: walsender.c:2813
XLogRecPtr restart_lsn
Definition: slot.h:72
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:219
void StartTransactionCommand(void)
Definition: xact.c:2794
bool needreload
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1769
size_t Size
Definition: c.h:466
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1318
char * dbname
Definition: streamutil.c:52
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:211
int XactIsoLevel
Definition: xact.c:75
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:149
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:151
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1005
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:231
WalSndState
static XLogSegNo sendSegNo
Definition: walsender.c:136
Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
Definition: walsender.c:3231
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:852
static bool streamingDoneReceiving
Definition: walsender.c:183
Tuplestorestate * setResult
Definition: execnodes.h:308
#define pg_attribute_noreturn()
Definition: c.h:147
static StringInfoData tmpbuf
Definition: walsender.c:162
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:946
#define XLogFilePath(path, tli, logSegNo, wal_segsz_bytes)
bool IsSubTransaction(void)
Definition: xact.c:4682
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:544
static Datum values[MAXATTR]
Definition: bootstrap.c:167
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:301
bool log_replication_commands
Definition: walsender.c:124
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4802
#define Int32GetDatum(X)
Definition: postgres.h:479
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:517
char * application_name
Definition: guc.c:529
TupleDesc setDesc
Definition: execnodes.h:309
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
void * palloc(Size size)
Definition: mcxt.c:924
int errmsg(const char *fmt,...)
Definition: elog.c:784
XLogReaderState * reader
Definition: logical.h:42
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
CRSSnapshotAction
Definition: walsender.h:20
#define elog(elevel,...)
Definition: elog.h:226
StringInfo out
Definition: logical.h:71
static const unsigned __int64 epoch
Definition: gettimeofday.c:34
int i
void WalSndShmemInit(void)
Definition: walsender.c:3052
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
#define NameStr(name)
Definition: c.h:609
TimeLineID timeline
Definition: replnodes.h:84
#define CStringGetTextDatum(s)
Definition: builtins.h:83
void * arg
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:1874
struct Latch * MyLatch
Definition: globals.c:54
void ReplicationSlotCleanup(void)
Definition: slot.c:479
#define PG_FUNCTION_ARGS
Definition: fmgr.h:188
char * defname
Definition: parsenodes.h:730
static uint32 sendOff
Definition: walsender.c:137
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2933
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:146
TimestampTz replyTime
void LWLockReleaseAll(void)
Definition: lwlock.c:1825
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3215
slock_t mutex
Definition: slot.h:105
#define close(a)
Definition: win32.h:12
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:261
CommandDest whereToSendOutput
Definition: postgres.c:90
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1643
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
static bool waiting_for_ping_response
Definition: walsender.c:174
void pq_puttextmessage(char msgtype, const char *str)
Definition: pqformat.c:369
XLogRecPtr apply
static void IdentifySystem(void)
Definition: walsender.c:345
Definition: pg_list.h:50
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:79
#define snprintf
Definition: port.h:192
#define WL_LATCH_SET
Definition: latch.h:124
int16 AttrNumber
Definition: attnum.h:21
void quickdie(SIGNAL_ARGS)
Definition: postgres.c:2694
#define UINT64_FORMAT
Definition: c.h:401
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:702
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:411
static TimestampTz last_reply_timestamp
Definition: walsender.c:171
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1602
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1533
#define MAX_SEND_SIZE
Definition: walsender.c:105
#define die(msg)
Definition: pg_test_fsync.c:97
#define read(a, b, c)
Definition: win32.h:13
static bool sendTimeLineIsHistoric
Definition: walsender.c:150
#define offsetof(type, field)
Definition: c.h:655
void WalSndWakeup(void)
Definition: walsender.c:3084
void ReplicationSlotMarkDirty(void)
Definition: slot.c:663
bool am_cascading_walsender
Definition: walsender.c:115
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:1954
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:697
static XLogRecPtr startptr
Definition: basebackup.c:106
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1268
void replication_scanner_init(const char *query_string)
#define TLHistoryFilePath(path, tli)
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1729
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)