PostgreSQL Source Code  git master
walsender.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * walsender.c
4  *
5  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6  * care of sending XLOG from the primary server to a single recipient.
7  * (Note that there can be more than one walsender process concurrently.)
8  * It is started by the postmaster when the walreceiver of a standby server
9  * connects to the primary server and requests XLOG streaming replication.
10  *
11  * A walsender is similar to a regular backend, ie. there is a one-to-one
12  * relationship between a connection and a walsender process, but instead
13  * of processing SQL queries, it understands a small set of special
14  * replication-mode commands. The START_REPLICATION command begins streaming
15  * WAL to the client. While streaming, the walsender keeps reading XLOG
16  * records from the disk and sends them to the standby server over the
17  * COPY protocol, until either side ends the replication by exiting COPY
18  * mode (or until the connection is closed).
19  *
20  * Normal termination is by SIGTERM, which instructs the walsender to
21  * close the connection and exit(0) at the next convenient moment. Emergency
22  * termination is by SIGQUIT; like any backend, the walsender will simply
23  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24  * are treated as not a crash but approximately normal termination;
25  * the walsender will exit quickly without sending any more XLOG records.
26  *
27  * If the server is shut down, checkpointer sends us
28  * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If
29  * the backend is idle or runs an SQL query this causes the backend to
30  * shutdown, if logical replication is in progress all existing WAL records
31  * are processed followed by a shutdown. Otherwise this causes the walsender
32  * to switch to the "stopping" state. In this state, the walsender will reject
33  * any further replication commands. The checkpointer begins the shutdown
34  * checkpoint once all walsenders are confirmed as stopping. When the shutdown
35  * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
36  * walsender to send any outstanding WAL, including the shutdown checkpoint
37  * record, wait for it to be replicated to the standby, and then exit.
38  *
39  *
40  * Portions Copyright (c) 2010-2019, PostgreSQL Global Development Group
41  *
42  * IDENTIFICATION
43  * src/backend/replication/walsender.c
44  *
45  *-------------------------------------------------------------------------
46  */
47 #include "postgres.h"
48 
49 #include <signal.h>
50 #include <unistd.h>
51 
52 #include "access/printtup.h"
53 #include "access/timeline.h"
54 #include "access/transam.h"
55 #include "access/xact.h"
56 #include "access/xlog_internal.h"
57 #include "access/xlogutils.h"
58 
59 #include "catalog/pg_authid.h"
60 #include "catalog/pg_type.h"
61 #include "commands/dbcommands.h"
62 #include "commands/defrem.h"
63 #include "funcapi.h"
64 #include "libpq/libpq.h"
65 #include "libpq/pqformat.h"
66 #include "miscadmin.h"
67 #include "nodes/replnodes.h"
68 #include "pgstat.h"
69 #include "replication/basebackup.h"
70 #include "replication/decode.h"
71 #include "replication/logical.h"
73 #include "replication/slot.h"
74 #include "replication/snapbuild.h"
75 #include "replication/syncrep.h"
77 #include "replication/walsender.h"
80 #include "storage/fd.h"
81 #include "storage/ipc.h"
82 #include "storage/pmsignal.h"
83 #include "storage/proc.h"
84 #include "storage/procarray.h"
85 #include "tcop/dest.h"
86 #include "tcop/tcopprot.h"
87 #include "utils/builtins.h"
88 #include "utils/guc.h"
89 #include "utils/memutils.h"
90 #include "utils/pg_lsn.h"
91 #include "utils/portal.h"
92 #include "utils/ps_status.h"
93 #include "utils/timeout.h"
94 #include "utils/timestamp.h"
95 
96 /*
97  * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
98  *
99  * We don't have a good idea of what a good value would be; there's some
100  * overhead per message in both walsender and walreceiver, but on the other
101  * hand sending large batches makes walsender less responsive to signals
102  * because signals are checked only between messages. 128kB (with
103  * default 8k blocks) seems like a reasonable guess for now.
104  */
105 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
106 
107 /* Array of WalSnds in shared memory */
109 
110 /* My slot in the shared memory array */
111 WalSnd *MyWalSnd = NULL;
112 
113 /* Global state */
114 bool am_walsender = false; /* Am I a walsender process? */
115 bool am_cascading_walsender = false; /* Am I cascading WAL to another
116  * standby? */
117 bool am_db_walsender = false; /* Connected to a database? */
118 
119 /* User-settable parameters for walsender */
120 int max_wal_senders = 0; /* the maximum number of concurrent
121  * walsenders */
122 int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
123  * data message */
125 
126 /*
127  * State for WalSndWakeupRequest
128  */
129 bool wake_wal_senders = false;
130 
131 static WALOpenSegment *sendSeg = NULL;
132 static WALSegmentContext *sendCxt = NULL;
133 
134 /*
135  * These variables keep track of the state of the timeline we're currently
136  * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
137  * the timeline is not the latest timeline on this server, and the server's
138  * history forked off from that timeline at sendTimeLineValidUpto.
139  */
142 static bool sendTimeLineIsHistoric = false;
144 
145 /*
146  * How far have we sent WAL already? This is also advertised in
147  * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
148  */
149 static XLogRecPtr sentPtr = 0;
150 
151 /* Buffers for constructing outgoing messages and processing reply messages. */
155 
156 /* Timestamp of last ProcessRepliesIfAny(). */
158 
159 /*
160  * Timestamp of last ProcessRepliesIfAny() that saw a reply from the
161  * standby. Set to 0 if wal_sender_timeout doesn't need to be active.
162  */
164 
165 /* Have we sent a heartbeat message asking for reply, since last reply? */
166 static bool waiting_for_ping_response = false;
167 
168 /*
169  * While streaming WAL in Copy mode, streamingDoneSending is set to true
170  * after we have sent CopyDone. We should not send any more CopyData messages
171  * after that. streamingDoneReceiving is set to true when we receive CopyDone
172  * from the other end. When both become true, it's time to exit Copy mode.
173  */
176 
177 /* Are we there yet? */
178 static bool WalSndCaughtUp = false;
179 
180 /* Flags set by signal handlers for later service in main loop */
181 static volatile sig_atomic_t got_SIGUSR2 = false;
182 static volatile sig_atomic_t got_STOPPING = false;
183 
184 /*
185  * This is set while we are streaming. When not set
186  * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
187  * the main loop is responsible for checking got_STOPPING and terminating when
188  * it's set (after streaming any remaining WAL).
189  */
190 static volatile sig_atomic_t replication_active = false;
191 
194 
195 /* A sample associating a WAL location with the time it was written. */
196 typedef struct
197 {
200 } WalTimeSample;
201 
202 /* The size of our buffer of time samples. */
203 #define LAG_TRACKER_BUFFER_SIZE 8192
204 
205 /* A mechanism for tracking replication lag. */
206 typedef struct
207 {
211  int read_heads[NUM_SYNC_REP_WAIT_MODE];
213 } LagTracker;
214 
216 
217 /* Signal handlers */
219 
220 /* Prototypes for private functions */
221 typedef void (*WalSndSendDataCallback) (void);
222 static void WalSndLoop(WalSndSendDataCallback send_data);
223 static void InitWalSenderSlot(void);
224 static void WalSndKill(int code, Datum arg);
226 static void XLogSendPhysical(void);
227 static void XLogSendLogical(void);
228 static void WalSndDone(WalSndSendDataCallback send_data);
229 static XLogRecPtr GetStandbyFlushRecPtr(void);
230 static void IdentifySystem(void);
233 static void StartReplication(StartReplicationCmd *cmd);
235 static void ProcessStandbyMessage(void);
236 static void ProcessStandbyReplyMessage(void);
237 static void ProcessStandbyHSFeedbackMessage(void);
238 static void ProcessRepliesIfAny(void);
239 static void WalSndKeepalive(bool requestReply);
240 static void WalSndKeepaliveIfNecessary(void);
241 static void WalSndCheckTimeOut(void);
243 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
244 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
247 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
248 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
250 
251 static void XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count);
252 
253 
254 /* Initialize walsender process before entering the main command loop */
255 void
256 InitWalSender(void)
257 {
259 
260  /* Create a per-walsender data structure in shared memory */
262 
263  /*
264  * We don't currently need any ResourceOwner in a walsender process, but
265  * if we did, we could call CreateAuxProcessResourceOwner here.
266  */
267 
268  /*
269  * Let postmaster know that we're a WAL sender. Once we've declared us as
270  * a WAL sender process, postmaster will let us outlive the bgwriter and
271  * kill us last in the shutdown sequence, so we get a chance to stream all
272  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
273  * there's no going back, and we mustn't write any WAL records after this.
274  */
277 
278  /* Initialize empty timestamp buffer for lag tracking. */
279  lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
280 
281  /* Make sure we can remember the current read position in XLOG. */
282  sendSeg = (WALOpenSegment *)
284  sendCxt = (WALSegmentContext *)
286  WALOpenSegmentInit(sendSeg, sendCxt, wal_segment_size, NULL);
287 }
288 
289 /*
290  * Clean up after an error.
291  *
292  * WAL sender processes don't use transactions like regular backends do.
293  * This function does any cleanup required after an error in a WAL sender
294  * process, similar to what transaction abort does in a regular backend.
295  */
296 void
298 {
302 
303  if (sendSeg->ws_file >= 0)
304  {
305  close(sendSeg->ws_file);
306  sendSeg->ws_file = -1;
307  }
308 
309  if (MyReplicationSlot != NULL)
311 
313 
314  replication_active = false;
315 
316  if (got_STOPPING || got_SIGUSR2)
317  proc_exit(0);
318 
319  /* Revert back to startup state */
321 }
322 
323 /*
324  * Handle a client's connection abort in an orderly manner.
325  */
326 static void
327 WalSndShutdown(void)
328 {
329  /*
330  * Reset whereToSendOutput to prevent ereport from attempting to send any
331  * more messages to the standby.
332  */
335 
336  proc_exit(0);
337  abort(); /* keep the compiler quiet */
338 }
339 
340 /*
341  * Handle the IDENTIFY_SYSTEM command.
342  */
343 static void
345 {
346  char sysid[32];
347  char xloc[MAXFNAMELEN];
348  XLogRecPtr logptr;
349  char *dbname = NULL;
351  TupOutputState *tstate;
352  TupleDesc tupdesc;
353  Datum values[4];
354  bool nulls[4];
355 
356  /*
357  * Reply with a result set with one row, four columns. First col is system
358  * ID, second is timeline ID, third is current xlog location and the
359  * fourth contains the database name if we are connected to one.
360  */
361 
362  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
364 
367  {
368  /* this also updates ThisTimeLineID */
369  logptr = GetStandbyFlushRecPtr();
370  }
371  else
372  logptr = GetFlushRecPtr();
373 
374  snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
375 
376  if (MyDatabaseId != InvalidOid)
377  {
379 
380  /* syscache access needs a transaction env. */
382  /* make dbname live outside TX context */
386  /* CommitTransactionCommand switches to TopMemoryContext */
388  }
389 
391  MemSet(nulls, false, sizeof(nulls));
392 
393  /* need a tuple descriptor representing four columns */
394  tupdesc = CreateTemplateTupleDesc(4);
395  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
396  TEXTOID, -1, 0);
397  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
398  INT4OID, -1, 0);
399  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
400  TEXTOID, -1, 0);
401  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
402  TEXTOID, -1, 0);
403 
404  /* prepare for projection of tuples */
405  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
406 
407  /* column 1: system identifier */
408  values[0] = CStringGetTextDatum(sysid);
409 
410  /* column 2: timeline */
411  values[1] = Int32GetDatum(ThisTimeLineID);
412 
413  /* column 3: wal location */
414  values[2] = CStringGetTextDatum(xloc);
415 
416  /* column 4: database name, or NULL if none */
417  if (dbname)
418  values[3] = CStringGetTextDatum(dbname);
419  else
420  nulls[3] = true;
421 
422  /* send it to dest */
423  do_tup_output(tstate, values, nulls);
424 
425  end_tup_output(tstate);
426 }
427 
428 
429 /*
430  * Handle TIMELINE_HISTORY command.
431  */
432 static void
434 {
436  char histfname[MAXFNAMELEN];
437  char path[MAXPGPATH];
438  int fd;
439  off_t histfilelen;
440  off_t bytesleft;
441  Size len;
442 
443  /*
444  * Reply with a result set with one row, and two columns. The first col is
445  * the name of the history file, 2nd is the contents.
446  */
447 
448  TLHistoryFileName(histfname, cmd->timeline);
449  TLHistoryFilePath(path, cmd->timeline);
450 
451  /* Send a RowDescription message */
452  pq_beginmessage(&buf, 'T');
453  pq_sendint16(&buf, 2); /* 2 fields */
454 
455  /* first field */
456  pq_sendstring(&buf, "filename"); /* col name */
457  pq_sendint32(&buf, 0); /* table oid */
458  pq_sendint16(&buf, 0); /* attnum */
459  pq_sendint32(&buf, TEXTOID); /* type oid */
460  pq_sendint16(&buf, -1); /* typlen */
461  pq_sendint32(&buf, 0); /* typmod */
462  pq_sendint16(&buf, 0); /* format code */
463 
464  /* second field */
465  pq_sendstring(&buf, "content"); /* col name */
466  pq_sendint32(&buf, 0); /* table oid */
467  pq_sendint16(&buf, 0); /* attnum */
468  pq_sendint32(&buf, BYTEAOID); /* type oid */
469  pq_sendint16(&buf, -1); /* typlen */
470  pq_sendint32(&buf, 0); /* typmod */
471  pq_sendint16(&buf, 0); /* format code */
472  pq_endmessage(&buf);
473 
474  /* Send a DataRow message */
475  pq_beginmessage(&buf, 'D');
476  pq_sendint16(&buf, 2); /* # of columns */
477  len = strlen(histfname);
478  pq_sendint32(&buf, len); /* col1 len */
479  pq_sendbytes(&buf, histfname, len);
480 
481  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
482  if (fd < 0)
483  ereport(ERROR,
485  errmsg("could not open file \"%s\": %m", path)));
486 
487  /* Determine file length and send it to client */
488  histfilelen = lseek(fd, 0, SEEK_END);
489  if (histfilelen < 0)
490  ereport(ERROR,
492  errmsg("could not seek to end of file \"%s\": %m", path)));
493  if (lseek(fd, 0, SEEK_SET) != 0)
494  ereport(ERROR,
496  errmsg("could not seek to beginning of file \"%s\": %m", path)));
497 
498  pq_sendint32(&buf, histfilelen); /* col2 len */
499 
500  bytesleft = histfilelen;
501  while (bytesleft > 0)
502  {
503  PGAlignedBlock rbuf;
504  int nread;
505 
507  nread = read(fd, rbuf.data, sizeof(rbuf));
509  if (nread < 0)
510  ereport(ERROR,
512  errmsg("could not read file \"%s\": %m",
513  path)));
514  else if (nread == 0)
515  ereport(ERROR,
517  errmsg("could not read file \"%s\": read %d of %zu",
518  path, nread, (Size) bytesleft)));
519 
520  pq_sendbytes(&buf, rbuf.data, nread);
521  bytesleft -= nread;
522  }
523 
524  if (CloseTransientFile(fd) != 0)
525  ereport(ERROR,
527  errmsg("could not close file \"%s\": %m", path)));
528 
529  pq_endmessage(&buf);
530 }
531 
532 /*
533  * Handle START_REPLICATION command.
534  *
535  * At the moment, this never returns, but an ereport(ERROR) will take us back
536  * to the main loop.
537  */
538 static void
540 {
542  XLogRecPtr FlushPtr;
543 
544  if (ThisTimeLineID == 0)
545  ereport(ERROR,
546  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
547  errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
548 
549  /*
550  * We assume here that we're logging enough information in the WAL for
551  * log-shipping, since this is checked in PostmasterMain().
552  *
553  * NOTE: wal_level can only change at shutdown, so in most cases it is
554  * difficult for there to be WAL data that we can still see that was
555  * written at wal_level='minimal'.
556  */
557 
558  if (cmd->slotname)
559  {
560  ReplicationSlotAcquire(cmd->slotname, true);
562  ereport(ERROR,
563  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
564  (errmsg("cannot use a logical replication slot for physical replication"))));
565  }
566 
567  /*
568  * Select the timeline. If it was given explicitly by the client, use
569  * that. Otherwise use the timeline of the last replayed record, which is
570  * kept in ThisTimeLineID.
571  */
573  {
574  /* this also updates ThisTimeLineID */
575  FlushPtr = GetStandbyFlushRecPtr();
576  }
577  else
578  FlushPtr = GetFlushRecPtr();
579 
580  if (cmd->timeline != 0)
581  {
582  XLogRecPtr switchpoint;
583 
584  sendTimeLine = cmd->timeline;
586  {
587  sendTimeLineIsHistoric = false;
589  }
590  else
591  {
592  List *timeLineHistory;
593 
594  sendTimeLineIsHistoric = true;
595 
596  /*
597  * Check that the timeline the client requested exists, and the
598  * requested start location is on that timeline.
599  */
600  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
601  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
603  list_free_deep(timeLineHistory);
604 
605  /*
606  * Found the requested timeline in the history. Check that
607  * requested startpoint is on that timeline in our history.
608  *
609  * This is quite loose on purpose. We only check that we didn't
610  * fork off the requested timeline before the switchpoint. We
611  * don't check that we switched *to* it before the requested
612  * starting point. This is because the client can legitimately
613  * request to start replication from the beginning of the WAL
614  * segment that contains switchpoint, but on the new timeline, so
615  * that it doesn't end up with a partial segment. If you ask for
616  * too old a starting point, you'll get an error later when we
617  * fail to find the requested WAL segment in pg_wal.
618  *
619  * XXX: we could be more strict here and only allow a startpoint
620  * that's older than the switchpoint, if it's still in the same
621  * WAL segment.
622  */
623  if (!XLogRecPtrIsInvalid(switchpoint) &&
624  switchpoint < cmd->startpoint)
625  {
626  ereport(ERROR,
627  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
628  (uint32) (cmd->startpoint >> 32),
629  (uint32) (cmd->startpoint),
630  cmd->timeline),
631  errdetail("This server's history forked from timeline %u at %X/%X.",
632  cmd->timeline,
633  (uint32) (switchpoint >> 32),
634  (uint32) (switchpoint))));
635  }
636  sendTimeLineValidUpto = switchpoint;
637  }
638  }
639  else
640  {
643  sendTimeLineIsHistoric = false;
644  }
645 
647 
648  /* If there is nothing to stream, don't even enter COPY mode */
650  {
651  /*
652  * When we first start replication the standby will be behind the
653  * primary. For some applications, for example synchronous
654  * replication, it is important to have a clear state for this initial
655  * catchup mode, so we can trigger actions when we change streaming
656  * state later. We may stay in this state for a long time, which is
657  * exactly why we want to be able to monitor whether or not we are
658  * still here.
659  */
661 
662  /* Send a CopyBothResponse message, and start streaming */
663  pq_beginmessage(&buf, 'W');
664  pq_sendbyte(&buf, 0);
665  pq_sendint16(&buf, 0);
666  pq_endmessage(&buf);
667  pq_flush();
668 
669  /*
670  * Don't allow a request to stream from a future point in WAL that
671  * hasn't been flushed to disk in this server yet.
672  */
673  if (FlushPtr < cmd->startpoint)
674  {
675  ereport(ERROR,
676  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
677  (uint32) (cmd->startpoint >> 32),
678  (uint32) (cmd->startpoint),
679  (uint32) (FlushPtr >> 32),
680  (uint32) (FlushPtr))));
681  }
682 
683  /* Start streaming from the requested point */
684  sentPtr = cmd->startpoint;
685 
686  /* Initialize shared memory status, too */
687  SpinLockAcquire(&MyWalSnd->mutex);
688  MyWalSnd->sentPtr = sentPtr;
689  SpinLockRelease(&MyWalSnd->mutex);
690 
692 
693  /* Main loop of walsender */
694  replication_active = true;
695 
697 
698  replication_active = false;
699  if (got_STOPPING)
700  proc_exit(0);
702 
704  }
705 
706  if (cmd->slotname)
708 
709  /*
710  * Copy is finished now. Send a single-row result set indicating the next
711  * timeline.
712  */
714  {
715  char startpos_str[8 + 1 + 8 + 1];
717  TupOutputState *tstate;
718  TupleDesc tupdesc;
719  Datum values[2];
720  bool nulls[2];
721 
722  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
723  (uint32) (sendTimeLineValidUpto >> 32),
725 
727  MemSet(nulls, false, sizeof(nulls));
728 
729  /*
730  * Need a tuple descriptor representing two columns. int8 may seem
731  * like a surprising data type for this, but in theory int4 would not
732  * be wide enough for this, as TimeLineID is unsigned.
733  */
734  tupdesc = CreateTemplateTupleDesc(2);
735  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
736  INT8OID, -1, 0);
737  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
738  TEXTOID, -1, 0);
739 
740  /* prepare for projection of tuple */
741  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
742 
743  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
744  values[1] = CStringGetTextDatum(startpos_str);
745 
746  /* send it to dest */
747  do_tup_output(tstate, values, nulls);
748 
749  end_tup_output(tstate);
750  }
751 
752  /* Send CommandComplete message */
753  pq_puttextmessage('C', "START_STREAMING");
754 }
755 
756 /*
757  * read_page callback for logical decoding contexts, as a walsender process.
758  *
759  * Inside the walsender we can do better than logical_read_local_xlog_page,
760  * which has to do a plain sleep/busy loop, because the walsender's latch gets
761  * set every time WAL is flushed.
762  */
763 static int
765  XLogRecPtr targetRecPtr, char *cur_page)
766 {
767  XLogRecPtr flushptr;
768  int count;
769 
770  XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
772  sendTimeLine = state->currTLI;
774  sendTimeLineNextTLI = state->nextTLI;
775 
776  /* make sure we have enough WAL available */
777  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
778 
779  /* fail if not (implies we are going to shut down) */
780  if (flushptr < targetPagePtr + reqLen)
781  return -1;
782 
783  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
784  count = XLOG_BLCKSZ; /* more than one block available */
785  else
786  count = flushptr - targetPagePtr; /* part of the page available */
787 
788  /* now actually read the data, we know it's there */
789  XLogRead(sendCxt, cur_page, targetPagePtr, XLOG_BLCKSZ);
790 
791  return count;
792 }
793 
794 /*
795  * Process extra options given to CREATE_REPLICATION_SLOT.
796  */
797 static void
799  bool *reserve_wal,
800  CRSSnapshotAction *snapshot_action)
801 {
802  ListCell *lc;
803  bool snapshot_action_given = false;
804  bool reserve_wal_given = false;
805 
806  /* Parse options */
807  foreach(lc, cmd->options)
808  {
809  DefElem *defel = (DefElem *) lfirst(lc);
810 
811  if (strcmp(defel->defname, "export_snapshot") == 0)
812  {
813  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
814  ereport(ERROR,
815  (errcode(ERRCODE_SYNTAX_ERROR),
816  errmsg("conflicting or redundant options")));
817 
818  snapshot_action_given = true;
819  *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
821  }
822  else if (strcmp(defel->defname, "use_snapshot") == 0)
823  {
824  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
825  ereport(ERROR,
826  (errcode(ERRCODE_SYNTAX_ERROR),
827  errmsg("conflicting or redundant options")));
828 
829  snapshot_action_given = true;
830  *snapshot_action = CRS_USE_SNAPSHOT;
831  }
832  else if (strcmp(defel->defname, "reserve_wal") == 0)
833  {
834  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
835  ereport(ERROR,
836  (errcode(ERRCODE_SYNTAX_ERROR),
837  errmsg("conflicting or redundant options")));
838 
839  reserve_wal_given = true;
840  *reserve_wal = true;
841  }
842  else
843  elog(ERROR, "unrecognized option: %s", defel->defname);
844  }
845 }
846 
847 /*
848  * Create a new replication slot.
849  */
850 static void
852 {
853  const char *snapshot_name = NULL;
854  char xloc[MAXFNAMELEN];
855  char *slot_name;
856  bool reserve_wal = false;
857  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
859  TupOutputState *tstate;
860  TupleDesc tupdesc;
861  Datum values[4];
862  bool nulls[4];
863 
865 
866  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
867 
868  /* setup state for XLogRead */
869  sendTimeLineIsHistoric = false;
871 
872  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
873  {
874  ReplicationSlotCreate(cmd->slotname, false,
876  }
877  else
878  {
880 
881  /*
882  * Initially create persistent slot as ephemeral - that allows us to
883  * nicely handle errors during initialization because it'll get
884  * dropped if this transaction fails. We'll make it persistent at the
885  * end. Temporary slots can be created as temporary from beginning as
886  * they get dropped on error as well.
887  */
888  ReplicationSlotCreate(cmd->slotname, true,
890  }
891 
892  if (cmd->kind == REPLICATION_KIND_LOGICAL)
893  {
895  bool need_full_snapshot = false;
896 
897  /*
898  * Do options check early so that we can bail before calling the
899  * DecodingContextFindStartpoint which can take long time.
900  */
901  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
902  {
903  if (IsTransactionBlock())
904  ereport(ERROR,
905  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
906  (errmsg("%s must not be called inside a transaction",
907  "CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT")));
908 
909  need_full_snapshot = true;
910  }
911  else if (snapshot_action == CRS_USE_SNAPSHOT)
912  {
913  if (!IsTransactionBlock())
914  ereport(ERROR,
915  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
916  (errmsg("%s must be called inside a transaction",
917  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
918 
920  ereport(ERROR,
921  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
922  (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
923  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
924 
925  if (FirstSnapshotSet)
926  ereport(ERROR,
927  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
928  (errmsg("%s must be called before any query",
929  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
930 
931  if (IsSubTransaction())
932  ereport(ERROR,
933  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
934  (errmsg("%s must not be called in a subtransaction",
935  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
936 
937  need_full_snapshot = true;
938  }
939 
940  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
945 
946  /*
947  * Signal that we don't need the timeout mechanism. We're just
948  * creating the replication slot and don't yet accept feedback
949  * messages or send keepalives. As we possibly need to wait for
950  * further WAL the walsender would otherwise possibly be killed too
951  * soon.
952  */
954 
955  /* build initial snapshot, might take a while */
957 
958  /*
959  * Export or use the snapshot if we've been asked to do so.
960  *
961  * NB. We will convert the snapbuild.c kind of snapshot to normal
962  * snapshot when doing this.
963  */
964  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
965  {
966  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
967  }
968  else if (snapshot_action == CRS_USE_SNAPSHOT)
969  {
970  Snapshot snap;
971 
974  }
975 
976  /* don't need the decoding context anymore */
977  FreeDecodingContext(ctx);
978 
979  if (!cmd->temporary)
981  }
982  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
983  {
985 
987 
988  /* Write this slot to disk if it's a permanent one. */
989  if (!cmd->temporary)
991  }
992 
993  snprintf(xloc, sizeof(xloc), "%X/%X",
996 
998  MemSet(nulls, false, sizeof(nulls));
999 
1000  /*----------
1001  * Need a tuple descriptor representing four columns:
1002  * - first field: the slot name
1003  * - second field: LSN at which we became consistent
1004  * - third field: exported snapshot's name
1005  * - fourth field: output plugin
1006  *----------
1007  */
1008  tupdesc = CreateTemplateTupleDesc(4);
1009  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1010  TEXTOID, -1, 0);
1011  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1012  TEXTOID, -1, 0);
1013  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1014  TEXTOID, -1, 0);
1015  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1016  TEXTOID, -1, 0);
1017 
1018  /* prepare for projection of tuples */
1019  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1020 
1021  /* slot_name */
1022  slot_name = NameStr(MyReplicationSlot->data.name);
1023  values[0] = CStringGetTextDatum(slot_name);
1024 
1025  /* consistent wal location */
1026  values[1] = CStringGetTextDatum(xloc);
1027 
1028  /* snapshot name, or NULL if none */
1029  if (snapshot_name != NULL)
1030  values[2] = CStringGetTextDatum(snapshot_name);
1031  else
1032  nulls[2] = true;
1033 
1034  /* plugin, or NULL if none */
1035  if (cmd->plugin != NULL)
1036  values[3] = CStringGetTextDatum(cmd->plugin);
1037  else
1038  nulls[3] = true;
1039 
1040  /* send it to dest */
1041  do_tup_output(tstate, values, nulls);
1042  end_tup_output(tstate);
1043 
1045 }
1046 
1047 /*
1048  * Get rid of a replication slot that is no longer wanted.
1049  */
1050 static void
1052 {
1053  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1054  EndCommand("DROP_REPLICATION_SLOT", DestRemote);
1055 }
1056 
1057 /*
1058  * Load previously initiated logical slot and prepare for sending data (via
1059  * WalSndLoop).
1060  */
1061 static void
1063 {
1065 
1066  /* make sure that our requirements are still fulfilled */
1068 
1070 
1071  ReplicationSlotAcquire(cmd->slotname, true);
1072 
1073  /*
1074  * Force a disconnect, so that the decoding code doesn't need to care
1075  * about an eventual switch from running in recovery, to running in a
1076  * normal environment. Client code is expected to handle reconnects.
1077  */
1079  {
1080  ereport(LOG,
1081  (errmsg("terminating walsender process after promotion")));
1082  got_STOPPING = true;
1083  }
1084 
1085  /*
1086  * Create our decoding context, making it start at the previously ack'ed
1087  * position.
1088  *
1089  * Do this before sending a CopyBothResponse message, so that any errors
1090  * are reported early.
1091  */
1092  logical_decoding_ctx =
1093  CreateDecodingContext(cmd->startpoint, cmd->options, false,
1097 
1098 
1100 
1101  /* Send a CopyBothResponse message, and start streaming */
1102  pq_beginmessage(&buf, 'W');
1103  pq_sendbyte(&buf, 0);
1104  pq_sendint16(&buf, 0);
1105  pq_endmessage(&buf);
1106  pq_flush();
1107 
1108 
1109  /* Start reading WAL from the oldest required WAL. */
1111 
1112  /*
1113  * Report the location after which we'll send out further commits as the
1114  * current sentPtr.
1115  */
1117 
1118  /* Also update the sent position status in shared memory */
1119  SpinLockAcquire(&MyWalSnd->mutex);
1121  SpinLockRelease(&MyWalSnd->mutex);
1122 
1123  replication_active = true;
1124 
1126 
1127  /* Main loop of walsender */
1129 
1130  FreeDecodingContext(logical_decoding_ctx);
1132 
1133  replication_active = false;
1134  if (got_STOPPING)
1135  proc_exit(0);
1137 
1138  /* Get out of COPY mode (CommandComplete). */
1139  EndCommand("COPY 0", DestRemote);
1140 }
1141 
1142 /*
1143  * LogicalDecodingContext 'prepare_write' callback.
1144  *
1145  * Prepare a write into a StringInfo.
1146  *
1147  * Don't do anything lasting in here, it's quite possible that nothing will be done
1148  * with the data.
1149  */
1150 static void
1152 {
1153  /* can't have sync rep confused by sending the same LSN several times */
1154  if (!last_write)
1155  lsn = InvalidXLogRecPtr;
1156 
1157  resetStringInfo(ctx->out);
1158 
1159  pq_sendbyte(ctx->out, 'w');
1160  pq_sendint64(ctx->out, lsn); /* dataStart */
1161  pq_sendint64(ctx->out, lsn); /* walEnd */
1162 
1163  /*
1164  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1165  * reserve space here.
1166  */
1167  pq_sendint64(ctx->out, 0); /* sendtime */
1168 }
1169 
1170 /*
1171  * LogicalDecodingContext 'write' callback.
1172  *
1173  * Actually write out data previously prepared by WalSndPrepareWrite out to
1174  * the network. Take as long as needed, but process replies from the other
1175  * side and check timeouts during that.
1176  */
1177 static void
1179  bool last_write)
1180 {
1181  TimestampTz now;
1182 
1183  /* output previously gathered data in a CopyData packet */
1184  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1185 
1186  /*
1187  * Fill the send timestamp last, so that it is taken as late as possible.
1188  * This is somewhat ugly, but the protocol is set as it's already used for
1189  * several releases by streaming physical replication.
1190  */
1191  resetStringInfo(&tmpbuf);
1192  now = GetCurrentTimestamp();
1193  pq_sendint64(&tmpbuf, now);
1194  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1195  tmpbuf.data, sizeof(int64));
1196 
1198 
1199  /* Try to flush pending output to the client */
1200  if (pq_flush_if_writable() != 0)
1201  WalSndShutdown();
1202 
1203  /* Try taking fast path unless we get too close to walsender timeout. */
1205  wal_sender_timeout / 2) &&
1206  !pq_is_send_pending())
1207  {
1208  return;
1209  }
1210 
1211  /* If we have pending write here, go to slow path */
1212  for (;;)
1213  {
1214  int wakeEvents;
1215  long sleeptime;
1216 
1217  /* Check for input from the client */
1219 
1220  /* die if timeout was reached */
1222 
1223  /* Send keepalive if the time has come */
1225 
1226  if (!pq_is_send_pending())
1227  break;
1228 
1230 
1231  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1233 
1234  /* Sleep until something happens or we time out */
1235  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
1236  MyProcPort->sock, sleeptime,
1238 
1239  /* Clear any already-pending wakeups */
1241 
1243 
1244  /* Process any requests or signals received recently */
1245  if (ConfigReloadPending)
1246  {
1247  ConfigReloadPending = false;
1250  }
1251 
1252  /* Try to flush pending output to the client */
1253  if (pq_flush_if_writable() != 0)
1254  WalSndShutdown();
1255  }
1256 
1257  /* reactivate latch so WalSndLoop knows to continue */
1258  SetLatch(MyLatch);
1259 }
1260 
1261 /*
1262  * LogicalDecodingContext 'update_progress' callback.
1263  *
1264  * Write the current position to the lag tracker (see XLogSendPhysical).
1265  */
1266 static void
1268 {
1269  static TimestampTz sendTime = 0;
1271 
1272  /*
1273  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1274  * avoid flooding the lag tracker when we commit frequently.
1275  */
1276 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1277  if (!TimestampDifferenceExceeds(sendTime, now,
1279  return;
1280 
1281  LagTrackerWrite(lsn, now);
1282  sendTime = now;
1283 }
1284 
1285 /*
1286  * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
1287  *
1288  * Returns end LSN of flushed WAL. Normally this will be >= loc, but
1289  * if we detect a shutdown request (either from postmaster or client)
1290  * we will return early, so caller must always check.
1291  */
1292 static XLogRecPtr
1294 {
1295  int wakeEvents;
1296  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1297 
1298  /*
1299  * Fast path to avoid acquiring the spinlock in case we already know we
1300  * have enough WAL available. This is particularly interesting if we're
1301  * far behind.
1302  */
1303  if (RecentFlushPtr != InvalidXLogRecPtr &&
1304  loc <= RecentFlushPtr)
1305  return RecentFlushPtr;
1306 
1307  /* Get a more recent flush pointer. */
1308  if (!RecoveryInProgress())
1309  RecentFlushPtr = GetFlushRecPtr();
1310  else
1311  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1312 
1313  for (;;)
1314  {
1315  long sleeptime;
1316 
1317  /* Clear any already-pending wakeups */
1319 
1321 
1322  /* Process any requests or signals received recently */
1323  if (ConfigReloadPending)
1324  {
1325  ConfigReloadPending = false;
1328  }
1329 
1330  /* Check for input from the client */
1332 
1333  /*
1334  * If we're shutting down, trigger pending WAL to be written out,
1335  * otherwise we'd possibly end up waiting for WAL that never gets
1336  * written, because walwriter has shut down already.
1337  */
1338  if (got_STOPPING)
1340 
1341  /* Update our idea of the currently flushed position. */
1342  if (!RecoveryInProgress())
1343  RecentFlushPtr = GetFlushRecPtr();
1344  else
1345  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1346 
1347  /*
1348  * If postmaster asked us to stop, don't wait anymore.
1349  *
1350  * It's important to do this check after the recomputation of
1351  * RecentFlushPtr, so we can send all remaining data before shutting
1352  * down.
1353  */
1354  if (got_STOPPING)
1355  break;
1356 
1357  /*
1358  * We only send regular messages to the client for full decoded
1359  * transactions, but a synchronous replication and walsender shutdown
1360  * possibly are waiting for a later location. So we send pings
1361  * containing the flush location every now and then.
1362  */
1363  if (MyWalSnd->flush < sentPtr &&
1364  MyWalSnd->write < sentPtr &&
1366  {
1367  WalSndKeepalive(false);
1369  }
1370 
1371  /* check whether we're done */
1372  if (loc <= RecentFlushPtr)
1373  break;
1374 
1375  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1376  WalSndCaughtUp = true;
1377 
1378  /*
1379  * Try to flush any pending output to the client.
1380  */
1381  if (pq_flush_if_writable() != 0)
1382  WalSndShutdown();
1383 
1384  /*
1385  * If we have received CopyDone from the client, sent CopyDone
1386  * ourselves, and the output buffer is empty, it's time to exit
1387  * streaming, so fail the current WAL fetch request.
1388  */
1390  !pq_is_send_pending())
1391  break;
1392 
1393  /* die if timeout was reached */
1395 
1396  /* Send keepalive if the time has come */
1398 
1399  /*
1400  * Sleep until something happens or we time out. Also wait for the
1401  * socket becoming writable, if there's still pending output.
1402  * Otherwise we might sit on sendable output data while waiting for
1403  * new WAL to be generated. (But if we have nothing to send, we don't
1404  * want to wake on socket-writable.)
1405  */
1407 
1408  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1410 
1411  if (pq_is_send_pending())
1412  wakeEvents |= WL_SOCKET_WRITEABLE;
1413 
1414  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
1415  MyProcPort->sock, sleeptime,
1417  }
1418 
1419  /* reactivate latch so WalSndLoop knows to continue */
1420  SetLatch(MyLatch);
1421  return RecentFlushPtr;
1422 }
1423 
1424 /*
1425  * Execute an incoming replication command.
1426  *
1427  * Returns true if the cmd_string was recognized as WalSender command, false
1428  * if not.
1429  */
1430 bool
1431 exec_replication_command(const char *cmd_string)
1432 {
1433  int parse_rc;
1434  Node *cmd_node;
1435  MemoryContext cmd_context;
1436  MemoryContext old_context;
1437 
1438  /*
1439  * If WAL sender has been told that shutdown is getting close, switch its
1440  * status accordingly to handle the next replication commands correctly.
1441  */
1442  if (got_STOPPING)
1444 
1445  /*
1446  * Throw error if in stopping mode. We need prevent commands that could
1447  * generate WAL while the shutdown checkpoint is being written. To be
1448  * safe, we just prohibit all new commands.
1449  */
1450  if (MyWalSnd->state == WALSNDSTATE_STOPPING)
1451  ereport(ERROR,
1452  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1453 
1454  /*
1455  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1456  * command arrives. Clean up the old stuff if there's anything.
1457  */
1459 
1461 
1463  "Replication command context",
1465  old_context = MemoryContextSwitchTo(cmd_context);
1466 
1467  replication_scanner_init(cmd_string);
1468  parse_rc = replication_yyparse();
1469  if (parse_rc != 0)
1470  ereport(ERROR,
1471  (errcode(ERRCODE_SYNTAX_ERROR),
1472  (errmsg_internal("replication command parser returned %d",
1473  parse_rc))));
1474 
1475  cmd_node = replication_parse_result;
1476 
1477  /*
1478  * Log replication command if log_replication_commands is enabled. Even
1479  * when it's disabled, log the command with DEBUG1 level for backward
1480  * compatibility. Note that SQL commands are not logged here, and will be
1481  * logged later if log_statement is enabled.
1482  */
1483  if (cmd_node->type != T_SQLCmd)
1485  (errmsg("received replication command: %s", cmd_string)));
1486 
1487  /*
1488  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
1489  * called outside of transaction the snapshot should be cleared here.
1490  */
1491  if (!IsTransactionBlock())
1493 
1494  /*
1495  * For aborted transactions, don't allow anything except pure SQL, the
1496  * exec_simple_query() will handle it correctly.
1497  */
1498  if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
1499  ereport(ERROR,
1500  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1501  errmsg("current transaction is aborted, "
1502  "commands ignored until end of transaction block")));
1503 
1505 
1506  /*
1507  * Allocate buffers that will be used for each outgoing and incoming
1508  * message. We do this just once per command to reduce palloc overhead.
1509  */
1510  initStringInfo(&output_message);
1511  initStringInfo(&reply_message);
1512  initStringInfo(&tmpbuf);
1513 
1514  /* Report to pgstat that this process is running */
1516 
1517  switch (cmd_node->type)
1518  {
1519  case T_IdentifySystemCmd:
1520  IdentifySystem();
1521  break;
1522 
1523  case T_BaseBackupCmd:
1524  PreventInTransactionBlock(true, "BASE_BACKUP");
1525  SendBaseBackup((BaseBackupCmd *) cmd_node);
1526  break;
1527 
1530  break;
1531 
1534  break;
1535 
1536  case T_StartReplicationCmd:
1537  {
1538  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1539 
1540  PreventInTransactionBlock(true, "START_REPLICATION");
1541 
1542  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1543  StartReplication(cmd);
1544  else
1546  break;
1547  }
1548 
1549  case T_TimeLineHistoryCmd:
1550  PreventInTransactionBlock(true, "TIMELINE_HISTORY");
1552  break;
1553 
1554  case T_VariableShowStmt:
1555  {
1557  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1558 
1559  /* syscache access needs a transaction environment */
1561  GetPGVariable(n->name, dest);
1563  }
1564  break;
1565 
1566  case T_SQLCmd:
1567  if (MyDatabaseId == InvalidOid)
1568  ereport(ERROR,
1569  (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1570 
1571  /* Report to pgstat that this process is now idle */
1573 
1574  /* Tell the caller that this wasn't a WalSender command. */
1575  return false;
1576 
1577  default:
1578  elog(ERROR, "unrecognized replication command node tag: %u",
1579  cmd_node->type);
1580  }
1581 
1582  /* done */
1583  MemoryContextSwitchTo(old_context);
1584  MemoryContextDelete(cmd_context);
1585 
1586  /* Send CommandComplete message */
1587  EndCommand("SELECT", DestRemote);
1588 
1589  /* Report to pgstat that this process is now idle */
1591 
1592  return true;
1593 }
1594 
1595 /*
1596  * Process any incoming messages while streaming. Also checks if the remote
1597  * end has closed the connection.
1598  */
1599 static void
1601 {
1602  unsigned char firstchar;
1603  int r;
1604  bool received = false;
1605 
1607 
1608  for (;;)
1609  {
1610  pq_startmsgread();
1611  r = pq_getbyte_if_available(&firstchar);
1612  if (r < 0)
1613  {
1614  /* unexpected error or EOF */
1616  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1617  errmsg("unexpected EOF on standby connection")));
1618  proc_exit(0);
1619  }
1620  if (r == 0)
1621  {
1622  /* no data available without blocking */
1623  pq_endmsgread();
1624  break;
1625  }
1626 
1627  /* Read the message contents */
1628  resetStringInfo(&reply_message);
1629  if (pq_getmessage(&reply_message, 0))
1630  {
1632  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1633  errmsg("unexpected EOF on standby connection")));
1634  proc_exit(0);
1635  }
1636 
1637  /*
1638  * If we already received a CopyDone from the frontend, the frontend
1639  * should not send us anything until we've closed our end of the COPY.
1640  * XXX: In theory, the frontend could already send the next command
1641  * before receiving the CopyDone, but libpq doesn't currently allow
1642  * that.
1643  */
1644  if (streamingDoneReceiving && firstchar != 'X')
1645  ereport(FATAL,
1646  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1647  errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
1648  firstchar)));
1649 
1650  /* Handle the very limited subset of commands expected in this phase */
1651  switch (firstchar)
1652  {
1653  /*
1654  * 'd' means a standby reply wrapped in a CopyData packet.
1655  */
1656  case 'd':
1658  received = true;
1659  break;
1660 
1661  /*
1662  * CopyDone means the standby requested to finish streaming.
1663  * Reply with CopyDone, if we had not sent that already.
1664  */
1665  case 'c':
1666  if (!streamingDoneSending)
1667  {
1668  pq_putmessage_noblock('c', NULL, 0);
1669  streamingDoneSending = true;
1670  }
1671 
1672  streamingDoneReceiving = true;
1673  received = true;
1674  break;
1675 
1676  /*
1677  * 'X' means that the standby is closing down the socket.
1678  */
1679  case 'X':
1680  proc_exit(0);
1681 
1682  default:
1683  ereport(FATAL,
1684  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1685  errmsg("invalid standby message type \"%c\"",
1686  firstchar)));
1687  }
1688  }
1689 
1690  /*
1691  * Save the last reply timestamp if we've received at least one reply.
1692  */
1693  if (received)
1694  {
1696  waiting_for_ping_response = false;
1697  }
1698 }
1699 
1700 /*
1701  * Process a status update message received from standby.
1702  */
1703 static void
1705 {
1706  char msgtype;
1707 
1708  /*
1709  * Check message type from the first byte.
1710  */
1711  msgtype = pq_getmsgbyte(&reply_message);
1712 
1713  switch (msgtype)
1714  {
1715  case 'r':
1717  break;
1718 
1719  case 'h':
1721  break;
1722 
1723  default:
1725  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1726  errmsg("unexpected message type \"%c\"", msgtype)));
1727  proc_exit(0);
1728  }
1729 }
1730 
1731 /*
1732  * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
1733  */
1734 static void
1736 {
1737  bool changed = false;
1739 
1740  Assert(lsn != InvalidXLogRecPtr);
1741  SpinLockAcquire(&slot->mutex);
1742  if (slot->data.restart_lsn != lsn)
1743  {
1744  changed = true;
1745  slot->data.restart_lsn = lsn;
1746  }
1747  SpinLockRelease(&slot->mutex);
1748 
1749  if (changed)
1750  {
1753  }
1754 
1755  /*
1756  * One could argue that the slot should be saved to disk now, but that'd
1757  * be energy wasted - the worst lost information can do here is give us
1758  * wrong information in a statistics view - we'll just potentially be more
1759  * conservative in removing files.
1760  */
1761 }
1762 
1763 /*
1764  * Regular reply from standby advising of WAL locations on standby server.
1765  */
1766 static void
1768 {
1769  XLogRecPtr writePtr,
1770  flushPtr,
1771  applyPtr;
1772  bool replyRequested;
1773  TimeOffset writeLag,
1774  flushLag,
1775  applyLag;
1776  bool clearLagTimes;
1777  TimestampTz now;
1778  TimestampTz replyTime;
1779 
1780  static bool fullyAppliedLastTime = false;
1781 
1782  /* the caller already consumed the msgtype byte */
1783  writePtr = pq_getmsgint64(&reply_message);
1784  flushPtr = pq_getmsgint64(&reply_message);
1785  applyPtr = pq_getmsgint64(&reply_message);
1786  replyTime = pq_getmsgint64(&reply_message);
1787  replyRequested = pq_getmsgbyte(&reply_message);
1788 
1789  if (log_min_messages <= DEBUG2)
1790  {
1791  char *replyTimeStr;
1792 
1793  /* Copy because timestamptz_to_str returns a static buffer */
1794  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
1795 
1796  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
1797  (uint32) (writePtr >> 32), (uint32) writePtr,
1798  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1799  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1800  replyRequested ? " (reply requested)" : "",
1801  replyTimeStr);
1802 
1803  pfree(replyTimeStr);
1804  }
1805 
1806  /* See if we can compute the round-trip lag for these positions. */
1807  now = GetCurrentTimestamp();
1808  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1809  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1810  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1811 
1812  /*
1813  * If the standby reports that it has fully replayed the WAL in two
1814  * consecutive reply messages, then the second such message must result
1815  * from wal_receiver_status_interval expiring on the standby. This is a
1816  * convenient time to forget the lag times measured when it last
1817  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1818  * until more WAL traffic arrives.
1819  */
1820  clearLagTimes = false;
1821  if (applyPtr == sentPtr)
1822  {
1823  if (fullyAppliedLastTime)
1824  clearLagTimes = true;
1825  fullyAppliedLastTime = true;
1826  }
1827  else
1828  fullyAppliedLastTime = false;
1829 
1830  /* Send a reply if the standby requested one. */
1831  if (replyRequested)
1832  WalSndKeepalive(false);
1833 
1834  /*
1835  * Update shared state for this WalSender process based on reply data from
1836  * standby.
1837  */
1838  {
1839  WalSnd *walsnd = MyWalSnd;
1840 
1841  SpinLockAcquire(&walsnd->mutex);
1842  walsnd->write = writePtr;
1843  walsnd->flush = flushPtr;
1844  walsnd->apply = applyPtr;
1845  if (writeLag != -1 || clearLagTimes)
1846  walsnd->writeLag = writeLag;
1847  if (flushLag != -1 || clearLagTimes)
1848  walsnd->flushLag = flushLag;
1849  if (applyLag != -1 || clearLagTimes)
1850  walsnd->applyLag = applyLag;
1851  walsnd->replyTime = replyTime;
1852  SpinLockRelease(&walsnd->mutex);
1853  }
1854 
1857 
1858  /*
1859  * Advance our local xmin horizon when the client confirmed a flush.
1860  */
1861  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1862  {
1865  else
1867  }
1868 }
1869 
1870 /* compute new replication slot xmin horizon if needed */
1871 static void
1873 {
1874  bool changed = false;
1876 
1877  SpinLockAcquire(&slot->mutex);
1879 
1880  /*
1881  * For physical replication we don't need the interlock provided by xmin
1882  * and effective_xmin since the consequences of a missed increase are
1883  * limited to query cancellations, so set both at once.
1884  */
1885  if (!TransactionIdIsNormal(slot->data.xmin) ||
1886  !TransactionIdIsNormal(feedbackXmin) ||
1887  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1888  {
1889  changed = true;
1890  slot->data.xmin = feedbackXmin;
1891  slot->effective_xmin = feedbackXmin;
1892  }
1893  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
1894  !TransactionIdIsNormal(feedbackCatalogXmin) ||
1895  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
1896  {
1897  changed = true;
1898  slot->data.catalog_xmin = feedbackCatalogXmin;
1899  slot->effective_catalog_xmin = feedbackCatalogXmin;
1900  }
1901  SpinLockRelease(&slot->mutex);
1902 
1903  if (changed)
1904  {
1907  }
1908 }
1909 
1910 /*
1911  * Check that the provided xmin/epoch are sane, that is, not in the future
1912  * and not so far back as to be already wrapped around.
1913  *
1914  * Epoch of nextXid should be same as standby, or if the counter has
1915  * wrapped, then one greater than standby.
1916  *
1917  * This check doesn't care about whether clog exists for these xids
1918  * at all.
1919  */
1920 static bool
1922 {
1923  FullTransactionId nextFullXid;
1924  TransactionId nextXid;
1925  uint32 nextEpoch;
1926 
1927  nextFullXid = ReadNextFullTransactionId();
1928  nextXid = XidFromFullTransactionId(nextFullXid);
1929  nextEpoch = EpochFromFullTransactionId(nextFullXid);
1930 
1931  if (xid <= nextXid)
1932  {
1933  if (epoch != nextEpoch)
1934  return false;
1935  }
1936  else
1937  {
1938  if (epoch + 1 != nextEpoch)
1939  return false;
1940  }
1941 
1942  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
1943  return false; /* epoch OK, but it's wrapped around */
1944 
1945  return true;
1946 }
1947 
1948 /*
1949  * Hot Standby feedback
1950  */
1951 static void
1953 {
1954  TransactionId feedbackXmin;
1955  uint32 feedbackEpoch;
1956  TransactionId feedbackCatalogXmin;
1957  uint32 feedbackCatalogEpoch;
1958  TimestampTz replyTime;
1959 
1960  /*
1961  * Decipher the reply message. The caller already consumed the msgtype
1962  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
1963  * of this message.
1964  */
1965  replyTime = pq_getmsgint64(&reply_message);
1966  feedbackXmin = pq_getmsgint(&reply_message, 4);
1967  feedbackEpoch = pq_getmsgint(&reply_message, 4);
1968  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
1969  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
1970 
1971  if (log_min_messages <= DEBUG2)
1972  {
1973  char *replyTimeStr;
1974 
1975  /* Copy because timestamptz_to_str returns a static buffer */
1976  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
1977 
1978  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
1979  feedbackXmin,
1980  feedbackEpoch,
1981  feedbackCatalogXmin,
1982  feedbackCatalogEpoch,
1983  replyTimeStr);
1984 
1985  pfree(replyTimeStr);
1986  }
1987 
1988  /*
1989  * Update shared state for this WalSender process based on reply data from
1990  * standby.
1991  */
1992  {
1993  WalSnd *walsnd = MyWalSnd;
1994 
1995  SpinLockAcquire(&walsnd->mutex);
1996  walsnd->replyTime = replyTime;
1997  SpinLockRelease(&walsnd->mutex);
1998  }
1999 
2000  /*
2001  * Unset WalSender's xmins if the feedback message values are invalid.
2002  * This happens when the downstream turned hot_standby_feedback off.
2003  */
2004  if (!TransactionIdIsNormal(feedbackXmin)
2005  && !TransactionIdIsNormal(feedbackCatalogXmin))
2006  {
2008  if (MyReplicationSlot != NULL)
2009  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2010  return;
2011  }
2012 
2013  /*
2014  * Check that the provided xmin/epoch are sane, that is, not in the future
2015  * and not so far back as to be already wrapped around. Ignore if not.
2016  */
2017  if (TransactionIdIsNormal(feedbackXmin) &&
2018  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2019  return;
2020 
2021  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2022  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2023  return;
2024 
2025  /*
2026  * Set the WalSender's xmin equal to the standby's requested xmin, so that
2027  * the xmin will be taken into account by GetOldestXmin. This will hold
2028  * back the removal of dead rows and thereby prevent the generation of
2029  * cleanup conflicts on the standby server.
2030  *
2031  * There is a small window for a race condition here: although we just
2032  * checked that feedbackXmin precedes nextXid, the nextXid could have
2033  * gotten advanced between our fetching it and applying the xmin below,
2034  * perhaps far enough to make feedbackXmin wrap around. In that case the
2035  * xmin we set here would be "in the future" and have no effect. No point
2036  * in worrying about this since it's too late to save the desired data
2037  * anyway. Assuming that the standby sends us an increasing sequence of
2038  * xmins, this could only happen during the first reply cycle, else our
2039  * own xmin would prevent nextXid from advancing so far.
2040  *
2041  * We don't bother taking the ProcArrayLock here. Setting the xmin field
2042  * is assumed atomic, and there's no real need to prevent a concurrent
2043  * GetOldestXmin. (If we're moving our xmin forward, this is obviously
2044  * safe, and if we're moving it backwards, well, the data is at risk
2045  * already since a VACUUM could have just finished calling GetOldestXmin.)
2046  *
2047  * If we're using a replication slot we reserve the xmin via that,
2048  * otherwise via the walsender's PGXACT entry. We can only track the
2049  * catalog xmin separately when using a slot, so we store the least of the
2050  * two provided when not using a slot.
2051  *
2052  * XXX: It might make sense to generalize the ephemeral slot concept and
2053  * always use the slot mechanism to handle the feedback xmin.
2054  */
2055  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2056  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2057  else
2058  {
2059  if (TransactionIdIsNormal(feedbackCatalogXmin)
2060  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2061  MyPgXact->xmin = feedbackCatalogXmin;
2062  else
2063  MyPgXact->xmin = feedbackXmin;
2064  }
2065 }
2066 
2067 /*
2068  * Compute how long send/receive loops should sleep.
2069  *
2070  * If wal_sender_timeout is enabled we want to wake up in time to send
2071  * keepalives and to abort the connection if wal_sender_timeout has been
2072  * reached.
2073  */
2074 static long
2076 {
2077  long sleeptime = 10000; /* 10 s */
2078 
2080  {
2081  TimestampTz wakeup_time;
2082  long sec_to_timeout;
2083  int microsec_to_timeout;
2084 
2085  /*
2086  * At the latest stop sleeping once wal_sender_timeout has been
2087  * reached.
2088  */
2091 
2092  /*
2093  * If no ping has been sent yet, wakeup when it's time to do so.
2094  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2095  * the timeout passed without a response.
2096  */
2099  wal_sender_timeout / 2);
2100 
2101  /* Compute relative time until wakeup. */
2102  TimestampDifference(now, wakeup_time,
2103  &sec_to_timeout, &microsec_to_timeout);
2104 
2105  sleeptime = sec_to_timeout * 1000 +
2106  microsec_to_timeout / 1000;
2107  }
2108 
2109  return sleeptime;
2110 }
2111 
2112 /*
2113  * Check whether there have been responses by the client within
2114  * wal_sender_timeout and shutdown if not. Using last_processing as the
2115  * reference point avoids counting server-side stalls against the client.
2116  * However, a long server-side stall can make WalSndKeepaliveIfNecessary()
2117  * postdate last_processing by more than wal_sender_timeout. If that happens,
2118  * the client must reply almost immediately to avoid a timeout. This rarely
2119  * affects the default configuration, under which clients spontaneously send a
2120  * message every standby_message_timeout = wal_sender_timeout/6 = 10s. We
2121  * could eliminate that problem by recognizing timeout expiration at
2122  * wal_sender_timeout/2 after the keepalive.
2123  */
2124 static void
2126 {
2127  TimestampTz timeout;
2128 
2129  /* don't bail out if we're doing something that doesn't require timeouts */
2130  if (last_reply_timestamp <= 0)
2131  return;
2132 
2135 
2136  if (wal_sender_timeout > 0 && last_processing >= timeout)
2137  {
2138  /*
2139  * Since typically expiration of replication timeout means
2140  * communication problem, we don't send the error message to the
2141  * standby.
2142  */
2144  (errmsg("terminating walsender process due to replication timeout")));
2145 
2146  WalSndShutdown();
2147  }
2148 }
2149 
2150 /* Main loop of walsender process that streams the WAL over Copy messages. */
2151 static void
2153 {
2154  /*
2155  * Initialize the last reply timestamp. That enables timeout processing
2156  * from hereon.
2157  */
2159  waiting_for_ping_response = false;
2160 
2161  /*
2162  * Loop until we reach the end of this timeline or the client requests to
2163  * stop streaming.
2164  */
2165  for (;;)
2166  {
2167  /* Clear any already-pending wakeups */
2169 
2171 
2172  /* Process any requests or signals received recently */
2173  if (ConfigReloadPending)
2174  {
2175  ConfigReloadPending = false;
2178  }
2179 
2180  /* Check for input from the client */
2182 
2183  /*
2184  * If we have received CopyDone from the client, sent CopyDone
2185  * ourselves, and the output buffer is empty, it's time to exit
2186  * streaming.
2187  */
2189  !pq_is_send_pending())
2190  break;
2191 
2192  /*
2193  * If we don't have any pending data in the output buffer, try to send
2194  * some more. If there is some, we don't bother to call send_data
2195  * again until we've flushed it ... but we'd better assume we are not
2196  * caught up.
2197  */
2198  if (!pq_is_send_pending())
2199  send_data();
2200  else
2201  WalSndCaughtUp = false;
2202 
2203  /* Try to flush pending output to the client */
2204  if (pq_flush_if_writable() != 0)
2205  WalSndShutdown();
2206 
2207  /* If nothing remains to be sent right now ... */
2209  {
2210  /*
2211  * If we're in catchup state, move to streaming. This is an
2212  * important state change for users to know about, since before
2213  * this point data loss might occur if the primary dies and we
2214  * need to failover to the standby. The state change is also
2215  * important for synchronous replication, since commits that
2216  * started to wait at that point might wait for some time.
2217  */
2218  if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
2219  {
2220  ereport(DEBUG1,
2221  (errmsg("\"%s\" has now caught up with upstream server",
2222  application_name)));
2224  }
2225 
2226  /*
2227  * When SIGUSR2 arrives, we send any outstanding logs up to the
2228  * shutdown checkpoint record (i.e., the latest record), wait for
2229  * them to be replicated to the standby, and exit. This may be a
2230  * normal termination at shutdown, or a promotion, the walsender
2231  * is not sure which.
2232  */
2233  if (got_SIGUSR2)
2234  WalSndDone(send_data);
2235  }
2236 
2237  /* Check for replication timeout. */
2239 
2240  /* Send keepalive if the time has come */
2242 
2243  /*
2244  * We don't block if not caught up, unless there is unsent data
2245  * pending in which case we'd better block until the socket is
2246  * write-ready. This test is only needed for the case where the
2247  * send_data callback handled a subset of the available data but then
2248  * pq_flush_if_writable flushed it all --- we should immediately try
2249  * to send more.
2250  */
2252  {
2253  long sleeptime;
2254  int wakeEvents;
2255 
2256  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT |
2258 
2259  /*
2260  * Use fresh timestamp, not last_processing, to reduce the chance
2261  * of reaching wal_sender_timeout before sending a keepalive.
2262  */
2264 
2265  if (pq_is_send_pending())
2266  wakeEvents |= WL_SOCKET_WRITEABLE;
2267 
2268  /* Sleep until something happens or we time out */
2269  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
2270  MyProcPort->sock, sleeptime,
2272  }
2273  }
2274  return;
2275 }
2276 
2277 /* Initialize a per-walsender data structure for this walsender process */
2278 static void
2280 {
2281  int i;
2282 
2283  /*
2284  * WalSndCtl should be set up already (we inherit this by fork() or
2285  * EXEC_BACKEND mechanism from the postmaster).
2286  */
2287  Assert(WalSndCtl != NULL);
2288  Assert(MyWalSnd == NULL);
2289 
2290  /*
2291  * Find a free walsender slot and reserve it. This must not fail due to
2292  * the prior check for free WAL senders in InitProcess().
2293  */
2294  for (i = 0; i < max_wal_senders; i++)
2295  {
2296  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2297 
2298  SpinLockAcquire(&walsnd->mutex);
2299 
2300  if (walsnd->pid != 0)
2301  {
2302  SpinLockRelease(&walsnd->mutex);
2303  continue;
2304  }
2305  else
2306  {
2307  /*
2308  * Found a free slot. Reserve it for us.
2309  */
2310  walsnd->pid = MyProcPid;
2311  walsnd->sentPtr = InvalidXLogRecPtr;
2312  walsnd->write = InvalidXLogRecPtr;
2313  walsnd->flush = InvalidXLogRecPtr;
2314  walsnd->apply = InvalidXLogRecPtr;
2315  walsnd->writeLag = -1;
2316  walsnd->flushLag = -1;
2317  walsnd->applyLag = -1;
2318  walsnd->state = WALSNDSTATE_STARTUP;
2319  walsnd->latch = &MyProc->procLatch;
2320  walsnd->replyTime = 0;
2321  SpinLockRelease(&walsnd->mutex);
2322  /* don't need the lock anymore */
2323  MyWalSnd = (WalSnd *) walsnd;
2324 
2325  break;
2326  }
2327  }
2328 
2329  Assert(MyWalSnd != NULL);
2330 
2331  /* Arrange to clean up at walsender exit */
2333 }
2334 
2335 /* Destroy the per-walsender data structure for this walsender process */
2336 static void
2338 {
2339  WalSnd *walsnd = MyWalSnd;
2340 
2341  Assert(walsnd != NULL);
2342 
2343  MyWalSnd = NULL;
2344 
2345  SpinLockAcquire(&walsnd->mutex);
2346  /* clear latch while holding the spinlock, so it can safely be read */
2347  walsnd->latch = NULL;
2348  /* Mark WalSnd struct as no longer being in use. */
2349  walsnd->pid = 0;
2350  SpinLockRelease(&walsnd->mutex);
2351 }
2352 
2353 /*
2354  * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
2355  *
2356  * XXX probably this should be improved to suck data directly from the
2357  * WAL buffers when possible.
2358  *
2359  * Will open, and keep open, one WAL segment stored in the global file
2360  * descriptor sendFile. This means if XLogRead is used once, there will
2361  * always be one descriptor left open until the process ends, but never
2362  * more than one.
2363  */
2364 static void
2366 {
2367  char *p;
2368  XLogRecPtr recptr;
2369  Size nbytes;
2370  XLogSegNo segno;
2371 
2372 retry:
2373  p = buf;
2374  recptr = startptr;
2375  nbytes = count;
2376 
2377  while (nbytes > 0)
2378  {
2379  uint32 startoff;
2380  int segbytes;
2381  int readbytes;
2382 
2383  startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize);
2384 
2385  if (sendSeg->ws_file < 0 ||
2386  !XLByteInSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize))
2387  {
2388  char path[MAXPGPATH];
2389 
2390  /* Switch to another logfile segment */
2391  if (sendSeg->ws_file >= 0)
2392  close(sendSeg->ws_file);
2393 
2394  XLByteToSeg(recptr, sendSeg->ws_segno, segcxt->ws_segsize);
2395 
2396  /*-------
2397  * When reading from a historic timeline, and there is a timeline
2398  * switch within this segment, read from the WAL segment belonging
2399  * to the new timeline.
2400  *
2401  * For example, imagine that this server is currently on timeline
2402  * 5, and we're streaming timeline 4. The switch from timeline 4
2403  * to 5 happened at 0/13002088. In pg_wal, we have these files:
2404  *
2405  * ...
2406  * 000000040000000000000012
2407  * 000000040000000000000013
2408  * 000000050000000000000013
2409  * 000000050000000000000014
2410  * ...
2411  *
2412  * In this situation, when requested to send the WAL from
2413  * segment 0x13, on timeline 4, we read the WAL from file
2414  * 000000050000000000000013. Archive recovery prefers files from
2415  * newer timelines, so if the segment was restored from the
2416  * archive on this server, the file belonging to the old timeline,
2417  * 000000040000000000000013, might not exist. Their contents are
2418  * equal up to the switchpoint, because at a timeline switch, the
2419  * used portion of the old segment is copied to the new file.
2420  *-------
2421  */
2422  sendSeg->ws_tli = sendTimeLine;
2424  {
2425  XLogSegNo endSegNo;
2426 
2427  XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize);
2428  if (sendSeg->ws_segno == endSegNo)
2429  sendSeg->ws_tli = sendTimeLineNextTLI;
2430  }
2431 
2432  XLogFilePath(path, sendSeg->ws_tli, sendSeg->ws_segno, segcxt->ws_segsize);
2433 
2434  sendSeg->ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2435  if (sendSeg->ws_file < 0)
2436  {
2437  /*
2438  * If the file is not found, assume it's because the standby
2439  * asked for a too old WAL segment that has already been
2440  * removed or recycled.
2441  */
2442  if (errno == ENOENT)
2443  ereport(ERROR,
2445  errmsg("requested WAL segment %s has already been removed",
2446  XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno))));
2447  else
2448  ereport(ERROR,
2450  errmsg("could not open file \"%s\": %m",
2451  path)));
2452  }
2453  sendSeg->ws_off = 0;
2454  }
2455 
2456  /* Need to seek in the file? */
2457  if (sendSeg->ws_off != startoff)
2458  {
2459  if (lseek(sendSeg->ws_file, (off_t) startoff, SEEK_SET) < 0)
2460  ereport(ERROR,
2462  errmsg("could not seek in log segment %s to offset %u: %m",
2463  XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
2464  startoff)));
2465  sendSeg->ws_off = startoff;
2466  }
2467 
2468  /* How many bytes are within this segment? */
2469  if (nbytes > (segcxt->ws_segsize - startoff))
2470  segbytes = segcxt->ws_segsize - startoff;
2471  else
2472  segbytes = nbytes;
2473 
2475  readbytes = read(sendSeg->ws_file, p, segbytes);
2477  if (readbytes < 0)
2478  {
2479  ereport(ERROR,
2481  errmsg("could not read from log segment %s, offset %u, length %zu: %m",
2482  XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
2483  sendSeg->ws_off, (Size) segbytes)));
2484  }
2485  else if (readbytes == 0)
2486  {
2487  ereport(ERROR,
2489  errmsg("could not read from log segment %s, offset %u: read %d of %zu",
2490  XLogFileNameP(sendSeg->ws_tli, sendSeg->ws_segno),
2491  sendSeg->ws_off, readbytes, (Size) segbytes)));
2492  }
2493 
2494  /* Update state for read */
2495  recptr += readbytes;
2496 
2497  sendSeg->ws_off += readbytes;
2498  nbytes -= readbytes;
2499  p += readbytes;
2500  }
2501 
2502  /*
2503  * After reading into the buffer, check that what we read was valid. We do
2504  * this after reading, because even though the segment was present when we
2505  * opened it, it might get recycled or removed while we read it. The
2506  * read() succeeds in that case, but the data we tried to read might
2507  * already have been overwritten with new WAL records.
2508  */
2509  XLByteToSeg(startptr, segno, segcxt->ws_segsize);
2511 
2512  /*
2513  * During recovery, the currently-open WAL file might be replaced with the
2514  * file of the same name retrieved from archive. So we always need to
2515  * check what we read was valid after reading into the buffer. If it's
2516  * invalid, we try to open and read the file again.
2517  */
2519  {
2520  WalSnd *walsnd = MyWalSnd;
2521  bool reload;
2522 
2523  SpinLockAcquire(&walsnd->mutex);
2524  reload = walsnd->needreload;
2525  walsnd->needreload = false;
2526  SpinLockRelease(&walsnd->mutex);
2527 
2528  if (reload && sendSeg->ws_file >= 0)
2529  {
2530  close(sendSeg->ws_file);
2531  sendSeg->ws_file = -1;
2532 
2533  goto retry;
2534  }
2535  }
2536 }
2537 
2538 /*
2539  * Send out the WAL in its normal physical/stored form.
2540  *
2541  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
2542  * but not yet sent to the client, and buffer it in the libpq output
2543  * buffer.
2544  *
2545  * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
2546  * otherwise WalSndCaughtUp is set to false.
2547  */
2548 static void
2550 {
2551  XLogRecPtr SendRqstPtr;
2553  XLogRecPtr endptr;
2554  Size nbytes;
2555 
2556  /* If requested switch the WAL sender to the stopping state. */
2557  if (got_STOPPING)
2559 
2561  {
2562  WalSndCaughtUp = true;
2563  return;
2564  }
2565 
2566  /* Figure out how far we can safely send the WAL. */
2568  {
2569  /*
2570  * Streaming an old timeline that's in this server's history, but is
2571  * not the one we're currently inserting or replaying. It can be
2572  * streamed up to the point where we switched off that timeline.
2573  */
2574  SendRqstPtr = sendTimeLineValidUpto;
2575  }
2576  else if (am_cascading_walsender)
2577  {
2578  /*
2579  * Streaming the latest timeline on a standby.
2580  *
2581  * Attempt to send all WAL that has already been replayed, so that we
2582  * know it's valid. If we're receiving WAL through streaming
2583  * replication, it's also OK to send any WAL that has been received
2584  * but not replayed.
2585  *
2586  * The timeline we're recovering from can change, or we can be
2587  * promoted. In either case, the current timeline becomes historic. We
2588  * need to detect that so that we don't try to stream past the point
2589  * where we switched to another timeline. We check for promotion or
2590  * timeline switch after calculating FlushPtr, to avoid a race
2591  * condition: if the timeline becomes historic just after we checked
2592  * that it was still current, it's still be OK to stream it up to the
2593  * FlushPtr that was calculated before it became historic.
2594  */
2595  bool becameHistoric = false;
2596 
2597  SendRqstPtr = GetStandbyFlushRecPtr();
2598 
2599  if (!RecoveryInProgress())
2600  {
2601  /*
2602  * We have been promoted. RecoveryInProgress() updated
2603  * ThisTimeLineID to the new current timeline.
2604  */
2605  am_cascading_walsender = false;
2606  becameHistoric = true;
2607  }
2608  else
2609  {
2610  /*
2611  * Still a cascading standby. But is the timeline we're sending
2612  * still the one recovery is recovering from? ThisTimeLineID was
2613  * updated by the GetStandbyFlushRecPtr() call above.
2614  */
2616  becameHistoric = true;
2617  }
2618 
2619  if (becameHistoric)
2620  {
2621  /*
2622  * The timeline we were sending has become historic. Read the
2623  * timeline history file of the new timeline to see where exactly
2624  * we forked off from the timeline we were sending.
2625  */
2626  List *history;
2627 
2630 
2632  list_free_deep(history);
2633 
2634  sendTimeLineIsHistoric = true;
2635 
2636  SendRqstPtr = sendTimeLineValidUpto;
2637  }
2638  }
2639  else
2640  {
2641  /*
2642  * Streaming the current timeline on a master.
2643  *
2644  * Attempt to send all data that's already been written out and
2645  * fsync'd to disk. We cannot go further than what's been written out
2646  * given the current implementation of XLogRead(). And in any case
2647  * it's unsafe to send WAL that is not securely down to disk on the
2648  * master: if the master subsequently crashes and restarts, standbys
2649  * must not have applied any WAL that got lost on the master.
2650  */
2651  SendRqstPtr = GetFlushRecPtr();
2652  }
2653 
2654  /*
2655  * Record the current system time as an approximation of the time at which
2656  * this WAL location was written for the purposes of lag tracking.
2657  *
2658  * In theory we could make XLogFlush() record a time in shmem whenever WAL
2659  * is flushed and we could get that time as well as the LSN when we call
2660  * GetFlushRecPtr() above (and likewise for the cascading standby
2661  * equivalent), but rather than putting any new code into the hot WAL path
2662  * it seems good enough to capture the time here. We should reach this
2663  * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2664  * may take some time, we read the WAL flush pointer and take the time
2665  * very close to together here so that we'll get a later position if it is
2666  * still moving.
2667  *
2668  * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
2669  * this gives us a cheap approximation for the WAL flush time for this
2670  * LSN.
2671  *
2672  * Note that the LSN is not necessarily the LSN for the data contained in
2673  * the present message; it's the end of the WAL, which might be further
2674  * ahead. All the lag tracking machinery cares about is finding out when
2675  * that arbitrary LSN is eventually reported as written, flushed and
2676  * applied, so that it can measure the elapsed time.
2677  */
2678  LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2679 
2680  /*
2681  * If this is a historic timeline and we've reached the point where we
2682  * forked to the next timeline, stop streaming.
2683  *
2684  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2685  * startup process will normally replay all WAL that has been received
2686  * from the master, before promoting, but if the WAL streaming is
2687  * terminated at a WAL page boundary, the valid portion of the timeline
2688  * might end in the middle of a WAL record. We might've already sent the
2689  * first half of that partial WAL record to the cascading standby, so that
2690  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2691  * replay the partial WAL record either, so it can still follow our
2692  * timeline switch.
2693  */
2695  {
2696  /* close the current file. */
2697  if (sendSeg->ws_file >= 0)
2698  close(sendSeg->ws_file);
2699  sendSeg->ws_file = -1;
2700 
2701  /* Send CopyDone */
2702  pq_putmessage_noblock('c', NULL, 0);
2703  streamingDoneSending = true;
2704 
2705  WalSndCaughtUp = true;
2706 
2707  elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2709  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2710  return;
2711  }
2712 
2713  /* Do we have any work to do? */
2714  Assert(sentPtr <= SendRqstPtr);
2715  if (SendRqstPtr <= sentPtr)
2716  {
2717  WalSndCaughtUp = true;
2718  return;
2719  }
2720 
2721  /*
2722  * Figure out how much to send in one message. If there's no more than
2723  * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2724  * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2725  *
2726  * The rounding is not only for performance reasons. Walreceiver relies on
2727  * the fact that we never split a WAL record across two messages. Since a
2728  * long WAL record is split at page boundary into continuation records,
2729  * page boundary is always a safe cut-off point. We also assume that
2730  * SendRqstPtr never points to the middle of a WAL record.
2731  */
2732  startptr = sentPtr;
2733  endptr = startptr;
2734  endptr += MAX_SEND_SIZE;
2735 
2736  /* if we went beyond SendRqstPtr, back off */
2737  if (SendRqstPtr <= endptr)
2738  {
2739  endptr = SendRqstPtr;
2741  WalSndCaughtUp = false;
2742  else
2743  WalSndCaughtUp = true;
2744  }
2745  else
2746  {
2747  /* round down to page boundary. */
2748  endptr -= (endptr % XLOG_BLCKSZ);
2749  WalSndCaughtUp = false;
2750  }
2751 
2752  nbytes = endptr - startptr;
2753  Assert(nbytes <= MAX_SEND_SIZE);
2754 
2755  /*
2756  * OK to read and send the slice.
2757  */
2758  resetStringInfo(&output_message);
2759  pq_sendbyte(&output_message, 'w');
2760 
2761  pq_sendint64(&output_message, startptr); /* dataStart */
2762  pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2763  pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2764 
2765  /*
2766  * Read the log directly into the output buffer to avoid extra memcpy
2767  * calls.
2768  */
2769  enlargeStringInfo(&output_message, nbytes);
2770  XLogRead(sendCxt, &output_message.data[output_message.len], startptr, nbytes);
2771  output_message.len += nbytes;
2772  output_message.data[output_message.len] = '\0';
2773 
2774  /*
2775  * Fill the send timestamp last, so that it is taken as late as possible.
2776  */
2777  resetStringInfo(&tmpbuf);
2778  pq_sendint64(&tmpbuf, GetCurrentTimestamp());
2779  memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2780  tmpbuf.data, sizeof(int64));
2781 
2782  pq_putmessage_noblock('d', output_message.data, output_message.len);
2783 
2784  sentPtr = endptr;
2785 
2786  /* Update shared memory status */
2787  {
2788  WalSnd *walsnd = MyWalSnd;
2789 
2790  SpinLockAcquire(&walsnd->mutex);
2791  walsnd->sentPtr = sentPtr;
2792  SpinLockRelease(&walsnd->mutex);
2793  }
2794 
2795  /* Report progress of XLOG streaming in PS display */
2797  {
2798  char activitymsg[50];
2799 
2800  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
2801  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2802  set_ps_display(activitymsg, false);
2803  }
2804 
2805  return;
2806 }
2807 
2808 /*
2809  * Stream out logically decoded data.
2810  */
2811 static void
2813 {
2814  XLogRecord *record;
2815  char *errm;
2816  XLogRecPtr flushPtr;
2817 
2818  /*
2819  * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
2820  * true in WalSndWaitForWal, if we're actually waiting. We also set to
2821  * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
2822  * didn't wait - i.e. when we're shutting down.
2823  */
2824  WalSndCaughtUp = false;
2825 
2826  record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
2828 
2829  /* xlog record was invalid */
2830  if (errm != NULL)
2831  elog(ERROR, "%s", errm);
2832 
2833  /*
2834  * We'll use the current flush point to determine whether we've caught up.
2835  */
2836  flushPtr = GetFlushRecPtr();
2837 
2838  if (record != NULL)
2839  {
2840  /*
2841  * Note the lack of any call to LagTrackerWrite() which is handled by
2842  * WalSndUpdateProgress which is called by output plugin through
2843  * logical decoding write api.
2844  */
2845  LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
2846 
2847  sentPtr = logical_decoding_ctx->reader->EndRecPtr;
2848  }
2849 
2850  /* Set flag if we're caught up. */
2851  if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2852  WalSndCaughtUp = true;
2853 
2854  /*
2855  * If we're caught up and have been requested to stop, have WalSndLoop()
2856  * terminate the connection in an orderly manner, after writing out all
2857  * the pending data.
2858  */
2860  got_SIGUSR2 = true;
2861 
2862  /* Update shared memory status */
2863  {
2864  WalSnd *walsnd = MyWalSnd;
2865 
2866  SpinLockAcquire(&walsnd->mutex);
2867  walsnd->sentPtr = sentPtr;
2868  SpinLockRelease(&walsnd->mutex);
2869  }
2870 }
2871 
2872 /*
2873  * Shutdown if the sender is caught up.
2874  *
2875  * NB: This should only be called when the shutdown signal has been received
2876  * from postmaster.
2877  *
2878  * Note that if we determine that there's still more data to send, this
2879  * function will return control to the caller.
2880  */
2881 static void
2883 {
2884  XLogRecPtr replicatedPtr;
2885 
2886  /* ... let's just be real sure we're caught up ... */
2887  send_data();
2888 
2889  /*
2890  * To figure out whether all WAL has successfully been replicated, check
2891  * flush location if valid, write otherwise. Tools like pg_receivewal will
2892  * usually (unless in synchronous mode) return an invalid flush location.
2893  */
2894  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2895  MyWalSnd->write : MyWalSnd->flush;
2896 
2897  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2898  !pq_is_send_pending())
2899  {
2900  /* Inform the standby that XLOG streaming is done */
2901  EndCommand("COPY 0", DestRemote);
2902  pq_flush();
2903 
2904  proc_exit(0);
2905  }
2907  {
2908  WalSndKeepalive(true);
2910  }
2911 }
2912 
2913 /*
2914  * Returns the latest point in WAL that has been safely flushed to disk, and
2915  * can be sent to the standby. This should only be called when in recovery,
2916  * ie. we're streaming to a cascaded standby.
2917  *
2918  * As a side-effect, ThisTimeLineID is updated to the TLI of the last
2919  * replayed WAL record.
2920  */
2921 static XLogRecPtr
2923 {
2924  XLogRecPtr replayPtr;
2925  TimeLineID replayTLI;
2926  XLogRecPtr receivePtr;
2928  XLogRecPtr result;
2929 
2930  /*
2931  * We can safely send what's already been replayed. Also, if walreceiver
2932  * is streaming WAL from the same timeline, we can send anything that it
2933  * has streamed, but hasn't been replayed yet.
2934  */
2935 
2936  receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
2937  replayPtr = GetXLogReplayRecPtr(&replayTLI);
2938 
2939  ThisTimeLineID = replayTLI;
2940 
2941  result = replayPtr;
2942  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2943  result = receivePtr;
2944 
2945  return result;
2946 }
2947 
2948 /*
2949  * Request walsenders to reload the currently-open WAL file
2950  */
2951 void
2953 {
2954  int i;
2955 
2956  for (i = 0; i < max_wal_senders; i++)
2957  {
2958  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2959 
2960  SpinLockAcquire(&walsnd->mutex);
2961  if (walsnd->pid == 0)
2962  {
2963  SpinLockRelease(&walsnd->mutex);
2964  continue;
2965  }
2966  walsnd->needreload = true;
2967  SpinLockRelease(&walsnd->mutex);
2968  }
2969 }
2970 
2971 /*
2972  * Handle PROCSIG_WALSND_INIT_STOPPING signal.
2973  */
2974 void
2976 {
2978 
2979  /*
2980  * If replication has not yet started, die like with SIGTERM. If
2981  * replication is active, only set a flag and wake up the main loop. It
2982  * will send any outstanding WAL, wait for it to be replicated to the
2983  * standby, and then exit gracefully.
2984  */
2985  if (!replication_active)
2986  kill(MyProcPid, SIGTERM);
2987  else
2988  got_STOPPING = true;
2989 }
2990 
2991 /*
2992  * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
2993  * sender should already have been switched to WALSNDSTATE_STOPPING at
2994  * this point.
2995  */
2996 static void
2998 {
2999  int save_errno = errno;
3000 
3001  got_SIGUSR2 = true;
3002  SetLatch(MyLatch);
3003 
3004  errno = save_errno;
3005 }
3006 
3007 /* Set up signal handlers */
3008 void
3010 {
3011  /* Set up signal handlers */
3012  pqsignal(SIGHUP, PostgresSigHupHandler); /* set flag to read config
3013  * file */
3014  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3015  pqsignal(SIGTERM, die); /* request shutdown */
3016  pqsignal(SIGQUIT, quickdie); /* hard crash time */
3017  InitializeTimeouts(); /* establishes SIGALRM handler */
3020  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3021  * shutdown */
3022 
3023  /* Reset some signals that are accepted by postmaster but not here */
3025 }
3026 
3027 /* Report shared-memory space needed by WalSndShmemInit */
3028 Size
3030 {
3031  Size size = 0;
3032 
3033  size = offsetof(WalSndCtlData, walsnds);
3034  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3035 
3036  return size;
3037 }
3038 
3039 /* Allocate and initialize walsender-related shared memory */
3040 void
3042 {
3043  bool found;
3044  int i;
3045 
3046  WalSndCtl = (WalSndCtlData *)
3047  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3048 
3049  if (!found)
3050  {
3051  /* First time through, so initialize */
3052  MemSet(WalSndCtl, 0, WalSndShmemSize());
3053 
3054  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3055  SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
3056 
3057  for (i = 0; i < max_wal_senders; i++)
3058  {
3059  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3060 
3061  SpinLockInit(&walsnd->mutex);
3062  }
3063  }
3064 }
3065 
3066 /*
3067  * Wake up all walsenders
3068  *
3069  * This will be called inside critical sections, so throwing an error is not
3070  * advisable.
3071  */
3072 void
3074 {
3075  int i;
3076 
3077  for (i = 0; i < max_wal_senders; i++)
3078  {
3079  Latch *latch;
3080  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3081 
3082  /*
3083  * Get latch pointer with spinlock held, for the unlikely case that
3084  * pointer reads aren't atomic (as they're 8 bytes).
3085  */
3086  SpinLockAcquire(&walsnd->mutex);
3087  latch = walsnd->latch;
3088  SpinLockRelease(&walsnd->mutex);
3089 
3090  if (latch != NULL)
3091  SetLatch(latch);
3092  }
3093 }
3094 
3095 /*
3096  * Signal all walsenders to move to stopping state.
3097  *
3098  * This will trigger walsenders to move to a state where no further WAL can be
3099  * generated. See this file's header for details.
3100  */
3101 void
3103 {
3104  int i;
3105 
3106  for (i = 0; i < max_wal_senders; i++)
3107  {
3108  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3109  pid_t pid;
3110 
3111  SpinLockAcquire(&walsnd->mutex);
3112  pid = walsnd->pid;
3113  SpinLockRelease(&walsnd->mutex);
3114 
3115  if (pid == 0)
3116  continue;
3117 
3119  }
3120 }
3121 
3122 /*
3123  * Wait that all the WAL senders have quit or reached the stopping state. This
3124  * is used by the checkpointer to control when the shutdown checkpoint can
3125  * safely be performed.
3126  */
3127 void
3129 {
3130  for (;;)
3131  {
3132  int i;
3133  bool all_stopped = true;
3134 
3135  for (i = 0; i < max_wal_senders; i++)
3136  {
3137  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3138 
3139  SpinLockAcquire(&walsnd->mutex);
3140 
3141  if (walsnd->pid == 0)
3142  {
3143  SpinLockRelease(&walsnd->mutex);
3144  continue;
3145  }
3146 
3147  if (walsnd->state != WALSNDSTATE_STOPPING)
3148  {
3149  all_stopped = false;
3150  SpinLockRelease(&walsnd->mutex);
3151  break;
3152  }
3153  SpinLockRelease(&walsnd->mutex);
3154  }
3155 
3156  /* safe to leave if confirmation is done for all WAL senders */
3157  if (all_stopped)
3158  return;
3159 
3160  pg_usleep(10000L); /* wait for 10 msec */
3161  }
3162 }
3163 
3164 /* Set state for current walsender (only called in walsender) */
3165 void
3167 {
3168  WalSnd *walsnd = MyWalSnd;
3169 
3171 
3172  if (walsnd->state == state)
3173  return;
3174 
3175  SpinLockAcquire(&walsnd->mutex);
3176  walsnd->state = state;
3177  SpinLockRelease(&walsnd->mutex);
3178 }
3179 
3180 /*
3181  * Return a string constant representing the state. This is used
3182  * in system views, and should *not* be translated.
3183  */
3184 static const char *
3186 {
3187  switch (state)
3188  {
3189  case WALSNDSTATE_STARTUP:
3190  return "startup";
3191  case WALSNDSTATE_BACKUP:
3192  return "backup";
3193  case WALSNDSTATE_CATCHUP:
3194  return "catchup";
3195  case WALSNDSTATE_STREAMING:
3196  return "streaming";
3197  case WALSNDSTATE_STOPPING:
3198  return "stopping";
3199  }
3200  return "UNKNOWN";
3201 }
3202 
3203 static Interval *
3205 {
3206  Interval *result = palloc(sizeof(Interval));
3207 
3208  result->month = 0;
3209  result->day = 0;
3210  result->time = offset;
3211 
3212  return result;
3213 }
3214 
3215 /*
3216  * Returns activity of walsenders, including pids and xlog locations sent to
3217  * standby servers.
3218  */
3219 Datum
3221 {
3222 #define PG_STAT_GET_WAL_SENDERS_COLS 12
3223  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3224  TupleDesc tupdesc;
3225  Tuplestorestate *tupstore;
3226  MemoryContext per_query_ctx;
3227  MemoryContext oldcontext;
3228  List *sync_standbys;
3229  int i;
3230 
3231  /* check to see if caller supports us returning a tuplestore */
3232  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3233  ereport(ERROR,
3234  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3235  errmsg("set-valued function called in context that cannot accept a set")));
3236  if (!(rsinfo->allowedModes & SFRM_Materialize))
3237  ereport(ERROR,
3238  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3239  errmsg("materialize mode required, but it is not " \
3240  "allowed in this context")));
3241 
3242  /* Build a tuple descriptor for our result type */
3243  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3244  elog(ERROR, "return type must be a row type");
3245 
3246  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3247  oldcontext = MemoryContextSwitchTo(per_query_ctx);
3248 
3249  tupstore = tuplestore_begin_heap(true, false, work_mem);
3250  rsinfo->returnMode = SFRM_Materialize;
3251  rsinfo->setResult = tupstore;
3252  rsinfo->setDesc = tupdesc;
3253 
3254  MemoryContextSwitchTo(oldcontext);
3255 
3256  /*
3257  * Get the currently active synchronous standbys.
3258  */
3259  LWLockAcquire(SyncRepLock, LW_SHARED);
3260  sync_standbys = SyncRepGetSyncStandbys(NULL);
3261  LWLockRelease(SyncRepLock);
3262 
3263  for (i = 0; i < max_wal_senders; i++)
3264  {
3265  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3267  XLogRecPtr write;
3268  XLogRecPtr flush;
3269  XLogRecPtr apply;
3270  TimeOffset writeLag;
3271  TimeOffset flushLag;
3272  TimeOffset applyLag;
3273  int priority;
3274  int pid;
3276  TimestampTz replyTime;
3278  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3279 
3280  SpinLockAcquire(&walsnd->mutex);
3281  if (walsnd->pid == 0)
3282  {
3283  SpinLockRelease(&walsnd->mutex);
3284  continue;
3285  }
3286  pid = walsnd->pid;
3287  sentPtr = walsnd->sentPtr;
3288  state = walsnd->state;
3289  write = walsnd->write;
3290  flush = walsnd->flush;
3291  apply = walsnd->apply;
3292  writeLag = walsnd->writeLag;
3293  flushLag = walsnd->flushLag;
3294  applyLag = walsnd->applyLag;
3295  priority = walsnd->sync_standby_priority;
3296  replyTime = walsnd->replyTime;
3297  SpinLockRelease(&walsnd->mutex);
3298 
3299  memset(nulls, 0, sizeof(nulls));
3300  values[0] = Int32GetDatum(pid);
3301 
3302  if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
3303  {
3304  /*
3305  * Only superusers and members of pg_read_all_stats can see
3306  * details. Other users only get the pid value to know it's a
3307  * walsender, but no details.
3308  */
3309  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3310  }
3311  else
3312  {
3313  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3314 
3315  if (XLogRecPtrIsInvalid(sentPtr))
3316  nulls[2] = true;
3317  values[2] = LSNGetDatum(sentPtr);
3318 
3319  if (XLogRecPtrIsInvalid(write))
3320  nulls[3] = true;
3321  values[3] = LSNGetDatum(write);
3322 
3323  if (XLogRecPtrIsInvalid(flush))
3324  nulls[4] = true;
3325  values[4] = LSNGetDatum(flush);
3326 
3327  if (XLogRecPtrIsInvalid(apply))
3328  nulls[5] = true;
3329  values[5] = LSNGetDatum(apply);
3330 
3331  /*
3332  * Treat a standby such as a pg_basebackup background process
3333  * which always returns an invalid flush location, as an
3334  * asynchronous standby.
3335  */
3336  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3337 
3338  if (writeLag < 0)
3339  nulls[6] = true;
3340  else
3341  values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3342 
3343  if (flushLag < 0)
3344  nulls[7] = true;
3345  else
3346  values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3347 
3348  if (applyLag < 0)
3349  nulls[8] = true;
3350  else
3351  values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3352 
3353  values[9] = Int32GetDatum(priority);
3354 
3355  /*
3356  * More easily understood version of standby state. This is purely
3357  * informational.
3358  *
3359  * In quorum-based sync replication, the role of each standby
3360  * listed in synchronous_standby_names can be changing very
3361  * frequently. Any standbys considered as "sync" at one moment can
3362  * be switched to "potential" ones at the next moment. So, it's
3363  * basically useless to report "sync" or "potential" as their sync
3364  * states. We report just "quorum" for them.
3365  */
3366  if (priority == 0)
3367  values[10] = CStringGetTextDatum("async");
3368  else if (list_member_int(sync_standbys, i))
3370  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3371  else
3372  values[10] = CStringGetTextDatum("potential");
3373 
3374  if (replyTime == 0)
3375  nulls[11] = true;
3376  else
3377  values[11] = TimestampTzGetDatum(replyTime);
3378  }
3379 
3380  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3381  }
3382 
3383  /* clean up and return the tuplestore */
3384  tuplestore_donestoring(tupstore);
3385 
3386  return (Datum) 0;
3387 }
3388 
3389 /*
3390  * This function is used to send a keepalive message to standby.
3391  * If requestReply is set, sets a flag in the message requesting the standby
3392  * to send a message back to us, for heartbeat purposes.
3393  */
3394 static void
3395 WalSndKeepalive(bool requestReply)
3396 {
3397  elog(DEBUG2, "sending replication keepalive");
3398 
3399  /* construct the message... */
3400  resetStringInfo(&output_message);
3401  pq_sendbyte(&output_message, 'k');
3402  pq_sendint64(&output_message, sentPtr);
3403  pq_sendint64(&output_message, GetCurrentTimestamp());
3404  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3405 
3406  /* ... and send it wrapped in CopyData */
3407  pq_putmessage_noblock('d', output_message.data, output_message.len);
3408 }
3409 
3410 /*
3411  * Send keepalive message if too much time has elapsed.
3412  */
3413 static void
3415 {
3416  TimestampTz ping_time;
3417 
3418  /*
3419  * Don't send keepalive messages if timeouts are globally disabled or
3420  * we're doing something not partaking in timeouts.
3421  */
3422  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3423  return;
3424 
3426  return;
3427 
3428  /*
3429  * If half of wal_sender_timeout has lapsed without receiving any reply
3430  * from the standby, send a keep-alive message to the standby requesting
3431  * an immediate reply.
3432  */
3434  wal_sender_timeout / 2);
3435  if (last_processing >= ping_time)
3436  {
3437  WalSndKeepalive(true);
3439 
3440  /* Try to flush pending output to the client */
3441  if (pq_flush_if_writable() != 0)
3442  WalSndShutdown();
3443  }
3444 }
3445 
3446 /*
3447  * Record the end of the WAL and the time it was flushed locally, so that
3448  * LagTrackerRead can compute the elapsed time (lag) when this WAL location is
3449  * eventually reported to have been written, flushed and applied by the
3450  * standby in a reply message.
3451  */
3452 static void
3454 {
3455  bool buffer_full;
3456  int new_write_head;
3457  int i;
3458 
3459  if (!am_walsender)
3460  return;
3461 
3462  /*
3463  * If the lsn hasn't advanced since last time, then do nothing. This way
3464  * we only record a new sample when new WAL has been written.
3465  */
3466  if (lag_tracker->last_lsn == lsn)
3467  return;
3468  lag_tracker->last_lsn = lsn;
3469 
3470  /*
3471  * If advancing the write head of the circular buffer would crash into any
3472  * of the read heads, then the buffer is full. In other words, the
3473  * slowest reader (presumably apply) is the one that controls the release
3474  * of space.
3475  */
3476  new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3477  buffer_full = false;
3478  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3479  {
3480  if (new_write_head == lag_tracker->read_heads[i])
3481  buffer_full = true;
3482  }
3483 
3484  /*
3485  * If the buffer is full, for now we just rewind by one slot and overwrite
3486  * the last sample, as a simple (if somewhat uneven) way to lower the
3487  * sampling rate. There may be better adaptive compaction algorithms.
3488  */
3489  if (buffer_full)
3490  {
3491  new_write_head = lag_tracker->write_head;
3492  if (lag_tracker->write_head > 0)
3493  lag_tracker->write_head--;
3494  else
3495  lag_tracker->write_head = LAG_TRACKER_BUFFER_SIZE - 1;
3496  }
3497 
3498  /* Store a sample at the current write head position. */
3499  lag_tracker->buffer[lag_tracker->write_head].lsn = lsn;
3500  lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
3501  lag_tracker->write_head = new_write_head;
3502 }
3503 
3504 /*
3505  * Find out how much time has elapsed between the moment WAL location 'lsn'
3506  * (or the highest known earlier LSN) was flushed locally and the time 'now'.
3507  * We have a separate read head for each of the reported LSN locations we
3508  * receive in replies from standby; 'head' controls which read head is
3509  * used. Whenever a read head crosses an LSN which was written into the
3510  * lag buffer with LagTrackerWrite, we can use the associated timestamp to
3511  * find out the time this LSN (or an earlier one) was flushed locally, and
3512  * therefore compute the lag.
3513  *
3514  * Return -1 if no new sample data is available, and otherwise the elapsed
3515  * time in microseconds.
3516  */
3517 static TimeOffset
3519 {
3520  TimestampTz time = 0;
3521 
3522  /* Read all unread samples up to this LSN or end of buffer. */
3523  while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
3524  lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
3525  {
3526  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3527  lag_tracker->last_read[head] =
3528  lag_tracker->buffer[lag_tracker->read_heads[head]];
3529  lag_tracker->read_heads[head] =
3530  (lag_tracker->read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
3531  }
3532 
3533  /*
3534  * If the lag tracker is empty, that means the standby has processed
3535  * everything we've ever sent so we should now clear 'last_read'. If we
3536  * didn't do that, we'd risk using a stale and irrelevant sample for
3537  * interpolation at the beginning of the next burst of WAL after a period
3538  * of idleness.
3539  */
3540  if (lag_tracker->read_heads[head] == lag_tracker->write_head)
3541  lag_tracker->last_read[head].time = 0;
3542 
3543  if (time > now)
3544  {
3545  /* If the clock somehow went backwards, treat as not found. */
3546  return -1;
3547  }
3548  else if (time == 0)
3549  {
3550  /*
3551  * We didn't cross a time. If there is a future sample that we
3552  * haven't reached yet, and we've already reached at least one sample,
3553  * let's interpolate the local flushed time. This is mainly useful
3554  * for reporting a completely stuck apply position as having
3555  * increasing lag, since otherwise we'd have to wait for it to
3556  * eventually start moving again and cross one of our samples before
3557  * we can show the lag increasing.
3558  */
3559  if (lag_tracker->read_heads[head] == lag_tracker->write_head)
3560  {
3561  /* There are no future samples, so we can't interpolate. */
3562  return -1;
3563  }
3564  else if (lag_tracker->last_read[head].time != 0)
3565  {
3566  /* We can interpolate between last_read and the next sample. */
3567  double fraction;
3568  WalTimeSample prev = lag_tracker->last_read[head];
3569  WalTimeSample next = lag_tracker->buffer[lag_tracker->read_heads[head]];
3570 
3571  if (lsn < prev.lsn)
3572  {
3573  /*
3574  * Reported LSNs shouldn't normally go backwards, but it's
3575  * possible when there is a timeline change. Treat as not
3576  * found.
3577  */
3578  return -1;
3579  }
3580 
3581  Assert(prev.lsn < next.lsn);
3582 
3583  if (prev.time > next.time)
3584  {
3585  /* If the clock somehow went backwards, treat as not found. */
3586  return -1;
3587  }
3588 
3589  /* See how far we are between the previous and next samples. */
3590  fraction =
3591  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3592 
3593  /* Scale the local flush time proportionally. */
3594  time = (TimestampTz)
3595  ((double) prev.time + (next.time - prev.time) * fraction);
3596  }
3597  else
3598  {
3599  /*
3600  * We have only a future sample, implying that we were entirely
3601  * caught up but and now there is a new burst of WAL and the
3602  * standby hasn't processed the first sample yet. Until the
3603  * standby reaches the future sample the best we can do is report
3604  * the hypothetical lag if that sample were to be replayed now.
3605  */
3606  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3607  }
3608  }
3609 
3610  /* Return the elapsed time since local flush time in microseconds. */
3611  Assert(time != 0);
3612  return now - time;
3613 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
static void InitWalSenderSlot(void)
Definition: walsender.c:2279
void InitializeTimeouts(void)
Definition: timeout.c:340
#define pq_is_send_pending()
Definition: libpq.h:41
#define NIL
Definition: pg_list.h:65
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
XLogRecPtr write
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:209
static void ProcessStandbyMessage(void)
Definition: walsender.c:1704
#define SIGQUIT
Definition: win32_port.h:155
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define IsA(nodeptr, _type_)
Definition: nodes.h:576
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
#define AllocSetContextCreate
Definition: memutils.h:170
XLogRecPtr startpoint
Definition: replnodes.h:85
#define DEBUG1
Definition: elog.h:25
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:196
int MyProcPid
Definition: globals.c:40
int wal_sender_timeout
Definition: walsender.c:122
uint32 TimeLineID
Definition: xlogdefs.h:52
#define pq_flush()
Definition: libpq.h:39
static int32 next
Definition: blutils.c:215
XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
struct Port * MyProcPort
Definition: globals.c:43
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
int wal_segment_size
Definition: xlog.c:112
int write_head
Definition: walsender.c:210
uint32 TransactionId
Definition: c.h:507
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
bool wake_wal_senders
Definition: walsender.c:129
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
bool exec_replication_command(const char *cmd_string)
Definition: walsender.c:1431
Oid GetUserId(void)
Definition: miscinit.c:380
#define SIGUSR1
Definition: win32_port.h:166
bool update_process_title
Definition: ps_status.c:35
#define write(a, b, c)
Definition: win32.h:14
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
TransactionId xmin
Definition: proc.h:228
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:2997
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3121
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:433
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
Definition: walsender.c:798
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1051
static WALSegmentContext * sendCxt
Definition: walsender.c:132
#define SIGCHLD
Definition: win32_port.h:164
PGPROC * MyProc
Definition: proc.c:68
int64 TimestampTz
Definition: timestamp.h:39
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:212
Size WalSndShmemSize(void)
Definition: walsender.c:3029
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:272
char * pstrdup(const char *in)
Definition: mcxt.c:1186
void CommitTransactionCommand(void)
Definition: xact.c:2895
#define XACT_REPEATABLE_READ
Definition: xact.h:38
#define SpinLockInit(lock)
Definition: spin.h:60
void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
Definition: snapmgr.c:2225
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:330
uint8 syncrep_method
Definition: syncrep.h:51
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:75
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:376
static void XLogSendPhysical(void)
Definition: walsender.c:2549
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2252
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
TimeOffset flushLag
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:221
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:331
Definition: nodes.h:525
static StringInfoData output_message
Definition: walsender.c:152
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
struct cursor * cur
Definition: ecpg.c:28
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:182
void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, int segsize, const char *waldir)
Definition: xlogreader.c:207
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:570
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
#define MemSet(start, val, len)
Definition: c.h:955
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2337
#define kill(pid, sig)
Definition: win32_port.h:426
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:2882
#define WL_SOCKET_READABLE
Definition: latch.h:125
void ReplicationSlotSave(void)
Definition: slot.c:645
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8249
List * SyncRepGetSyncStandbys(bool *am_sync)
Definition: syncrep.c:681
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
TimestampTz time
Definition: walsender.c:199
int sleeptime
Definition: pg_standby.c:42
#define SIGPIPE
Definition: win32_port.h:159
ReplicationSlotPersistentData data
Definition: slot.h:132
TimeOffset writeLag
#define SIGUSR2
Definition: win32_port.h:167
#define LOG
Definition: elog.h:26
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
bool RecoveryInProgress(void)
Definition: xlog.c:7917
void SetLatch(Latch *latch)
Definition: latch.c:436
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1682
void list_free_deep(List *list)
Definition: list.c:1391
static int fd(const char *x, int i)
Definition: preproc-init.c:105
pgsocket sock
Definition: libpq-be.h:122
#define PG_BINARY
Definition: c.h:1191
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:8738
void ResetLatch(Latch *latch)
Definition: latch.c:519
slock_t mutex
Node * replication_parse_result
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2792
Latch procLatch
Definition: proc.h:104
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
Definition: xlogutils.c:803
XLogRecord * XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
Definition: xlogreader.c:237
XLogRecPtr confirmed_flush
Definition: slot.h:80
PGXACT * MyPgXact
Definition: proc.c:69
XLogRecPtr EndRecPtr
Definition: xlogreader.h:133
int32 day
Definition: timestamp.h:47
#define XidFromFullTransactionId(x)
Definition: transam.h:48
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:1921
static LagTracker * lag_tracker
Definition: walsender.c:215
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
TimeLineID timeline
Definition: replnodes.h:97
ReplicationKind kind
Definition: replnodes.h:56
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
bool IsTransactionBlock(void)
Definition: xact.c:4633
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:636
bool am_walsender
Definition: walsender.c:114
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
void HandleWalSndInitStopping(void)
Definition: walsender.c:2975
void ReplicationSlotReserveWal(void)
Definition: slot.c:997
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:180
XLogRecPtr flush
bool defGetBoolean(DefElem *def)
Definition: define.c:111
char data[BLCKSZ]
Definition: c.h:1060
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:466
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:748
ReplicationKind kind
Definition: replnodes.h:82
void WalSndRqstFileReload(void)
Definition: walsender.c:2952
void pfree(void *pointer)
Definition: mcxt.c:1056
void ConditionVariableCancelSleep(void)
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3395
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2310
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:166
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:676
#define ERROR
Definition: elog.h:43
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:97
static TimestampTz last_processing
Definition: walsender.c:157
void pq_startmsgread(void)
Definition: pqcomm.c:1211
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2255
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1735
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3846
bool FirstSnapshotSet
Definition: snapmgr.c:205
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
void WalSndSignals(void)
Definition: walsender.c:3009
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
#define FATAL
Definition: elog.h:52
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:760
Latch * latch
void(* WalSndSendDataCallback)(void)
Definition: walsender.c:221
#define MAXPGPATH
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11149
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2232
void ReplicationSlotPersist(void)
Definition: slot.c:680
TransactionId effective_xmin
Definition: slot.h:128
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1041
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:192
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2100
#define DEBUG2
Definition: elog.h:24
bool XLogBackgroundFlush(void)
Definition: xlog.c:2987
bool list_member_int(const List *list, int datum)
Definition: list.c:655
Definition: latch.h:110
Definition: dest.h:88
#define TLHistoryFileName(fname, tli)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
WalSndState state
NodeTag type
Definition: nodes.h:527
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10149
static char * buf
Definition: pg_test_fsync.c:68
void WalSndWaitStopping(void)
Definition: walsender.c:3128
static bool streamingDoneSending
Definition: walsender.c:174
uint64 XLogSegNo
Definition: xlogdefs.h:41
static void XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count)
Definition: walsender.c:2365
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
int errdetail(const char *fmt,...)
Definition: elog.c:860
XLogSegNo ws_segno
Definition: xlogreader.h:38
#define COMMERROR
Definition: elog.h:30
int errcode_for_file_access(void)
Definition: elog.c:593
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:539
#define SIGHUP
Definition: win32_port.h:154
TransactionId catalog_xmin
Definition: slot.h:69
#define pq_flush_if_writable()
Definition: libpq.h:40
TimeOffset time
Definition: timestamp.h:45
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3328
XLogRecPtr currTLIValidUntil
Definition: xlogreader.h:192
#define InvalidTransactionId
Definition: transam.h:31
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:270
void PostgresSigHupHandler(SIGNAL_ARGS)
Definition: postgres.c:2832
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:62
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1178
static XLogRecPtr logical_startptr
Definition: walsender.c:193
unsigned int uint32
Definition: c.h:358
void ReplicationSlotRelease(void)
Definition: slot.c:424
static WALOpenSegment * sendSeg
Definition: walsender.c:131
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1342
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
TransactionId xmin
Definition: slot.h:61
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
void InitWalSender(void)
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:181
#define SlotIsLogical(slot)
Definition: slot.h:154
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1699
void SyncRepInitConfig(void)
Definition: syncrep.c:383
#define ereport(elevel, rest)
Definition: elog.h:141
static bool WalSndCaughtUp
Definition: walsender.c:178
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
MemoryContext TopMemoryContext
Definition: mcxt.c:44
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3453
XLogRecPtr lsn
Definition: walsender.c:198
int replication_yyparse(void)
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:246
Definition: guc.h:72
XLogRecPtr last_lsn
Definition: walsender.c:208
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:45
int32 month
Definition: timestamp.h:48
volatile sig_atomic_t ConfigReloadPending
Definition: globals.c:35
int max_wal_senders
Definition: walsender.c:120
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2152
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3414
int CloseTransientFile(int fd)
Definition: fd.c:2432
#define SIG_IGN
Definition: win32_port.h:151
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1151
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1273
static StringInfoData reply_message
Definition: walsender.c:153
#define MAXFNAMELEN
TimeLineID nextTLI
Definition: xlogreader.h:198
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
int64 TimeOffset
Definition: timestamp.h:40
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1293
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2125
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
#define InvalidBackendId
Definition: backendid.h:23
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:231
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3518
void WalSndErrorCleanup(void)
Definition: walsender.c:297
static volatile sig_atomic_t replication_active
Definition: walsender.c:190
uintptr_t Datum
Definition: postgres.h:367
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3185
XLogRecPtr sentPtr
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
bool am_db_walsender
Definition: walsender.c:117
TransactionId effective_catalog_xmin
Definition: slot.h:129
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1062
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:374
Oid MyDatabaseId
Definition: globals.c:85
static void WalSndShutdown(void)
Definition: walsender.c:225
static TimeLineID receiveTLI
Definition: xlog.c:209
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
int work_mem
Definition: globals.c:121
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:563
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
WalSnd * MyWalSnd
Definition: walsender.c:111
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:839
static XLogRecPtr sentPtr
Definition: walsender.c:149
int log_min_messages
Definition: guc.c:510
void pq_endmsgread(void)
Definition: pqcomm.c:1235
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:187
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
int allowedModes
Definition: execnodes.h:303
void WalSndInitStopping(void)
Definition: walsender.c:3102
TimeLineID currTLI
Definition: xlogreader.h:182
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4932
TimeOffset applyLag
struct SnapBuild * snapshot_builder
Definition: logical.h:44
int errmsg_internal(const char *fmt,...)
Definition: elog.c:814
SetFunctionReturnMode returnMode
Definition: execnodes.h:305
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2075
#define SIG_DFL
Definition: win32_port.h:149
#define SIGNAL_ARGS
Definition: c.h:1259
static TimeLineID sendTimeLine
Definition: walsender.c:140
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:732
#define lfirst(lc)
Definition: pg_list.h:190
void WalSndSetState(WalSndState state)
Definition: walsender.c:3166
int sync_standby_priority
Definition: regguts.h:298
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:512
static void XLogSendLogical(void)
Definition: walsender.c:2812
XLogRecPtr restart_lsn
Definition: slot.h:72
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:211
void StartTransactionCommand(void)
Definition: xact.c:2794
bool needreload
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1767
size_t Size
Definition: c.h:466
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1318
char * dbname
Definition: streamutil.c:52
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:203
int XactIsoLevel
Definition: xact.c:75
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:141
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:143
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1005
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:231
WalSndState
Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
Definition: walsender.c:3220
TimeLineID ws_tli
Definition: xlogreader.h:40
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:851
static bool streamingDoneReceiving
Definition: walsender.c:175
Tuplestorestate * setResult
Definition: execnodes.h:308
#define pg_attribute_noreturn()
Definition: c.h:147
static StringInfoData tmpbuf
Definition: walsender.c:154
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:944
#define XLogFilePath(path, tli, logSegNo, wal_segsz_bytes)
bool IsSubTransaction(void)
Definition: xact.c:4706
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:544
static Datum values[MAXATTR]
Definition: bootstrap.c:167
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:301
bool log_replication_commands
Definition: walsender.c:124
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4802
#define Int32GetDatum(X)
Definition: postgres.h:479
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:517
char * application_name
Definition: guc.c:529
TupleDesc setDesc
Definition: execnodes.h:309
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
void * palloc(Size size)
Definition: mcxt.c:949
int errmsg(const char *fmt,...)
Definition: elog.c:784
XLogReaderState * reader
Definition: logical.h:42
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
CRSSnapshotAction
Definition: walsender.h:20
#define elog(elevel,...)
Definition: elog.h:226
StringInfo out
Definition: logical.h:71
static const unsigned __int64 epoch
Definition: gettimeofday.c:34
int i
void WalSndShmemInit(void)
Definition: walsender.c:3041
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
#define NameStr(name)
Definition: c.h:609
TimeLineID timeline
Definition: replnodes.h:84
#define CStringGetTextDatum(s)
Definition: builtins.h:83
void * arg
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:1872
struct Latch * MyLatch
Definition: globals.c:54
void ReplicationSlotCleanup(void)
Definition: slot.c:479
#define PG_FUNCTION_ARGS
Definition: fmgr.h:188
char * defname
Definition: parsenodes.h:730
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2922
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:764
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:146
TimestampTz replyTime
void LWLockReleaseAll(void)
Definition: lwlock.c:1825
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3204
slock_t mutex
Definition: slot.h:105
#define close(a)
Definition: win32.h:12
uint32 ws_off
Definition: xlogreader.h:39
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:261
CommandDest whereToSendOutput
Definition: postgres.c:90
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1657
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
static bool waiting_for_ping_response
Definition: walsender.c:166
void pq_puttextmessage(char msgtype, const char *str)
Definition: pqformat.c:369
XLogRecPtr apply
static void IdentifySystem(void)
Definition: walsender.c:344
Definition: pg_list.h:50
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:79
#define snprintf
Definition: port.h:192
#define WL_LATCH_SET
Definition: latch.h:124
int16 AttrNumber
Definition: attnum.h:21
void quickdie(SIGNAL_ARGS)
Definition: postgres.c:2694
#define UINT64_FORMAT
Definition: c.h:401
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:702
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:411
static TimestampTz last_reply_timestamp
Definition: walsender.c:163
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1600
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547
#define MAX_SEND_SIZE
Definition: walsender.c:105
#define die(msg)
Definition: pg_test_fsync.c:97
#define read(a, b, c)
Definition: win32.h:13
static bool sendTimeLineIsHistoric
Definition: walsender.c:142
#define offsetof(type, field)
Definition: c.h:655
void WalSndWakeup(void)
Definition: walsender.c:3073
void ReplicationSlotMarkDirty(void)
Definition: slot.c:663
bool am_cascading_walsender
Definition: walsender.c:115
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:1952
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:697
static XLogRecPtr startptr
Definition: basebackup.c:118
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1267
void replication_scanner_init(const char *query_string)
#define TLHistoryFilePath(path, tli)
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1743
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)