PostgreSQL Source Code  git master
walsender.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * walsender.c
4  *
5  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6  * care of sending XLOG from the primary server to a single recipient.
7  * (Note that there can be more than one walsender process concurrently.)
8  * It is started by the postmaster when the walreceiver of a standby server
9  * connects to the primary server and requests XLOG streaming replication.
10  *
11  * A walsender is similar to a regular backend, ie. there is a one-to-one
12  * relationship between a connection and a walsender process, but instead
13  * of processing SQL queries, it understands a small set of special
14  * replication-mode commands. The START_REPLICATION command begins streaming
15  * WAL to the client. While streaming, the walsender keeps reading XLOG
16  * records from the disk and sends them to the standby server over the
17  * COPY protocol, until either side ends the replication by exiting COPY
18  * mode (or until the connection is closed).
19  *
20  * Normal termination is by SIGTERM, which instructs the walsender to
21  * close the connection and exit(0) at the next convenient moment. Emergency
22  * termination is by SIGQUIT; like any backend, the walsender will simply
23  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24  * are treated as not a crash but approximately normal termination;
25  * the walsender will exit quickly without sending any more XLOG records.
26  *
27  * If the server is shut down, checkpointer sends us
28  * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If
29  * the backend is idle or runs an SQL query this causes the backend to
30  * shutdown, if logical replication is in progress all existing WAL records
31  * are processed followed by a shutdown. Otherwise this causes the walsender
32  * to switch to the "stopping" state. In this state, the walsender will reject
33  * any further replication commands. The checkpointer begins the shutdown
34  * checkpoint once all walsenders are confirmed as stopping. When the shutdown
35  * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
36  * walsender to send any outstanding WAL, including the shutdown checkpoint
37  * record, wait for it to be replicated to the standby, and then exit.
38  *
39  *
40  * Portions Copyright (c) 2010-2019, PostgreSQL Global Development Group
41  *
42  * IDENTIFICATION
43  * src/backend/replication/walsender.c
44  *
45  *-------------------------------------------------------------------------
46  */
47 #include "postgres.h"
48 
49 #include <signal.h>
50 #include <unistd.h>
51 
52 #include "access/printtup.h"
53 #include "access/timeline.h"
54 #include "access/transam.h"
55 #include "access/xact.h"
56 #include "access/xlog_internal.h"
57 #include "access/xlogutils.h"
58 
59 #include "catalog/pg_authid.h"
60 #include "catalog/pg_type.h"
61 #include "commands/dbcommands.h"
62 #include "commands/defrem.h"
63 #include "funcapi.h"
64 #include "libpq/libpq.h"
65 #include "libpq/pqformat.h"
66 #include "miscadmin.h"
67 #include "nodes/replnodes.h"
68 #include "pgstat.h"
69 #include "replication/basebackup.h"
70 #include "replication/decode.h"
71 #include "replication/logical.h"
73 #include "replication/slot.h"
74 #include "replication/snapbuild.h"
75 #include "replication/syncrep.h"
77 #include "replication/walsender.h"
80 #include "storage/fd.h"
81 #include "storage/ipc.h"
82 #include "storage/pmsignal.h"
83 #include "storage/proc.h"
84 #include "storage/procarray.h"
85 #include "tcop/dest.h"
86 #include "tcop/tcopprot.h"
87 #include "utils/builtins.h"
88 #include "utils/guc.h"
89 #include "utils/memutils.h"
90 #include "utils/pg_lsn.h"
91 #include "utils/portal.h"
92 #include "utils/ps_status.h"
93 #include "utils/timeout.h"
94 #include "utils/timestamp.h"
95 
96 /*
97  * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
98  *
99  * We don't have a good idea of what a good value would be; there's some
100  * overhead per message in both walsender and walreceiver, but on the other
101  * hand sending large batches makes walsender less responsive to signals
102  * because signals are checked only between messages. 128kB (with
103  * default 8k blocks) seems like a reasonable guess for now.
104  */
105 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
106 
107 /* Array of WalSnds in shared memory */
109 
110 /* My slot in the shared memory array */
111 WalSnd *MyWalSnd = NULL;
112 
113 /* Global state */
114 bool am_walsender = false; /* Am I a walsender process? */
115 bool am_cascading_walsender = false; /* Am I cascading WAL to another
116  * standby? */
117 bool am_db_walsender = false; /* Connected to a database? */
118 
119 /* User-settable parameters for walsender */
120 int max_wal_senders = 0; /* the maximum number of concurrent
121  * walsenders */
122 int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
123  * data message */
125 
126 /*
127  * State for WalSndWakeupRequest
128  */
129 bool wake_wal_senders = false;
130 
131 static WALOpenSegment *sendSeg = NULL;
132 static WALSegmentContext *sendCxt = NULL;
133 
134 /*
135  * These variables keep track of the state of the timeline we're currently
136  * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
137  * the timeline is not the latest timeline on this server, and the server's
138  * history forked off from that timeline at sendTimeLineValidUpto.
139  */
142 static bool sendTimeLineIsHistoric = false;
144 
145 /*
146  * How far have we sent WAL already? This is also advertised in
147  * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
148  */
149 static XLogRecPtr sentPtr = 0;
150 
151 /* Buffers for constructing outgoing messages and processing reply messages. */
155 
156 /* Timestamp of last ProcessRepliesIfAny(). */
158 
159 /*
160  * Timestamp of last ProcessRepliesIfAny() that saw a reply from the
161  * standby. Set to 0 if wal_sender_timeout doesn't need to be active.
162  */
164 
165 /* Have we sent a heartbeat message asking for reply, since last reply? */
166 static bool waiting_for_ping_response = false;
167 
168 /*
169  * While streaming WAL in Copy mode, streamingDoneSending is set to true
170  * after we have sent CopyDone. We should not send any more CopyData messages
171  * after that. streamingDoneReceiving is set to true when we receive CopyDone
172  * from the other end. When both become true, it's time to exit Copy mode.
173  */
176 
177 /* Are we there yet? */
178 static bool WalSndCaughtUp = false;
179 
180 /* Flags set by signal handlers for later service in main loop */
181 static volatile sig_atomic_t got_SIGUSR2 = false;
182 static volatile sig_atomic_t got_STOPPING = false;
183 
184 /*
185  * This is set while we are streaming. When not set
186  * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
187  * the main loop is responsible for checking got_STOPPING and terminating when
188  * it's set (after streaming any remaining WAL).
189  */
190 static volatile sig_atomic_t replication_active = false;
191 
194 
195 /* A sample associating a WAL location with the time it was written. */
196 typedef struct
197 {
200 } WalTimeSample;
201 
202 /* The size of our buffer of time samples. */
203 #define LAG_TRACKER_BUFFER_SIZE 8192
204 
205 /* A mechanism for tracking replication lag. */
206 typedef struct
207 {
211  int read_heads[NUM_SYNC_REP_WAIT_MODE];
213 } LagTracker;
214 
216 
217 /* Signal handlers */
219 
220 /* Prototypes for private functions */
221 typedef void (*WalSndSendDataCallback) (void);
222 static void WalSndLoop(WalSndSendDataCallback send_data);
223 static void InitWalSenderSlot(void);
224 static void WalSndKill(int code, Datum arg);
226 static void XLogSendPhysical(void);
227 static void XLogSendLogical(void);
228 static void WalSndDone(WalSndSendDataCallback send_data);
229 static XLogRecPtr GetStandbyFlushRecPtr(void);
230 static void IdentifySystem(void);
233 static void StartReplication(StartReplicationCmd *cmd);
235 static void ProcessStandbyMessage(void);
236 static void ProcessStandbyReplyMessage(void);
237 static void ProcessStandbyHSFeedbackMessage(void);
238 static void ProcessRepliesIfAny(void);
239 static void WalSndKeepalive(bool requestReply);
240 static void WalSndKeepaliveIfNecessary(void);
241 static void WalSndCheckTimeOut(void);
243 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
244 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
247 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
248 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
250 
251 static int WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
252  TimeLineID *tli_p);
253 static void UpdateSpillStats(LogicalDecodingContext *ctx);
254 
255 
256 /* Initialize walsender process before entering the main command loop */
257 void
258 InitWalSender(void)
259 {
261 
262  /* Create a per-walsender data structure in shared memory */
264 
265  /*
266  * We don't currently need any ResourceOwner in a walsender process, but
267  * if we did, we could call CreateAuxProcessResourceOwner here.
268  */
269 
270  /*
271  * Let postmaster know that we're a WAL sender. Once we've declared us as
272  * a WAL sender process, postmaster will let us outlive the bgwriter and
273  * kill us last in the shutdown sequence, so we get a chance to stream all
274  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
275  * there's no going back, and we mustn't write any WAL records after this.
276  */
279 
280  /* Initialize empty timestamp buffer for lag tracking. */
281  lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
282 
283  /* Make sure we can remember the current read position in XLOG. */
284  sendSeg = (WALOpenSegment *)
286  sendCxt = (WALSegmentContext *)
288  WALOpenSegmentInit(sendSeg, sendCxt, wal_segment_size, NULL);
289 }
290 
291 /*
292  * Clean up after an error.
293  *
294  * WAL sender processes don't use transactions like regular backends do.
295  * This function does any cleanup required after an error in a WAL sender
296  * process, similar to what transaction abort does in a regular backend.
297  */
298 void
300 {
304 
305  if (sendSeg->ws_file >= 0)
306  {
307  close(sendSeg->ws_file);
308  sendSeg->ws_file = -1;
309  }
310 
311  if (MyReplicationSlot != NULL)
313 
315 
316  replication_active = false;
317 
318  if (got_STOPPING || got_SIGUSR2)
319  proc_exit(0);
320 
321  /* Revert back to startup state */
323 }
324 
325 /*
326  * Handle a client's connection abort in an orderly manner.
327  */
328 static void
329 WalSndShutdown(void)
330 {
331  /*
332  * Reset whereToSendOutput to prevent ereport from attempting to send any
333  * more messages to the standby.
334  */
337 
338  proc_exit(0);
339  abort(); /* keep the compiler quiet */
340 }
341 
342 /*
343  * Handle the IDENTIFY_SYSTEM command.
344  */
345 static void
347 {
348  char sysid[32];
349  char xloc[MAXFNAMELEN];
350  XLogRecPtr logptr;
351  char *dbname = NULL;
353  TupOutputState *tstate;
354  TupleDesc tupdesc;
355  Datum values[4];
356  bool nulls[4];
357 
358  /*
359  * Reply with a result set with one row, four columns. First col is system
360  * ID, second is timeline ID, third is current xlog location and the
361  * fourth contains the database name if we are connected to one.
362  */
363 
364  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
366 
369  {
370  /* this also updates ThisTimeLineID */
371  logptr = GetStandbyFlushRecPtr();
372  }
373  else
374  logptr = GetFlushRecPtr();
375 
376  snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
377 
378  if (MyDatabaseId != InvalidOid)
379  {
381 
382  /* syscache access needs a transaction env. */
384  /* make dbname live outside TX context */
388  /* CommitTransactionCommand switches to TopMemoryContext */
390  }
391 
393  MemSet(nulls, false, sizeof(nulls));
394 
395  /* need a tuple descriptor representing four columns */
396  tupdesc = CreateTemplateTupleDesc(4);
397  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
398  TEXTOID, -1, 0);
399  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
400  INT4OID, -1, 0);
401  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
402  TEXTOID, -1, 0);
403  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
404  TEXTOID, -1, 0);
405 
406  /* prepare for projection of tuples */
407  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
408 
409  /* column 1: system identifier */
410  values[0] = CStringGetTextDatum(sysid);
411 
412  /* column 2: timeline */
413  values[1] = Int32GetDatum(ThisTimeLineID);
414 
415  /* column 3: wal location */
416  values[2] = CStringGetTextDatum(xloc);
417 
418  /* column 4: database name, or NULL if none */
419  if (dbname)
420  values[3] = CStringGetTextDatum(dbname);
421  else
422  nulls[3] = true;
423 
424  /* send it to dest */
425  do_tup_output(tstate, values, nulls);
426 
427  end_tup_output(tstate);
428 }
429 
430 
431 /*
432  * Handle TIMELINE_HISTORY command.
433  */
434 static void
436 {
438  char histfname[MAXFNAMELEN];
439  char path[MAXPGPATH];
440  int fd;
441  off_t histfilelen;
442  off_t bytesleft;
443  Size len;
444 
445  /*
446  * Reply with a result set with one row, and two columns. The first col is
447  * the name of the history file, 2nd is the contents.
448  */
449 
450  TLHistoryFileName(histfname, cmd->timeline);
451  TLHistoryFilePath(path, cmd->timeline);
452 
453  /* Send a RowDescription message */
454  pq_beginmessage(&buf, 'T');
455  pq_sendint16(&buf, 2); /* 2 fields */
456 
457  /* first field */
458  pq_sendstring(&buf, "filename"); /* col name */
459  pq_sendint32(&buf, 0); /* table oid */
460  pq_sendint16(&buf, 0); /* attnum */
461  pq_sendint32(&buf, TEXTOID); /* type oid */
462  pq_sendint16(&buf, -1); /* typlen */
463  pq_sendint32(&buf, 0); /* typmod */
464  pq_sendint16(&buf, 0); /* format code */
465 
466  /* second field */
467  pq_sendstring(&buf, "content"); /* col name */
468  pq_sendint32(&buf, 0); /* table oid */
469  pq_sendint16(&buf, 0); /* attnum */
470  pq_sendint32(&buf, BYTEAOID); /* type oid */
471  pq_sendint16(&buf, -1); /* typlen */
472  pq_sendint32(&buf, 0); /* typmod */
473  pq_sendint16(&buf, 0); /* format code */
474  pq_endmessage(&buf);
475 
476  /* Send a DataRow message */
477  pq_beginmessage(&buf, 'D');
478  pq_sendint16(&buf, 2); /* # of columns */
479  len = strlen(histfname);
480  pq_sendint32(&buf, len); /* col1 len */
481  pq_sendbytes(&buf, histfname, len);
482 
483  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
484  if (fd < 0)
485  ereport(ERROR,
487  errmsg("could not open file \"%s\": %m", path)));
488 
489  /* Determine file length and send it to client */
490  histfilelen = lseek(fd, 0, SEEK_END);
491  if (histfilelen < 0)
492  ereport(ERROR,
494  errmsg("could not seek to end of file \"%s\": %m", path)));
495  if (lseek(fd, 0, SEEK_SET) != 0)
496  ereport(ERROR,
498  errmsg("could not seek to beginning of file \"%s\": %m", path)));
499 
500  pq_sendint32(&buf, histfilelen); /* col2 len */
501 
502  bytesleft = histfilelen;
503  while (bytesleft > 0)
504  {
505  PGAlignedBlock rbuf;
506  int nread;
507 
509  nread = read(fd, rbuf.data, sizeof(rbuf));
511  if (nread < 0)
512  ereport(ERROR,
514  errmsg("could not read file \"%s\": %m",
515  path)));
516  else if (nread == 0)
517  ereport(ERROR,
519  errmsg("could not read file \"%s\": read %d of %zu",
520  path, nread, (Size) bytesleft)));
521 
522  pq_sendbytes(&buf, rbuf.data, nread);
523  bytesleft -= nread;
524  }
525 
526  if (CloseTransientFile(fd) != 0)
527  ereport(ERROR,
529  errmsg("could not close file \"%s\": %m", path)));
530 
531  pq_endmessage(&buf);
532 }
533 
534 /*
535  * Handle START_REPLICATION command.
536  *
537  * At the moment, this never returns, but an ereport(ERROR) will take us back
538  * to the main loop.
539  */
540 static void
542 {
544  XLogRecPtr FlushPtr;
545 
546  if (ThisTimeLineID == 0)
547  ereport(ERROR,
548  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
549  errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
550 
551  /*
552  * We assume here that we're logging enough information in the WAL for
553  * log-shipping, since this is checked in PostmasterMain().
554  *
555  * NOTE: wal_level can only change at shutdown, so in most cases it is
556  * difficult for there to be WAL data that we can still see that was
557  * written at wal_level='minimal'.
558  */
559 
560  if (cmd->slotname)
561  {
562  ReplicationSlotAcquire(cmd->slotname, true);
564  ereport(ERROR,
565  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
566  (errmsg("cannot use a logical replication slot for physical replication"))));
567  }
568 
569  /*
570  * Select the timeline. If it was given explicitly by the client, use
571  * that. Otherwise use the timeline of the last replayed record, which is
572  * kept in ThisTimeLineID.
573  */
575  {
576  /* this also updates ThisTimeLineID */
577  FlushPtr = GetStandbyFlushRecPtr();
578  }
579  else
580  FlushPtr = GetFlushRecPtr();
581 
582  if (cmd->timeline != 0)
583  {
584  XLogRecPtr switchpoint;
585 
586  sendTimeLine = cmd->timeline;
588  {
589  sendTimeLineIsHistoric = false;
591  }
592  else
593  {
594  List *timeLineHistory;
595 
596  sendTimeLineIsHistoric = true;
597 
598  /*
599  * Check that the timeline the client requested exists, and the
600  * requested start location is on that timeline.
601  */
602  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
603  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
605  list_free_deep(timeLineHistory);
606 
607  /*
608  * Found the requested timeline in the history. Check that
609  * requested startpoint is on that timeline in our history.
610  *
611  * This is quite loose on purpose. We only check that we didn't
612  * fork off the requested timeline before the switchpoint. We
613  * don't check that we switched *to* it before the requested
614  * starting point. This is because the client can legitimately
615  * request to start replication from the beginning of the WAL
616  * segment that contains switchpoint, but on the new timeline, so
617  * that it doesn't end up with a partial segment. If you ask for
618  * too old a starting point, you'll get an error later when we
619  * fail to find the requested WAL segment in pg_wal.
620  *
621  * XXX: we could be more strict here and only allow a startpoint
622  * that's older than the switchpoint, if it's still in the same
623  * WAL segment.
624  */
625  if (!XLogRecPtrIsInvalid(switchpoint) &&
626  switchpoint < cmd->startpoint)
627  {
628  ereport(ERROR,
629  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
630  (uint32) (cmd->startpoint >> 32),
631  (uint32) (cmd->startpoint),
632  cmd->timeline),
633  errdetail("This server's history forked from timeline %u at %X/%X.",
634  cmd->timeline,
635  (uint32) (switchpoint >> 32),
636  (uint32) (switchpoint))));
637  }
638  sendTimeLineValidUpto = switchpoint;
639  }
640  }
641  else
642  {
645  sendTimeLineIsHistoric = false;
646  }
647 
649 
650  /* If there is nothing to stream, don't even enter COPY mode */
652  {
653  /*
654  * When we first start replication the standby will be behind the
655  * primary. For some applications, for example synchronous
656  * replication, it is important to have a clear state for this initial
657  * catchup mode, so we can trigger actions when we change streaming
658  * state later. We may stay in this state for a long time, which is
659  * exactly why we want to be able to monitor whether or not we are
660  * still here.
661  */
663 
664  /* Send a CopyBothResponse message, and start streaming */
665  pq_beginmessage(&buf, 'W');
666  pq_sendbyte(&buf, 0);
667  pq_sendint16(&buf, 0);
668  pq_endmessage(&buf);
669  pq_flush();
670 
671  /*
672  * Don't allow a request to stream from a future point in WAL that
673  * hasn't been flushed to disk in this server yet.
674  */
675  if (FlushPtr < cmd->startpoint)
676  {
677  ereport(ERROR,
678  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
679  (uint32) (cmd->startpoint >> 32),
680  (uint32) (cmd->startpoint),
681  (uint32) (FlushPtr >> 32),
682  (uint32) (FlushPtr))));
683  }
684 
685  /* Start streaming from the requested point */
686  sentPtr = cmd->startpoint;
687 
688  /* Initialize shared memory status, too */
689  SpinLockAcquire(&MyWalSnd->mutex);
690  MyWalSnd->sentPtr = sentPtr;
691  SpinLockRelease(&MyWalSnd->mutex);
692 
694 
695  /* Main loop of walsender */
696  replication_active = true;
697 
699 
700  replication_active = false;
701  if (got_STOPPING)
702  proc_exit(0);
704 
706  }
707 
708  if (cmd->slotname)
710 
711  /*
712  * Copy is finished now. Send a single-row result set indicating the next
713  * timeline.
714  */
716  {
717  char startpos_str[8 + 1 + 8 + 1];
719  TupOutputState *tstate;
720  TupleDesc tupdesc;
721  Datum values[2];
722  bool nulls[2];
723 
724  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
725  (uint32) (sendTimeLineValidUpto >> 32),
727 
729  MemSet(nulls, false, sizeof(nulls));
730 
731  /*
732  * Need a tuple descriptor representing two columns. int8 may seem
733  * like a surprising data type for this, but in theory int4 would not
734  * be wide enough for this, as TimeLineID is unsigned.
735  */
736  tupdesc = CreateTemplateTupleDesc(2);
737  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
738  INT8OID, -1, 0);
739  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
740  TEXTOID, -1, 0);
741 
742  /* prepare for projection of tuple */
743  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
744 
745  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
746  values[1] = CStringGetTextDatum(startpos_str);
747 
748  /* send it to dest */
749  do_tup_output(tstate, values, nulls);
750 
751  end_tup_output(tstate);
752  }
753 
754  /* Send CommandComplete message */
755  pq_puttextmessage('C', "START_STREAMING");
756 }
757 
758 /*
759  * read_page callback for logical decoding contexts, as a walsender process.
760  *
761  * Inside the walsender we can do better than logical_read_local_xlog_page,
762  * which has to do a plain sleep/busy loop, because the walsender's latch gets
763  * set every time WAL is flushed.
764  */
765 static int
767  XLogRecPtr targetRecPtr, char *cur_page)
768 {
769  XLogRecPtr flushptr;
770  int count;
771  WALReadError errinfo;
772  XLogSegNo segno;
773 
774  XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
776  sendTimeLine = state->currTLI;
778  sendTimeLineNextTLI = state->nextTLI;
779 
780  /* make sure we have enough WAL available */
781  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
782 
783  /* fail if not (implies we are going to shut down) */
784  if (flushptr < targetPagePtr + reqLen)
785  return -1;
786 
787  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
788  count = XLOG_BLCKSZ; /* more than one block available */
789  else
790  count = flushptr - targetPagePtr; /* part of the page available */
791 
792  /* now actually read the data, we know it's there */
793  if (!WALRead(cur_page,
794  targetPagePtr,
795  XLOG_BLCKSZ,
796  sendSeg->ws_tli, /* Pass the current TLI because only
797  * WalSndSegmentOpen controls whether new
798  * TLI is needed. */
799  sendSeg,
800  sendCxt,
802  &errinfo))
803  WALReadRaiseError(&errinfo);
804 
805  /*
806  * After reading into the buffer, check that what we read was valid. We do
807  * this after reading, because even though the segment was present when we
808  * opened it, it might get recycled or removed while we read it. The
809  * read() succeeds in that case, but the data we tried to read might
810  * already have been overwritten with new WAL records.
811  */
812  XLByteToSeg(targetPagePtr, segno, sendCxt->ws_segsize);
813  CheckXLogRemoved(segno, sendSeg->ws_tli);
814 
815  return count;
816 }
817 
818 /*
819  * Process extra options given to CREATE_REPLICATION_SLOT.
820  */
821 static void
823  bool *reserve_wal,
824  CRSSnapshotAction *snapshot_action)
825 {
826  ListCell *lc;
827  bool snapshot_action_given = false;
828  bool reserve_wal_given = false;
829 
830  /* Parse options */
831  foreach(lc, cmd->options)
832  {
833  DefElem *defel = (DefElem *) lfirst(lc);
834 
835  if (strcmp(defel->defname, "export_snapshot") == 0)
836  {
837  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
838  ereport(ERROR,
839  (errcode(ERRCODE_SYNTAX_ERROR),
840  errmsg("conflicting or redundant options")));
841 
842  snapshot_action_given = true;
843  *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
845  }
846  else if (strcmp(defel->defname, "use_snapshot") == 0)
847  {
848  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
849  ereport(ERROR,
850  (errcode(ERRCODE_SYNTAX_ERROR),
851  errmsg("conflicting or redundant options")));
852 
853  snapshot_action_given = true;
854  *snapshot_action = CRS_USE_SNAPSHOT;
855  }
856  else if (strcmp(defel->defname, "reserve_wal") == 0)
857  {
858  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
859  ereport(ERROR,
860  (errcode(ERRCODE_SYNTAX_ERROR),
861  errmsg("conflicting or redundant options")));
862 
863  reserve_wal_given = true;
864  *reserve_wal = true;
865  }
866  else
867  elog(ERROR, "unrecognized option: %s", defel->defname);
868  }
869 }
870 
871 /*
872  * Create a new replication slot.
873  */
874 static void
876 {
877  const char *snapshot_name = NULL;
878  char xloc[MAXFNAMELEN];
879  char *slot_name;
880  bool reserve_wal = false;
881  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
883  TupOutputState *tstate;
884  TupleDesc tupdesc;
885  Datum values[4];
886  bool nulls[4];
887 
889 
890  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
891 
892  /* setup state for XLogRead */
893  sendTimeLineIsHistoric = false;
895 
896  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
897  {
898  ReplicationSlotCreate(cmd->slotname, false,
900  }
901  else
902  {
904 
905  /*
906  * Initially create persistent slot as ephemeral - that allows us to
907  * nicely handle errors during initialization because it'll get
908  * dropped if this transaction fails. We'll make it persistent at the
909  * end. Temporary slots can be created as temporary from beginning as
910  * they get dropped on error as well.
911  */
912  ReplicationSlotCreate(cmd->slotname, true,
914  }
915 
916  if (cmd->kind == REPLICATION_KIND_LOGICAL)
917  {
919  bool need_full_snapshot = false;
920 
921  /*
922  * Do options check early so that we can bail before calling the
923  * DecodingContextFindStartpoint which can take long time.
924  */
925  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
926  {
927  if (IsTransactionBlock())
928  ereport(ERROR,
929  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
930  (errmsg("%s must not be called inside a transaction",
931  "CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT")));
932 
933  need_full_snapshot = true;
934  }
935  else if (snapshot_action == CRS_USE_SNAPSHOT)
936  {
937  if (!IsTransactionBlock())
938  ereport(ERROR,
939  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
940  (errmsg("%s must be called inside a transaction",
941  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
942 
944  ereport(ERROR,
945  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
946  (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
947  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
948 
949  if (FirstSnapshotSet)
950  ereport(ERROR,
951  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
952  (errmsg("%s must be called before any query",
953  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
954 
955  if (IsSubTransaction())
956  ereport(ERROR,
957  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
958  (errmsg("%s must not be called in a subtransaction",
959  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
960 
961  need_full_snapshot = true;
962  }
963 
964  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
969 
970  /*
971  * Signal that we don't need the timeout mechanism. We're just
972  * creating the replication slot and don't yet accept feedback
973  * messages or send keepalives. As we possibly need to wait for
974  * further WAL the walsender would otherwise possibly be killed too
975  * soon.
976  */
978 
979  /* build initial snapshot, might take a while */
981 
982  /*
983  * Export or use the snapshot if we've been asked to do so.
984  *
985  * NB. We will convert the snapbuild.c kind of snapshot to normal
986  * snapshot when doing this.
987  */
988  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
989  {
990  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
991  }
992  else if (snapshot_action == CRS_USE_SNAPSHOT)
993  {
994  Snapshot snap;
995 
998  }
999 
1000  /* don't need the decoding context anymore */
1001  FreeDecodingContext(ctx);
1002 
1003  if (!cmd->temporary)
1005  }
1006  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
1007  {
1009 
1011 
1012  /* Write this slot to disk if it's a permanent one. */
1013  if (!cmd->temporary)
1015  }
1016 
1017  snprintf(xloc, sizeof(xloc), "%X/%X",
1020 
1022  MemSet(nulls, false, sizeof(nulls));
1023 
1024  /*----------
1025  * Need a tuple descriptor representing four columns:
1026  * - first field: the slot name
1027  * - second field: LSN at which we became consistent
1028  * - third field: exported snapshot's name
1029  * - fourth field: output plugin
1030  *----------
1031  */
1032  tupdesc = CreateTemplateTupleDesc(4);
1033  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1034  TEXTOID, -1, 0);
1035  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1036  TEXTOID, -1, 0);
1037  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1038  TEXTOID, -1, 0);
1039  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1040  TEXTOID, -1, 0);
1041 
1042  /* prepare for projection of tuples */
1043  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1044 
1045  /* slot_name */
1046  slot_name = NameStr(MyReplicationSlot->data.name);
1047  values[0] = CStringGetTextDatum(slot_name);
1048 
1049  /* consistent wal location */
1050  values[1] = CStringGetTextDatum(xloc);
1051 
1052  /* snapshot name, or NULL if none */
1053  if (snapshot_name != NULL)
1054  values[2] = CStringGetTextDatum(snapshot_name);
1055  else
1056  nulls[2] = true;
1057 
1058  /* plugin, or NULL if none */
1059  if (cmd->plugin != NULL)
1060  values[3] = CStringGetTextDatum(cmd->plugin);
1061  else
1062  nulls[3] = true;
1063 
1064  /* send it to dest */
1065  do_tup_output(tstate, values, nulls);
1066  end_tup_output(tstate);
1067 
1069 }
1070 
1071 /*
1072  * Get rid of a replication slot that is no longer wanted.
1073  */
1074 static void
1076 {
1077  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1078  EndCommand("DROP_REPLICATION_SLOT", DestRemote);
1079 }
1080 
1081 /*
1082  * Load previously initiated logical slot and prepare for sending data (via
1083  * WalSndLoop).
1084  */
1085 static void
1087 {
1089 
1090  /* make sure that our requirements are still fulfilled */
1092 
1094 
1095  ReplicationSlotAcquire(cmd->slotname, true);
1096 
1097  /*
1098  * Force a disconnect, so that the decoding code doesn't need to care
1099  * about an eventual switch from running in recovery, to running in a
1100  * normal environment. Client code is expected to handle reconnects.
1101  */
1103  {
1104  ereport(LOG,
1105  (errmsg("terminating walsender process after promotion")));
1106  got_STOPPING = true;
1107  }
1108 
1109  /*
1110  * Create our decoding context, making it start at the previously ack'ed
1111  * position.
1112  *
1113  * Do this before sending a CopyBothResponse message, so that any errors
1114  * are reported early.
1115  */
1116  logical_decoding_ctx =
1117  CreateDecodingContext(cmd->startpoint, cmd->options, false,
1121 
1122 
1124 
1125  /* Send a CopyBothResponse message, and start streaming */
1126  pq_beginmessage(&buf, 'W');
1127  pq_sendbyte(&buf, 0);
1128  pq_sendint16(&buf, 0);
1129  pq_endmessage(&buf);
1130  pq_flush();
1131 
1132 
1133  /* Start reading WAL from the oldest required WAL. */
1135 
1136  /*
1137  * Report the location after which we'll send out further commits as the
1138  * current sentPtr.
1139  */
1141 
1142  /* Also update the sent position status in shared memory */
1143  SpinLockAcquire(&MyWalSnd->mutex);
1145  SpinLockRelease(&MyWalSnd->mutex);
1146 
1147  replication_active = true;
1148 
1150 
1151  /* Main loop of walsender */
1153 
1154  FreeDecodingContext(logical_decoding_ctx);
1156 
1157  replication_active = false;
1158  if (got_STOPPING)
1159  proc_exit(0);
1161 
1162  /* Get out of COPY mode (CommandComplete). */
1163  EndCommand("COPY 0", DestRemote);
1164 }
1165 
1166 /*
1167  * LogicalDecodingContext 'prepare_write' callback.
1168  *
1169  * Prepare a write into a StringInfo.
1170  *
1171  * Don't do anything lasting in here, it's quite possible that nothing will be done
1172  * with the data.
1173  */
1174 static void
1176 {
1177  /* can't have sync rep confused by sending the same LSN several times */
1178  if (!last_write)
1179  lsn = InvalidXLogRecPtr;
1180 
1181  resetStringInfo(ctx->out);
1182 
1183  pq_sendbyte(ctx->out, 'w');
1184  pq_sendint64(ctx->out, lsn); /* dataStart */
1185  pq_sendint64(ctx->out, lsn); /* walEnd */
1186 
1187  /*
1188  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1189  * reserve space here.
1190  */
1191  pq_sendint64(ctx->out, 0); /* sendtime */
1192 }
1193 
1194 /*
1195  * LogicalDecodingContext 'write' callback.
1196  *
1197  * Actually write out data previously prepared by WalSndPrepareWrite out to
1198  * the network. Take as long as needed, but process replies from the other
1199  * side and check timeouts during that.
1200  */
1201 static void
1203  bool last_write)
1204 {
1205  TimestampTz now;
1206 
1207  /*
1208  * Fill the send timestamp last, so that it is taken as late as possible.
1209  * This is somewhat ugly, but the protocol is set as it's already used for
1210  * several releases by streaming physical replication.
1211  */
1212  resetStringInfo(&tmpbuf);
1213  now = GetCurrentTimestamp();
1214  pq_sendint64(&tmpbuf, now);
1215  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1216  tmpbuf.data, sizeof(int64));
1217 
1218  /* output previously gathered data in a CopyData packet */
1219  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1220 
1222 
1223  /* Try to flush pending output to the client */
1224  if (pq_flush_if_writable() != 0)
1225  WalSndShutdown();
1226 
1227  /* Try taking fast path unless we get too close to walsender timeout. */
1229  wal_sender_timeout / 2) &&
1230  !pq_is_send_pending())
1231  {
1232  return;
1233  }
1234 
1235  /* If we have pending write here, go to slow path */
1236  for (;;)
1237  {
1238  int wakeEvents;
1239  long sleeptime;
1240 
1241  /* Check for input from the client */
1243 
1244  /* die if timeout was reached */
1246 
1247  /* Send keepalive if the time has come */
1249 
1250  if (!pq_is_send_pending())
1251  break;
1252 
1254 
1255  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1257 
1258  /* Sleep until something happens or we time out */
1259  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
1260  MyProcPort->sock, sleeptime,
1262 
1263  /* Clear any already-pending wakeups */
1265 
1267 
1268  /* Process any requests or signals received recently */
1269  if (ConfigReloadPending)
1270  {
1271  ConfigReloadPending = false;
1274  }
1275 
1276  /* Try to flush pending output to the client */
1277  if (pq_flush_if_writable() != 0)
1278  WalSndShutdown();
1279  }
1280 
1281  /* reactivate latch so WalSndLoop knows to continue */
1282  SetLatch(MyLatch);
1283 }
1284 
1285 /*
1286  * LogicalDecodingContext 'update_progress' callback.
1287  *
1288  * Write the current position to the lag tracker (see XLogSendPhysical),
1289  * and update the spill statistics.
1290  */
1291 static void
1293 {
1294  static TimestampTz sendTime = 0;
1296 
1297  /*
1298  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1299  * avoid flooding the lag tracker when we commit frequently.
1300  */
1301 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1302  if (!TimestampDifferenceExceeds(sendTime, now,
1304  return;
1305 
1306  LagTrackerWrite(lsn, now);
1307  sendTime = now;
1308 
1309  /*
1310  * Update statistics about transactions that spilled to disk.
1311  */
1312  UpdateSpillStats(ctx);
1313 }
1314 
1315 /*
1316  * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
1317  *
1318  * Returns end LSN of flushed WAL. Normally this will be >= loc, but
1319  * if we detect a shutdown request (either from postmaster or client)
1320  * we will return early, so caller must always check.
1321  */
1322 static XLogRecPtr
1324 {
1325  int wakeEvents;
1326  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1327 
1328  /*
1329  * Fast path to avoid acquiring the spinlock in case we already know we
1330  * have enough WAL available. This is particularly interesting if we're
1331  * far behind.
1332  */
1333  if (RecentFlushPtr != InvalidXLogRecPtr &&
1334  loc <= RecentFlushPtr)
1335  return RecentFlushPtr;
1336 
1337  /* Get a more recent flush pointer. */
1338  if (!RecoveryInProgress())
1339  RecentFlushPtr = GetFlushRecPtr();
1340  else
1341  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1342 
1343  for (;;)
1344  {
1345  long sleeptime;
1346 
1347  /* Clear any already-pending wakeups */
1349 
1351 
1352  /* Process any requests or signals received recently */
1353  if (ConfigReloadPending)
1354  {
1355  ConfigReloadPending = false;
1358  }
1359 
1360  /* Check for input from the client */
1362 
1363  /*
1364  * If we're shutting down, trigger pending WAL to be written out,
1365  * otherwise we'd possibly end up waiting for WAL that never gets
1366  * written, because walwriter has shut down already.
1367  */
1368  if (got_STOPPING)
1370 
1371  /* Update our idea of the currently flushed position. */
1372  if (!RecoveryInProgress())
1373  RecentFlushPtr = GetFlushRecPtr();
1374  else
1375  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1376 
1377  /*
1378  * If postmaster asked us to stop, don't wait anymore.
1379  *
1380  * It's important to do this check after the recomputation of
1381  * RecentFlushPtr, so we can send all remaining data before shutting
1382  * down.
1383  */
1384  if (got_STOPPING)
1385  break;
1386 
1387  /*
1388  * We only send regular messages to the client for full decoded
1389  * transactions, but a synchronous replication and walsender shutdown
1390  * possibly are waiting for a later location. So we send pings
1391  * containing the flush location every now and then.
1392  */
1393  if (MyWalSnd->flush < sentPtr &&
1394  MyWalSnd->write < sentPtr &&
1396  {
1397  WalSndKeepalive(false);
1399  }
1400 
1401  /* check whether we're done */
1402  if (loc <= RecentFlushPtr)
1403  break;
1404 
1405  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1406  WalSndCaughtUp = true;
1407 
1408  /*
1409  * Try to flush any pending output to the client.
1410  */
1411  if (pq_flush_if_writable() != 0)
1412  WalSndShutdown();
1413 
1414  /*
1415  * If we have received CopyDone from the client, sent CopyDone
1416  * ourselves, and the output buffer is empty, it's time to exit
1417  * streaming, so fail the current WAL fetch request.
1418  */
1420  !pq_is_send_pending())
1421  break;
1422 
1423  /* die if timeout was reached */
1425 
1426  /* Send keepalive if the time has come */
1428 
1429  /*
1430  * Sleep until something happens or we time out. Also wait for the
1431  * socket becoming writable, if there's still pending output.
1432  * Otherwise we might sit on sendable output data while waiting for
1433  * new WAL to be generated. (But if we have nothing to send, we don't
1434  * want to wake on socket-writable.)
1435  */
1437 
1438  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1440 
1441  if (pq_is_send_pending())
1442  wakeEvents |= WL_SOCKET_WRITEABLE;
1443 
1444  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
1445  MyProcPort->sock, sleeptime,
1447  }
1448 
1449  /* reactivate latch so WalSndLoop knows to continue */
1450  SetLatch(MyLatch);
1451  return RecentFlushPtr;
1452 }
1453 
1454 /*
1455  * Execute an incoming replication command.
1456  *
1457  * Returns true if the cmd_string was recognized as WalSender command, false
1458  * if not.
1459  */
1460 bool
1461 exec_replication_command(const char *cmd_string)
1462 {
1463  int parse_rc;
1464  Node *cmd_node;
1465  MemoryContext cmd_context;
1466  MemoryContext old_context;
1467 
1468  /*
1469  * If WAL sender has been told that shutdown is getting close, switch its
1470  * status accordingly to handle the next replication commands correctly.
1471  */
1472  if (got_STOPPING)
1474 
1475  /*
1476  * Throw error if in stopping mode. We need prevent commands that could
1477  * generate WAL while the shutdown checkpoint is being written. To be
1478  * safe, we just prohibit all new commands.
1479  */
1480  if (MyWalSnd->state == WALSNDSTATE_STOPPING)
1481  ereport(ERROR,
1482  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1483 
1484  /*
1485  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1486  * command arrives. Clean up the old stuff if there's anything.
1487  */
1489 
1491 
1493  "Replication command context",
1495  old_context = MemoryContextSwitchTo(cmd_context);
1496 
1497  replication_scanner_init(cmd_string);
1498  parse_rc = replication_yyparse();
1499  if (parse_rc != 0)
1500  ereport(ERROR,
1501  (errcode(ERRCODE_SYNTAX_ERROR),
1502  (errmsg_internal("replication command parser returned %d",
1503  parse_rc))));
1504 
1505  cmd_node = replication_parse_result;
1506 
1507  /*
1508  * Log replication command if log_replication_commands is enabled. Even
1509  * when it's disabled, log the command with DEBUG1 level for backward
1510  * compatibility. Note that SQL commands are not logged here, and will be
1511  * logged later if log_statement is enabled.
1512  */
1513  if (cmd_node->type != T_SQLCmd)
1515  (errmsg("received replication command: %s", cmd_string)));
1516 
1517  /*
1518  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
1519  * called outside of transaction the snapshot should be cleared here.
1520  */
1521  if (!IsTransactionBlock())
1523 
1524  /*
1525  * For aborted transactions, don't allow anything except pure SQL, the
1526  * exec_simple_query() will handle it correctly.
1527  */
1528  if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
1529  ereport(ERROR,
1530  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1531  errmsg("current transaction is aborted, "
1532  "commands ignored until end of transaction block")));
1533 
1535 
1536  /*
1537  * Allocate buffers that will be used for each outgoing and incoming
1538  * message. We do this just once per command to reduce palloc overhead.
1539  */
1540  initStringInfo(&output_message);
1541  initStringInfo(&reply_message);
1542  initStringInfo(&tmpbuf);
1543 
1544  /* Report to pgstat that this process is running */
1546 
1547  switch (cmd_node->type)
1548  {
1549  case T_IdentifySystemCmd:
1550  IdentifySystem();
1551  break;
1552 
1553  case T_BaseBackupCmd:
1554  PreventInTransactionBlock(true, "BASE_BACKUP");
1555  SendBaseBackup((BaseBackupCmd *) cmd_node);
1556  break;
1557 
1560  break;
1561 
1564  break;
1565 
1566  case T_StartReplicationCmd:
1567  {
1568  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1569 
1570  PreventInTransactionBlock(true, "START_REPLICATION");
1571 
1572  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1573  StartReplication(cmd);
1574  else
1576  break;
1577  }
1578 
1579  case T_TimeLineHistoryCmd:
1580  PreventInTransactionBlock(true, "TIMELINE_HISTORY");
1582  break;
1583 
1584  case T_VariableShowStmt:
1585  {
1587  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1588 
1589  /* syscache access needs a transaction environment */
1591  GetPGVariable(n->name, dest);
1593  }
1594  break;
1595 
1596  case T_SQLCmd:
1597  if (MyDatabaseId == InvalidOid)
1598  ereport(ERROR,
1599  (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1600 
1601  /* Report to pgstat that this process is now idle */
1603 
1604  /* Tell the caller that this wasn't a WalSender command. */
1605  return false;
1606 
1607  default:
1608  elog(ERROR, "unrecognized replication command node tag: %u",
1609  cmd_node->type);
1610  }
1611 
1612  /* done */
1613  MemoryContextSwitchTo(old_context);
1614  MemoryContextDelete(cmd_context);
1615 
1616  /* Send CommandComplete message */
1617  EndCommand("SELECT", DestRemote);
1618 
1619  /* Report to pgstat that this process is now idle */
1621 
1622  return true;
1623 }
1624 
1625 /*
1626  * Process any incoming messages while streaming. Also checks if the remote
1627  * end has closed the connection.
1628  */
1629 static void
1631 {
1632  unsigned char firstchar;
1633  int r;
1634  bool received = false;
1635 
1637 
1638  for (;;)
1639  {
1640  pq_startmsgread();
1641  r = pq_getbyte_if_available(&firstchar);
1642  if (r < 0)
1643  {
1644  /* unexpected error or EOF */
1646  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1647  errmsg("unexpected EOF on standby connection")));
1648  proc_exit(0);
1649  }
1650  if (r == 0)
1651  {
1652  /* no data available without blocking */
1653  pq_endmsgread();
1654  break;
1655  }
1656 
1657  /* Read the message contents */
1658  resetStringInfo(&reply_message);
1659  if (pq_getmessage(&reply_message, 0))
1660  {
1662  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1663  errmsg("unexpected EOF on standby connection")));
1664  proc_exit(0);
1665  }
1666 
1667  /*
1668  * If we already received a CopyDone from the frontend, the frontend
1669  * should not send us anything until we've closed our end of the COPY.
1670  * XXX: In theory, the frontend could already send the next command
1671  * before receiving the CopyDone, but libpq doesn't currently allow
1672  * that.
1673  */
1674  if (streamingDoneReceiving && firstchar != 'X')
1675  ereport(FATAL,
1676  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1677  errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
1678  firstchar)));
1679 
1680  /* Handle the very limited subset of commands expected in this phase */
1681  switch (firstchar)
1682  {
1683  /*
1684  * 'd' means a standby reply wrapped in a CopyData packet.
1685  */
1686  case 'd':
1688  received = true;
1689  break;
1690 
1691  /*
1692  * CopyDone means the standby requested to finish streaming.
1693  * Reply with CopyDone, if we had not sent that already.
1694  */
1695  case 'c':
1696  if (!streamingDoneSending)
1697  {
1698  pq_putmessage_noblock('c', NULL, 0);
1699  streamingDoneSending = true;
1700  }
1701 
1702  streamingDoneReceiving = true;
1703  received = true;
1704  break;
1705 
1706  /*
1707  * 'X' means that the standby is closing down the socket.
1708  */
1709  case 'X':
1710  proc_exit(0);
1711 
1712  default:
1713  ereport(FATAL,
1714  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1715  errmsg("invalid standby message type \"%c\"",
1716  firstchar)));
1717  }
1718  }
1719 
1720  /*
1721  * Save the last reply timestamp if we've received at least one reply.
1722  */
1723  if (received)
1724  {
1726  waiting_for_ping_response = false;
1727  }
1728 }
1729 
1730 /*
1731  * Process a status update message received from standby.
1732  */
1733 static void
1735 {
1736  char msgtype;
1737 
1738  /*
1739  * Check message type from the first byte.
1740  */
1741  msgtype = pq_getmsgbyte(&reply_message);
1742 
1743  switch (msgtype)
1744  {
1745  case 'r':
1747  break;
1748 
1749  case 'h':
1751  break;
1752 
1753  default:
1755  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1756  errmsg("unexpected message type \"%c\"", msgtype)));
1757  proc_exit(0);
1758  }
1759 }
1760 
1761 /*
1762  * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
1763  */
1764 static void
1766 {
1767  bool changed = false;
1769 
1770  Assert(lsn != InvalidXLogRecPtr);
1771  SpinLockAcquire(&slot->mutex);
1772  if (slot->data.restart_lsn != lsn)
1773  {
1774  changed = true;
1775  slot->data.restart_lsn = lsn;
1776  }
1777  SpinLockRelease(&slot->mutex);
1778 
1779  if (changed)
1780  {
1783  }
1784 
1785  /*
1786  * One could argue that the slot should be saved to disk now, but that'd
1787  * be energy wasted - the worst lost information can do here is give us
1788  * wrong information in a statistics view - we'll just potentially be more
1789  * conservative in removing files.
1790  */
1791 }
1792 
1793 /*
1794  * Regular reply from standby advising of WAL locations on standby server.
1795  */
1796 static void
1798 {
1799  XLogRecPtr writePtr,
1800  flushPtr,
1801  applyPtr;
1802  bool replyRequested;
1803  TimeOffset writeLag,
1804  flushLag,
1805  applyLag;
1806  bool clearLagTimes;
1807  TimestampTz now;
1808  TimestampTz replyTime;
1809 
1810  static bool fullyAppliedLastTime = false;
1811 
1812  /* the caller already consumed the msgtype byte */
1813  writePtr = pq_getmsgint64(&reply_message);
1814  flushPtr = pq_getmsgint64(&reply_message);
1815  applyPtr = pq_getmsgint64(&reply_message);
1816  replyTime = pq_getmsgint64(&reply_message);
1817  replyRequested = pq_getmsgbyte(&reply_message);
1818 
1819  if (log_min_messages <= DEBUG2)
1820  {
1821  char *replyTimeStr;
1822 
1823  /* Copy because timestamptz_to_str returns a static buffer */
1824  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
1825 
1826  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
1827  (uint32) (writePtr >> 32), (uint32) writePtr,
1828  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1829  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1830  replyRequested ? " (reply requested)" : "",
1831  replyTimeStr);
1832 
1833  pfree(replyTimeStr);
1834  }
1835 
1836  /* See if we can compute the round-trip lag for these positions. */
1837  now = GetCurrentTimestamp();
1838  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1839  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1840  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1841 
1842  /*
1843  * If the standby reports that it has fully replayed the WAL in two
1844  * consecutive reply messages, then the second such message must result
1845  * from wal_receiver_status_interval expiring on the standby. This is a
1846  * convenient time to forget the lag times measured when it last
1847  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1848  * until more WAL traffic arrives.
1849  */
1850  clearLagTimes = false;
1851  if (applyPtr == sentPtr)
1852  {
1853  if (fullyAppliedLastTime)
1854  clearLagTimes = true;
1855  fullyAppliedLastTime = true;
1856  }
1857  else
1858  fullyAppliedLastTime = false;
1859 
1860  /* Send a reply if the standby requested one. */
1861  if (replyRequested)
1862  WalSndKeepalive(false);
1863 
1864  /*
1865  * Update shared state for this WalSender process based on reply data from
1866  * standby.
1867  */
1868  {
1869  WalSnd *walsnd = MyWalSnd;
1870 
1871  SpinLockAcquire(&walsnd->mutex);
1872  walsnd->write = writePtr;
1873  walsnd->flush = flushPtr;
1874  walsnd->apply = applyPtr;
1875  if (writeLag != -1 || clearLagTimes)
1876  walsnd->writeLag = writeLag;
1877  if (flushLag != -1 || clearLagTimes)
1878  walsnd->flushLag = flushLag;
1879  if (applyLag != -1 || clearLagTimes)
1880  walsnd->applyLag = applyLag;
1881  walsnd->replyTime = replyTime;
1882  SpinLockRelease(&walsnd->mutex);
1883  }
1884 
1887 
1888  /*
1889  * Advance our local xmin horizon when the client confirmed a flush.
1890  */
1891  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1892  {
1895  else
1897  }
1898 }
1899 
1900 /* compute new replication slot xmin horizon if needed */
1901 static void
1903 {
1904  bool changed = false;
1906 
1907  SpinLockAcquire(&slot->mutex);
1909 
1910  /*
1911  * For physical replication we don't need the interlock provided by xmin
1912  * and effective_xmin since the consequences of a missed increase are
1913  * limited to query cancellations, so set both at once.
1914  */
1915  if (!TransactionIdIsNormal(slot->data.xmin) ||
1916  !TransactionIdIsNormal(feedbackXmin) ||
1917  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1918  {
1919  changed = true;
1920  slot->data.xmin = feedbackXmin;
1921  slot->effective_xmin = feedbackXmin;
1922  }
1923  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
1924  !TransactionIdIsNormal(feedbackCatalogXmin) ||
1925  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
1926  {
1927  changed = true;
1928  slot->data.catalog_xmin = feedbackCatalogXmin;
1929  slot->effective_catalog_xmin = feedbackCatalogXmin;
1930  }
1931  SpinLockRelease(&slot->mutex);
1932 
1933  if (changed)
1934  {
1937  }
1938 }
1939 
1940 /*
1941  * Check that the provided xmin/epoch are sane, that is, not in the future
1942  * and not so far back as to be already wrapped around.
1943  *
1944  * Epoch of nextXid should be same as standby, or if the counter has
1945  * wrapped, then one greater than standby.
1946  *
1947  * This check doesn't care about whether clog exists for these xids
1948  * at all.
1949  */
1950 static bool
1952 {
1953  FullTransactionId nextFullXid;
1954  TransactionId nextXid;
1955  uint32 nextEpoch;
1956 
1957  nextFullXid = ReadNextFullTransactionId();
1958  nextXid = XidFromFullTransactionId(nextFullXid);
1959  nextEpoch = EpochFromFullTransactionId(nextFullXid);
1960 
1961  if (xid <= nextXid)
1962  {
1963  if (epoch != nextEpoch)
1964  return false;
1965  }
1966  else
1967  {
1968  if (epoch + 1 != nextEpoch)
1969  return false;
1970  }
1971 
1972  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
1973  return false; /* epoch OK, but it's wrapped around */
1974 
1975  return true;
1976 }
1977 
1978 /*
1979  * Hot Standby feedback
1980  */
1981 static void
1983 {
1984  TransactionId feedbackXmin;
1985  uint32 feedbackEpoch;
1986  TransactionId feedbackCatalogXmin;
1987  uint32 feedbackCatalogEpoch;
1988  TimestampTz replyTime;
1989 
1990  /*
1991  * Decipher the reply message. The caller already consumed the msgtype
1992  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
1993  * of this message.
1994  */
1995  replyTime = pq_getmsgint64(&reply_message);
1996  feedbackXmin = pq_getmsgint(&reply_message, 4);
1997  feedbackEpoch = pq_getmsgint(&reply_message, 4);
1998  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
1999  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2000 
2001  if (log_min_messages <= DEBUG2)
2002  {
2003  char *replyTimeStr;
2004 
2005  /* Copy because timestamptz_to_str returns a static buffer */
2006  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2007 
2008  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2009  feedbackXmin,
2010  feedbackEpoch,
2011  feedbackCatalogXmin,
2012  feedbackCatalogEpoch,
2013  replyTimeStr);
2014 
2015  pfree(replyTimeStr);
2016  }
2017 
2018  /*
2019  * Update shared state for this WalSender process based on reply data from
2020  * standby.
2021  */
2022  {
2023  WalSnd *walsnd = MyWalSnd;
2024 
2025  SpinLockAcquire(&walsnd->mutex);
2026  walsnd->replyTime = replyTime;
2027  SpinLockRelease(&walsnd->mutex);
2028  }
2029 
2030  /*
2031  * Unset WalSender's xmins if the feedback message values are invalid.
2032  * This happens when the downstream turned hot_standby_feedback off.
2033  */
2034  if (!TransactionIdIsNormal(feedbackXmin)
2035  && !TransactionIdIsNormal(feedbackCatalogXmin))
2036  {
2038  if (MyReplicationSlot != NULL)
2039  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2040  return;
2041  }
2042 
2043  /*
2044  * Check that the provided xmin/epoch are sane, that is, not in the future
2045  * and not so far back as to be already wrapped around. Ignore if not.
2046  */
2047  if (TransactionIdIsNormal(feedbackXmin) &&
2048  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2049  return;
2050 
2051  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2052  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2053  return;
2054 
2055  /*
2056  * Set the WalSender's xmin equal to the standby's requested xmin, so that
2057  * the xmin will be taken into account by GetOldestXmin. This will hold
2058  * back the removal of dead rows and thereby prevent the generation of
2059  * cleanup conflicts on the standby server.
2060  *
2061  * There is a small window for a race condition here: although we just
2062  * checked that feedbackXmin precedes nextXid, the nextXid could have
2063  * gotten advanced between our fetching it and applying the xmin below,
2064  * perhaps far enough to make feedbackXmin wrap around. In that case the
2065  * xmin we set here would be "in the future" and have no effect. No point
2066  * in worrying about this since it's too late to save the desired data
2067  * anyway. Assuming that the standby sends us an increasing sequence of
2068  * xmins, this could only happen during the first reply cycle, else our
2069  * own xmin would prevent nextXid from advancing so far.
2070  *
2071  * We don't bother taking the ProcArrayLock here. Setting the xmin field
2072  * is assumed atomic, and there's no real need to prevent a concurrent
2073  * GetOldestXmin. (If we're moving our xmin forward, this is obviously
2074  * safe, and if we're moving it backwards, well, the data is at risk
2075  * already since a VACUUM could have just finished calling GetOldestXmin.)
2076  *
2077  * If we're using a replication slot we reserve the xmin via that,
2078  * otherwise via the walsender's PGXACT entry. We can only track the
2079  * catalog xmin separately when using a slot, so we store the least of the
2080  * two provided when not using a slot.
2081  *
2082  * XXX: It might make sense to generalize the ephemeral slot concept and
2083  * always use the slot mechanism to handle the feedback xmin.
2084  */
2085  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2086  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2087  else
2088  {
2089  if (TransactionIdIsNormal(feedbackCatalogXmin)
2090  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2091  MyPgXact->xmin = feedbackCatalogXmin;
2092  else
2093  MyPgXact->xmin = feedbackXmin;
2094  }
2095 }
2096 
2097 /*
2098  * Compute how long send/receive loops should sleep.
2099  *
2100  * If wal_sender_timeout is enabled we want to wake up in time to send
2101  * keepalives and to abort the connection if wal_sender_timeout has been
2102  * reached.
2103  */
2104 static long
2106 {
2107  long sleeptime = 10000; /* 10 s */
2108 
2110  {
2111  TimestampTz wakeup_time;
2112  long sec_to_timeout;
2113  int microsec_to_timeout;
2114 
2115  /*
2116  * At the latest stop sleeping once wal_sender_timeout has been
2117  * reached.
2118  */
2121 
2122  /*
2123  * If no ping has been sent yet, wakeup when it's time to do so.
2124  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2125  * the timeout passed without a response.
2126  */
2129  wal_sender_timeout / 2);
2130 
2131  /* Compute relative time until wakeup. */
2132  TimestampDifference(now, wakeup_time,
2133  &sec_to_timeout, &microsec_to_timeout);
2134 
2135  sleeptime = sec_to_timeout * 1000 +
2136  microsec_to_timeout / 1000;
2137  }
2138 
2139  return sleeptime;
2140 }
2141 
2142 /*
2143  * Check whether there have been responses by the client within
2144  * wal_sender_timeout and shutdown if not. Using last_processing as the
2145  * reference point avoids counting server-side stalls against the client.
2146  * However, a long server-side stall can make WalSndKeepaliveIfNecessary()
2147  * postdate last_processing by more than wal_sender_timeout. If that happens,
2148  * the client must reply almost immediately to avoid a timeout. This rarely
2149  * affects the default configuration, under which clients spontaneously send a
2150  * message every standby_message_timeout = wal_sender_timeout/6 = 10s. We
2151  * could eliminate that problem by recognizing timeout expiration at
2152  * wal_sender_timeout/2 after the keepalive.
2153  */
2154 static void
2156 {
2157  TimestampTz timeout;
2158 
2159  /* don't bail out if we're doing something that doesn't require timeouts */
2160  if (last_reply_timestamp <= 0)
2161  return;
2162 
2165 
2166  if (wal_sender_timeout > 0 && last_processing >= timeout)
2167  {
2168  /*
2169  * Since typically expiration of replication timeout means
2170  * communication problem, we don't send the error message to the
2171  * standby.
2172  */
2174  (errmsg("terminating walsender process due to replication timeout")));
2175 
2176  WalSndShutdown();
2177  }
2178 }
2179 
2180 /* Main loop of walsender process that streams the WAL over Copy messages. */
2181 static void
2183 {
2184  /*
2185  * Initialize the last reply timestamp. That enables timeout processing
2186  * from hereon.
2187  */
2189  waiting_for_ping_response = false;
2190 
2191  /*
2192  * Loop until we reach the end of this timeline or the client requests to
2193  * stop streaming.
2194  */
2195  for (;;)
2196  {
2197  /* Clear any already-pending wakeups */
2199 
2201 
2202  /* Process any requests or signals received recently */
2203  if (ConfigReloadPending)
2204  {
2205  ConfigReloadPending = false;
2208  }
2209 
2210  /* Check for input from the client */
2212 
2213  /*
2214  * If we have received CopyDone from the client, sent CopyDone
2215  * ourselves, and the output buffer is empty, it's time to exit
2216  * streaming.
2217  */
2219  !pq_is_send_pending())
2220  break;
2221 
2222  /*
2223  * If we don't have any pending data in the output buffer, try to send
2224  * some more. If there is some, we don't bother to call send_data
2225  * again until we've flushed it ... but we'd better assume we are not
2226  * caught up.
2227  */
2228  if (!pq_is_send_pending())
2229  send_data();
2230  else
2231  WalSndCaughtUp = false;
2232 
2233  /* Try to flush pending output to the client */
2234  if (pq_flush_if_writable() != 0)
2235  WalSndShutdown();
2236 
2237  /* If nothing remains to be sent right now ... */
2239  {
2240  /*
2241  * If we're in catchup state, move to streaming. This is an
2242  * important state change for users to know about, since before
2243  * this point data loss might occur if the primary dies and we
2244  * need to failover to the standby. The state change is also
2245  * important for synchronous replication, since commits that
2246  * started to wait at that point might wait for some time.
2247  */
2248  if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
2249  {
2250  ereport(DEBUG1,
2251  (errmsg("\"%s\" has now caught up with upstream server",
2252  application_name)));
2254  }
2255 
2256  /*
2257  * When SIGUSR2 arrives, we send any outstanding logs up to the
2258  * shutdown checkpoint record (i.e., the latest record), wait for
2259  * them to be replicated to the standby, and exit. This may be a
2260  * normal termination at shutdown, or a promotion, the walsender
2261  * is not sure which.
2262  */
2263  if (got_SIGUSR2)
2264  WalSndDone(send_data);
2265  }
2266 
2267  /* Check for replication timeout. */
2269 
2270  /* Send keepalive if the time has come */
2272 
2273  /*
2274  * We don't block if not caught up, unless there is unsent data
2275  * pending in which case we'd better block until the socket is
2276  * write-ready. This test is only needed for the case where the
2277  * send_data callback handled a subset of the available data but then
2278  * pq_flush_if_writable flushed it all --- we should immediately try
2279  * to send more.
2280  */
2282  {
2283  long sleeptime;
2284  int wakeEvents;
2285 
2286  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT |
2288 
2289  /*
2290  * Use fresh timestamp, not last_processing, to reduce the chance
2291  * of reaching wal_sender_timeout before sending a keepalive.
2292  */
2294 
2295  if (pq_is_send_pending())
2296  wakeEvents |= WL_SOCKET_WRITEABLE;
2297 
2298  /* Sleep until something happens or we time out */
2299  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
2300  MyProcPort->sock, sleeptime,
2302  }
2303  }
2304 }
2305 
2306 /* Initialize a per-walsender data structure for this walsender process */
2307 static void
2309 {
2310  int i;
2311 
2312  /*
2313  * WalSndCtl should be set up already (we inherit this by fork() or
2314  * EXEC_BACKEND mechanism from the postmaster).
2315  */
2316  Assert(WalSndCtl != NULL);
2317  Assert(MyWalSnd == NULL);
2318 
2319  /*
2320  * Find a free walsender slot and reserve it. This must not fail due to
2321  * the prior check for free WAL senders in InitProcess().
2322  */
2323  for (i = 0; i < max_wal_senders; i++)
2324  {
2325  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2326 
2327  SpinLockAcquire(&walsnd->mutex);
2328 
2329  if (walsnd->pid != 0)
2330  {
2331  SpinLockRelease(&walsnd->mutex);
2332  continue;
2333  }
2334  else
2335  {
2336  /*
2337  * Found a free slot. Reserve it for us.
2338  */
2339  walsnd->pid = MyProcPid;
2340  walsnd->sentPtr = InvalidXLogRecPtr;
2341  walsnd->write = InvalidXLogRecPtr;
2342  walsnd->flush = InvalidXLogRecPtr;
2343  walsnd->apply = InvalidXLogRecPtr;
2344  walsnd->writeLag = -1;
2345  walsnd->flushLag = -1;
2346  walsnd->applyLag = -1;
2347  walsnd->state = WALSNDSTATE_STARTUP;
2348  walsnd->latch = &MyProc->procLatch;
2349  walsnd->replyTime = 0;
2350  walsnd->spillTxns = 0;
2351  walsnd->spillCount = 0;
2352  walsnd->spillBytes = 0;
2353  SpinLockRelease(&walsnd->mutex);
2354  /* don't need the lock anymore */
2355  MyWalSnd = (WalSnd *) walsnd;
2356 
2357  break;
2358  }
2359  }
2360 
2361  Assert(MyWalSnd != NULL);
2362 
2363  /* Arrange to clean up at walsender exit */
2365 }
2366 
2367 /* Destroy the per-walsender data structure for this walsender process */
2368 static void
2370 {
2371  WalSnd *walsnd = MyWalSnd;
2372 
2373  Assert(walsnd != NULL);
2374 
2375  MyWalSnd = NULL;
2376 
2377  SpinLockAcquire(&walsnd->mutex);
2378  /* clear latch while holding the spinlock, so it can safely be read */
2379  walsnd->latch = NULL;
2380  /* Mark WalSnd struct as no longer being in use. */
2381  walsnd->pid = 0;
2382  SpinLockRelease(&walsnd->mutex);
2383 }
2384 
2385 /* walsender's openSegment callback for WALRead */
2386 static int
2388  TimeLineID *tli_p)
2389 {
2390  char path[MAXPGPATH];
2391  int fd;
2392 
2393  /*-------
2394  * When reading from a historic timeline, and there is a timeline switch
2395  * within this segment, read from the WAL segment belonging to the new
2396  * timeline.
2397  *
2398  * For example, imagine that this server is currently on timeline 5, and
2399  * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
2400  * 0/13002088. In pg_wal, we have these files:
2401  *
2402  * ...
2403  * 000000040000000000000012
2404  * 000000040000000000000013
2405  * 000000050000000000000013
2406  * 000000050000000000000014
2407  * ...
2408  *
2409  * In this situation, when requested to send the WAL from segment 0x13, on
2410  * timeline 4, we read the WAL from file 000000050000000000000013. Archive
2411  * recovery prefers files from newer timelines, so if the segment was
2412  * restored from the archive on this server, the file belonging to the old
2413  * timeline, 000000040000000000000013, might not exist. Their contents are
2414  * equal up to the switchpoint, because at a timeline switch, the used
2415  * portion of the old segment is copied to the new file. -------
2416  */
2417  *tli_p = sendTimeLine;
2419  {
2420  XLogSegNo endSegNo;
2421 
2422  XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize);
2423  if (sendSeg->ws_segno == endSegNo)
2424  *tli_p = sendTimeLineNextTLI;
2425  }
2426 
2427  XLogFilePath(path, *tli_p, nextSegNo, segcxt->ws_segsize);
2428  fd = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2429  if (fd >= 0)
2430  return fd;
2431 
2432  /*
2433  * If the file is not found, assume it's because the standby asked for a
2434  * too old WAL segment that has already been removed or recycled.
2435  */
2436  if (errno == ENOENT)
2437  {
2438  char xlogfname[MAXFNAMELEN];
2439  int save_errno = errno;
2440 
2441  XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
2442  errno = save_errno;
2443  ereport(ERROR,
2445  errmsg("requested WAL segment %s has already been removed",
2446  xlogfname)));
2447  }
2448  else
2449  ereport(ERROR,
2451  errmsg("could not open file \"%s\": %m",
2452  path)));
2453  return -1; /* keep compiler quiet */
2454 }
2455 
2456 /*
2457  * Send out the WAL in its normal physical/stored form.
2458  *
2459  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
2460  * but not yet sent to the client, and buffer it in the libpq output
2461  * buffer.
2462  *
2463  * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
2464  * otherwise WalSndCaughtUp is set to false.
2465  */
2466 static void
2468 {
2469  XLogRecPtr SendRqstPtr;
2471  XLogRecPtr endptr;
2472  Size nbytes;
2473  XLogSegNo segno;
2474  WALReadError errinfo;
2475 
2476  /* If requested switch the WAL sender to the stopping state. */
2477  if (got_STOPPING)
2479 
2481  {
2482  WalSndCaughtUp = true;
2483  return;
2484  }
2485 
2486  /* Figure out how far we can safely send the WAL. */
2488  {
2489  /*
2490  * Streaming an old timeline that's in this server's history, but is
2491  * not the one we're currently inserting or replaying. It can be
2492  * streamed up to the point where we switched off that timeline.
2493  */
2494  SendRqstPtr = sendTimeLineValidUpto;
2495  }
2496  else if (am_cascading_walsender)
2497  {
2498  /*
2499  * Streaming the latest timeline on a standby.
2500  *
2501  * Attempt to send all WAL that has already been replayed, so that we
2502  * know it's valid. If we're receiving WAL through streaming
2503  * replication, it's also OK to send any WAL that has been received
2504  * but not replayed.
2505  *
2506  * The timeline we're recovering from can change, or we can be
2507  * promoted. In either case, the current timeline becomes historic. We
2508  * need to detect that so that we don't try to stream past the point
2509  * where we switched to another timeline. We check for promotion or
2510  * timeline switch after calculating FlushPtr, to avoid a race
2511  * condition: if the timeline becomes historic just after we checked
2512  * that it was still current, it's still be OK to stream it up to the
2513  * FlushPtr that was calculated before it became historic.
2514  */
2515  bool becameHistoric = false;
2516 
2517  SendRqstPtr = GetStandbyFlushRecPtr();
2518 
2519  if (!RecoveryInProgress())
2520  {
2521  /*
2522  * We have been promoted. RecoveryInProgress() updated
2523  * ThisTimeLineID to the new current timeline.
2524  */
2525  am_cascading_walsender = false;
2526  becameHistoric = true;
2527  }
2528  else
2529  {
2530  /*
2531  * Still a cascading standby. But is the timeline we're sending
2532  * still the one recovery is recovering from? ThisTimeLineID was
2533  * updated by the GetStandbyFlushRecPtr() call above.
2534  */
2536  becameHistoric = true;
2537  }
2538 
2539  if (becameHistoric)
2540  {
2541  /*
2542  * The timeline we were sending has become historic. Read the
2543  * timeline history file of the new timeline to see where exactly
2544  * we forked off from the timeline we were sending.
2545  */
2546  List *history;
2547 
2550 
2552  list_free_deep(history);
2553 
2554  sendTimeLineIsHistoric = true;
2555 
2556  SendRqstPtr = sendTimeLineValidUpto;
2557  }
2558  }
2559  else
2560  {
2561  /*
2562  * Streaming the current timeline on a master.
2563  *
2564  * Attempt to send all data that's already been written out and
2565  * fsync'd to disk. We cannot go further than what's been written out
2566  * given the current implementation of XLogRead(). And in any case
2567  * it's unsafe to send WAL that is not securely down to disk on the
2568  * master: if the master subsequently crashes and restarts, standbys
2569  * must not have applied any WAL that got lost on the master.
2570  */
2571  SendRqstPtr = GetFlushRecPtr();
2572  }
2573 
2574  /*
2575  * Record the current system time as an approximation of the time at which
2576  * this WAL location was written for the purposes of lag tracking.
2577  *
2578  * In theory we could make XLogFlush() record a time in shmem whenever WAL
2579  * is flushed and we could get that time as well as the LSN when we call
2580  * GetFlushRecPtr() above (and likewise for the cascading standby
2581  * equivalent), but rather than putting any new code into the hot WAL path
2582  * it seems good enough to capture the time here. We should reach this
2583  * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2584  * may take some time, we read the WAL flush pointer and take the time
2585  * very close to together here so that we'll get a later position if it is
2586  * still moving.
2587  *
2588  * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
2589  * this gives us a cheap approximation for the WAL flush time for this
2590  * LSN.
2591  *
2592  * Note that the LSN is not necessarily the LSN for the data contained in
2593  * the present message; it's the end of the WAL, which might be further
2594  * ahead. All the lag tracking machinery cares about is finding out when
2595  * that arbitrary LSN is eventually reported as written, flushed and
2596  * applied, so that it can measure the elapsed time.
2597  */
2598  LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2599 
2600  /*
2601  * If this is a historic timeline and we've reached the point where we
2602  * forked to the next timeline, stop streaming.
2603  *
2604  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2605  * startup process will normally replay all WAL that has been received
2606  * from the master, before promoting, but if the WAL streaming is
2607  * terminated at a WAL page boundary, the valid portion of the timeline
2608  * might end in the middle of a WAL record. We might've already sent the
2609  * first half of that partial WAL record to the cascading standby, so that
2610  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2611  * replay the partial WAL record either, so it can still follow our
2612  * timeline switch.
2613  */
2615  {
2616  /* close the current file. */
2617  if (sendSeg->ws_file >= 0)
2618  close(sendSeg->ws_file);
2619  sendSeg->ws_file = -1;
2620 
2621  /* Send CopyDone */
2622  pq_putmessage_noblock('c', NULL, 0);
2623  streamingDoneSending = true;
2624 
2625  WalSndCaughtUp = true;
2626 
2627  elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2629  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2630  return;
2631  }
2632 
2633  /* Do we have any work to do? */
2634  Assert(sentPtr <= SendRqstPtr);
2635  if (SendRqstPtr <= sentPtr)
2636  {
2637  WalSndCaughtUp = true;
2638  return;
2639  }
2640 
2641  /*
2642  * Figure out how much to send in one message. If there's no more than
2643  * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2644  * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2645  *
2646  * The rounding is not only for performance reasons. Walreceiver relies on
2647  * the fact that we never split a WAL record across two messages. Since a
2648  * long WAL record is split at page boundary into continuation records,
2649  * page boundary is always a safe cut-off point. We also assume that
2650  * SendRqstPtr never points to the middle of a WAL record.
2651  */
2652  startptr = sentPtr;
2653  endptr = startptr;
2654  endptr += MAX_SEND_SIZE;
2655 
2656  /* if we went beyond SendRqstPtr, back off */
2657  if (SendRqstPtr <= endptr)
2658  {
2659  endptr = SendRqstPtr;
2661  WalSndCaughtUp = false;
2662  else
2663  WalSndCaughtUp = true;
2664  }
2665  else
2666  {
2667  /* round down to page boundary. */
2668  endptr -= (endptr % XLOG_BLCKSZ);
2669  WalSndCaughtUp = false;
2670  }
2671 
2672  nbytes = endptr - startptr;
2673  Assert(nbytes <= MAX_SEND_SIZE);
2674 
2675  /*
2676  * OK to read and send the slice.
2677  */
2678  resetStringInfo(&output_message);
2679  pq_sendbyte(&output_message, 'w');
2680 
2681  pq_sendint64(&output_message, startptr); /* dataStart */
2682  pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2683  pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2684 
2685  /*
2686  * Read the log directly into the output buffer to avoid extra memcpy
2687  * calls.
2688  */
2689  enlargeStringInfo(&output_message, nbytes);
2690 
2691 retry:
2692  if (!WALRead(&output_message.data[output_message.len],
2693  startptr,
2694  nbytes,
2695  sendSeg->ws_tli, /* Pass the current TLI because only
2696  * WalSndSegmentOpen controls whether new
2697  * TLI is needed. */
2698  sendSeg,
2699  sendCxt,
2701  &errinfo))
2702  WALReadRaiseError(&errinfo);
2703 
2704  /* See logical_read_xlog_page(). */
2705  XLByteToSeg(startptr, segno, sendCxt->ws_segsize);
2706  CheckXLogRemoved(segno, sendSeg->ws_tli);
2707 
2708  /*
2709  * During recovery, the currently-open WAL file might be replaced with the
2710  * file of the same name retrieved from archive. So we always need to
2711  * check what we read was valid after reading into the buffer. If it's
2712  * invalid, we try to open and read the file again.
2713  */
2715  {
2716  WalSnd *walsnd = MyWalSnd;
2717  bool reload;
2718 
2719  SpinLockAcquire(&walsnd->mutex);
2720  reload = walsnd->needreload;
2721  walsnd->needreload = false;
2722  SpinLockRelease(&walsnd->mutex);
2723 
2724  if (reload && sendSeg->ws_file >= 0)
2725  {
2726  close(sendSeg->ws_file);
2727  sendSeg->ws_file = -1;
2728 
2729  goto retry;
2730  }
2731  }
2732 
2733  output_message.len += nbytes;
2734  output_message.data[output_message.len] = '\0';
2735 
2736  /*
2737  * Fill the send timestamp last, so that it is taken as late as possible.
2738  */
2739  resetStringInfo(&tmpbuf);
2740  pq_sendint64(&tmpbuf, GetCurrentTimestamp());
2741  memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2742  tmpbuf.data, sizeof(int64));
2743 
2744  pq_putmessage_noblock('d', output_message.data, output_message.len);
2745 
2746  sentPtr = endptr;
2747 
2748  /* Update shared memory status */
2749  {
2750  WalSnd *walsnd = MyWalSnd;
2751 
2752  SpinLockAcquire(&walsnd->mutex);
2753  walsnd->sentPtr = sentPtr;
2754  SpinLockRelease(&walsnd->mutex);
2755  }
2756 
2757  /* Report progress of XLOG streaming in PS display */
2759  {
2760  char activitymsg[50];
2761 
2762  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
2763  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2764  set_ps_display(activitymsg, false);
2765  }
2766 }
2767 
2768 /*
2769  * Stream out logically decoded data.
2770  */
2771 static void
2773 {
2774  XLogRecord *record;
2775  char *errm;
2776  XLogRecPtr flushPtr;
2777 
2778  /*
2779  * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
2780  * true in WalSndWaitForWal, if we're actually waiting. We also set to
2781  * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
2782  * didn't wait - i.e. when we're shutting down.
2783  */
2784  WalSndCaughtUp = false;
2785 
2786  record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
2788 
2789  /* xlog record was invalid */
2790  if (errm != NULL)
2791  elog(ERROR, "%s", errm);
2792 
2793  /*
2794  * We'll use the current flush point to determine whether we've caught up.
2795  */
2796  flushPtr = GetFlushRecPtr();
2797 
2798  if (record != NULL)
2799  {
2800  /*
2801  * Note the lack of any call to LagTrackerWrite() which is handled by
2802  * WalSndUpdateProgress which is called by output plugin through
2803  * logical decoding write api.
2804  */
2805  LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
2806 
2807  sentPtr = logical_decoding_ctx->reader->EndRecPtr;
2808  }
2809 
2810  /* Set flag if we're caught up. */
2811  if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2812  WalSndCaughtUp = true;
2813 
2814  /*
2815  * If we're caught up and have been requested to stop, have WalSndLoop()
2816  * terminate the connection in an orderly manner, after writing out all
2817  * the pending data.
2818  */
2820  got_SIGUSR2 = true;
2821 
2822  /* Update shared memory status */
2823  {
2824  WalSnd *walsnd = MyWalSnd;
2825 
2826  SpinLockAcquire(&walsnd->mutex);
2827  walsnd->sentPtr = sentPtr;
2828  SpinLockRelease(&walsnd->mutex);
2829  }
2830 }
2831 
2832 /*
2833  * Shutdown if the sender is caught up.
2834  *
2835  * NB: This should only be called when the shutdown signal has been received
2836  * from postmaster.
2837  *
2838  * Note that if we determine that there's still more data to send, this
2839  * function will return control to the caller.
2840  */
2841 static void
2843 {
2844  XLogRecPtr replicatedPtr;
2845 
2846  /* ... let's just be real sure we're caught up ... */
2847  send_data();
2848 
2849  /*
2850  * To figure out whether all WAL has successfully been replicated, check
2851  * flush location if valid, write otherwise. Tools like pg_receivewal will
2852  * usually (unless in synchronous mode) return an invalid flush location.
2853  */
2854  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2855  MyWalSnd->write : MyWalSnd->flush;
2856 
2857  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2858  !pq_is_send_pending())
2859  {
2860  /* Inform the standby that XLOG streaming is done */
2861  EndCommand("COPY 0", DestRemote);
2862  pq_flush();
2863 
2864  proc_exit(0);
2865  }
2867  {
2868  WalSndKeepalive(true);
2870  }
2871 }
2872 
2873 /*
2874  * Returns the latest point in WAL that has been safely flushed to disk, and
2875  * can be sent to the standby. This should only be called when in recovery,
2876  * ie. we're streaming to a cascaded standby.
2877  *
2878  * As a side-effect, ThisTimeLineID is updated to the TLI of the last
2879  * replayed WAL record.
2880  */
2881 static XLogRecPtr
2883 {
2884  XLogRecPtr replayPtr;
2885  TimeLineID replayTLI;
2886  XLogRecPtr receivePtr;
2888  XLogRecPtr result;
2889 
2890  /*
2891  * We can safely send what's already been replayed. Also, if walreceiver
2892  * is streaming WAL from the same timeline, we can send anything that it
2893  * has streamed, but hasn't been replayed yet.
2894  */
2895 
2896  receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
2897  replayPtr = GetXLogReplayRecPtr(&replayTLI);
2898 
2899  ThisTimeLineID = replayTLI;
2900 
2901  result = replayPtr;
2902  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2903  result = receivePtr;
2904 
2905  return result;
2906 }
2907 
2908 /*
2909  * Request walsenders to reload the currently-open WAL file
2910  */
2911 void
2913 {
2914  int i;
2915 
2916  for (i = 0; i < max_wal_senders; i++)
2917  {
2918  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2919 
2920  SpinLockAcquire(&walsnd->mutex);
2921  if (walsnd->pid == 0)
2922  {
2923  SpinLockRelease(&walsnd->mutex);
2924  continue;
2925  }
2926  walsnd->needreload = true;
2927  SpinLockRelease(&walsnd->mutex);
2928  }
2929 }
2930 
2931 /*
2932  * Handle PROCSIG_WALSND_INIT_STOPPING signal.
2933  */
2934 void
2936 {
2938 
2939  /*
2940  * If replication has not yet started, die like with SIGTERM. If
2941  * replication is active, only set a flag and wake up the main loop. It
2942  * will send any outstanding WAL, wait for it to be replicated to the
2943  * standby, and then exit gracefully.
2944  */
2945  if (!replication_active)
2946  kill(MyProcPid, SIGTERM);
2947  else
2948  got_STOPPING = true;
2949 }
2950 
2951 /*
2952  * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
2953  * sender should already have been switched to WALSNDSTATE_STOPPING at
2954  * this point.
2955  */
2956 static void
2958 {
2959  int save_errno = errno;
2960 
2961  got_SIGUSR2 = true;
2962  SetLatch(MyLatch);
2963 
2964  errno = save_errno;
2965 }
2966 
2967 /* Set up signal handlers */
2968 void
2970 {
2971  /* Set up signal handlers */
2972  pqsignal(SIGHUP, PostgresSigHupHandler); /* set flag to read config
2973  * file */
2974  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
2975  pqsignal(SIGTERM, die); /* request shutdown */
2976  pqsignal(SIGQUIT, quickdie); /* hard crash time */
2977  InitializeTimeouts(); /* establishes SIGALRM handler */
2980  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
2981  * shutdown */
2982 
2983  /* Reset some signals that are accepted by postmaster but not here */
2985 }
2986 
2987 /* Report shared-memory space needed by WalSndShmemInit */
2988 Size
2990 {
2991  Size size = 0;
2992 
2993  size = offsetof(WalSndCtlData, walsnds);
2994  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
2995 
2996  return size;
2997 }
2998 
2999 /* Allocate and initialize walsender-related shared memory */
3000 void
3002 {
3003  bool found;
3004  int i;
3005 
3006  WalSndCtl = (WalSndCtlData *)
3007  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3008 
3009  if (!found)
3010  {
3011  /* First time through, so initialize */
3012  MemSet(WalSndCtl, 0, WalSndShmemSize());
3013 
3014  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3015  SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
3016 
3017  for (i = 0; i < max_wal_senders; i++)
3018  {
3019  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3020 
3021  SpinLockInit(&walsnd->mutex);
3022  }
3023  }
3024 }
3025 
3026 /*
3027  * Wake up all walsenders
3028  *
3029  * This will be called inside critical sections, so throwing an error is not
3030  * advisable.
3031  */
3032 void
3034 {
3035  int i;
3036 
3037  for (i = 0; i < max_wal_senders; i++)
3038  {
3039  Latch *latch;
3040  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3041 
3042  /*
3043  * Get latch pointer with spinlock held, for the unlikely case that
3044  * pointer reads aren't atomic (as they're 8 bytes).
3045  */
3046  SpinLockAcquire(&walsnd->mutex);
3047  latch = walsnd->latch;
3048  SpinLockRelease(&walsnd->mutex);
3049 
3050  if (latch != NULL)
3051  SetLatch(latch);
3052  }
3053 }
3054 
3055 /*
3056  * Signal all walsenders to move to stopping state.
3057  *
3058  * This will trigger walsenders to move to a state where no further WAL can be
3059  * generated. See this file's header for details.
3060  */
3061 void
3063 {
3064  int i;
3065 
3066  for (i = 0; i < max_wal_senders; i++)
3067  {
3068  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3069  pid_t pid;
3070 
3071  SpinLockAcquire(&walsnd->mutex);
3072  pid = walsnd->pid;
3073  SpinLockRelease(&walsnd->mutex);
3074 
3075  if (pid == 0)
3076  continue;
3077 
3079  }
3080 }
3081 
3082 /*
3083  * Wait that all the WAL senders have quit or reached the stopping state. This
3084  * is used by the checkpointer to control when the shutdown checkpoint can
3085  * safely be performed.
3086  */
3087 void
3089 {
3090  for (;;)
3091  {
3092  int i;
3093  bool all_stopped = true;
3094 
3095  for (i = 0; i < max_wal_senders; i++)
3096  {
3097  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3098 
3099  SpinLockAcquire(&walsnd->mutex);
3100 
3101  if (walsnd->pid == 0)
3102  {
3103  SpinLockRelease(&walsnd->mutex);
3104  continue;
3105  }
3106 
3107  if (walsnd->state != WALSNDSTATE_STOPPING)
3108  {
3109  all_stopped = false;
3110  SpinLockRelease(&walsnd->mutex);
3111  break;
3112  }
3113  SpinLockRelease(&walsnd->mutex);
3114  }
3115 
3116  /* safe to leave if confirmation is done for all WAL senders */
3117  if (all_stopped)
3118  return;
3119 
3120  pg_usleep(10000L); /* wait for 10 msec */
3121  }
3122 }
3123 
3124 /* Set state for current walsender (only called in walsender) */
3125 void
3127 {
3128  WalSnd *walsnd = MyWalSnd;
3129 
3131 
3132  if (walsnd->state == state)
3133  return;
3134 
3135  SpinLockAcquire(&walsnd->mutex);
3136  walsnd->state = state;
3137  SpinLockRelease(&walsnd->mutex);
3138 }
3139 
3140 /*
3141  * Return a string constant representing the state. This is used
3142  * in system views, and should *not* be translated.
3143  */
3144 static const char *
3146 {
3147  switch (state)
3148  {
3149  case WALSNDSTATE_STARTUP:
3150  return "startup";
3151  case WALSNDSTATE_BACKUP:
3152  return "backup";
3153  case WALSNDSTATE_CATCHUP:
3154  return "catchup";
3155  case WALSNDSTATE_STREAMING:
3156  return "streaming";
3157  case WALSNDSTATE_STOPPING:
3158  return "stopping";
3159  }
3160  return "UNKNOWN";
3161 }
3162 
3163 static Interval *
3165 {
3166  Interval *result = palloc(sizeof(Interval));
3167 
3168  result->month = 0;
3169  result->day = 0;
3170  result->time = offset;
3171 
3172  return result;
3173 }
3174 
3175 /*
3176  * Returns activity of walsenders, including pids and xlog locations sent to
3177  * standby servers.
3178  */
3179 Datum
3181 {
3182 #define PG_STAT_GET_WAL_SENDERS_COLS 15
3183  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3184  TupleDesc tupdesc;
3185  Tuplestorestate *tupstore;
3186  MemoryContext per_query_ctx;
3187  MemoryContext oldcontext;
3188  List *sync_standbys;
3189  int i;
3190 
3191  /* check to see if caller supports us returning a tuplestore */
3192  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3193  ereport(ERROR,
3194  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3195  errmsg("set-valued function called in context that cannot accept a set")));
3196  if (!(rsinfo->allowedModes & SFRM_Materialize))
3197  ereport(ERROR,
3198  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3199  errmsg("materialize mode required, but it is not " \
3200  "allowed in this context")));
3201 
3202  /* Build a tuple descriptor for our result type */
3203  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3204  elog(ERROR, "return type must be a row type");
3205 
3206  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3207  oldcontext = MemoryContextSwitchTo(per_query_ctx);
3208 
3209  tupstore = tuplestore_begin_heap(true, false, work_mem);
3210  rsinfo->returnMode = SFRM_Materialize;
3211  rsinfo->setResult = tupstore;
3212  rsinfo->setDesc = tupdesc;
3213 
3214  MemoryContextSwitchTo(oldcontext);
3215 
3216  /*
3217  * Get the currently active synchronous standbys.
3218  */
3219  LWLockAcquire(SyncRepLock, LW_SHARED);
3220  sync_standbys = SyncRepGetSyncStandbys(NULL);
3221  LWLockRelease(SyncRepLock);
3222 
3223  for (i = 0; i < max_wal_senders; i++)
3224  {
3225  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3227  XLogRecPtr write;
3228  XLogRecPtr flush;
3229  XLogRecPtr apply;
3230  TimeOffset writeLag;
3231  TimeOffset flushLag;
3232  TimeOffset applyLag;
3233  int priority;
3234  int pid;
3236  TimestampTz replyTime;
3237  int64 spillTxns;
3238  int64 spillCount;
3239  int64 spillBytes;
3241  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3242 
3243  SpinLockAcquire(&walsnd->mutex);
3244  if (walsnd->pid == 0)
3245  {
3246  SpinLockRelease(&walsnd->mutex);
3247  continue;
3248  }
3249  pid = walsnd->pid;
3250  sentPtr = walsnd->sentPtr;
3251  state = walsnd->state;
3252  write = walsnd->write;
3253  flush = walsnd->flush;
3254  apply = walsnd->apply;
3255  writeLag = walsnd->writeLag;
3256  flushLag = walsnd->flushLag;
3257  applyLag = walsnd->applyLag;
3258  priority = walsnd->sync_standby_priority;
3259  replyTime = walsnd->replyTime;
3260  spillTxns = walsnd->spillTxns;
3261  spillCount = walsnd->spillCount;
3262  spillBytes = walsnd->spillBytes;
3263  SpinLockRelease(&walsnd->mutex);
3264 
3265  memset(nulls, 0, sizeof(nulls));
3266  values[0] = Int32GetDatum(pid);
3267 
3268  if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
3269  {
3270  /*
3271  * Only superusers and members of pg_read_all_stats can see
3272  * details. Other users only get the pid value to know it's a
3273  * walsender, but no details.
3274  */
3275  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3276  }
3277  else
3278  {
3279  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3280 
3281  if (XLogRecPtrIsInvalid(sentPtr))
3282  nulls[2] = true;
3283  values[2] = LSNGetDatum(sentPtr);
3284 
3285  if (XLogRecPtrIsInvalid(write))
3286  nulls[3] = true;
3287  values[3] = LSNGetDatum(write);
3288 
3289  if (XLogRecPtrIsInvalid(flush))
3290  nulls[4] = true;
3291  values[4] = LSNGetDatum(flush);
3292 
3293  if (XLogRecPtrIsInvalid(apply))
3294  nulls[5] = true;
3295  values[5] = LSNGetDatum(apply);
3296 
3297  /*
3298  * Treat a standby such as a pg_basebackup background process
3299  * which always returns an invalid flush location, as an
3300  * asynchronous standby.
3301  */
3302  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3303 
3304  if (writeLag < 0)
3305  nulls[6] = true;
3306  else
3307  values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3308 
3309  if (flushLag < 0)
3310  nulls[7] = true;
3311  else
3312  values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3313 
3314  if (applyLag < 0)
3315  nulls[8] = true;
3316  else
3317  values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3318 
3319  values[9] = Int32GetDatum(priority);
3320 
3321  /*
3322  * More easily understood version of standby state. This is purely
3323  * informational.
3324  *
3325  * In quorum-based sync replication, the role of each standby
3326  * listed in synchronous_standby_names can be changing very
3327  * frequently. Any standbys considered as "sync" at one moment can
3328  * be switched to "potential" ones at the next moment. So, it's
3329  * basically useless to report "sync" or "potential" as their sync
3330  * states. We report just "quorum" for them.
3331  */
3332  if (priority == 0)
3333  values[10] = CStringGetTextDatum("async");
3334  else if (list_member_int(sync_standbys, i))
3336  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3337  else
3338  values[10] = CStringGetTextDatum("potential");
3339 
3340  if (replyTime == 0)
3341  nulls[11] = true;
3342  else
3343  values[11] = TimestampTzGetDatum(replyTime);
3344 
3345  /* spill to disk */
3346  values[12] = Int64GetDatum(spillTxns);
3347  values[13] = Int64GetDatum(spillCount);
3348  values[14] = Int64GetDatum(spillBytes);
3349  }
3350 
3351  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3352  }
3353 
3354  /* clean up and return the tuplestore */
3355  tuplestore_donestoring(tupstore);
3356 
3357  return (Datum) 0;
3358 }
3359 
3360 /*
3361  * This function is used to send a keepalive message to standby.
3362  * If requestReply is set, sets a flag in the message requesting the standby
3363  * to send a message back to us, for heartbeat purposes.
3364  */
3365 static void
3366 WalSndKeepalive(bool requestReply)
3367 {
3368  elog(DEBUG2, "sending replication keepalive");
3369 
3370  /* construct the message... */
3371  resetStringInfo(&output_message);
3372  pq_sendbyte(&output_message, 'k');
3373  pq_sendint64(&output_message, sentPtr);
3374  pq_sendint64(&output_message, GetCurrentTimestamp());
3375  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3376 
3377  /* ... and send it wrapped in CopyData */
3378  pq_putmessage_noblock('d', output_message.data, output_message.len);
3379 }
3380 
3381 /*
3382  * Send keepalive message if too much time has elapsed.
3383  */
3384 static void
3386 {
3387  TimestampTz ping_time;
3388 
3389  /*
3390  * Don't send keepalive messages if timeouts are globally disabled or
3391  * we're doing something not partaking in timeouts.
3392  */
3393  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3394  return;
3395 
3397  return;
3398 
3399  /*
3400  * If half of wal_sender_timeout has lapsed without receiving any reply
3401  * from the standby, send a keep-alive message to the standby requesting
3402  * an immediate reply.
3403  */
3405  wal_sender_timeout / 2);
3406  if (last_processing >= ping_time)
3407  {
3408  WalSndKeepalive(true);
3410 
3411  /* Try to flush pending output to the client */
3412  if (pq_flush_if_writable() != 0)
3413  WalSndShutdown();
3414  }
3415 }
3416 
3417 /*
3418  * Record the end of the WAL and the time it was flushed locally, so that
3419  * LagTrackerRead can compute the elapsed time (lag) when this WAL location is
3420  * eventually reported to have been written, flushed and applied by the
3421  * standby in a reply message.
3422  */
3423 static void
3425 {
3426  bool buffer_full;
3427  int new_write_head;
3428  int i;
3429 
3430  if (!am_walsender)
3431  return;
3432 
3433  /*
3434  * If the lsn hasn't advanced since last time, then do nothing. This way
3435  * we only record a new sample when new WAL has been written.
3436  */
3437  if (lag_tracker->last_lsn == lsn)
3438  return;
3439  lag_tracker->last_lsn = lsn;
3440 
3441  /*
3442  * If advancing the write head of the circular buffer would crash into any
3443  * of the read heads, then the buffer is full. In other words, the
3444  * slowest reader (presumably apply) is the one that controls the release
3445  * of space.
3446  */
3447  new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3448  buffer_full = false;
3449  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3450  {
3451  if (new_write_head == lag_tracker->read_heads[i])
3452  buffer_full = true;
3453  }
3454 
3455  /*
3456  * If the buffer is full, for now we just rewind by one slot and overwrite
3457  * the last sample, as a simple (if somewhat uneven) way to lower the
3458  * sampling rate. There may be better adaptive compaction algorithms.
3459  */
3460  if (buffer_full)
3461  {
3462  new_write_head = lag_tracker->write_head;
3463  if (lag_tracker->write_head > 0)
3464  lag_tracker->write_head--;
3465  else
3466  lag_tracker->write_head = LAG_TRACKER_BUFFER_SIZE - 1;
3467  }
3468 
3469  /* Store a sample at the current write head position. */
3470  lag_tracker->buffer[lag_tracker->write_head].lsn = lsn;
3471  lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
3472  lag_tracker->write_head = new_write_head;
3473 }
3474 
3475 /*
3476  * Find out how much time has elapsed between the moment WAL location 'lsn'
3477  * (or the highest known earlier LSN) was flushed locally and the time 'now'.
3478  * We have a separate read head for each of the reported LSN locations we
3479  * receive in replies from standby; 'head' controls which read head is
3480  * used. Whenever a read head crosses an LSN which was written into the
3481  * lag buffer with LagTrackerWrite, we can use the associated timestamp to
3482  * find out the time this LSN (or an earlier one) was flushed locally, and
3483  * therefore compute the lag.
3484  *
3485  * Return -1 if no new sample data is available, and otherwise the elapsed
3486  * time in microseconds.
3487  */
3488 static TimeOffset
3490 {
3491  TimestampTz time = 0;
3492 
3493  /* Read all unread samples up to this LSN or end of buffer. */
3494  while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
3495  lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
3496  {
3497  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3498  lag_tracker->last_read[head] =
3499  lag_tracker->buffer[lag_tracker->read_heads[head]];
3500  lag_tracker->read_heads[head] =
3501  (lag_tracker->read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
3502  }
3503 
3504  /*
3505  * If the lag tracker is empty, that means the standby has processed
3506  * everything we've ever sent so we should now clear 'last_read'. If we
3507  * didn't do that, we'd risk using a stale and irrelevant sample for
3508  * interpolation at the beginning of the next burst of WAL after a period
3509  * of idleness.
3510  */
3511  if (lag_tracker->read_heads[head] == lag_tracker->write_head)
3512  lag_tracker->last_read[head].time = 0;
3513 
3514  if (time > now)
3515  {
3516  /* If the clock somehow went backwards, treat as not found. */
3517  return -1;
3518  }
3519  else if (time == 0)
3520  {
3521  /*
3522  * We didn't cross a time. If there is a future sample that we
3523  * haven't reached yet, and we've already reached at least one sample,
3524  * let's interpolate the local flushed time. This is mainly useful
3525  * for reporting a completely stuck apply position as having
3526  * increasing lag, since otherwise we'd have to wait for it to
3527  * eventually start moving again and cross one of our samples before
3528  * we can show the lag increasing.
3529  */
3530  if (lag_tracker->read_heads[head] == lag_tracker->write_head)
3531  {
3532  /* There are no future samples, so we can't interpolate. */
3533  return -1;
3534  }
3535  else if (lag_tracker->last_read[head].time != 0)
3536  {
3537  /* We can interpolate between last_read and the next sample. */
3538  double fraction;
3539  WalTimeSample prev = lag_tracker->last_read[head];
3540  WalTimeSample next = lag_tracker->buffer[lag_tracker->read_heads[head]];
3541 
3542  if (lsn < prev.lsn)
3543  {
3544  /*
3545  * Reported LSNs shouldn't normally go backwards, but it's
3546  * possible when there is a timeline change. Treat as not
3547  * found.
3548  */
3549  return -1;
3550  }
3551 
3552  Assert(prev.lsn < next.lsn);
3553 
3554  if (prev.time > next.time)
3555  {
3556  /* If the clock somehow went backwards, treat as not found. */
3557  return -1;
3558  }
3559 
3560  /* See how far we are between the previous and next samples. */
3561  fraction =
3562  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3563 
3564  /* Scale the local flush time proportionally. */
3565  time = (TimestampTz)
3566  ((double) prev.time + (next.time - prev.time) * fraction);
3567  }
3568  else
3569  {
3570  /*
3571  * We have only a future sample, implying that we were entirely
3572  * caught up but and now there is a new burst of WAL and the
3573  * standby hasn't processed the first sample yet. Until the
3574  * standby reaches the future sample the best we can do is report
3575  * the hypothetical lag if that sample were to be replayed now.
3576  */
3577  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3578  }
3579  }
3580 
3581  /* Return the elapsed time since local flush time in microseconds. */
3582  Assert(time != 0);
3583  return now - time;
3584 }
3585 
3586 static void
3588 {
3589  ReorderBuffer *rb = ctx->reorder;
3590 
3591  SpinLockAcquire(&MyWalSnd->mutex);
3592 
3593  MyWalSnd->spillTxns = rb->spillTxns;
3594  MyWalSnd->spillCount = rb->spillCount;
3595  MyWalSnd->spillBytes = rb->spillBytes;
3596 
3597  elog(DEBUG2, "UpdateSpillStats: updating stats %p %lld %lld %lld",
3598  rb,
3599  (long long) rb->spillTxns,
3600  (long long) rb->spillCount,
3601  (long long) rb->spillBytes);
3602 
3603  SpinLockRelease(&MyWalSnd->mutex);
3604 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
static void InitWalSenderSlot(void)
Definition: walsender.c:2308
void InitializeTimeouts(void)
Definition: timeout.c:346
#define pq_is_send_pending()
Definition: libpq.h:41
#define NIL
Definition: pg_list.h:65
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
XLogRecPtr write
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:209
static void ProcessStandbyMessage(void)
Definition: walsender.c:1734
#define SIGQUIT
Definition: win32_port.h:155
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define IsA(nodeptr, _type_)
Definition: nodes.h:576
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
#define AllocSetContextCreate
Definition: memutils.h:170
XLogRecPtr startpoint
Definition: replnodes.h:85
#define DEBUG1
Definition: elog.h:25
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:196
int MyProcPid
Definition: globals.c:40
int wal_sender_timeout
Definition: walsender.c:122
struct ReorderBuffer * reorder
Definition: logical.h:42
uint32 TimeLineID
Definition: xlogdefs.h:52
#define pq_flush()
Definition: libpq.h:39
static int32 next
Definition: blutils.c:213
XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
struct Port * MyProcPort
Definition: globals.c:43
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
int wal_segment_size
Definition: xlog.c:112
int write_head
Definition: walsender.c:210
uint32 TransactionId
Definition: c.h:514
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
bool wake_wal_senders
Definition: walsender.c:129
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
bool exec_replication_command(const char *cmd_string)
Definition: walsender.c:1461
Oid GetUserId(void)
Definition: miscinit.c:380
#define SIGUSR1
Definition: win32_port.h:166
bool update_process_title
Definition: ps_status.c:35
#define write(a, b, c)
Definition: win32.h:14
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
TransactionId xmin
Definition: proc.h:228
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:2957
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3119
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:435
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
Definition: walsender.c:822
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1075
static WALSegmentContext * sendCxt
Definition: walsender.c:132
#define SIGCHLD
Definition: win32_port.h:164
PGPROC * MyProc
Definition: proc.c:67
int64 TimestampTz
Definition: timestamp.h:39
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:212
Size WalSndShmemSize(void)
Definition: walsender.c:2989
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:272
char * pstrdup(const char *in)
Definition: mcxt.c:1186
void CommitTransactionCommand(void)
Definition: xact.c:2898
#define XACT_REPEATABLE_READ
Definition: xact.h:38
#define SpinLockInit(lock)
Definition: spin.h:60
void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
Definition: snapmgr.c:2225
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:330
uint8 syncrep_method
Definition: syncrep.h:51
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:944
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:75
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:375
static void XLogSendPhysical(void)
Definition: walsender.c:2467
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2256
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
TimeOffset flushLag
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:221
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:331
Definition: nodes.h:525
static StringInfoData output_message
Definition: walsender.c:152
WalSndCtlData * WalSndCtl
Definition: walsender.c:108
struct cursor * cur
Definition: ecpg.c:28
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:182
void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, int segsize, const char *waldir)
Definition: xlogreader.c:209
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:608
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
#define MemSet(start, val, len)
Definition: c.h:962
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2369
#define kill(pid, sig)
Definition: win32_port.h:426
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:2842
#define WL_SOCKET_READABLE
Definition: latch.h:125
void ReplicationSlotSave(void)
Definition: slot.c:645
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8267
List * SyncRepGetSyncStandbys(bool *am_sync)
Definition: syncrep.c:700
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
TimestampTz time
Definition: walsender.c:199
int sleeptime
Definition: pg_standby.c:41
#define SIGPIPE
Definition: win32_port.h:159
ReplicationSlotPersistentData data
Definition: slot.h:132
TimeOffset writeLag
#define SIGUSR2
Definition: win32_port.h:167
#define LOG
Definition: elog.h:26
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
bool RecoveryInProgress(void)
Definition: xlog.c:7935
void SetLatch(Latch *latch)
Definition: latch.c:436
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1682
void list_free_deep(List *list)
Definition: list.c:1391
static int fd(const char *x, int i)
Definition: preproc-init.c:105
pgsocket sock
Definition: libpq-be.h:122
#define PG_BINARY
Definition: c.h:1222
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:8801
void ResetLatch(Latch *latch)
Definition: latch.c:519
slock_t mutex
Node * replication_parse_result
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2832
Latch procLatch
Definition: proc.h:104
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
Definition: xlogutils.c:681
XLogRecord * XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
Definition: xlogreader.c:238
XLogRecPtr confirmed_flush
Definition: slot.h:80
PGXACT * MyPgXact
Definition: proc.c:68
XLogRecPtr EndRecPtr
Definition: xlogreader.h:132
int32 day
Definition: timestamp.h:47
#define XidFromFullTransactionId(x)
Definition: transam.h:48
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:1951
static LagTracker * lag_tracker
Definition: walsender.c:215
TimeLineID timeline
Definition: replnodes.h:97
int64 spillCount
ReplicationKind kind
Definition: replnodes.h:56
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
bool IsTransactionBlock(void)
Definition: xact.c:4635
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
int64 spillBytes
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:631
bool am_walsender
Definition: walsender.c:114
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
void HandleWalSndInitStopping(void)
Definition: walsender.c:2935
void ReplicationSlotReserveWal(void)
Definition: slot.c:997
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:179
XLogRecPtr flush
bool defGetBoolean(DefElem *def)
Definition: define.c:111
char data[BLCKSZ]
Definition: c.h:1091
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:462
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:748
ReplicationKind kind
Definition: replnodes.h:82
void WalSndRqstFileReload(void)
Definition: walsender.c:2912
void pfree(void *pointer)
Definition: mcxt.c:1056
void ConditionVariableCancelSleep(void)
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3366
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2314
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:166
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:676
#define ERROR
Definition: elog.h:43
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:94
static TimestampTz last_processing
Definition: walsender.c:157
void pq_startmsgread(void)
Definition: pqcomm.c:1211
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2292
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1765
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3860
bool FirstSnapshotSet
Definition: snapmgr.c:205
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
void WalSndSignals(void)
Definition: walsender.c:2969
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
#define FATAL
Definition: elog.h:52
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:759
Latch * latch
void(* WalSndSendDataCallback)(void)
Definition: walsender.c:221
int64 spillTxns
#define MAXPGPATH
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11172
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2236
void ReplicationSlotPersist(void)
Definition: slot.c:680
TransactionId effective_xmin
Definition: slot.h:128
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1041
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:192
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2155
#define DEBUG2
Definition: elog.h:24
bool XLogBackgroundFlush(void)
Definition: xlog.c:2994
bool list_member_int(const List *list, int datum)
Definition: list.c:655
Definition: latch.h:110
Definition: dest.h:88
#define TLHistoryFileName(fname, tli)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
WalSndState state
NodeTag type
Definition: nodes.h:527
static char * buf
Definition: pg_test_fsync.c:67
void WalSndWaitStopping(void)
Definition: walsender.c:3088
static bool streamingDoneSending
Definition: walsender.c:174
uint64 XLogSegNo
Definition: xlogdefs.h:41
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
bool WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALOpenSegment *seg, WALSegmentContext *segcxt, WALSegmentOpen openSegment, WALReadError *errinfo)
Definition: xlogreader.c:1033
int errdetail(const char *fmt,...)
Definition: elog.c:955
XLogSegNo ws_segno
Definition: xlogreader.h:38
#define COMMERROR
Definition: elog.h:30
int errcode_for_file_access(void)
Definition: elog.c:631
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:541
#define SIGHUP
Definition: win32_port.h:154
TransactionId catalog_xmin
Definition: slot.h:69
#define pq_flush_if_writable()
Definition: libpq.h:40
TimeOffset time
Definition: timestamp.h:45
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3331
XLogRecPtr currTLIValidUntil
Definition: xlogreader.h:192
#define InvalidTransactionId
Definition: transam.h:31
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:283
void PostgresSigHupHandler(SIGNAL_ARGS)
Definition: postgres.c:2872
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:369
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1202
static XLogRecPtr logical_startptr
Definition: walsender.c:193
unsigned int uint32
Definition: c.h:359
void ReplicationSlotRelease(void)
Definition: slot.c:424
static WALOpenSegment * sendSeg
Definition: walsender.c:131
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1342
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
TransactionId xmin
Definition: slot.h:61
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
void InitWalSender(void)
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:181
#define SlotIsLogical(slot)
Definition: slot.h:154
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1699
void SyncRepInitConfig(void)
Definition: syncrep.c:398
#define ereport(elevel, rest)
Definition: elog.h:141
static void UpdateSpillStats(LogicalDecodingContext *ctx)
Definition: walsender.c:3587
static bool WalSndCaughtUp
Definition: walsender.c:178
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
MemoryContext TopMemoryContext
Definition: mcxt.c:44
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3424
XLogRecPtr lsn
Definition: walsender.c:198
int replication_yyparse(void)
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:246
Definition: guc.h:72
XLogRecPtr last_lsn
Definition: walsender.c:208
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:45
static int WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt, TimeLineID *tli_p)
Definition: walsender.c:2387
int32 month
Definition: timestamp.h:48
volatile sig_atomic_t ConfigReloadPending
Definition: globals.c:35
int max_wal_senders
Definition: walsender.c:120
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2182
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3385
int CloseTransientFile(int fd)
Definition: fd.c:2469
#define SIG_IGN
Definition: win32_port.h:151
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1175
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1273
static StringInfoData reply_message
Definition: walsender.c:153
#define MAXFNAMELEN
TimeLineID nextTLI
Definition: xlogreader.h:198
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
int64 TimeOffset
Definition: timestamp.h:40
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1323
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2155
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
#define InvalidBackendId
Definition: backendid.h:23
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:227
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3489
void WalSndErrorCleanup(void)
Definition: walsender.c:299
static volatile sig_atomic_t replication_active
Definition: walsender.c:190
uintptr_t Datum
Definition: postgres.h:367
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3145
XLogRecPtr sentPtr
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
bool am_db_walsender
Definition: walsender.c:117
TransactionId effective_catalog_xmin
Definition: slot.h:129
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1086
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:370
Oid MyDatabaseId
Definition: globals.c:85
static void WalSndShutdown(void)
Definition: walsender.c:225
static TimeLineID receiveTLI
Definition: xlog.c:209
int work_mem
Definition: globals.c:121
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:563
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
WalSnd * MyWalSnd
Definition: walsender.c:111
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:839
static XLogRecPtr sentPtr
Definition: walsender.c:149
int log_min_messages
Definition: guc.c:514
void pq_endmsgread(void)
Definition: pqcomm.c:1235
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:187
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
int allowedModes
Definition: execnodes.h:302
void WalSndInitStopping(void)
Definition: walsender.c:3062
TimeLineID currTLI
Definition: xlogreader.h:182
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4924
TimeOffset applyLag
struct SnapBuild * snapshot_builder
Definition: logical.h:43
int errmsg_internal(const char *fmt,...)
Definition: elog.c:909
SetFunctionReturnMode returnMode
Definition: execnodes.h:304
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2105
#define SIG_DFL
Definition: win32_port.h:149
#define SIGNAL_ARGS
Definition: c.h:1288
static TimeLineID sendTimeLine
Definition: walsender.c:140
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:739
#define lfirst(lc)
Definition: pg_list.h:190
void WalSndSetState(WalSndState state)
Definition: walsender.c:3126
int sync_standby_priority
Definition: regguts.h:298
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:508
static void XLogSendLogical(void)
Definition: walsender.c:2772
XLogRecPtr restart_lsn
Definition: slot.h:72
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:211
void StartTransactionCommand(void)
Definition: xact.c:2797
bool needreload
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1797
size_t Size
Definition: c.h:467
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1318
char * dbname
Definition: streamutil.c:50
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:203
int XactIsoLevel
Definition: xact.c:74
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:141
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:143
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1001
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:230
WalSndState
Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
Definition: walsender.c:3180
TimeLineID ws_tli
Definition: xlogreader.h:39
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:875
static bool streamingDoneReceiving
Definition: walsender.c:175
Tuplestorestate * setResult
Definition: execnodes.h:307
#define pg_attribute_noreturn()
Definition: c.h:147
static StringInfoData tmpbuf
Definition: walsender.c:154
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:981
#define XLogFilePath(path, tli, logSegNo, wal_segsz_bytes)
bool IsSubTransaction(void)
Definition: xact.c:4708
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:539
static Datum values[MAXATTR]
Definition: bootstrap.c:167
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:300
bool log_replication_commands
Definition: walsender.c:124
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4799
#define Int32GetDatum(X)
Definition: postgres.h:479
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:517
char * application_name
Definition: guc.c:537
TupleDesc setDesc
Definition: execnodes.h:308
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
void * palloc(Size size)
Definition: mcxt.c:949
int errmsg(const char *fmt,...)
Definition: elog.c:822
XLogReaderState * reader
Definition: logical.h:41
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
CRSSnapshotAction
Definition: walsender.h:20
#define elog(elevel,...)
Definition: elog.h:228
StringInfo out
Definition: logical.h:70
static const unsigned __int64 epoch
Definition: gettimeofday.c:34
int i
void WalSndShmemInit(void)
Definition: walsender.c:3001
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
#define NameStr(name)
Definition: c.h:616
TimeLineID timeline
Definition: replnodes.h:84
#define CStringGetTextDatum(s)
Definition: builtins.h:83
void * arg
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:1902
struct Latch * MyLatch
Definition: globals.c:54
void ReplicationSlotCleanup(void)
Definition: slot.c:479
#define PG_FUNCTION_ARGS
Definition: fmgr.h:188
char * defname
Definition: parsenodes.h:730
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2882
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:766
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:146
TimestampTz replyTime
void LWLockReleaseAll(void)
Definition: lwlock.c:1825
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3164
slock_t mutex
Definition: slot.h:105
#define close(a)
Definition: win32.h:12
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:260
CommandDest whereToSendOutput
Definition: postgres.c:90
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1657
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
static bool waiting_for_ping_response
Definition: walsender.c:166
void pq_puttextmessage(char msgtype, const char *str)
Definition: pqformat.c:369
XLogRecPtr apply
static void IdentifySystem(void)
Definition: walsender.c:346
Definition: pg_list.h:50
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:75
#define snprintf
Definition: port.h:192
#define WL_LATCH_SET
Definition: latch.h:124
int16 AttrNumber
Definition: attnum.h:21
void quickdie(SIGNAL_ARGS)
Definition: postgres.c:2734
#define UINT64_FORMAT
Definition: c.h:402
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:702
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:426
static TimestampTz last_reply_timestamp
Definition: walsender.c:163
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1630
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547
#define MAX_SEND_SIZE
Definition: walsender.c:105
#define die(msg)
Definition: pg_test_fsync.c:96
#define read(a, b, c)
Definition: win32.h:13
static bool sendTimeLineIsHistoric
Definition: walsender.c:142
#define offsetof(type, field)
Definition: c.h:662
void WalSndWakeup(void)
Definition: walsender.c:3033
void ReplicationSlotMarkDirty(void)
Definition: slot.c:663
bool am_cascading_walsender
Definition: walsender.c:115
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:1982
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:692
static XLogRecPtr startptr
Definition: basebackup.c:117
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1292
void replication_scanner_init(const char *query_string)
#define TLHistoryFilePath(path, tli)
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1743
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)