PostgreSQL Source Code  git master
walsender.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * walsender.c
4  *
5  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6  * care of sending XLOG from the primary server to a single recipient.
7  * (Note that there can be more than one walsender process concurrently.)
8  * It is started by the postmaster when the walreceiver of a standby server
9  * connects to the primary server and requests XLOG streaming replication.
10  *
11  * A walsender is similar to a regular backend, ie. there is a one-to-one
12  * relationship between a connection and a walsender process, but instead
13  * of processing SQL queries, it understands a small set of special
14  * replication-mode commands. The START_REPLICATION command begins streaming
15  * WAL to the client. While streaming, the walsender keeps reading XLOG
16  * records from the disk and sends them to the standby server over the
17  * COPY protocol, until either side ends the replication by exiting COPY
18  * mode (or until the connection is closed).
19  *
20  * Normal termination is by SIGTERM, which instructs the walsender to
21  * close the connection and exit(0) at the next convenient moment. Emergency
22  * termination is by SIGQUIT; like any backend, the walsender will simply
23  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24  * are treated as not a crash but approximately normal termination;
25  * the walsender will exit quickly without sending any more XLOG records.
26  *
27  * If the server is shut down, checkpointer sends us
28  * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If
29  * the backend is idle or runs an SQL query this causes the backend to
30  * shutdown, if logical replication is in progress all existing WAL records
31  * are processed followed by a shutdown. Otherwise this causes the walsender
32  * to switch to the "stopping" state. In this state, the walsender will reject
33  * any further replication commands. The checkpointer begins the shutdown
34  * checkpoint once all walsenders are confirmed as stopping. When the shutdown
35  * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
36  * walsender to send any outstanding WAL, including the shutdown checkpoint
37  * record, wait for it to be replicated to the standby, and then exit.
38  *
39  *
40  * Portions Copyright (c) 2010-2020, PostgreSQL Global Development Group
41  *
42  * IDENTIFICATION
43  * src/backend/replication/walsender.c
44  *
45  *-------------------------------------------------------------------------
46  */
47 #include "postgres.h"
48 
49 #include <signal.h>
50 #include <unistd.h>
51 
52 #include "access/printtup.h"
53 #include "access/timeline.h"
54 #include "access/transam.h"
55 #include "access/xact.h"
56 #include "access/xlog_internal.h"
57 #include "access/xlogutils.h"
58 
59 #include "catalog/pg_authid.h"
60 #include "catalog/pg_type.h"
61 #include "commands/dbcommands.h"
62 #include "commands/defrem.h"
63 #include "funcapi.h"
64 #include "libpq/libpq.h"
65 #include "libpq/pqformat.h"
66 #include "miscadmin.h"
67 #include "nodes/replnodes.h"
68 #include "pgstat.h"
69 #include "postmaster/interrupt.h"
70 #include "replication/basebackup.h"
71 #include "replication/decode.h"
72 #include "replication/logical.h"
74 #include "replication/slot.h"
75 #include "replication/snapbuild.h"
76 #include "replication/syncrep.h"
78 #include "replication/walsender.h"
81 #include "storage/fd.h"
82 #include "storage/ipc.h"
83 #include "storage/pmsignal.h"
84 #include "storage/proc.h"
85 #include "storage/procarray.h"
86 #include "tcop/dest.h"
87 #include "tcop/tcopprot.h"
88 #include "utils/builtins.h"
89 #include "utils/guc.h"
90 #include "utils/memutils.h"
91 #include "utils/pg_lsn.h"
92 #include "utils/portal.h"
93 #include "utils/ps_status.h"
94 #include "utils/timeout.h"
95 #include "utils/timestamp.h"
96 
97 /*
98  * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
99  *
100  * We don't have a good idea of what a good value would be; there's some
101  * overhead per message in both walsender and walreceiver, but on the other
102  * hand sending large batches makes walsender less responsive to signals
103  * because signals are checked only between messages. 128kB (with
104  * default 8k blocks) seems like a reasonable guess for now.
105  */
106 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
107 
108 /* Array of WalSnds in shared memory */
110 
111 /* My slot in the shared memory array */
112 WalSnd *MyWalSnd = NULL;
113 
114 /* Global state */
115 bool am_walsender = false; /* Am I a walsender process? */
116 bool am_cascading_walsender = false; /* Am I cascading WAL to another
117  * standby? */
118 bool am_db_walsender = false; /* Connected to a database? */
119 
120 /* User-settable parameters for walsender */
121 int max_wal_senders = 0; /* the maximum number of concurrent
122  * walsenders */
123 int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
124  * data message */
126 
127 /*
128  * State for WalSndWakeupRequest
129  */
130 bool wake_wal_senders = false;
131 
132 static WALOpenSegment *sendSeg = NULL;
133 static WALSegmentContext *sendCxt = NULL;
134 
135 /*
136  * These variables keep track of the state of the timeline we're currently
137  * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
138  * the timeline is not the latest timeline on this server, and the server's
139  * history forked off from that timeline at sendTimeLineValidUpto.
140  */
143 static bool sendTimeLineIsHistoric = false;
145 
146 /*
147  * How far have we sent WAL already? This is also advertised in
148  * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
149  */
150 static XLogRecPtr sentPtr = 0;
151 
152 /* Buffers for constructing outgoing messages and processing reply messages. */
156 
157 /* Timestamp of last ProcessRepliesIfAny(). */
159 
160 /*
161  * Timestamp of last ProcessRepliesIfAny() that saw a reply from the
162  * standby. Set to 0 if wal_sender_timeout doesn't need to be active.
163  */
165 
166 /* Have we sent a heartbeat message asking for reply, since last reply? */
167 static bool waiting_for_ping_response = false;
168 
169 /*
170  * While streaming WAL in Copy mode, streamingDoneSending is set to true
171  * after we have sent CopyDone. We should not send any more CopyData messages
172  * after that. streamingDoneReceiving is set to true when we receive CopyDone
173  * from the other end. When both become true, it's time to exit Copy mode.
174  */
177 
178 /* Are we there yet? */
179 static bool WalSndCaughtUp = false;
180 
181 /* Flags set by signal handlers for later service in main loop */
182 static volatile sig_atomic_t got_SIGUSR2 = false;
183 static volatile sig_atomic_t got_STOPPING = false;
184 
185 /*
186  * This is set while we are streaming. When not set
187  * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
188  * the main loop is responsible for checking got_STOPPING and terminating when
189  * it's set (after streaming any remaining WAL).
190  */
191 static volatile sig_atomic_t replication_active = false;
192 
195 
196 /* A sample associating a WAL location with the time it was written. */
197 typedef struct
198 {
201 } WalTimeSample;
202 
203 /* The size of our buffer of time samples. */
204 #define LAG_TRACKER_BUFFER_SIZE 8192
205 
206 /* A mechanism for tracking replication lag. */
207 typedef struct
208 {
212  int read_heads[NUM_SYNC_REP_WAIT_MODE];
214 } LagTracker;
215 
217 
218 /* Signal handlers */
220 
221 /* Prototypes for private functions */
222 typedef void (*WalSndSendDataCallback) (void);
223 static void WalSndLoop(WalSndSendDataCallback send_data);
224 static void InitWalSenderSlot(void);
225 static void WalSndKill(int code, Datum arg);
227 static void XLogSendPhysical(void);
228 static void XLogSendLogical(void);
229 static void WalSndDone(WalSndSendDataCallback send_data);
230 static XLogRecPtr GetStandbyFlushRecPtr(void);
231 static void IdentifySystem(void);
234 static void StartReplication(StartReplicationCmd *cmd);
236 static void ProcessStandbyMessage(void);
237 static void ProcessStandbyReplyMessage(void);
238 static void ProcessStandbyHSFeedbackMessage(void);
239 static void ProcessRepliesIfAny(void);
240 static void WalSndKeepalive(bool requestReply);
241 static void WalSndKeepaliveIfNecessary(void);
242 static void WalSndCheckTimeOut(void);
244 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
245 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
248 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
249 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
251 
252 static int WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
253  TimeLineID *tli_p);
254 static void UpdateSpillStats(LogicalDecodingContext *ctx);
255 
256 
257 /* Initialize walsender process before entering the main command loop */
258 void
259 InitWalSender(void)
260 {
262 
263  /* Create a per-walsender data structure in shared memory */
265 
266  /*
267  * We don't currently need any ResourceOwner in a walsender process, but
268  * if we did, we could call CreateAuxProcessResourceOwner here.
269  */
270 
271  /*
272  * Let postmaster know that we're a WAL sender. Once we've declared us as
273  * a WAL sender process, postmaster will let us outlive the bgwriter and
274  * kill us last in the shutdown sequence, so we get a chance to stream all
275  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
276  * there's no going back, and we mustn't write any WAL records after this.
277  */
280 
281  /* Initialize empty timestamp buffer for lag tracking. */
282  lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
283 
284  /* Make sure we can remember the current read position in XLOG. */
285  sendSeg = (WALOpenSegment *)
287  sendCxt = (WALSegmentContext *)
289  WALOpenSegmentInit(sendSeg, sendCxt, wal_segment_size, NULL);
290 }
291 
292 /*
293  * Clean up after an error.
294  *
295  * WAL sender processes don't use transactions like regular backends do.
296  * This function does any cleanup required after an error in a WAL sender
297  * process, similar to what transaction abort does in a regular backend.
298  */
299 void
301 {
305 
306  if (sendSeg->ws_file >= 0)
307  {
308  close(sendSeg->ws_file);
309  sendSeg->ws_file = -1;
310  }
311 
312  if (MyReplicationSlot != NULL)
314 
316 
317  replication_active = false;
318 
319  if (got_STOPPING || got_SIGUSR2)
320  proc_exit(0);
321 
322  /* Revert back to startup state */
324 }
325 
326 /*
327  * Handle a client's connection abort in an orderly manner.
328  */
329 static void
330 WalSndShutdown(void)
331 {
332  /*
333  * Reset whereToSendOutput to prevent ereport from attempting to send any
334  * more messages to the standby.
335  */
338 
339  proc_exit(0);
340  abort(); /* keep the compiler quiet */
341 }
342 
343 /*
344  * Handle the IDENTIFY_SYSTEM command.
345  */
346 static void
348 {
349  char sysid[32];
350  char xloc[MAXFNAMELEN];
351  XLogRecPtr logptr;
352  char *dbname = NULL;
354  TupOutputState *tstate;
355  TupleDesc tupdesc;
356  Datum values[4];
357  bool nulls[4];
358 
359  /*
360  * Reply with a result set with one row, four columns. First col is system
361  * ID, second is timeline ID, third is current xlog location and the
362  * fourth contains the database name if we are connected to one.
363  */
364 
365  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
367 
370  {
371  /* this also updates ThisTimeLineID */
372  logptr = GetStandbyFlushRecPtr();
373  }
374  else
375  logptr = GetFlushRecPtr();
376 
377  snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
378 
379  if (MyDatabaseId != InvalidOid)
380  {
382 
383  /* syscache access needs a transaction env. */
385  /* make dbname live outside TX context */
389  /* CommitTransactionCommand switches to TopMemoryContext */
391  }
392 
394  MemSet(nulls, false, sizeof(nulls));
395 
396  /* need a tuple descriptor representing four columns */
397  tupdesc = CreateTemplateTupleDesc(4);
398  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
399  TEXTOID, -1, 0);
400  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
401  INT4OID, -1, 0);
402  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
403  TEXTOID, -1, 0);
404  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
405  TEXTOID, -1, 0);
406 
407  /* prepare for projection of tuples */
408  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
409 
410  /* column 1: system identifier */
411  values[0] = CStringGetTextDatum(sysid);
412 
413  /* column 2: timeline */
414  values[1] = Int32GetDatum(ThisTimeLineID);
415 
416  /* column 3: wal location */
417  values[2] = CStringGetTextDatum(xloc);
418 
419  /* column 4: database name, or NULL if none */
420  if (dbname)
421  values[3] = CStringGetTextDatum(dbname);
422  else
423  nulls[3] = true;
424 
425  /* send it to dest */
426  do_tup_output(tstate, values, nulls);
427 
428  end_tup_output(tstate);
429 }
430 
431 
432 /*
433  * Handle TIMELINE_HISTORY command.
434  */
435 static void
437 {
439  char histfname[MAXFNAMELEN];
440  char path[MAXPGPATH];
441  int fd;
442  off_t histfilelen;
443  off_t bytesleft;
444  Size len;
445 
446  /*
447  * Reply with a result set with one row, and two columns. The first col is
448  * the name of the history file, 2nd is the contents.
449  */
450 
451  TLHistoryFileName(histfname, cmd->timeline);
452  TLHistoryFilePath(path, cmd->timeline);
453 
454  /* Send a RowDescription message */
455  pq_beginmessage(&buf, 'T');
456  pq_sendint16(&buf, 2); /* 2 fields */
457 
458  /* first field */
459  pq_sendstring(&buf, "filename"); /* col name */
460  pq_sendint32(&buf, 0); /* table oid */
461  pq_sendint16(&buf, 0); /* attnum */
462  pq_sendint32(&buf, TEXTOID); /* type oid */
463  pq_sendint16(&buf, -1); /* typlen */
464  pq_sendint32(&buf, 0); /* typmod */
465  pq_sendint16(&buf, 0); /* format code */
466 
467  /* second field */
468  pq_sendstring(&buf, "content"); /* col name */
469  pq_sendint32(&buf, 0); /* table oid */
470  pq_sendint16(&buf, 0); /* attnum */
471  pq_sendint32(&buf, BYTEAOID); /* type oid */
472  pq_sendint16(&buf, -1); /* typlen */
473  pq_sendint32(&buf, 0); /* typmod */
474  pq_sendint16(&buf, 0); /* format code */
475  pq_endmessage(&buf);
476 
477  /* Send a DataRow message */
478  pq_beginmessage(&buf, 'D');
479  pq_sendint16(&buf, 2); /* # of columns */
480  len = strlen(histfname);
481  pq_sendint32(&buf, len); /* col1 len */
482  pq_sendbytes(&buf, histfname, len);
483 
484  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
485  if (fd < 0)
486  ereport(ERROR,
488  errmsg("could not open file \"%s\": %m", path)));
489 
490  /* Determine file length and send it to client */
491  histfilelen = lseek(fd, 0, SEEK_END);
492  if (histfilelen < 0)
493  ereport(ERROR,
495  errmsg("could not seek to end of file \"%s\": %m", path)));
496  if (lseek(fd, 0, SEEK_SET) != 0)
497  ereport(ERROR,
499  errmsg("could not seek to beginning of file \"%s\": %m", path)));
500 
501  pq_sendint32(&buf, histfilelen); /* col2 len */
502 
503  bytesleft = histfilelen;
504  while (bytesleft > 0)
505  {
506  PGAlignedBlock rbuf;
507  int nread;
508 
510  nread = read(fd, rbuf.data, sizeof(rbuf));
512  if (nread < 0)
513  ereport(ERROR,
515  errmsg("could not read file \"%s\": %m",
516  path)));
517  else if (nread == 0)
518  ereport(ERROR,
520  errmsg("could not read file \"%s\": read %d of %zu",
521  path, nread, (Size) bytesleft)));
522 
523  pq_sendbytes(&buf, rbuf.data, nread);
524  bytesleft -= nread;
525  }
526 
527  if (CloseTransientFile(fd) != 0)
528  ereport(ERROR,
530  errmsg("could not close file \"%s\": %m", path)));
531 
532  pq_endmessage(&buf);
533 }
534 
535 /*
536  * Handle START_REPLICATION command.
537  *
538  * At the moment, this never returns, but an ereport(ERROR) will take us back
539  * to the main loop.
540  */
541 static void
543 {
545  XLogRecPtr FlushPtr;
546 
547  if (ThisTimeLineID == 0)
548  ereport(ERROR,
549  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
550  errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
551 
552  /*
553  * We assume here that we're logging enough information in the WAL for
554  * log-shipping, since this is checked in PostmasterMain().
555  *
556  * NOTE: wal_level can only change at shutdown, so in most cases it is
557  * difficult for there to be WAL data that we can still see that was
558  * written at wal_level='minimal'.
559  */
560 
561  if (cmd->slotname)
562  {
563  ReplicationSlotAcquire(cmd->slotname, true);
565  ereport(ERROR,
566  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
567  (errmsg("cannot use a logical replication slot for physical replication"))));
568  }
569 
570  /*
571  * Select the timeline. If it was given explicitly by the client, use
572  * that. Otherwise use the timeline of the last replayed record, which is
573  * kept in ThisTimeLineID.
574  */
576  {
577  /* this also updates ThisTimeLineID */
578  FlushPtr = GetStandbyFlushRecPtr();
579  }
580  else
581  FlushPtr = GetFlushRecPtr();
582 
583  if (cmd->timeline != 0)
584  {
585  XLogRecPtr switchpoint;
586 
587  sendTimeLine = cmd->timeline;
589  {
590  sendTimeLineIsHistoric = false;
592  }
593  else
594  {
595  List *timeLineHistory;
596 
597  sendTimeLineIsHistoric = true;
598 
599  /*
600  * Check that the timeline the client requested exists, and the
601  * requested start location is on that timeline.
602  */
603  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
604  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
606  list_free_deep(timeLineHistory);
607 
608  /*
609  * Found the requested timeline in the history. Check that
610  * requested startpoint is on that timeline in our history.
611  *
612  * This is quite loose on purpose. We only check that we didn't
613  * fork off the requested timeline before the switchpoint. We
614  * don't check that we switched *to* it before the requested
615  * starting point. This is because the client can legitimately
616  * request to start replication from the beginning of the WAL
617  * segment that contains switchpoint, but on the new timeline, so
618  * that it doesn't end up with a partial segment. If you ask for
619  * too old a starting point, you'll get an error later when we
620  * fail to find the requested WAL segment in pg_wal.
621  *
622  * XXX: we could be more strict here and only allow a startpoint
623  * that's older than the switchpoint, if it's still in the same
624  * WAL segment.
625  */
626  if (!XLogRecPtrIsInvalid(switchpoint) &&
627  switchpoint < cmd->startpoint)
628  {
629  ereport(ERROR,
630  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
631  (uint32) (cmd->startpoint >> 32),
632  (uint32) (cmd->startpoint),
633  cmd->timeline),
634  errdetail("This server's history forked from timeline %u at %X/%X.",
635  cmd->timeline,
636  (uint32) (switchpoint >> 32),
637  (uint32) (switchpoint))));
638  }
639  sendTimeLineValidUpto = switchpoint;
640  }
641  }
642  else
643  {
646  sendTimeLineIsHistoric = false;
647  }
648 
650 
651  /* If there is nothing to stream, don't even enter COPY mode */
653  {
654  /*
655  * When we first start replication the standby will be behind the
656  * primary. For some applications, for example synchronous
657  * replication, it is important to have a clear state for this initial
658  * catchup mode, so we can trigger actions when we change streaming
659  * state later. We may stay in this state for a long time, which is
660  * exactly why we want to be able to monitor whether or not we are
661  * still here.
662  */
664 
665  /* Send a CopyBothResponse message, and start streaming */
666  pq_beginmessage(&buf, 'W');
667  pq_sendbyte(&buf, 0);
668  pq_sendint16(&buf, 0);
669  pq_endmessage(&buf);
670  pq_flush();
671 
672  /*
673  * Don't allow a request to stream from a future point in WAL that
674  * hasn't been flushed to disk in this server yet.
675  */
676  if (FlushPtr < cmd->startpoint)
677  {
678  ereport(ERROR,
679  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
680  (uint32) (cmd->startpoint >> 32),
681  (uint32) (cmd->startpoint),
682  (uint32) (FlushPtr >> 32),
683  (uint32) (FlushPtr))));
684  }
685 
686  /* Start streaming from the requested point */
687  sentPtr = cmd->startpoint;
688 
689  /* Initialize shared memory status, too */
690  SpinLockAcquire(&MyWalSnd->mutex);
691  MyWalSnd->sentPtr = sentPtr;
692  SpinLockRelease(&MyWalSnd->mutex);
693 
695 
696  /* Main loop of walsender */
697  replication_active = true;
698 
700 
701  replication_active = false;
702  if (got_STOPPING)
703  proc_exit(0);
705 
707  }
708 
709  if (cmd->slotname)
711 
712  /*
713  * Copy is finished now. Send a single-row result set indicating the next
714  * timeline.
715  */
717  {
718  char startpos_str[8 + 1 + 8 + 1];
720  TupOutputState *tstate;
721  TupleDesc tupdesc;
722  Datum values[2];
723  bool nulls[2];
724 
725  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
726  (uint32) (sendTimeLineValidUpto >> 32),
728 
730  MemSet(nulls, false, sizeof(nulls));
731 
732  /*
733  * Need a tuple descriptor representing two columns. int8 may seem
734  * like a surprising data type for this, but in theory int4 would not
735  * be wide enough for this, as TimeLineID is unsigned.
736  */
737  tupdesc = CreateTemplateTupleDesc(2);
738  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
739  INT8OID, -1, 0);
740  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
741  TEXTOID, -1, 0);
742 
743  /* prepare for projection of tuple */
744  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
745 
746  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
747  values[1] = CStringGetTextDatum(startpos_str);
748 
749  /* send it to dest */
750  do_tup_output(tstate, values, nulls);
751 
752  end_tup_output(tstate);
753  }
754 
755  /* Send CommandComplete message */
756  pq_puttextmessage('C', "START_STREAMING");
757 }
758 
759 /*
760  * read_page callback for logical decoding contexts, as a walsender process.
761  *
762  * Inside the walsender we can do better than logical_read_local_xlog_page,
763  * which has to do a plain sleep/busy loop, because the walsender's latch gets
764  * set every time WAL is flushed.
765  */
766 static int
768  XLogRecPtr targetRecPtr, char *cur_page)
769 {
770  XLogRecPtr flushptr;
771  int count;
772  WALReadError errinfo;
773  XLogSegNo segno;
774 
775  XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
777  sendTimeLine = state->currTLI;
779  sendTimeLineNextTLI = state->nextTLI;
780 
781  /* make sure we have enough WAL available */
782  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
783 
784  /* fail if not (implies we are going to shut down) */
785  if (flushptr < targetPagePtr + reqLen)
786  return -1;
787 
788  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
789  count = XLOG_BLCKSZ; /* more than one block available */
790  else
791  count = flushptr - targetPagePtr; /* part of the page available */
792 
793  /* now actually read the data, we know it's there */
794  if (!WALRead(cur_page,
795  targetPagePtr,
796  XLOG_BLCKSZ,
797  sendSeg->ws_tli, /* Pass the current TLI because only
798  * WalSndSegmentOpen controls whether new
799  * TLI is needed. */
800  sendSeg,
801  sendCxt,
803  &errinfo))
804  WALReadRaiseError(&errinfo);
805 
806  /*
807  * After reading into the buffer, check that what we read was valid. We do
808  * this after reading, because even though the segment was present when we
809  * opened it, it might get recycled or removed while we read it. The
810  * read() succeeds in that case, but the data we tried to read might
811  * already have been overwritten with new WAL records.
812  */
813  XLByteToSeg(targetPagePtr, segno, sendCxt->ws_segsize);
814  CheckXLogRemoved(segno, sendSeg->ws_tli);
815 
816  return count;
817 }
818 
819 /*
820  * Process extra options given to CREATE_REPLICATION_SLOT.
821  */
822 static void
824  bool *reserve_wal,
825  CRSSnapshotAction *snapshot_action)
826 {
827  ListCell *lc;
828  bool snapshot_action_given = false;
829  bool reserve_wal_given = false;
830 
831  /* Parse options */
832  foreach(lc, cmd->options)
833  {
834  DefElem *defel = (DefElem *) lfirst(lc);
835 
836  if (strcmp(defel->defname, "export_snapshot") == 0)
837  {
838  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
839  ereport(ERROR,
840  (errcode(ERRCODE_SYNTAX_ERROR),
841  errmsg("conflicting or redundant options")));
842 
843  snapshot_action_given = true;
844  *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
846  }
847  else if (strcmp(defel->defname, "use_snapshot") == 0)
848  {
849  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
850  ereport(ERROR,
851  (errcode(ERRCODE_SYNTAX_ERROR),
852  errmsg("conflicting or redundant options")));
853 
854  snapshot_action_given = true;
855  *snapshot_action = CRS_USE_SNAPSHOT;
856  }
857  else if (strcmp(defel->defname, "reserve_wal") == 0)
858  {
859  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
860  ereport(ERROR,
861  (errcode(ERRCODE_SYNTAX_ERROR),
862  errmsg("conflicting or redundant options")));
863 
864  reserve_wal_given = true;
865  *reserve_wal = true;
866  }
867  else
868  elog(ERROR, "unrecognized option: %s", defel->defname);
869  }
870 }
871 
872 /*
873  * Create a new replication slot.
874  */
875 static void
877 {
878  const char *snapshot_name = NULL;
879  char xloc[MAXFNAMELEN];
880  char *slot_name;
881  bool reserve_wal = false;
882  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
884  TupOutputState *tstate;
885  TupleDesc tupdesc;
886  Datum values[4];
887  bool nulls[4];
888 
890 
891  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
892 
893  /* setup state for XLogRead */
894  sendTimeLineIsHistoric = false;
896 
897  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
898  {
899  ReplicationSlotCreate(cmd->slotname, false,
901  }
902  else
903  {
905 
906  /*
907  * Initially create persistent slot as ephemeral - that allows us to
908  * nicely handle errors during initialization because it'll get
909  * dropped if this transaction fails. We'll make it persistent at the
910  * end. Temporary slots can be created as temporary from beginning as
911  * they get dropped on error as well.
912  */
913  ReplicationSlotCreate(cmd->slotname, true,
915  }
916 
917  if (cmd->kind == REPLICATION_KIND_LOGICAL)
918  {
920  bool need_full_snapshot = false;
921 
922  /*
923  * Do options check early so that we can bail before calling the
924  * DecodingContextFindStartpoint which can take long time.
925  */
926  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
927  {
928  if (IsTransactionBlock())
929  ereport(ERROR,
930  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
931  (errmsg("%s must not be called inside a transaction",
932  "CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT")));
933 
934  need_full_snapshot = true;
935  }
936  else if (snapshot_action == CRS_USE_SNAPSHOT)
937  {
938  if (!IsTransactionBlock())
939  ereport(ERROR,
940  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
941  (errmsg("%s must be called inside a transaction",
942  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
943 
945  ereport(ERROR,
946  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
947  (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
948  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
949 
950  if (FirstSnapshotSet)
951  ereport(ERROR,
952  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
953  (errmsg("%s must be called before any query",
954  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
955 
956  if (IsSubTransaction())
957  ereport(ERROR,
958  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
959  (errmsg("%s must not be called in a subtransaction",
960  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
961 
962  need_full_snapshot = true;
963  }
964 
965  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
970 
971  /*
972  * Signal that we don't need the timeout mechanism. We're just
973  * creating the replication slot and don't yet accept feedback
974  * messages or send keepalives. As we possibly need to wait for
975  * further WAL the walsender would otherwise possibly be killed too
976  * soon.
977  */
979 
980  /* build initial snapshot, might take a while */
982 
983  /*
984  * Export or use the snapshot if we've been asked to do so.
985  *
986  * NB. We will convert the snapbuild.c kind of snapshot to normal
987  * snapshot when doing this.
988  */
989  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
990  {
991  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
992  }
993  else if (snapshot_action == CRS_USE_SNAPSHOT)
994  {
995  Snapshot snap;
996 
999  }
1000 
1001  /* don't need the decoding context anymore */
1002  FreeDecodingContext(ctx);
1003 
1004  if (!cmd->temporary)
1006  }
1007  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
1008  {
1010 
1012 
1013  /* Write this slot to disk if it's a permanent one. */
1014  if (!cmd->temporary)
1016  }
1017 
1018  snprintf(xloc, sizeof(xloc), "%X/%X",
1021 
1023  MemSet(nulls, false, sizeof(nulls));
1024 
1025  /*----------
1026  * Need a tuple descriptor representing four columns:
1027  * - first field: the slot name
1028  * - second field: LSN at which we became consistent
1029  * - third field: exported snapshot's name
1030  * - fourth field: output plugin
1031  *----------
1032  */
1033  tupdesc = CreateTemplateTupleDesc(4);
1034  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1035  TEXTOID, -1, 0);
1036  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1037  TEXTOID, -1, 0);
1038  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1039  TEXTOID, -1, 0);
1040  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1041  TEXTOID, -1, 0);
1042 
1043  /* prepare for projection of tuples */
1044  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1045 
1046  /* slot_name */
1047  slot_name = NameStr(MyReplicationSlot->data.name);
1048  values[0] = CStringGetTextDatum(slot_name);
1049 
1050  /* consistent wal location */
1051  values[1] = CStringGetTextDatum(xloc);
1052 
1053  /* snapshot name, or NULL if none */
1054  if (snapshot_name != NULL)
1055  values[2] = CStringGetTextDatum(snapshot_name);
1056  else
1057  nulls[2] = true;
1058 
1059  /* plugin, or NULL if none */
1060  if (cmd->plugin != NULL)
1061  values[3] = CStringGetTextDatum(cmd->plugin);
1062  else
1063  nulls[3] = true;
1064 
1065  /* send it to dest */
1066  do_tup_output(tstate, values, nulls);
1067  end_tup_output(tstate);
1068 
1070 }
1071 
1072 /*
1073  * Get rid of a replication slot that is no longer wanted.
1074  */
1075 static void
1077 {
1078  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1079  EndCommand("DROP_REPLICATION_SLOT", DestRemote);
1080 }
1081 
1082 /*
1083  * Load previously initiated logical slot and prepare for sending data (via
1084  * WalSndLoop).
1085  */
1086 static void
1088 {
1090 
1091  /* make sure that our requirements are still fulfilled */
1093 
1095 
1096  ReplicationSlotAcquire(cmd->slotname, true);
1097 
1098  /*
1099  * Force a disconnect, so that the decoding code doesn't need to care
1100  * about an eventual switch from running in recovery, to running in a
1101  * normal environment. Client code is expected to handle reconnects.
1102  */
1104  {
1105  ereport(LOG,
1106  (errmsg("terminating walsender process after promotion")));
1107  got_STOPPING = true;
1108  }
1109 
1110  /*
1111  * Create our decoding context, making it start at the previously ack'ed
1112  * position.
1113  *
1114  * Do this before sending a CopyBothResponse message, so that any errors
1115  * are reported early.
1116  */
1117  logical_decoding_ctx =
1118  CreateDecodingContext(cmd->startpoint, cmd->options, false,
1122 
1123 
1125 
1126  /* Send a CopyBothResponse message, and start streaming */
1127  pq_beginmessage(&buf, 'W');
1128  pq_sendbyte(&buf, 0);
1129  pq_sendint16(&buf, 0);
1130  pq_endmessage(&buf);
1131  pq_flush();
1132 
1133 
1134  /* Start reading WAL from the oldest required WAL. */
1136 
1137  /*
1138  * Report the location after which we'll send out further commits as the
1139  * current sentPtr.
1140  */
1142 
1143  /* Also update the sent position status in shared memory */
1144  SpinLockAcquire(&MyWalSnd->mutex);
1146  SpinLockRelease(&MyWalSnd->mutex);
1147 
1148  replication_active = true;
1149 
1151 
1152  /* Main loop of walsender */
1154 
1155  FreeDecodingContext(logical_decoding_ctx);
1157 
1158  replication_active = false;
1159  if (got_STOPPING)
1160  proc_exit(0);
1162 
1163  /* Get out of COPY mode (CommandComplete). */
1164  EndCommand("COPY 0", DestRemote);
1165 }
1166 
1167 /*
1168  * LogicalDecodingContext 'prepare_write' callback.
1169  *
1170  * Prepare a write into a StringInfo.
1171  *
1172  * Don't do anything lasting in here, it's quite possible that nothing will be done
1173  * with the data.
1174  */
1175 static void
1177 {
1178  /* can't have sync rep confused by sending the same LSN several times */
1179  if (!last_write)
1180  lsn = InvalidXLogRecPtr;
1181 
1182  resetStringInfo(ctx->out);
1183 
1184  pq_sendbyte(ctx->out, 'w');
1185  pq_sendint64(ctx->out, lsn); /* dataStart */
1186  pq_sendint64(ctx->out, lsn); /* walEnd */
1187 
1188  /*
1189  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1190  * reserve space here.
1191  */
1192  pq_sendint64(ctx->out, 0); /* sendtime */
1193 }
1194 
1195 /*
1196  * LogicalDecodingContext 'write' callback.
1197  *
1198  * Actually write out data previously prepared by WalSndPrepareWrite out to
1199  * the network. Take as long as needed, but process replies from the other
1200  * side and check timeouts during that.
1201  */
1202 static void
1204  bool last_write)
1205 {
1206  TimestampTz now;
1207 
1208  /*
1209  * Fill the send timestamp last, so that it is taken as late as possible.
1210  * This is somewhat ugly, but the protocol is set as it's already used for
1211  * several releases by streaming physical replication.
1212  */
1213  resetStringInfo(&tmpbuf);
1214  now = GetCurrentTimestamp();
1215  pq_sendint64(&tmpbuf, now);
1216  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1217  tmpbuf.data, sizeof(int64));
1218 
1219  /* output previously gathered data in a CopyData packet */
1220  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1221 
1223 
1224  /* Try to flush pending output to the client */
1225  if (pq_flush_if_writable() != 0)
1226  WalSndShutdown();
1227 
1228  /* Try taking fast path unless we get too close to walsender timeout. */
1230  wal_sender_timeout / 2) &&
1231  !pq_is_send_pending())
1232  {
1233  return;
1234  }
1235 
1236  /* If we have pending write here, go to slow path */
1237  for (;;)
1238  {
1239  int wakeEvents;
1240  long sleeptime;
1241 
1242  /* Check for input from the client */
1244 
1245  /* die if timeout was reached */
1247 
1248  /* Send keepalive if the time has come */
1250 
1251  if (!pq_is_send_pending())
1252  break;
1253 
1255 
1256  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1258 
1259  /* Sleep until something happens or we time out */
1260  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
1261  MyProcPort->sock, sleeptime,
1263 
1264  /* Clear any already-pending wakeups */
1266 
1268 
1269  /* Process any requests or signals received recently */
1270  if (ConfigReloadPending)
1271  {
1272  ConfigReloadPending = false;
1275  }
1276 
1277  /* Try to flush pending output to the client */
1278  if (pq_flush_if_writable() != 0)
1279  WalSndShutdown();
1280  }
1281 
1282  /* reactivate latch so WalSndLoop knows to continue */
1283  SetLatch(MyLatch);
1284 }
1285 
1286 /*
1287  * LogicalDecodingContext 'update_progress' callback.
1288  *
1289  * Write the current position to the lag tracker (see XLogSendPhysical),
1290  * and update the spill statistics.
1291  */
1292 static void
1294 {
1295  static TimestampTz sendTime = 0;
1297 
1298  /*
1299  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1300  * avoid flooding the lag tracker when we commit frequently.
1301  */
1302 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1303  if (!TimestampDifferenceExceeds(sendTime, now,
1305  return;
1306 
1307  LagTrackerWrite(lsn, now);
1308  sendTime = now;
1309 
1310  /*
1311  * Update statistics about transactions that spilled to disk.
1312  */
1313  UpdateSpillStats(ctx);
1314 }
1315 
1316 /*
1317  * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
1318  *
1319  * Returns end LSN of flushed WAL. Normally this will be >= loc, but
1320  * if we detect a shutdown request (either from postmaster or client)
1321  * we will return early, so caller must always check.
1322  */
1323 static XLogRecPtr
1325 {
1326  int wakeEvents;
1327  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1328 
1329  /*
1330  * Fast path to avoid acquiring the spinlock in case we already know we
1331  * have enough WAL available. This is particularly interesting if we're
1332  * far behind.
1333  */
1334  if (RecentFlushPtr != InvalidXLogRecPtr &&
1335  loc <= RecentFlushPtr)
1336  return RecentFlushPtr;
1337 
1338  /* Get a more recent flush pointer. */
1339  if (!RecoveryInProgress())
1340  RecentFlushPtr = GetFlushRecPtr();
1341  else
1342  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1343 
1344  for (;;)
1345  {
1346  long sleeptime;
1347 
1348  /* Clear any already-pending wakeups */
1350 
1352 
1353  /* Process any requests or signals received recently */
1354  if (ConfigReloadPending)
1355  {
1356  ConfigReloadPending = false;
1359  }
1360 
1361  /* Check for input from the client */
1363 
1364  /*
1365  * If we're shutting down, trigger pending WAL to be written out,
1366  * otherwise we'd possibly end up waiting for WAL that never gets
1367  * written, because walwriter has shut down already.
1368  */
1369  if (got_STOPPING)
1371 
1372  /* Update our idea of the currently flushed position. */
1373  if (!RecoveryInProgress())
1374  RecentFlushPtr = GetFlushRecPtr();
1375  else
1376  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1377 
1378  /*
1379  * If postmaster asked us to stop, don't wait anymore.
1380  *
1381  * It's important to do this check after the recomputation of
1382  * RecentFlushPtr, so we can send all remaining data before shutting
1383  * down.
1384  */
1385  if (got_STOPPING)
1386  break;
1387 
1388  /*
1389  * We only send regular messages to the client for full decoded
1390  * transactions, but a synchronous replication and walsender shutdown
1391  * possibly are waiting for a later location. So we send pings
1392  * containing the flush location every now and then.
1393  */
1394  if (MyWalSnd->flush < sentPtr &&
1395  MyWalSnd->write < sentPtr &&
1397  {
1398  WalSndKeepalive(false);
1400  }
1401 
1402  /* check whether we're done */
1403  if (loc <= RecentFlushPtr)
1404  break;
1405 
1406  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1407  WalSndCaughtUp = true;
1408 
1409  /*
1410  * Try to flush any pending output to the client.
1411  */
1412  if (pq_flush_if_writable() != 0)
1413  WalSndShutdown();
1414 
1415  /*
1416  * If we have received CopyDone from the client, sent CopyDone
1417  * ourselves, and the output buffer is empty, it's time to exit
1418  * streaming, so fail the current WAL fetch request.
1419  */
1421  !pq_is_send_pending())
1422  break;
1423 
1424  /* die if timeout was reached */
1426 
1427  /* Send keepalive if the time has come */
1429 
1430  /*
1431  * Sleep until something happens or we time out. Also wait for the
1432  * socket becoming writable, if there's still pending output.
1433  * Otherwise we might sit on sendable output data while waiting for
1434  * new WAL to be generated. (But if we have nothing to send, we don't
1435  * want to wake on socket-writable.)
1436  */
1438 
1439  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1441 
1442  if (pq_is_send_pending())
1443  wakeEvents |= WL_SOCKET_WRITEABLE;
1444 
1445  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
1446  MyProcPort->sock, sleeptime,
1448  }
1449 
1450  /* reactivate latch so WalSndLoop knows to continue */
1451  SetLatch(MyLatch);
1452  return RecentFlushPtr;
1453 }
1454 
1455 /*
1456  * Execute an incoming replication command.
1457  *
1458  * Returns true if the cmd_string was recognized as WalSender command, false
1459  * if not.
1460  */
1461 bool
1462 exec_replication_command(const char *cmd_string)
1463 {
1464  int parse_rc;
1465  Node *cmd_node;
1466  MemoryContext cmd_context;
1467  MemoryContext old_context;
1468 
1469  /*
1470  * If WAL sender has been told that shutdown is getting close, switch its
1471  * status accordingly to handle the next replication commands correctly.
1472  */
1473  if (got_STOPPING)
1475 
1476  /*
1477  * Throw error if in stopping mode. We need prevent commands that could
1478  * generate WAL while the shutdown checkpoint is being written. To be
1479  * safe, we just prohibit all new commands.
1480  */
1481  if (MyWalSnd->state == WALSNDSTATE_STOPPING)
1482  ereport(ERROR,
1483  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1484 
1485  /*
1486  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1487  * command arrives. Clean up the old stuff if there's anything.
1488  */
1490 
1492 
1494  "Replication command context",
1496  old_context = MemoryContextSwitchTo(cmd_context);
1497 
1498  replication_scanner_init(cmd_string);
1499  parse_rc = replication_yyparse();
1500  if (parse_rc != 0)
1501  ereport(ERROR,
1502  (errcode(ERRCODE_SYNTAX_ERROR),
1503  (errmsg_internal("replication command parser returned %d",
1504  parse_rc))));
1505 
1506  cmd_node = replication_parse_result;
1507 
1508  /*
1509  * Log replication command if log_replication_commands is enabled. Even
1510  * when it's disabled, log the command with DEBUG1 level for backward
1511  * compatibility. Note that SQL commands are not logged here, and will be
1512  * logged later if log_statement is enabled.
1513  */
1514  if (cmd_node->type != T_SQLCmd)
1516  (errmsg("received replication command: %s", cmd_string)));
1517 
1518  /*
1519  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
1520  * called outside of transaction the snapshot should be cleared here.
1521  */
1522  if (!IsTransactionBlock())
1524 
1525  /*
1526  * For aborted transactions, don't allow anything except pure SQL, the
1527  * exec_simple_query() will handle it correctly.
1528  */
1529  if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
1530  ereport(ERROR,
1531  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1532  errmsg("current transaction is aborted, "
1533  "commands ignored until end of transaction block")));
1534 
1536 
1537  /*
1538  * Allocate buffers that will be used for each outgoing and incoming
1539  * message. We do this just once per command to reduce palloc overhead.
1540  */
1541  initStringInfo(&output_message);
1542  initStringInfo(&reply_message);
1543  initStringInfo(&tmpbuf);
1544 
1545  /* Report to pgstat that this process is running */
1547 
1548  switch (cmd_node->type)
1549  {
1550  case T_IdentifySystemCmd:
1551  IdentifySystem();
1552  break;
1553 
1554  case T_BaseBackupCmd:
1555  PreventInTransactionBlock(true, "BASE_BACKUP");
1556  SendBaseBackup((BaseBackupCmd *) cmd_node);
1557  break;
1558 
1561  break;
1562 
1565  break;
1566 
1567  case T_StartReplicationCmd:
1568  {
1569  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1570 
1571  PreventInTransactionBlock(true, "START_REPLICATION");
1572 
1573  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1574  StartReplication(cmd);
1575  else
1577  break;
1578  }
1579 
1580  case T_TimeLineHistoryCmd:
1581  PreventInTransactionBlock(true, "TIMELINE_HISTORY");
1583  break;
1584 
1585  case T_VariableShowStmt:
1586  {
1588  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1589 
1590  /* syscache access needs a transaction environment */
1592  GetPGVariable(n->name, dest);
1594  }
1595  break;
1596 
1597  case T_SQLCmd:
1598  if (MyDatabaseId == InvalidOid)
1599  ereport(ERROR,
1600  (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1601 
1602  /* Report to pgstat that this process is now idle */
1604 
1605  /* Tell the caller that this wasn't a WalSender command. */
1606  return false;
1607 
1608  default:
1609  elog(ERROR, "unrecognized replication command node tag: %u",
1610  cmd_node->type);
1611  }
1612 
1613  /* done */
1614  MemoryContextSwitchTo(old_context);
1615  MemoryContextDelete(cmd_context);
1616 
1617  /* Send CommandComplete message */
1618  EndCommand("SELECT", DestRemote);
1619 
1620  /* Report to pgstat that this process is now idle */
1622 
1623  return true;
1624 }
1625 
1626 /*
1627  * Process any incoming messages while streaming. Also checks if the remote
1628  * end has closed the connection.
1629  */
1630 static void
1632 {
1633  unsigned char firstchar;
1634  int r;
1635  bool received = false;
1636 
1638 
1639  for (;;)
1640  {
1641  pq_startmsgread();
1642  r = pq_getbyte_if_available(&firstchar);
1643  if (r < 0)
1644  {
1645  /* unexpected error or EOF */
1647  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1648  errmsg("unexpected EOF on standby connection")));
1649  proc_exit(0);
1650  }
1651  if (r == 0)
1652  {
1653  /* no data available without blocking */
1654  pq_endmsgread();
1655  break;
1656  }
1657 
1658  /* Read the message contents */
1659  resetStringInfo(&reply_message);
1660  if (pq_getmessage(&reply_message, 0))
1661  {
1663  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1664  errmsg("unexpected EOF on standby connection")));
1665  proc_exit(0);
1666  }
1667 
1668  /*
1669  * If we already received a CopyDone from the frontend, the frontend
1670  * should not send us anything until we've closed our end of the COPY.
1671  * XXX: In theory, the frontend could already send the next command
1672  * before receiving the CopyDone, but libpq doesn't currently allow
1673  * that.
1674  */
1675  if (streamingDoneReceiving && firstchar != 'X')
1676  ereport(FATAL,
1677  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1678  errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
1679  firstchar)));
1680 
1681  /* Handle the very limited subset of commands expected in this phase */
1682  switch (firstchar)
1683  {
1684  /*
1685  * 'd' means a standby reply wrapped in a CopyData packet.
1686  */
1687  case 'd':
1689  received = true;
1690  break;
1691 
1692  /*
1693  * CopyDone means the standby requested to finish streaming.
1694  * Reply with CopyDone, if we had not sent that already.
1695  */
1696  case 'c':
1697  if (!streamingDoneSending)
1698  {
1699  pq_putmessage_noblock('c', NULL, 0);
1700  streamingDoneSending = true;
1701  }
1702 
1703  streamingDoneReceiving = true;
1704  received = true;
1705  break;
1706 
1707  /*
1708  * 'X' means that the standby is closing down the socket.
1709  */
1710  case 'X':
1711  proc_exit(0);
1712 
1713  default:
1714  ereport(FATAL,
1715  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1716  errmsg("invalid standby message type \"%c\"",
1717  firstchar)));
1718  }
1719  }
1720 
1721  /*
1722  * Save the last reply timestamp if we've received at least one reply.
1723  */
1724  if (received)
1725  {
1727  waiting_for_ping_response = false;
1728  }
1729 }
1730 
1731 /*
1732  * Process a status update message received from standby.
1733  */
1734 static void
1736 {
1737  char msgtype;
1738 
1739  /*
1740  * Check message type from the first byte.
1741  */
1742  msgtype = pq_getmsgbyte(&reply_message);
1743 
1744  switch (msgtype)
1745  {
1746  case 'r':
1748  break;
1749 
1750  case 'h':
1752  break;
1753 
1754  default:
1756  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1757  errmsg("unexpected message type \"%c\"", msgtype)));
1758  proc_exit(0);
1759  }
1760 }
1761 
1762 /*
1763  * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
1764  */
1765 static void
1767 {
1768  bool changed = false;
1770 
1771  Assert(lsn != InvalidXLogRecPtr);
1772  SpinLockAcquire(&slot->mutex);
1773  if (slot->data.restart_lsn != lsn)
1774  {
1775  changed = true;
1776  slot->data.restart_lsn = lsn;
1777  }
1778  SpinLockRelease(&slot->mutex);
1779 
1780  if (changed)
1781  {
1784  }
1785 
1786  /*
1787  * One could argue that the slot should be saved to disk now, but that'd
1788  * be energy wasted - the worst lost information can do here is give us
1789  * wrong information in a statistics view - we'll just potentially be more
1790  * conservative in removing files.
1791  */
1792 }
1793 
1794 /*
1795  * Regular reply from standby advising of WAL locations on standby server.
1796  */
1797 static void
1799 {
1800  XLogRecPtr writePtr,
1801  flushPtr,
1802  applyPtr;
1803  bool replyRequested;
1804  TimeOffset writeLag,
1805  flushLag,
1806  applyLag;
1807  bool clearLagTimes;
1808  TimestampTz now;
1809  TimestampTz replyTime;
1810 
1811  static bool fullyAppliedLastTime = false;
1812 
1813  /* the caller already consumed the msgtype byte */
1814  writePtr = pq_getmsgint64(&reply_message);
1815  flushPtr = pq_getmsgint64(&reply_message);
1816  applyPtr = pq_getmsgint64(&reply_message);
1817  replyTime = pq_getmsgint64(&reply_message);
1818  replyRequested = pq_getmsgbyte(&reply_message);
1819 
1820  if (log_min_messages <= DEBUG2)
1821  {
1822  char *replyTimeStr;
1823 
1824  /* Copy because timestamptz_to_str returns a static buffer */
1825  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
1826 
1827  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
1828  (uint32) (writePtr >> 32), (uint32) writePtr,
1829  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1830  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1831  replyRequested ? " (reply requested)" : "",
1832  replyTimeStr);
1833 
1834  pfree(replyTimeStr);
1835  }
1836 
1837  /* See if we can compute the round-trip lag for these positions. */
1838  now = GetCurrentTimestamp();
1839  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1840  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1841  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1842 
1843  /*
1844  * If the standby reports that it has fully replayed the WAL in two
1845  * consecutive reply messages, then the second such message must result
1846  * from wal_receiver_status_interval expiring on the standby. This is a
1847  * convenient time to forget the lag times measured when it last
1848  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1849  * until more WAL traffic arrives.
1850  */
1851  clearLagTimes = false;
1852  if (applyPtr == sentPtr)
1853  {
1854  if (fullyAppliedLastTime)
1855  clearLagTimes = true;
1856  fullyAppliedLastTime = true;
1857  }
1858  else
1859  fullyAppliedLastTime = false;
1860 
1861  /* Send a reply if the standby requested one. */
1862  if (replyRequested)
1863  WalSndKeepalive(false);
1864 
1865  /*
1866  * Update shared state for this WalSender process based on reply data from
1867  * standby.
1868  */
1869  {
1870  WalSnd *walsnd = MyWalSnd;
1871 
1872  SpinLockAcquire(&walsnd->mutex);
1873  walsnd->write = writePtr;
1874  walsnd->flush = flushPtr;
1875  walsnd->apply = applyPtr;
1876  if (writeLag != -1 || clearLagTimes)
1877  walsnd->writeLag = writeLag;
1878  if (flushLag != -1 || clearLagTimes)
1879  walsnd->flushLag = flushLag;
1880  if (applyLag != -1 || clearLagTimes)
1881  walsnd->applyLag = applyLag;
1882  walsnd->replyTime = replyTime;
1883  SpinLockRelease(&walsnd->mutex);
1884  }
1885 
1888 
1889  /*
1890  * Advance our local xmin horizon when the client confirmed a flush.
1891  */
1892  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1893  {
1896  else
1898  }
1899 }
1900 
1901 /* compute new replication slot xmin horizon if needed */
1902 static void
1904 {
1905  bool changed = false;
1907 
1908  SpinLockAcquire(&slot->mutex);
1910 
1911  /*
1912  * For physical replication we don't need the interlock provided by xmin
1913  * and effective_xmin since the consequences of a missed increase are
1914  * limited to query cancellations, so set both at once.
1915  */
1916  if (!TransactionIdIsNormal(slot->data.xmin) ||
1917  !TransactionIdIsNormal(feedbackXmin) ||
1918  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1919  {
1920  changed = true;
1921  slot->data.xmin = feedbackXmin;
1922  slot->effective_xmin = feedbackXmin;
1923  }
1924  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
1925  !TransactionIdIsNormal(feedbackCatalogXmin) ||
1926  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
1927  {
1928  changed = true;
1929  slot->data.catalog_xmin = feedbackCatalogXmin;
1930  slot->effective_catalog_xmin = feedbackCatalogXmin;
1931  }
1932  SpinLockRelease(&slot->mutex);
1933 
1934  if (changed)
1935  {
1938  }
1939 }
1940 
1941 /*
1942  * Check that the provided xmin/epoch are sane, that is, not in the future
1943  * and not so far back as to be already wrapped around.
1944  *
1945  * Epoch of nextXid should be same as standby, or if the counter has
1946  * wrapped, then one greater than standby.
1947  *
1948  * This check doesn't care about whether clog exists for these xids
1949  * at all.
1950  */
1951 static bool
1953 {
1954  FullTransactionId nextFullXid;
1955  TransactionId nextXid;
1956  uint32 nextEpoch;
1957 
1958  nextFullXid = ReadNextFullTransactionId();
1959  nextXid = XidFromFullTransactionId(nextFullXid);
1960  nextEpoch = EpochFromFullTransactionId(nextFullXid);
1961 
1962  if (xid <= nextXid)
1963  {
1964  if (epoch != nextEpoch)
1965  return false;
1966  }
1967  else
1968  {
1969  if (epoch + 1 != nextEpoch)
1970  return false;
1971  }
1972 
1973  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
1974  return false; /* epoch OK, but it's wrapped around */
1975 
1976  return true;
1977 }
1978 
1979 /*
1980  * Hot Standby feedback
1981  */
1982 static void
1984 {
1985  TransactionId feedbackXmin;
1986  uint32 feedbackEpoch;
1987  TransactionId feedbackCatalogXmin;
1988  uint32 feedbackCatalogEpoch;
1989  TimestampTz replyTime;
1990 
1991  /*
1992  * Decipher the reply message. The caller already consumed the msgtype
1993  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
1994  * of this message.
1995  */
1996  replyTime = pq_getmsgint64(&reply_message);
1997  feedbackXmin = pq_getmsgint(&reply_message, 4);
1998  feedbackEpoch = pq_getmsgint(&reply_message, 4);
1999  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2000  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2001 
2002  if (log_min_messages <= DEBUG2)
2003  {
2004  char *replyTimeStr;
2005 
2006  /* Copy because timestamptz_to_str returns a static buffer */
2007  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2008 
2009  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2010  feedbackXmin,
2011  feedbackEpoch,
2012  feedbackCatalogXmin,
2013  feedbackCatalogEpoch,
2014  replyTimeStr);
2015 
2016  pfree(replyTimeStr);
2017  }
2018 
2019  /*
2020  * Update shared state for this WalSender process based on reply data from
2021  * standby.
2022  */
2023  {
2024  WalSnd *walsnd = MyWalSnd;
2025 
2026  SpinLockAcquire(&walsnd->mutex);
2027  walsnd->replyTime = replyTime;
2028  SpinLockRelease(&walsnd->mutex);
2029  }
2030 
2031  /*
2032  * Unset WalSender's xmins if the feedback message values are invalid.
2033  * This happens when the downstream turned hot_standby_feedback off.
2034  */
2035  if (!TransactionIdIsNormal(feedbackXmin)
2036  && !TransactionIdIsNormal(feedbackCatalogXmin))
2037  {
2039  if (MyReplicationSlot != NULL)
2040  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2041  return;
2042  }
2043 
2044  /*
2045  * Check that the provided xmin/epoch are sane, that is, not in the future
2046  * and not so far back as to be already wrapped around. Ignore if not.
2047  */
2048  if (TransactionIdIsNormal(feedbackXmin) &&
2049  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2050  return;
2051 
2052  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2053  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2054  return;
2055 
2056  /*
2057  * Set the WalSender's xmin equal to the standby's requested xmin, so that
2058  * the xmin will be taken into account by GetOldestXmin. This will hold
2059  * back the removal of dead rows and thereby prevent the generation of
2060  * cleanup conflicts on the standby server.
2061  *
2062  * There is a small window for a race condition here: although we just
2063  * checked that feedbackXmin precedes nextXid, the nextXid could have
2064  * gotten advanced between our fetching it and applying the xmin below,
2065  * perhaps far enough to make feedbackXmin wrap around. In that case the
2066  * xmin we set here would be "in the future" and have no effect. No point
2067  * in worrying about this since it's too late to save the desired data
2068  * anyway. Assuming that the standby sends us an increasing sequence of
2069  * xmins, this could only happen during the first reply cycle, else our
2070  * own xmin would prevent nextXid from advancing so far.
2071  *
2072  * We don't bother taking the ProcArrayLock here. Setting the xmin field
2073  * is assumed atomic, and there's no real need to prevent a concurrent
2074  * GetOldestXmin. (If we're moving our xmin forward, this is obviously
2075  * safe, and if we're moving it backwards, well, the data is at risk
2076  * already since a VACUUM could have just finished calling GetOldestXmin.)
2077  *
2078  * If we're using a replication slot we reserve the xmin via that,
2079  * otherwise via the walsender's PGXACT entry. We can only track the
2080  * catalog xmin separately when using a slot, so we store the least of the
2081  * two provided when not using a slot.
2082  *
2083  * XXX: It might make sense to generalize the ephemeral slot concept and
2084  * always use the slot mechanism to handle the feedback xmin.
2085  */
2086  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2087  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2088  else
2089  {
2090  if (TransactionIdIsNormal(feedbackCatalogXmin)
2091  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2092  MyPgXact->xmin = feedbackCatalogXmin;
2093  else
2094  MyPgXact->xmin = feedbackXmin;
2095  }
2096 }
2097 
2098 /*
2099  * Compute how long send/receive loops should sleep.
2100  *
2101  * If wal_sender_timeout is enabled we want to wake up in time to send
2102  * keepalives and to abort the connection if wal_sender_timeout has been
2103  * reached.
2104  */
2105 static long
2107 {
2108  long sleeptime = 10000; /* 10 s */
2109 
2111  {
2112  TimestampTz wakeup_time;
2113  long sec_to_timeout;
2114  int microsec_to_timeout;
2115 
2116  /*
2117  * At the latest stop sleeping once wal_sender_timeout has been
2118  * reached.
2119  */
2122 
2123  /*
2124  * If no ping has been sent yet, wakeup when it's time to do so.
2125  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2126  * the timeout passed without a response.
2127  */
2130  wal_sender_timeout / 2);
2131 
2132  /* Compute relative time until wakeup. */
2133  TimestampDifference(now, wakeup_time,
2134  &sec_to_timeout, &microsec_to_timeout);
2135 
2136  sleeptime = sec_to_timeout * 1000 +
2137  microsec_to_timeout / 1000;
2138  }
2139 
2140  return sleeptime;
2141 }
2142 
2143 /*
2144  * Check whether there have been responses by the client within
2145  * wal_sender_timeout and shutdown if not. Using last_processing as the
2146  * reference point avoids counting server-side stalls against the client.
2147  * However, a long server-side stall can make WalSndKeepaliveIfNecessary()
2148  * postdate last_processing by more than wal_sender_timeout. If that happens,
2149  * the client must reply almost immediately to avoid a timeout. This rarely
2150  * affects the default configuration, under which clients spontaneously send a
2151  * message every standby_message_timeout = wal_sender_timeout/6 = 10s. We
2152  * could eliminate that problem by recognizing timeout expiration at
2153  * wal_sender_timeout/2 after the keepalive.
2154  */
2155 static void
2157 {
2158  TimestampTz timeout;
2159 
2160  /* don't bail out if we're doing something that doesn't require timeouts */
2161  if (last_reply_timestamp <= 0)
2162  return;
2163 
2166 
2167  if (wal_sender_timeout > 0 && last_processing >= timeout)
2168  {
2169  /*
2170  * Since typically expiration of replication timeout means
2171  * communication problem, we don't send the error message to the
2172  * standby.
2173  */
2175  (errmsg("terminating walsender process due to replication timeout")));
2176 
2177  WalSndShutdown();
2178  }
2179 }
2180 
2181 /* Main loop of walsender process that streams the WAL over Copy messages. */
2182 static void
2184 {
2185  /*
2186  * Initialize the last reply timestamp. That enables timeout processing
2187  * from hereon.
2188  */
2190  waiting_for_ping_response = false;
2191 
2192  /*
2193  * Loop until we reach the end of this timeline or the client requests to
2194  * stop streaming.
2195  */
2196  for (;;)
2197  {
2198  /* Clear any already-pending wakeups */
2200 
2202 
2203  /* Process any requests or signals received recently */
2204  if (ConfigReloadPending)
2205  {
2206  ConfigReloadPending = false;
2209  }
2210 
2211  /* Check for input from the client */
2213 
2214  /*
2215  * If we have received CopyDone from the client, sent CopyDone
2216  * ourselves, and the output buffer is empty, it's time to exit
2217  * streaming.
2218  */
2220  !pq_is_send_pending())
2221  break;
2222 
2223  /*
2224  * If we don't have any pending data in the output buffer, try to send
2225  * some more. If there is some, we don't bother to call send_data
2226  * again until we've flushed it ... but we'd better assume we are not
2227  * caught up.
2228  */
2229  if (!pq_is_send_pending())
2230  send_data();
2231  else
2232  WalSndCaughtUp = false;
2233 
2234  /* Try to flush pending output to the client */
2235  if (pq_flush_if_writable() != 0)
2236  WalSndShutdown();
2237 
2238  /* If nothing remains to be sent right now ... */
2240  {
2241  /*
2242  * If we're in catchup state, move to streaming. This is an
2243  * important state change for users to know about, since before
2244  * this point data loss might occur if the primary dies and we
2245  * need to failover to the standby. The state change is also
2246  * important for synchronous replication, since commits that
2247  * started to wait at that point might wait for some time.
2248  */
2249  if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
2250  {
2251  ereport(DEBUG1,
2252  (errmsg("\"%s\" has now caught up with upstream server",
2253  application_name)));
2255  }
2256 
2257  /*
2258  * When SIGUSR2 arrives, we send any outstanding logs up to the
2259  * shutdown checkpoint record (i.e., the latest record), wait for
2260  * them to be replicated to the standby, and exit. This may be a
2261  * normal termination at shutdown, or a promotion, the walsender
2262  * is not sure which.
2263  */
2264  if (got_SIGUSR2)
2265  WalSndDone(send_data);
2266  }
2267 
2268  /* Check for replication timeout. */
2270 
2271  /* Send keepalive if the time has come */
2273 
2274  /*
2275  * We don't block if not caught up, unless there is unsent data
2276  * pending in which case we'd better block until the socket is
2277  * write-ready. This test is only needed for the case where the
2278  * send_data callback handled a subset of the available data but then
2279  * pq_flush_if_writable flushed it all --- we should immediately try
2280  * to send more.
2281  */
2283  {
2284  long sleeptime;
2285  int wakeEvents;
2286 
2287  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT |
2289 
2290  /*
2291  * Use fresh timestamp, not last_processing, to reduce the chance
2292  * of reaching wal_sender_timeout before sending a keepalive.
2293  */
2295 
2296  if (pq_is_send_pending())
2297  wakeEvents |= WL_SOCKET_WRITEABLE;
2298 
2299  /* Sleep until something happens or we time out */
2300  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
2301  MyProcPort->sock, sleeptime,
2303  }
2304  }
2305 }
2306 
2307 /* Initialize a per-walsender data structure for this walsender process */
2308 static void
2310 {
2311  int i;
2312 
2313  /*
2314  * WalSndCtl should be set up already (we inherit this by fork() or
2315  * EXEC_BACKEND mechanism from the postmaster).
2316  */
2317  Assert(WalSndCtl != NULL);
2318  Assert(MyWalSnd == NULL);
2319 
2320  /*
2321  * Find a free walsender slot and reserve it. This must not fail due to
2322  * the prior check for free WAL senders in InitProcess().
2323  */
2324  for (i = 0; i < max_wal_senders; i++)
2325  {
2326  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2327 
2328  SpinLockAcquire(&walsnd->mutex);
2329 
2330  if (walsnd->pid != 0)
2331  {
2332  SpinLockRelease(&walsnd->mutex);
2333  continue;
2334  }
2335  else
2336  {
2337  /*
2338  * Found a free slot. Reserve it for us.
2339  */
2340  walsnd->pid = MyProcPid;
2341  walsnd->sentPtr = InvalidXLogRecPtr;
2342  walsnd->write = InvalidXLogRecPtr;
2343  walsnd->flush = InvalidXLogRecPtr;
2344  walsnd->apply = InvalidXLogRecPtr;
2345  walsnd->writeLag = -1;
2346  walsnd->flushLag = -1;
2347  walsnd->applyLag = -1;
2348  walsnd->state = WALSNDSTATE_STARTUP;
2349  walsnd->latch = &MyProc->procLatch;
2350  walsnd->replyTime = 0;
2351  walsnd->spillTxns = 0;
2352  walsnd->spillCount = 0;
2353  walsnd->spillBytes = 0;
2354  SpinLockRelease(&walsnd->mutex);
2355  /* don't need the lock anymore */
2356  MyWalSnd = (WalSnd *) walsnd;
2357 
2358  break;
2359  }
2360  }
2361 
2362  Assert(MyWalSnd != NULL);
2363 
2364  /* Arrange to clean up at walsender exit */
2366 }
2367 
2368 /* Destroy the per-walsender data structure for this walsender process */
2369 static void
2371 {
2372  WalSnd *walsnd = MyWalSnd;
2373 
2374  Assert(walsnd != NULL);
2375 
2376  MyWalSnd = NULL;
2377 
2378  SpinLockAcquire(&walsnd->mutex);
2379  /* clear latch while holding the spinlock, so it can safely be read */
2380  walsnd->latch = NULL;
2381  /* Mark WalSnd struct as no longer being in use. */
2382  walsnd->pid = 0;
2383  SpinLockRelease(&walsnd->mutex);
2384 }
2385 
2386 /* walsender's openSegment callback for WALRead */
2387 static int
2389  TimeLineID *tli_p)
2390 {
2391  char path[MAXPGPATH];
2392  int fd;
2393 
2394  /*-------
2395  * When reading from a historic timeline, and there is a timeline switch
2396  * within this segment, read from the WAL segment belonging to the new
2397  * timeline.
2398  *
2399  * For example, imagine that this server is currently on timeline 5, and
2400  * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
2401  * 0/13002088. In pg_wal, we have these files:
2402  *
2403  * ...
2404  * 000000040000000000000012
2405  * 000000040000000000000013
2406  * 000000050000000000000013
2407  * 000000050000000000000014
2408  * ...
2409  *
2410  * In this situation, when requested to send the WAL from segment 0x13, on
2411  * timeline 4, we read the WAL from file 000000050000000000000013. Archive
2412  * recovery prefers files from newer timelines, so if the segment was
2413  * restored from the archive on this server, the file belonging to the old
2414  * timeline, 000000040000000000000013, might not exist. Their contents are
2415  * equal up to the switchpoint, because at a timeline switch, the used
2416  * portion of the old segment is copied to the new file. -------
2417  */
2418  *tli_p = sendTimeLine;
2420  {
2421  XLogSegNo endSegNo;
2422 
2423  XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize);
2424  if (sendSeg->ws_segno == endSegNo)
2425  *tli_p = sendTimeLineNextTLI;
2426  }
2427 
2428  XLogFilePath(path, *tli_p, nextSegNo, segcxt->ws_segsize);
2429  fd = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2430  if (fd >= 0)
2431  return fd;
2432 
2433  /*
2434  * If the file is not found, assume it's because the standby asked for a
2435  * too old WAL segment that has already been removed or recycled.
2436  */
2437  if (errno == ENOENT)
2438  {
2439  char xlogfname[MAXFNAMELEN];
2440  int save_errno = errno;
2441 
2442  XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
2443  errno = save_errno;
2444  ereport(ERROR,
2446  errmsg("requested WAL segment %s has already been removed",
2447  xlogfname)));
2448  }
2449  else
2450  ereport(ERROR,
2452  errmsg("could not open file \"%s\": %m",
2453  path)));
2454  return -1; /* keep compiler quiet */
2455 }
2456 
2457 /*
2458  * Send out the WAL in its normal physical/stored form.
2459  *
2460  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
2461  * but not yet sent to the client, and buffer it in the libpq output
2462  * buffer.
2463  *
2464  * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
2465  * otherwise WalSndCaughtUp is set to false.
2466  */
2467 static void
2469 {
2470  XLogRecPtr SendRqstPtr;
2472  XLogRecPtr endptr;
2473  Size nbytes;
2474  XLogSegNo segno;
2475  WALReadError errinfo;
2476 
2477  /* If requested switch the WAL sender to the stopping state. */
2478  if (got_STOPPING)
2480 
2482  {
2483  WalSndCaughtUp = true;
2484  return;
2485  }
2486 
2487  /* Figure out how far we can safely send the WAL. */
2489  {
2490  /*
2491  * Streaming an old timeline that's in this server's history, but is
2492  * not the one we're currently inserting or replaying. It can be
2493  * streamed up to the point where we switched off that timeline.
2494  */
2495  SendRqstPtr = sendTimeLineValidUpto;
2496  }
2497  else if (am_cascading_walsender)
2498  {
2499  /*
2500  * Streaming the latest timeline on a standby.
2501  *
2502  * Attempt to send all WAL that has already been replayed, so that we
2503  * know it's valid. If we're receiving WAL through streaming
2504  * replication, it's also OK to send any WAL that has been received
2505  * but not replayed.
2506  *
2507  * The timeline we're recovering from can change, or we can be
2508  * promoted. In either case, the current timeline becomes historic. We
2509  * need to detect that so that we don't try to stream past the point
2510  * where we switched to another timeline. We check for promotion or
2511  * timeline switch after calculating FlushPtr, to avoid a race
2512  * condition: if the timeline becomes historic just after we checked
2513  * that it was still current, it's still be OK to stream it up to the
2514  * FlushPtr that was calculated before it became historic.
2515  */
2516  bool becameHistoric = false;
2517 
2518  SendRqstPtr = GetStandbyFlushRecPtr();
2519 
2520  if (!RecoveryInProgress())
2521  {
2522  /*
2523  * We have been promoted. RecoveryInProgress() updated
2524  * ThisTimeLineID to the new current timeline.
2525  */
2526  am_cascading_walsender = false;
2527  becameHistoric = true;
2528  }
2529  else
2530  {
2531  /*
2532  * Still a cascading standby. But is the timeline we're sending
2533  * still the one recovery is recovering from? ThisTimeLineID was
2534  * updated by the GetStandbyFlushRecPtr() call above.
2535  */
2537  becameHistoric = true;
2538  }
2539 
2540  if (becameHistoric)
2541  {
2542  /*
2543  * The timeline we were sending has become historic. Read the
2544  * timeline history file of the new timeline to see where exactly
2545  * we forked off from the timeline we were sending.
2546  */
2547  List *history;
2548 
2551 
2553  list_free_deep(history);
2554 
2555  sendTimeLineIsHistoric = true;
2556 
2557  SendRqstPtr = sendTimeLineValidUpto;
2558  }
2559  }
2560  else
2561  {
2562  /*
2563  * Streaming the current timeline on a master.
2564  *
2565  * Attempt to send all data that's already been written out and
2566  * fsync'd to disk. We cannot go further than what's been written out
2567  * given the current implementation of XLogRead(). And in any case
2568  * it's unsafe to send WAL that is not securely down to disk on the
2569  * master: if the master subsequently crashes and restarts, standbys
2570  * must not have applied any WAL that got lost on the master.
2571  */
2572  SendRqstPtr = GetFlushRecPtr();
2573  }
2574 
2575  /*
2576  * Record the current system time as an approximation of the time at which
2577  * this WAL location was written for the purposes of lag tracking.
2578  *
2579  * In theory we could make XLogFlush() record a time in shmem whenever WAL
2580  * is flushed and we could get that time as well as the LSN when we call
2581  * GetFlushRecPtr() above (and likewise for the cascading standby
2582  * equivalent), but rather than putting any new code into the hot WAL path
2583  * it seems good enough to capture the time here. We should reach this
2584  * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2585  * may take some time, we read the WAL flush pointer and take the time
2586  * very close to together here so that we'll get a later position if it is
2587  * still moving.
2588  *
2589  * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
2590  * this gives us a cheap approximation for the WAL flush time for this
2591  * LSN.
2592  *
2593  * Note that the LSN is not necessarily the LSN for the data contained in
2594  * the present message; it's the end of the WAL, which might be further
2595  * ahead. All the lag tracking machinery cares about is finding out when
2596  * that arbitrary LSN is eventually reported as written, flushed and
2597  * applied, so that it can measure the elapsed time.
2598  */
2599  LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2600 
2601  /*
2602  * If this is a historic timeline and we've reached the point where we
2603  * forked to the next timeline, stop streaming.
2604  *
2605  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2606  * startup process will normally replay all WAL that has been received
2607  * from the master, before promoting, but if the WAL streaming is
2608  * terminated at a WAL page boundary, the valid portion of the timeline
2609  * might end in the middle of a WAL record. We might've already sent the
2610  * first half of that partial WAL record to the cascading standby, so that
2611  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2612  * replay the partial WAL record either, so it can still follow our
2613  * timeline switch.
2614  */
2616  {
2617  /* close the current file. */
2618  if (sendSeg->ws_file >= 0)
2619  close(sendSeg->ws_file);
2620  sendSeg->ws_file = -1;
2621 
2622  /* Send CopyDone */
2623  pq_putmessage_noblock('c', NULL, 0);
2624  streamingDoneSending = true;
2625 
2626  WalSndCaughtUp = true;
2627 
2628  elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2630  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2631  return;
2632  }
2633 
2634  /* Do we have any work to do? */
2635  Assert(sentPtr <= SendRqstPtr);
2636  if (SendRqstPtr <= sentPtr)
2637  {
2638  WalSndCaughtUp = true;
2639  return;
2640  }
2641 
2642  /*
2643  * Figure out how much to send in one message. If there's no more than
2644  * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2645  * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2646  *
2647  * The rounding is not only for performance reasons. Walreceiver relies on
2648  * the fact that we never split a WAL record across two messages. Since a
2649  * long WAL record is split at page boundary into continuation records,
2650  * page boundary is always a safe cut-off point. We also assume that
2651  * SendRqstPtr never points to the middle of a WAL record.
2652  */
2653  startptr = sentPtr;
2654  endptr = startptr;
2655  endptr += MAX_SEND_SIZE;
2656 
2657  /* if we went beyond SendRqstPtr, back off */
2658  if (SendRqstPtr <= endptr)
2659  {
2660  endptr = SendRqstPtr;
2662  WalSndCaughtUp = false;
2663  else
2664  WalSndCaughtUp = true;
2665  }
2666  else
2667  {
2668  /* round down to page boundary. */
2669  endptr -= (endptr % XLOG_BLCKSZ);
2670  WalSndCaughtUp = false;
2671  }
2672 
2673  nbytes = endptr - startptr;
2674  Assert(nbytes <= MAX_SEND_SIZE);
2675 
2676  /*
2677  * OK to read and send the slice.
2678  */
2679  resetStringInfo(&output_message);
2680  pq_sendbyte(&output_message, 'w');
2681 
2682  pq_sendint64(&output_message, startptr); /* dataStart */
2683  pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2684  pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2685 
2686  /*
2687  * Read the log directly into the output buffer to avoid extra memcpy
2688  * calls.
2689  */
2690  enlargeStringInfo(&output_message, nbytes);
2691 
2692 retry:
2693  if (!WALRead(&output_message.data[output_message.len],
2694  startptr,
2695  nbytes,
2696  sendSeg->ws_tli, /* Pass the current TLI because only
2697  * WalSndSegmentOpen controls whether new
2698  * TLI is needed. */
2699  sendSeg,
2700  sendCxt,
2702  &errinfo))
2703  WALReadRaiseError(&errinfo);
2704 
2705  /* See logical_read_xlog_page(). */
2706  XLByteToSeg(startptr, segno, sendCxt->ws_segsize);
2707  CheckXLogRemoved(segno, sendSeg->ws_tli);
2708 
2709  /*
2710  * During recovery, the currently-open WAL file might be replaced with the
2711  * file of the same name retrieved from archive. So we always need to
2712  * check what we read was valid after reading into the buffer. If it's
2713  * invalid, we try to open and read the file again.
2714  */
2716  {
2717  WalSnd *walsnd = MyWalSnd;
2718  bool reload;
2719 
2720  SpinLockAcquire(&walsnd->mutex);
2721  reload = walsnd->needreload;
2722  walsnd->needreload = false;
2723  SpinLockRelease(&walsnd->mutex);
2724 
2725  if (reload && sendSeg->ws_file >= 0)
2726  {
2727  close(sendSeg->ws_file);
2728  sendSeg->ws_file = -1;
2729 
2730  goto retry;
2731  }
2732  }
2733 
2734  output_message.len += nbytes;
2735  output_message.data[output_message.len] = '\0';
2736 
2737  /*
2738  * Fill the send timestamp last, so that it is taken as late as possible.
2739  */
2740  resetStringInfo(&tmpbuf);
2741  pq_sendint64(&tmpbuf, GetCurrentTimestamp());
2742  memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2743  tmpbuf.data, sizeof(int64));
2744 
2745  pq_putmessage_noblock('d', output_message.data, output_message.len);
2746 
2747  sentPtr = endptr;
2748 
2749  /* Update shared memory status */
2750  {
2751  WalSnd *walsnd = MyWalSnd;
2752 
2753  SpinLockAcquire(&walsnd->mutex);
2754  walsnd->sentPtr = sentPtr;
2755  SpinLockRelease(&walsnd->mutex);
2756  }
2757 
2758  /* Report progress of XLOG streaming in PS display */
2760  {
2761  char activitymsg[50];
2762 
2763  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
2764  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2765  set_ps_display(activitymsg, false);
2766  }
2767 }
2768 
2769 /*
2770  * Stream out logically decoded data.
2771  */
2772 static void
2774 {
2775  XLogRecord *record;
2776  char *errm;
2777 
2778  /*
2779  * We'll use the current flush point to determine whether we've caught up.
2780  * This variable is static in order to cache it across calls. Caching is
2781  * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
2782  * spinlock.
2783  */
2784  static XLogRecPtr flushPtr = InvalidXLogRecPtr;
2785 
2786  /*
2787  * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
2788  * true in WalSndWaitForWal, if we're actually waiting. We also set to
2789  * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
2790  * didn't wait - i.e. when we're shutting down.
2791  */
2792  WalSndCaughtUp = false;
2793 
2794  record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
2796 
2797  /* xlog record was invalid */
2798  if (errm != NULL)
2799  elog(ERROR, "%s", errm);
2800 
2801  if (record != NULL)
2802  {
2803  /*
2804  * Note the lack of any call to LagTrackerWrite() which is handled by
2805  * WalSndUpdateProgress which is called by output plugin through
2806  * logical decoding write api.
2807  */
2808  LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
2809 
2810  sentPtr = logical_decoding_ctx->reader->EndRecPtr;
2811  }
2812 
2813  /*
2814  * If first time through in this session, initialize flushPtr. Otherwise,
2815  * we only need to update flushPtr if EndRecPtr is past it.
2816  */
2817  if (flushPtr == InvalidXLogRecPtr)
2818  flushPtr = GetFlushRecPtr();
2819  else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2820  flushPtr = GetFlushRecPtr();
2821 
2822  /* If EndRecPtr is still past our flushPtr, it means we caught up. */
2823  if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2824  WalSndCaughtUp = true;
2825 
2826  /*
2827  * If we're caught up and have been requested to stop, have WalSndLoop()
2828  * terminate the connection in an orderly manner, after writing out all
2829  * the pending data.
2830  */
2832  got_SIGUSR2 = true;
2833 
2834  /* Update shared memory status */
2835  {
2836  WalSnd *walsnd = MyWalSnd;
2837 
2838  SpinLockAcquire(&walsnd->mutex);
2839  walsnd->sentPtr = sentPtr;
2840  SpinLockRelease(&walsnd->mutex);
2841  }
2842 }
2843 
2844 /*
2845  * Shutdown if the sender is caught up.
2846  *
2847  * NB: This should only be called when the shutdown signal has been received
2848  * from postmaster.
2849  *
2850  * Note that if we determine that there's still more data to send, this
2851  * function will return control to the caller.
2852  */
2853 static void
2855 {
2856  XLogRecPtr replicatedPtr;
2857 
2858  /* ... let's just be real sure we're caught up ... */
2859  send_data();
2860 
2861  /*
2862  * To figure out whether all WAL has successfully been replicated, check
2863  * flush location if valid, write otherwise. Tools like pg_receivewal will
2864  * usually (unless in synchronous mode) return an invalid flush location.
2865  */
2866  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2867  MyWalSnd->write : MyWalSnd->flush;
2868 
2869  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2870  !pq_is_send_pending())
2871  {
2872  /* Inform the standby that XLOG streaming is done */
2873  EndCommand("COPY 0", DestRemote);
2874  pq_flush();
2875 
2876  proc_exit(0);
2877  }
2879  {
2880  WalSndKeepalive(true);
2882  }
2883 }
2884 
2885 /*
2886  * Returns the latest point in WAL that has been safely flushed to disk, and
2887  * can be sent to the standby. This should only be called when in recovery,
2888  * ie. we're streaming to a cascaded standby.
2889  *
2890  * As a side-effect, ThisTimeLineID is updated to the TLI of the last
2891  * replayed WAL record.
2892  */
2893 static XLogRecPtr
2895 {
2896  XLogRecPtr replayPtr;
2897  TimeLineID replayTLI;
2898  XLogRecPtr receivePtr;
2900  XLogRecPtr result;
2901 
2902  /*
2903  * We can safely send what's already been replayed. Also, if walreceiver
2904  * is streaming WAL from the same timeline, we can send anything that it
2905  * has streamed, but hasn't been replayed yet.
2906  */
2907 
2908  receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
2909  replayPtr = GetXLogReplayRecPtr(&replayTLI);
2910 
2911  ThisTimeLineID = replayTLI;
2912 
2913  result = replayPtr;
2914  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2915  result = receivePtr;
2916 
2917  return result;
2918 }
2919 
2920 /*
2921  * Request walsenders to reload the currently-open WAL file
2922  */
2923 void
2925 {
2926  int i;
2927 
2928  for (i = 0; i < max_wal_senders; i++)
2929  {
2930  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2931 
2932  SpinLockAcquire(&walsnd->mutex);
2933  if (walsnd->pid == 0)
2934  {
2935  SpinLockRelease(&walsnd->mutex);
2936  continue;
2937  }
2938  walsnd->needreload = true;
2939  SpinLockRelease(&walsnd->mutex);
2940  }
2941 }
2942 
2943 /*
2944  * Handle PROCSIG_WALSND_INIT_STOPPING signal.
2945  */
2946 void
2948 {
2950 
2951  /*
2952  * If replication has not yet started, die like with SIGTERM. If
2953  * replication is active, only set a flag and wake up the main loop. It
2954  * will send any outstanding WAL, wait for it to be replicated to the
2955  * standby, and then exit gracefully.
2956  */
2957  if (!replication_active)
2958  kill(MyProcPid, SIGTERM);
2959  else
2960  got_STOPPING = true;
2961 }
2962 
2963 /*
2964  * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
2965  * sender should already have been switched to WALSNDSTATE_STOPPING at
2966  * this point.
2967  */
2968 static void
2970 {
2971  int save_errno = errno;
2972 
2973  got_SIGUSR2 = true;
2974  SetLatch(MyLatch);
2975 
2976  errno = save_errno;
2977 }
2978 
2979 /* Set up signal handlers */
2980 void
2982 {
2983  /* Set up signal handlers */
2985  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
2986  pqsignal(SIGTERM, die); /* request shutdown */
2987  pqsignal(SIGQUIT, quickdie); /* hard crash time */
2988  InitializeTimeouts(); /* establishes SIGALRM handler */
2991  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
2992  * shutdown */
2993 
2994  /* Reset some signals that are accepted by postmaster but not here */
2996 }
2997 
2998 /* Report shared-memory space needed by WalSndShmemInit */
2999 Size
3001 {
3002  Size size = 0;
3003 
3004  size = offsetof(WalSndCtlData, walsnds);
3005  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3006 
3007  return size;
3008 }
3009 
3010 /* Allocate and initialize walsender-related shared memory */
3011 void
3013 {
3014  bool found;
3015  int i;
3016 
3017  WalSndCtl = (WalSndCtlData *)
3018  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3019 
3020  if (!found)
3021  {
3022  /* First time through, so initialize */
3023  MemSet(WalSndCtl, 0, WalSndShmemSize());
3024 
3025  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3026  SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
3027 
3028  for (i = 0; i < max_wal_senders; i++)
3029  {
3030  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3031 
3032  SpinLockInit(&walsnd->mutex);
3033  }
3034  }
3035 }
3036 
3037 /*
3038  * Wake up all walsenders
3039  *
3040  * This will be called inside critical sections, so throwing an error is not
3041  * advisable.
3042  */
3043 void
3045 {
3046  int i;
3047 
3048  for (i = 0; i < max_wal_senders; i++)
3049  {
3050  Latch *latch;
3051  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3052 
3053  /*
3054  * Get latch pointer with spinlock held, for the unlikely case that
3055  * pointer reads aren't atomic (as they're 8 bytes).
3056  */
3057  SpinLockAcquire(&walsnd->mutex);
3058  latch = walsnd->latch;
3059  SpinLockRelease(&walsnd->mutex);
3060 
3061  if (latch != NULL)
3062  SetLatch(latch);
3063  }
3064 }
3065 
3066 /*
3067  * Signal all walsenders to move to stopping state.
3068  *
3069  * This will trigger walsenders to move to a state where no further WAL can be
3070  * generated. See this file's header for details.
3071  */
3072 void
3074 {
3075  int i;
3076 
3077  for (i = 0; i < max_wal_senders; i++)
3078  {
3079  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3080  pid_t pid;
3081 
3082  SpinLockAcquire(&walsnd->mutex);
3083  pid = walsnd->pid;
3084  SpinLockRelease(&walsnd->mutex);
3085 
3086  if (pid == 0)
3087  continue;
3088 
3090  }
3091 }
3092 
3093 /*
3094  * Wait that all the WAL senders have quit or reached the stopping state. This
3095  * is used by the checkpointer to control when the shutdown checkpoint can
3096  * safely be performed.
3097  */
3098 void
3100 {
3101  for (;;)
3102  {
3103  int i;
3104  bool all_stopped = true;
3105 
3106  for (i = 0; i < max_wal_senders; i++)
3107  {
3108  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3109 
3110  SpinLockAcquire(&walsnd->mutex);
3111 
3112  if (walsnd->pid == 0)
3113  {
3114  SpinLockRelease(&walsnd->mutex);
3115  continue;
3116  }
3117 
3118  if (walsnd->state != WALSNDSTATE_STOPPING)
3119  {
3120  all_stopped = false;
3121  SpinLockRelease(&walsnd->mutex);
3122  break;
3123  }
3124  SpinLockRelease(&walsnd->mutex);
3125  }
3126 
3127  /* safe to leave if confirmation is done for all WAL senders */
3128  if (all_stopped)
3129  return;
3130 
3131  pg_usleep(10000L); /* wait for 10 msec */
3132  }
3133 }
3134 
3135 /* Set state for current walsender (only called in walsender) */
3136 void
3138 {
3139  WalSnd *walsnd = MyWalSnd;
3140 
3142 
3143  if (walsnd->state == state)
3144  return;
3145 
3146  SpinLockAcquire(&walsnd->mutex);
3147  walsnd->state = state;
3148  SpinLockRelease(&walsnd->mutex);
3149 }
3150 
3151 /*
3152  * Return a string constant representing the state. This is used
3153  * in system views, and should *not* be translated.
3154  */
3155 static const char *
3157 {
3158  switch (state)
3159  {
3160  case WALSNDSTATE_STARTUP:
3161  return "startup";
3162  case WALSNDSTATE_BACKUP:
3163  return "backup";
3164  case WALSNDSTATE_CATCHUP:
3165  return "catchup";
3166  case WALSNDSTATE_STREAMING:
3167  return "streaming";
3168  case WALSNDSTATE_STOPPING:
3169  return "stopping";
3170  }
3171  return "UNKNOWN";
3172 }
3173 
3174 static Interval *
3176 {
3177  Interval *result = palloc(sizeof(Interval));
3178 
3179  result->month = 0;
3180  result->day = 0;
3181  result->time = offset;
3182 
3183  return result;
3184 }
3185 
3186 /*
3187  * Returns activity of walsenders, including pids and xlog locations sent to
3188  * standby servers.
3189  */
3190 Datum
3192 {
3193 #define PG_STAT_GET_WAL_SENDERS_COLS 15
3194  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3195  TupleDesc tupdesc;
3196  Tuplestorestate *tupstore;
3197  MemoryContext per_query_ctx;
3198  MemoryContext oldcontext;
3199  List *sync_standbys;
3200  int i;
3201 
3202  /* check to see if caller supports us returning a tuplestore */
3203  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3204  ereport(ERROR,
3205  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3206  errmsg("set-valued function called in context that cannot accept a set")));
3207  if (!(rsinfo->allowedModes & SFRM_Materialize))
3208  ereport(ERROR,
3209  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3210  errmsg("materialize mode required, but it is not allowed in this context")));
3211 
3212  /* Build a tuple descriptor for our result type */
3213  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3214  elog(ERROR, "return type must be a row type");
3215 
3216  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3217  oldcontext = MemoryContextSwitchTo(per_query_ctx);
3218 
3219  tupstore = tuplestore_begin_heap(true, false, work_mem);
3220  rsinfo->returnMode = SFRM_Materialize;
3221  rsinfo->setResult = tupstore;
3222  rsinfo->setDesc = tupdesc;
3223 
3224  MemoryContextSwitchTo(oldcontext);
3225 
3226  /*
3227  * Get the currently active synchronous standbys.
3228  */
3229  LWLockAcquire(SyncRepLock, LW_SHARED);
3230  sync_standbys = SyncRepGetSyncStandbys(NULL);
3231  LWLockRelease(SyncRepLock);
3232 
3233  for (i = 0; i < max_wal_senders; i++)
3234  {
3235  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3237  XLogRecPtr write;
3238  XLogRecPtr flush;
3239  XLogRecPtr apply;
3240  TimeOffset writeLag;
3241  TimeOffset flushLag;
3242  TimeOffset applyLag;
3243  int priority;
3244  int pid;
3246  TimestampTz replyTime;
3247  int64 spillTxns;
3248  int64 spillCount;
3249  int64 spillBytes;
3251  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3252 
3253  SpinLockAcquire(&walsnd->mutex);
3254  if (walsnd->pid == 0)
3255  {
3256  SpinLockRelease(&walsnd->mutex);
3257  continue;
3258  }
3259  pid = walsnd->pid;
3260  sentPtr = walsnd->sentPtr;
3261  state = walsnd->state;
3262  write = walsnd->write;
3263  flush = walsnd->flush;
3264  apply = walsnd->apply;
3265  writeLag = walsnd->writeLag;
3266  flushLag = walsnd->flushLag;
3267  applyLag = walsnd->applyLag;
3268  priority = walsnd->sync_standby_priority;
3269  replyTime = walsnd->replyTime;
3270  spillTxns = walsnd->spillTxns;
3271  spillCount = walsnd->spillCount;
3272  spillBytes = walsnd->spillBytes;
3273  SpinLockRelease(&walsnd->mutex);
3274 
3275  memset(nulls, 0, sizeof(nulls));
3276  values[0] = Int32GetDatum(pid);
3277 
3278  if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
3279  {
3280  /*
3281  * Only superusers and members of pg_read_all_stats can see
3282  * details. Other users only get the pid value to know it's a
3283  * walsender, but no details.
3284  */
3285  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3286  }
3287  else
3288  {
3289  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3290 
3291  if (XLogRecPtrIsInvalid(sentPtr))
3292  nulls[2] = true;
3293  values[2] = LSNGetDatum(sentPtr);
3294 
3295  if (XLogRecPtrIsInvalid(write))
3296  nulls[3] = true;
3297  values[3] = LSNGetDatum(write);
3298 
3299  if (XLogRecPtrIsInvalid(flush))
3300  nulls[4] = true;
3301  values[4] = LSNGetDatum(flush);
3302 
3303  if (XLogRecPtrIsInvalid(apply))
3304  nulls[5] = true;
3305  values[5] = LSNGetDatum(apply);
3306 
3307  /*
3308  * Treat a standby such as a pg_basebackup background process
3309  * which always returns an invalid flush location, as an
3310  * asynchronous standby.
3311  */
3312  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3313 
3314  if (writeLag < 0)
3315  nulls[6] = true;
3316  else
3317  values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3318 
3319  if (flushLag < 0)
3320  nulls[7] = true;
3321  else
3322  values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3323 
3324  if (applyLag < 0)
3325  nulls[8] = true;
3326  else
3327  values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3328 
3329  values[9] = Int32GetDatum(priority);
3330 
3331  /*
3332  * More easily understood version of standby state. This is purely
3333  * informational.
3334  *
3335  * In quorum-based sync replication, the role of each standby
3336  * listed in synchronous_standby_names can be changing very
3337  * frequently. Any standbys considered as "sync" at one moment can
3338  * be switched to "potential" ones at the next moment. So, it's
3339  * basically useless to report "sync" or "potential" as their sync
3340  * states. We report just "quorum" for them.
3341  */
3342  if (priority == 0)
3343  values[10] = CStringGetTextDatum("async");
3344  else if (list_member_int(sync_standbys, i))
3346  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3347  else
3348  values[10] = CStringGetTextDatum("potential");
3349 
3350  if (replyTime == 0)
3351  nulls[11] = true;
3352  else
3353  values[11] = TimestampTzGetDatum(replyTime);
3354 
3355  /* spill to disk */
3356  values[12] = Int64GetDatum(spillTxns);
3357  values[13] = Int64GetDatum(spillCount);
3358  values[14] = Int64GetDatum(spillBytes);
3359  }
3360 
3361  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3362  }
3363 
3364  /* clean up and return the tuplestore */
3365  tuplestore_donestoring(tupstore);
3366 
3367  return (Datum) 0;
3368 }
3369 
3370 /*
3371  * This function is used to send a keepalive message to standby.
3372  * If requestReply is set, sets a flag in the message requesting the standby
3373  * to send a message back to us, for heartbeat purposes.
3374  */
3375 static void
3376 WalSndKeepalive(bool requestReply)
3377 {
3378  elog(DEBUG2, "sending replication keepalive");
3379 
3380  /* construct the message... */
3381  resetStringInfo(&output_message);
3382  pq_sendbyte(&output_message, 'k');
3383  pq_sendint64(&output_message, sentPtr);
3384  pq_sendint64(&output_message, GetCurrentTimestamp());
3385  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3386 
3387  /* ... and send it wrapped in CopyData */
3388  pq_putmessage_noblock('d', output_message.data, output_message.len);
3389 }
3390 
3391 /*
3392  * Send keepalive message if too much time has elapsed.
3393  */
3394 static void
3396 {
3397  TimestampTz ping_time;
3398 
3399  /*
3400  * Don't send keepalive messages if timeouts are globally disabled or
3401  * we're doing something not partaking in timeouts.
3402  */
3403  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3404  return;
3405 
3407  return;
3408 
3409  /*
3410  * If half of wal_sender_timeout has lapsed without receiving any reply
3411  * from the standby, send a keep-alive message to the standby requesting
3412  * an immediate reply.
3413  */
3415  wal_sender_timeout / 2);
3416  if (last_processing >= ping_time)
3417  {
3418  WalSndKeepalive(true);
3420 
3421  /* Try to flush pending output to the client */
3422  if (pq_flush_if_writable() != 0)
3423  WalSndShutdown();
3424  }
3425 }
3426 
3427 /*
3428  * Record the end of the WAL and the time it was flushed locally, so that
3429  * LagTrackerRead can compute the elapsed time (lag) when this WAL location is
3430  * eventually reported to have been written, flushed and applied by the
3431  * standby in a reply message.
3432  */
3433 static void
3435 {
3436  bool buffer_full;
3437  int new_write_head;
3438  int i;
3439 
3440  if (!am_walsender)
3441  return;
3442 
3443  /*
3444  * If the lsn hasn't advanced since last time, then do nothing. This way
3445  * we only record a new sample when new WAL has been written.
3446  */
3447  if (lag_tracker->last_lsn == lsn)
3448  return;
3449  lag_tracker->last_lsn = lsn;
3450 
3451  /*
3452  * If advancing the write head of the circular buffer would crash into any
3453  * of the read heads, then the buffer is full. In other words, the
3454  * slowest reader (presumably apply) is the one that controls the release
3455  * of space.
3456  */
3457  new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3458  buffer_full = false;
3459  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3460  {
3461  if (new_write_head == lag_tracker->read_heads[i])
3462  buffer_full = true;
3463  }
3464 
3465  /*
3466  * If the buffer is full, for now we just rewind by one slot and overwrite
3467  * the last sample, as a simple (if somewhat uneven) way to lower the
3468  * sampling rate. There may be better adaptive compaction algorithms.
3469  */
3470  if (buffer_full)
3471  {
3472  new_write_head = lag_tracker->write_head;
3473  if (lag_tracker->write_head > 0)
3474  lag_tracker->write_head--;
3475  else
3476  lag_tracker->write_head = LAG_TRACKER_BUFFER_SIZE - 1;
3477  }
3478 
3479  /* Store a sample at the current write head position. */
3480  lag_tracker->buffer[lag_tracker->write_head].lsn = lsn;
3481  lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
3482  lag_tracker->write_head = new_write_head;
3483 }
3484 
3485 /*
3486  * Find out how much time has elapsed between the moment WAL location 'lsn'
3487  * (or the highest known earlier LSN) was flushed locally and the time 'now'.
3488  * We have a separate read head for each of the reported LSN locations we
3489  * receive in replies from standby; 'head' controls which read head is
3490  * used. Whenever a read head crosses an LSN which was written into the
3491  * lag buffer with LagTrackerWrite, we can use the associated timestamp to
3492  * find out the time this LSN (or an earlier one) was flushed locally, and
3493  * therefore compute the lag.
3494  *
3495  * Return -1 if no new sample data is available, and otherwise the elapsed
3496  * time in microseconds.
3497  */
3498 static TimeOffset
3500 {
3501  TimestampTz time = 0;
3502 
3503  /* Read all unread samples up to this LSN or end of buffer. */
3504  while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
3505  lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
3506  {
3507  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3508  lag_tracker->last_read[head] =
3509  lag_tracker->buffer[lag_tracker->read_heads[head]];
3510  lag_tracker->read_heads[head] =
3511  (lag_tracker->read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
3512  }
3513 
3514  /*
3515  * If the lag tracker is empty, that means the standby has processed
3516  * everything we've ever sent so we should now clear 'last_read'. If we
3517  * didn't do that, we'd risk using a stale and irrelevant sample for
3518  * interpolation at the beginning of the next burst of WAL after a period
3519  * of idleness.
3520  */
3521  if (lag_tracker->read_heads[head] == lag_tracker->write_head)
3522  lag_tracker->last_read[head].time = 0;
3523 
3524  if (time > now)
3525  {
3526  /* If the clock somehow went backwards, treat as not found. */
3527  return -1;
3528  }
3529  else if (time == 0)
3530  {
3531  /*
3532  * We didn't cross a time. If there is a future sample that we
3533  * haven't reached yet, and we've already reached at least one sample,
3534  * let's interpolate the local flushed time. This is mainly useful
3535  * for reporting a completely stuck apply position as having
3536  * increasing lag, since otherwise we'd have to wait for it to
3537  * eventually start moving again and cross one of our samples before
3538  * we can show the lag increasing.
3539  */
3540  if (lag_tracker->read_heads[head] == lag_tracker->write_head)
3541  {
3542  /* There are no future samples, so we can't interpolate. */
3543  return -1;
3544  }
3545  else if (lag_tracker->last_read[head].time != 0)
3546  {
3547  /* We can interpolate between last_read and the next sample. */
3548  double fraction;
3549  WalTimeSample prev = lag_tracker->last_read[head];
3550  WalTimeSample next = lag_tracker->buffer[lag_tracker->read_heads[head]];
3551 
3552  if (lsn < prev.lsn)
3553  {
3554  /*
3555  * Reported LSNs shouldn't normally go backwards, but it's
3556  * possible when there is a timeline change. Treat as not
3557  * found.
3558  */
3559  return -1;
3560  }
3561 
3562  Assert(prev.lsn < next.lsn);
3563 
3564  if (prev.time > next.time)
3565  {
3566  /* If the clock somehow went backwards, treat as not found. */
3567  return -1;
3568  }
3569 
3570  /* See how far we are between the previous and next samples. */
3571  fraction =
3572  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3573 
3574  /* Scale the local flush time proportionally. */
3575  time = (TimestampTz)
3576  ((double) prev.time + (next.time - prev.time) * fraction);
3577  }
3578  else
3579  {
3580  /*
3581  * We have only a future sample, implying that we were entirely
3582  * caught up but and now there is a new burst of WAL and the
3583  * standby hasn't processed the first sample yet. Until the
3584  * standby reaches the future sample the best we can do is report
3585  * the hypothetical lag if that sample were to be replayed now.
3586  */
3587  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3588  }
3589  }
3590 
3591  /* Return the elapsed time since local flush time in microseconds. */
3592  Assert(time != 0);
3593  return now - time;
3594 }
3595 
3596 static void
3598 {
3599  ReorderBuffer *rb = ctx->reorder;
3600 
3601  SpinLockAcquire(&MyWalSnd->mutex);
3602 
3603  MyWalSnd->spillTxns = rb->spillTxns;
3604  MyWalSnd->spillCount = rb->spillCount;
3605  MyWalSnd->spillBytes = rb->spillBytes;
3606 
3607  elog(DEBUG2, "UpdateSpillStats: updating stats %p %lld %lld %lld",
3608  rb,
3609  (long long) rb->spillTxns,
3610  (long long) rb->spillCount,
3611  (long long) rb->spillBytes);
3612 
3613  SpinLockRelease(&MyWalSnd->mutex);
3614 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
static void InitWalSenderSlot(void)
Definition: walsender.c:2309
void InitializeTimeouts(void)
Definition: timeout.c:346
#define pq_is_send_pending()
Definition: libpq.h:41
#define NIL
Definition: pg_list.h:65
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
XLogRecPtr write
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:210
static void ProcessStandbyMessage(void)
Definition: walsender.c:1735
#define SIGQUIT
Definition: win32_port.h:155
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define IsA(nodeptr, _type_)
Definition: nodes.h:576
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
#define AllocSetContextCreate
Definition: memutils.h:170
XLogRecPtr startpoint
Definition: replnodes.h:85
#define DEBUG1
Definition: elog.h:25
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:196
int MyProcPid
Definition: globals.c:40
int wal_sender_timeout
Definition: walsender.c:123
struct ReorderBuffer * reorder
Definition: logical.h:42
uint32 TimeLineID
Definition: xlogdefs.h:52
#define pq_flush()
Definition: libpq.h:39
static int32 next
Definition: blutils.c:217
XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
struct Port * MyProcPort
Definition: globals.c:43
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
int wal_segment_size
Definition: xlog.c:112
int write_head
Definition: walsender.c:211
uint32 TransactionId
Definition: c.h:514
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
bool wake_wal_senders
Definition: walsender.c:130
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
bool exec_replication_command(const char *cmd_string)
Definition: walsender.c:1462
Oid GetUserId(void)
Definition: miscinit.c:380
#define SIGUSR1
Definition: win32_port.h:166
bool update_process_title
Definition: ps_status.c:35
#define write(a, b, c)
Definition: win32.h:14
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
TransactionId xmin
Definition: proc.h:228
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:2969
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3114
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:436
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
Definition: walsender.c:823
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1076
static WALSegmentContext * sendCxt
Definition: walsender.c:133
#define SIGCHLD
Definition: win32_port.h:164
PGPROC * MyProc
Definition: proc.c:67
int64 TimestampTz
Definition: timestamp.h:39
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:213
Size WalSndShmemSize(void)
Definition: walsender.c:3000
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:272
char * pstrdup(const char *in)
Definition: mcxt.c:1186
void CommitTransactionCommand(void)
Definition: xact.c:2898
#define XACT_REPEATABLE_READ
Definition: xact.h:38
#define SpinLockInit(lock)
Definition: spin.h:60
void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
Definition: snapmgr.c:2225
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:330
uint8 syncrep_method
Definition: syncrep.h:51
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:944
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:75
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:375
static void XLogSendPhysical(void)
Definition: walsender.c:2468
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2256
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
TimeOffset flushLag
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:221
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:335
Definition: nodes.h:525
static StringInfoData output_message
Definition: walsender.c:153
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
struct cursor * cur
Definition: ecpg.c:28
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:183
void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, int segsize, const char *waldir)
Definition: xlogreader.c:209
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:608
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
#define MemSet(start, val, len)
Definition: c.h:962
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2370
#define kill(pid, sig)
Definition: win32_port.h:426
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:2854
#define WL_SOCKET_READABLE
Definition: latch.h:125
void ReplicationSlotSave(void)
Definition: slot.c:645
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8262
List * SyncRepGetSyncStandbys(bool *am_sync)
Definition: syncrep.c:700
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
TimestampTz time
Definition: walsender.c:200
int sleeptime
Definition: pg_standby.c:41
#define SIGPIPE
Definition: win32_port.h:159
ReplicationSlotPersistentData data
Definition: slot.h:132
TimeOffset writeLag
#define SIGUSR2
Definition: win32_port.h:167
#define LOG
Definition: elog.h:26
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
bool RecoveryInProgress(void)
Definition: xlog.c:7930
void SetLatch(Latch *latch)
Definition: latch.c:436
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1682
void list_free_deep(List *list)
Definition: list.c:1391
static int fd(const char *x, int i)
Definition: preproc-init.c:105
pgsocket sock
Definition: libpq-be.h:122
#define PG_BINARY
Definition: c.h:1221
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:8814
void ResetLatch(Latch *latch)
Definition: latch.c:519
slock_t mutex
Node * replication_parse_result
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2833
Latch procLatch
Definition: proc.h:104
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
Definition: xlogutils.c:681
XLogRecord * XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
Definition: xlogreader.c:238
XLogRecPtr confirmed_flush
Definition: slot.h:80
PGXACT * MyPgXact
Definition: proc.c:68
XLogRecPtr EndRecPtr
Definition: xlogreader.h:132
int32 day
Definition: timestamp.h:47
#define XidFromFullTransactionId(x)
Definition: transam.h:48
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:1952
static LagTracker * lag_tracker
Definition: walsender.c:216
TimeLineID timeline
Definition: replnodes.h:97
int64 spillCount
ReplicationKind kind
Definition: replnodes.h:56
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
bool IsTransactionBlock(void)
Definition: xact.c:4635
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
int64 spillBytes
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:631
bool am_walsender
Definition: walsender.c:115
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
void HandleWalSndInitStopping(void)
Definition: walsender.c:2947
void ReplicationSlotReserveWal(void)
Definition: slot.c:997
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:250
XLogRecPtr flush
bool defGetBoolean(DefElem *def)
Definition: define.c:111
char data[BLCKSZ]
Definition: c.h:1091
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:462
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:748
ReplicationKind kind
Definition: replnodes.h:82
void WalSndRqstFileReload(void)
Definition: walsender.c:2924
void pfree(void *pointer)
Definition: mcxt.c:1056
void ConditionVariableCancelSleep(void)
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3376
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2314
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:166
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:676
#define ERROR
Definition: elog.h:43
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:94
static TimestampTz last_processing
Definition: walsender.c:158
void pq_startmsgread(void)
Definition: pqcomm.c:1211
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2292
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1766
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3859
bool FirstSnapshotSet
Definition: snapmgr.c:205
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:392
void WalSndSignals(void)
Definition: walsender.c:2981
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
#define FATAL
Definition: elog.h:52
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:747
Latch * latch
void(* WalSndSendDataCallback)(void)
Definition: walsender.c:222
int64 spillTxns
#define MAXPGPATH
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11193
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2236
void ReplicationSlotPersist(void)
Definition: slot.c:680
TransactionId effective_xmin
Definition: slot.h:128
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1041
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:193
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2155
#define DEBUG2
Definition: elog.h:24
bool XLogBackgroundFlush(void)
Definition: xlog.c:2993
bool list_member_int(const List *list, int datum)
Definition: list.c:655
Definition: latch.h:110
Definition: dest.h:88
#define TLHistoryFileName(fname, tli)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
WalSndState state
NodeTag type
Definition: nodes.h:527
static char * buf
Definition: pg_test_fsync.c:67
void WalSndWaitStopping(void)
Definition: walsender.c:3099
static bool streamingDoneSending
Definition: walsender.c:175
uint64 XLogSegNo
Definition: xlogdefs.h:41
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
bool WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALOpenSegment *seg, WALSegmentContext *segcxt, WALSegmentOpen openSegment, WALReadError *errinfo)
Definition: xlogreader.c:1033
int errdetail(const char *fmt,...)
Definition: elog.c:955
XLogSegNo ws_segno
Definition: xlogreader.h:38
#define COMMERROR
Definition: elog.h:30
int errcode_for_file_access(void)
Definition: elog.c:631
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:542
#define SIGHUP
Definition: win32_port.h:154
TransactionId catalog_xmin
Definition: slot.h:69
#define pq_flush_if_writable()
Definition: libpq.h:40
TimeOffset time
Definition: timestamp.h:45
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3331
XLogRecPtr currTLIValidUntil
Definition: xlogreader.h:192
#define InvalidTransactionId
Definition: transam.h:31
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:283
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1203
static XLogRecPtr logical_startptr
Definition: walsender.c:194
unsigned int uint32
Definition: c.h:359
void ReplicationSlotRelease(void)
Definition: slot.c:424
static WALOpenSegment * sendSeg
Definition: walsender.c:132
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1344
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
TransactionId xmin
Definition: slot.h:61
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
void InitWalSender(void)
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:182
#define SlotIsLogical(slot)
Definition: slot.h:154
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1699
void SyncRepInitConfig(void)
Definition: syncrep.c:398
#define ereport(elevel, rest)
Definition: elog.h:141
static void UpdateSpillStats(LogicalDecodingContext *ctx)
Definition: walsender.c:3597
static bool WalSndCaughtUp
Definition: walsender.c:179
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
MemoryContext TopMemoryContext
Definition: mcxt.c:44
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3434
XLogRecPtr lsn
Definition: walsender.c:199
int replication_yyparse(void)
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:246
Definition: guc.h:72
XLogRecPtr last_lsn
Definition: walsender.c:209
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:45
static int WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt, TimeLineID *tli_p)
Definition: walsender.c:2388
int32 month
Definition: timestamp.h:48
int max_wal_senders
Definition: walsender.c:121
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2183
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3395
int CloseTransientFile(int fd)
Definition: fd.c:2469
#define SIG_IGN
Definition: win32_port.h:151
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1176
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1273
static StringInfoData reply_message
Definition: walsender.c:154
#define MAXFNAMELEN
TimeLineID nextTLI
Definition: xlogreader.h:198
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
int64 TimeOffset
Definition: timestamp.h:40
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1324
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2156
Size mul_size(Size s1, Size s2)
Definition: shmem.c:515
#define InvalidBackendId
Definition: backendid.h:23
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:227
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3499
void WalSndErrorCleanup(void)
Definition: walsender.c:300
static volatile sig_atomic_t replication_active
Definition: walsender.c:191
uintptr_t Datum
Definition: postgres.h:367
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3156
XLogRecPtr sentPtr
Size add_size(Size s1, Size s2)
Definition: shmem.c:498
bool am_db_walsender
Definition: walsender.c:118
TransactionId effective_catalog_xmin
Definition: slot.h:129
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1087
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:370
Oid MyDatabaseId
Definition: globals.c:85
static void WalSndShutdown(void)
Definition: walsender.c:226
static TimeLineID receiveTLI
Definition: xlog.c:209
int work_mem
Definition: globals.c:121
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:563
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
WalSnd * MyWalSnd
Definition: walsender.c:112
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:839
static XLogRecPtr sentPtr
Definition: walsender.c:150
int log_min_messages
Definition: guc.c:518
void pq_endmsgread(void)
Definition: pqcomm.c:1235
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:187
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
int allowedModes
Definition: execnodes.h:302
void WalSndInitStopping(void)
Definition: walsender.c:3073
TimeLineID currTLI
Definition: xlogreader.h:182
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4924
TimeOffset applyLag
struct SnapBuild * snapshot_builder
Definition: logical.h:43
int errmsg_internal(const char *fmt,...)
Definition: elog.c:909
SetFunctionReturnMode returnMode
Definition: execnodes.h:304
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2106
#define SIG_DFL
Definition: win32_port.h:149
#define SIGNAL_ARGS
Definition: c.h:1287
static TimeLineID sendTimeLine
Definition: walsender.c:141
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:739
#define lfirst(lc)
Definition: pg_list.h:190
void WalSndSetState(WalSndState state)
Definition: walsender.c:3137
int sync_standby_priority
Definition: regguts.h:298
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:508
static void XLogSendLogical(void)
Definition: walsender.c:2773
XLogRecPtr restart_lsn
Definition: slot.h:72
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:212
void StartTransactionCommand(void)
Definition: xact.c:2797
bool needreload
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1798
size_t Size
Definition: c.h:467
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1320
char * dbname
Definition: streamutil.c:50
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:204
int XactIsoLevel
Definition: xact.c:74
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:142
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:144
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1001
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:230
WalSndState
Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
Definition: walsender.c:3191
TimeLineID ws_tli
Definition: xlogreader.h:39
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:876
static bool streamingDoneReceiving
Definition: walsender.c:176
Tuplestorestate * setResult
Definition: execnodes.h:307
#define pg_attribute_noreturn()
Definition: c.h:147
static StringInfoData tmpbuf
Definition: walsender.c:155
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:981
#define XLogFilePath(path, tli, logSegNo, wal_segsz_bytes)
bool IsSubTransaction(void)
Definition: xact.c:4708
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:539
static Datum values[MAXATTR]
Definition: bootstrap.c:167
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:300
bool log_replication_commands
Definition: walsender.c:125
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4798
#define Int32GetDatum(X)
Definition: postgres.h:479
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:517
char * application_name
Definition: guc.c:541
TupleDesc setDesc
Definition: execnodes.h:308
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
void * palloc(Size size)
Definition: mcxt.c:949
int errmsg(const char *fmt,...)
Definition: elog.c:822
XLogReaderState * reader
Definition: logical.h:41
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
CRSSnapshotAction
Definition: walsender.h:20
#define elog(elevel,...)
Definition: elog.h:228
StringInfo out
Definition: logical.h:70
static const unsigned __int64 epoch
Definition: gettimeofday.c:34
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
int i
void WalSndShmemInit(void)
Definition: walsender.c:3012
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
#define NameStr(name)
Definition: c.h:616
TimeLineID timeline
Definition: replnodes.h:84
#define CStringGetTextDatum(s)
Definition: builtins.h:83
void * arg
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:1903
struct Latch * MyLatch
Definition: globals.c:54
void ReplicationSlotCleanup(void)
Definition: slot.c:479
#define PG_FUNCTION_ARGS
Definition: fmgr.h:188
char * defname
Definition: parsenodes.h:730
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2894
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:767
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:146
TimestampTz replyTime
void LWLockReleaseAll(void)
Definition: lwlock.c:1825
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3175
slock_t mutex
Definition: slot.h:105
#define close(a)
Definition: win32.h:12
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:533
CommandDest whereToSendOutput
Definition: postgres.c:91
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1657
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
static bool waiting_for_ping_response
Definition: walsender.c:167
void pq_puttextmessage(char msgtype, const char *str)
Definition: pqformat.c:369
XLogRecPtr apply
static void IdentifySystem(void)
Definition: walsender.c:347
Definition: pg_list.h:50
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:75
#define snprintf
Definition: port.h:192
#define WL_LATCH_SET
Definition: latch.h:124
int16 AttrNumber
Definition: attnum.h:21
void quickdie(SIGNAL_ARGS)
Definition: postgres.c:2735
#define UINT64_FORMAT
Definition: c.h:402
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:702
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:426
static TimestampTz last_reply_timestamp
Definition: walsender.c:164
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1631
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547
#define MAX_SEND_SIZE
Definition: walsender.c:106
#define die(msg)
Definition: pg_test_fsync.c:96
#define read(a, b, c)
Definition: win32.h:13
static bool sendTimeLineIsHistoric
Definition: walsender.c:143
#define offsetof(type, field)
Definition: c.h:662
void WalSndWakeup(void)
Definition: walsender.c:3044
void ReplicationSlotMarkDirty(void)
Definition: slot.c:663
bool am_cascading_walsender
Definition: walsender.c:116
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:1983
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:692
static XLogRecPtr startptr
Definition: basebackup.c:116
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1293
void replication_scanner_init(const char *query_string)
#define TLHistoryFilePath(path, tli)
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1743
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)