PostgreSQL Source Code  git master
walsender.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * walsender.c
4  *
5  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6  * care of sending XLOG from the primary server to a single recipient.
7  * (Note that there can be more than one walsender process concurrently.)
8  * It is started by the postmaster when the walreceiver of a standby server
9  * connects to the primary server and requests XLOG streaming replication.
10  *
11  * A walsender is similar to a regular backend, ie. there is a one-to-one
12  * relationship between a connection and a walsender process, but instead
13  * of processing SQL queries, it understands a small set of special
14  * replication-mode commands. The START_REPLICATION command begins streaming
15  * WAL to the client. While streaming, the walsender keeps reading XLOG
16  * records from the disk and sends them to the standby server over the
17  * COPY protocol, until either side ends the replication by exiting COPY
18  * mode (or until the connection is closed).
19  *
20  * Normal termination is by SIGTERM, which instructs the walsender to
21  * close the connection and exit(0) at the next convenient moment. Emergency
22  * termination is by SIGQUIT; like any backend, the walsender will simply
23  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24  * are treated as not a crash but approximately normal termination;
25  * the walsender will exit quickly without sending any more XLOG records.
26  *
27  * If the server is shut down, checkpointer sends us
28  * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If
29  * the backend is idle or runs an SQL query this causes the backend to
30  * shutdown, if logical replication is in progress all existing WAL records
31  * are processed followed by a shutdown. Otherwise this causes the walsender
32  * to switch to the "stopping" state. In this state, the walsender will reject
33  * any further replication commands. The checkpointer begins the shutdown
34  * checkpoint once all walsenders are confirmed as stopping. When the shutdown
35  * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
36  * walsender to send any outstanding WAL, including the shutdown checkpoint
37  * record, wait for it to be replicated to the standby, and then exit.
38  *
39  *
40  * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
41  *
42  * IDENTIFICATION
43  * src/backend/replication/walsender.c
44  *
45  *-------------------------------------------------------------------------
46  */
47 #include "postgres.h"
48 
49 #include <signal.h>
50 #include <unistd.h>
51 
52 #include "access/printtup.h"
53 #include "access/timeline.h"
54 #include "access/transam.h"
55 #include "access/xact.h"
56 #include "access/xlog_internal.h"
57 #include "access/xlogreader.h"
58 #include "access/xlogutils.h"
59 #include "catalog/pg_authid.h"
60 #include "catalog/pg_type.h"
61 #include "commands/dbcommands.h"
62 #include "commands/defrem.h"
63 #include "funcapi.h"
64 #include "libpq/libpq.h"
65 #include "libpq/pqformat.h"
66 #include "miscadmin.h"
67 #include "nodes/replnodes.h"
68 #include "pgstat.h"
69 #include "postmaster/interrupt.h"
70 #include "replication/basebackup.h"
71 #include "replication/decode.h"
72 #include "replication/logical.h"
73 #include "replication/slot.h"
74 #include "replication/snapbuild.h"
75 #include "replication/syncrep.h"
77 #include "replication/walsender.h"
80 #include "storage/fd.h"
81 #include "storage/ipc.h"
82 #include "storage/pmsignal.h"
83 #include "storage/proc.h"
84 #include "storage/procarray.h"
85 #include "tcop/dest.h"
86 #include "tcop/tcopprot.h"
87 #include "utils/acl.h"
88 #include "utils/builtins.h"
89 #include "utils/guc.h"
90 #include "utils/memutils.h"
91 #include "utils/pg_lsn.h"
92 #include "utils/portal.h"
93 #include "utils/ps_status.h"
94 #include "utils/timeout.h"
95 #include "utils/timestamp.h"
96 
97 /*
98  * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
99  *
100  * We don't have a good idea of what a good value would be; there's some
101  * overhead per message in both walsender and walreceiver, but on the other
102  * hand sending large batches makes walsender less responsive to signals
103  * because signals are checked only between messages. 128kB (with
104  * default 8k blocks) seems like a reasonable guess for now.
105  */
106 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
107 
108 /* Array of WalSnds in shared memory */
110 
111 /* My slot in the shared memory array */
112 WalSnd *MyWalSnd = NULL;
113 
114 /* Global state */
115 bool am_walsender = false; /* Am I a walsender process? */
116 bool am_cascading_walsender = false; /* Am I cascading WAL to another
117  * standby? */
118 bool am_db_walsender = false; /* Connected to a database? */
119 
120 /* User-settable parameters for walsender */
121 int max_wal_senders = 0; /* the maximum number of concurrent
122  * walsenders */
123 int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
124  * data message */
126 
127 /*
128  * State for WalSndWakeupRequest
129  */
130 bool wake_wal_senders = false;
131 
132 /*
133  * xlogreader used for replication. Note that a WAL sender doing physical
134  * replication does not need xlogreader to read WAL, but it needs one to
135  * keep a state of its work.
136  */
138 
139 /*
140  * These variables keep track of the state of the timeline we're currently
141  * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
142  * the timeline is not the latest timeline on this server, and the server's
143  * history forked off from that timeline at sendTimeLineValidUpto.
144  */
147 static bool sendTimeLineIsHistoric = false;
149 
150 /*
151  * How far have we sent WAL already? This is also advertised in
152  * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
153  */
155 
156 /* Buffers for constructing outgoing messages and processing reply messages. */
160 
161 /* Timestamp of last ProcessRepliesIfAny(). */
163 
164 /*
165  * Timestamp of last ProcessRepliesIfAny() that saw a reply from the
166  * standby. Set to 0 if wal_sender_timeout doesn't need to be active.
167  */
169 
170 /* Have we sent a heartbeat message asking for reply, since last reply? */
171 static bool waiting_for_ping_response = false;
172 
173 /*
174  * While streaming WAL in Copy mode, streamingDoneSending is set to true
175  * after we have sent CopyDone. We should not send any more CopyData messages
176  * after that. streamingDoneReceiving is set to true when we receive CopyDone
177  * from the other end. When both become true, it's time to exit Copy mode.
178  */
181 
182 /* Are we there yet? */
183 static bool WalSndCaughtUp = false;
184 
185 /* Flags set by signal handlers for later service in main loop */
186 static volatile sig_atomic_t got_SIGUSR2 = false;
187 static volatile sig_atomic_t got_STOPPING = false;
188 
189 /*
190  * This is set while we are streaming. When not set
191  * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
192  * the main loop is responsible for checking got_STOPPING and terminating when
193  * it's set (after streaming any remaining WAL).
194  */
195 static volatile sig_atomic_t replication_active = false;
196 
198 
199 /* A sample associating a WAL location with the time it was written. */
200 typedef struct
201 {
204 } WalTimeSample;
205 
206 /* The size of our buffer of time samples. */
207 #define LAG_TRACKER_BUFFER_SIZE 8192
208 
209 /* A mechanism for tracking replication lag. */
210 typedef struct
211 {
215  int read_heads[NUM_SYNC_REP_WAIT_MODE];
217 } LagTracker;
218 
220 
221 /* Signal handlers */
223 
224 /* Prototypes for private functions */
225 typedef void (*WalSndSendDataCallback) (void);
226 static void WalSndLoop(WalSndSendDataCallback send_data);
227 static void InitWalSenderSlot(void);
228 static void WalSndKill(int code, Datum arg);
230 static void XLogSendPhysical(void);
231 static void XLogSendLogical(void);
232 static void WalSndDone(WalSndSendDataCallback send_data);
233 static XLogRecPtr GetStandbyFlushRecPtr(void);
234 static void IdentifySystem(void);
238 static void StartReplication(StartReplicationCmd *cmd);
240 static void ProcessStandbyMessage(void);
241 static void ProcessStandbyReplyMessage(void);
242 static void ProcessStandbyHSFeedbackMessage(void);
243 static void ProcessRepliesIfAny(void);
244 static void WalSndKeepalive(bool requestReply);
245 static void WalSndKeepaliveIfNecessary(void);
246 static void WalSndCheckTimeOut(void);
248 static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
249 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
250 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
253 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
254 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
256 
257 static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
258  TimeLineID *tli_p);
259 
260 
261 /* Initialize walsender process before entering the main command loop */
262 void
263 InitWalSender(void)
264 {
266 
267  /* Create a per-walsender data structure in shared memory */
269 
270  /*
271  * We don't currently need any ResourceOwner in a walsender process, but
272  * if we did, we could call CreateAuxProcessResourceOwner here.
273  */
274 
275  /*
276  * Let postmaster know that we're a WAL sender. Once we've declared us as
277  * a WAL sender process, postmaster will let us outlive the bgwriter and
278  * kill us last in the shutdown sequence, so we get a chance to stream all
279  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
280  * there's no going back, and we mustn't write any WAL records after this.
281  */
284 
285  /* Initialize empty timestamp buffer for lag tracking. */
286  lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
287 }
288 
289 /*
290  * Clean up after an error.
291  *
292  * WAL sender processes don't use transactions like regular backends do.
293  * This function does any cleanup required after an error in a WAL sender
294  * process, similar to what transaction abort does in a regular backend.
295  */
296 void
298 {
302 
303  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
304  wal_segment_close(xlogreader);
305 
306  if (MyReplicationSlot != NULL)
308 
310 
311  replication_active = false;
312 
313  /*
314  * If there is a transaction in progress, it will clean up our
315  * ResourceOwner, but if a replication command set up a resource owner
316  * without a transaction, we've got to clean that up now.
317  */
319  WalSndResourceCleanup(false);
320 
321  if (got_STOPPING || got_SIGUSR2)
322  proc_exit(0);
323 
324  /* Revert back to startup state */
326 }
327 
328 /*
329  * Clean up any ResourceOwner we created.
330  */
331 void
332 WalSndResourceCleanup(bool isCommit)
333 {
334  ResourceOwner resowner;
335 
336  if (CurrentResourceOwner == NULL)
337  return;
338 
339  /*
340  * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
341  * in a local variable and clear it first.
342  */
343  resowner = CurrentResourceOwner;
344  CurrentResourceOwner = NULL;
345 
346  /* Now we can release resources and delete it. */
347  ResourceOwnerRelease(resowner,
348  RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
349  ResourceOwnerRelease(resowner,
350  RESOURCE_RELEASE_LOCKS, isCommit, true);
351  ResourceOwnerRelease(resowner,
352  RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
353  ResourceOwnerDelete(resowner);
354 }
355 
356 /*
357  * Handle a client's connection abort in an orderly manner.
358  */
359 static void
360 WalSndShutdown(void)
361 {
362  /*
363  * Reset whereToSendOutput to prevent ereport from attempting to send any
364  * more messages to the standby.
365  */
368 
369  proc_exit(0);
370  abort(); /* keep the compiler quiet */
371 }
372 
373 /*
374  * Handle the IDENTIFY_SYSTEM command.
375  */
376 static void
378 {
379  char sysid[32];
380  char xloc[MAXFNAMELEN];
381  XLogRecPtr logptr;
382  char *dbname = NULL;
384  TupOutputState *tstate;
385  TupleDesc tupdesc;
386  Datum values[4];
387  bool nulls[4];
388 
389  /*
390  * Reply with a result set with one row, four columns. First col is system
391  * ID, second is timeline ID, third is current xlog location and the
392  * fourth contains the database name if we are connected to one.
393  */
394 
395  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
397 
400  {
401  /* this also updates ThisTimeLineID */
402  logptr = GetStandbyFlushRecPtr();
403  }
404  else
405  logptr = GetFlushRecPtr();
406 
407  snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
408 
409  if (MyDatabaseId != InvalidOid)
410  {
412 
413  /* syscache access needs a transaction env. */
415  /* make dbname live outside TX context */
419  /* CommitTransactionCommand switches to TopMemoryContext */
421  }
422 
424  MemSet(nulls, false, sizeof(nulls));
425 
426  /* need a tuple descriptor representing four columns */
427  tupdesc = CreateTemplateTupleDesc(4);
428  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
429  TEXTOID, -1, 0);
430  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
431  INT4OID, -1, 0);
432  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
433  TEXTOID, -1, 0);
434  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
435  TEXTOID, -1, 0);
436 
437  /* prepare for projection of tuples */
438  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
439 
440  /* column 1: system identifier */
441  values[0] = CStringGetTextDatum(sysid);
442 
443  /* column 2: timeline */
444  values[1] = Int32GetDatum(ThisTimeLineID);
445 
446  /* column 3: wal location */
447  values[2] = CStringGetTextDatum(xloc);
448 
449  /* column 4: database name, or NULL if none */
450  if (dbname)
451  values[3] = CStringGetTextDatum(dbname);
452  else
453  nulls[3] = true;
454 
455  /* send it to dest */
456  do_tup_output(tstate, values, nulls);
457 
458  end_tup_output(tstate);
459 }
460 
461 /* Handle READ_REPLICATION_SLOT command */
462 static void
464 {
465 #define READ_REPLICATION_SLOT_COLS 3
466  ReplicationSlot *slot;
468  TupOutputState *tstate;
469  TupleDesc tupdesc;
471  bool nulls[READ_REPLICATION_SLOT_COLS];
472 
474  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
475  TEXTOID, -1, 0);
476  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
477  TEXTOID, -1, 0);
478  /* TimeLineID is unsigned, so int4 is not wide enough. */
479  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
480  INT8OID, -1, 0);
481 
482  MemSet(values, 0, READ_REPLICATION_SLOT_COLS * sizeof(Datum));
483  MemSet(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
484 
485  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
486  slot = SearchNamedReplicationSlot(cmd->slotname, false);
487  if (slot == NULL || !slot->in_use)
488  {
489  LWLockRelease(ReplicationSlotControlLock);
490  }
491  else
492  {
493  ReplicationSlot slot_contents;
494  int i = 0;
495 
496  /* Copy slot contents while holding spinlock */
497  SpinLockAcquire(&slot->mutex);
498  slot_contents = *slot;
499  SpinLockRelease(&slot->mutex);
500  LWLockRelease(ReplicationSlotControlLock);
501 
502  if (OidIsValid(slot_contents.data.database))
503  ereport(ERROR,
504  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
505  errmsg("cannot use \"%s\" with logical replication slot \"%s\"",
506  "READ_REPLICATION_SLOT",
507  NameStr(slot_contents.data.name)));
508 
509  /* slot type */
510  values[i] = CStringGetTextDatum("physical");
511  nulls[i] = false;
512  i++;
513 
514  /* start LSN */
515  if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
516  {
517  char xloc[64];
518 
519  snprintf(xloc, sizeof(xloc), "%X/%X",
520  LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
521  values[i] = CStringGetTextDatum(xloc);
522  nulls[i] = false;
523  }
524  i++;
525 
526  /* timeline this WAL was produced on */
527  if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
528  {
529  TimeLineID slots_position_timeline;
530  TimeLineID current_timeline;
531  List *timeline_history = NIL;
532 
533  /*
534  * While in recovery, use as timeline the currently-replaying one
535  * to get the LSN position's history.
536  */
537  if (RecoveryInProgress())
538  (void) GetXLogReplayRecPtr(&current_timeline);
539  else
540  current_timeline = ThisTimeLineID;
541 
542  timeline_history = readTimeLineHistory(current_timeline);
543  slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
544  timeline_history);
545  values[i] = Int64GetDatum((int64) slots_position_timeline);
546  nulls[i] = false;
547  }
548  i++;
549 
551  }
552 
554  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
555  do_tup_output(tstate, values, nulls);
556  end_tup_output(tstate);
557 }
558 
559 
560 /*
561  * Handle TIMELINE_HISTORY command.
562  */
563 static void
565 {
567  char histfname[MAXFNAMELEN];
568  char path[MAXPGPATH];
569  int fd;
570  off_t histfilelen;
571  off_t bytesleft;
572  Size len;
573 
574  /*
575  * Reply with a result set with one row, and two columns. The first col is
576  * the name of the history file, 2nd is the contents.
577  */
578 
579  TLHistoryFileName(histfname, cmd->timeline);
580  TLHistoryFilePath(path, cmd->timeline);
581 
582  /* Send a RowDescription message */
583  pq_beginmessage(&buf, 'T');
584  pq_sendint16(&buf, 2); /* 2 fields */
585 
586  /* first field */
587  pq_sendstring(&buf, "filename"); /* col name */
588  pq_sendint32(&buf, 0); /* table oid */
589  pq_sendint16(&buf, 0); /* attnum */
590  pq_sendint32(&buf, TEXTOID); /* type oid */
591  pq_sendint16(&buf, -1); /* typlen */
592  pq_sendint32(&buf, 0); /* typmod */
593  pq_sendint16(&buf, 0); /* format code */
594 
595  /* second field */
596  pq_sendstring(&buf, "content"); /* col name */
597  pq_sendint32(&buf, 0); /* table oid */
598  pq_sendint16(&buf, 0); /* attnum */
599  pq_sendint32(&buf, TEXTOID); /* type oid */
600  pq_sendint16(&buf, -1); /* typlen */
601  pq_sendint32(&buf, 0); /* typmod */
602  pq_sendint16(&buf, 0); /* format code */
603  pq_endmessage(&buf);
604 
605  /* Send a DataRow message */
606  pq_beginmessage(&buf, 'D');
607  pq_sendint16(&buf, 2); /* # of columns */
608  len = strlen(histfname);
609  pq_sendint32(&buf, len); /* col1 len */
610  pq_sendbytes(&buf, histfname, len);
611 
612  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
613  if (fd < 0)
614  ereport(ERROR,
616  errmsg("could not open file \"%s\": %m", path)));
617 
618  /* Determine file length and send it to client */
619  histfilelen = lseek(fd, 0, SEEK_END);
620  if (histfilelen < 0)
621  ereport(ERROR,
623  errmsg("could not seek to end of file \"%s\": %m", path)));
624  if (lseek(fd, 0, SEEK_SET) != 0)
625  ereport(ERROR,
627  errmsg("could not seek to beginning of file \"%s\": %m", path)));
628 
629  pq_sendint32(&buf, histfilelen); /* col2 len */
630 
631  bytesleft = histfilelen;
632  while (bytesleft > 0)
633  {
634  PGAlignedBlock rbuf;
635  int nread;
636 
638  nread = read(fd, rbuf.data, sizeof(rbuf));
640  if (nread < 0)
641  ereport(ERROR,
643  errmsg("could not read file \"%s\": %m",
644  path)));
645  else if (nread == 0)
646  ereport(ERROR,
648  errmsg("could not read file \"%s\": read %d of %zu",
649  path, nread, (Size) bytesleft)));
650 
651  pq_sendbytes(&buf, rbuf.data, nread);
652  bytesleft -= nread;
653  }
654 
655  if (CloseTransientFile(fd) != 0)
656  ereport(ERROR,
658  errmsg("could not close file \"%s\": %m", path)));
659 
660  pq_endmessage(&buf);
661 }
662 
663 /*
664  * Handle START_REPLICATION command.
665  *
666  * At the moment, this never returns, but an ereport(ERROR) will take us back
667  * to the main loop.
668  */
669 static void
671 {
673  XLogRecPtr FlushPtr;
674 
675  /* create xlogreader for physical replication */
676  xlogreader =
678  XL_ROUTINE(.segment_open = WalSndSegmentOpen,
679  .segment_close = wal_segment_close),
680  NULL);
681 
682  if (!xlogreader)
683  ereport(ERROR,
684  (errcode(ERRCODE_OUT_OF_MEMORY),
685  errmsg("out of memory")));
686 
687  /*
688  * We assume here that we're logging enough information in the WAL for
689  * log-shipping, since this is checked in PostmasterMain().
690  *
691  * NOTE: wal_level can only change at shutdown, so in most cases it is
692  * difficult for there to be WAL data that we can still see that was
693  * written at wal_level='minimal'.
694  */
695 
696  if (cmd->slotname)
697  {
698  ReplicationSlotAcquire(cmd->slotname, true);
700  ereport(ERROR,
701  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
702  errmsg("cannot use a logical replication slot for physical replication")));
703 
704  /*
705  * We don't need to verify the slot's restart_lsn here; instead we
706  * rely on the caller requesting the starting point to use. If the
707  * WAL segment doesn't exist, we'll fail later.
708  */
709  }
710 
711  /*
712  * Select the timeline. If it was given explicitly by the client, use
713  * that. Otherwise use the timeline of the last replayed record, which is
714  * kept in ThisTimeLineID.
715  */
718  {
719  /* this also updates ThisTimeLineID */
720  FlushPtr = GetStandbyFlushRecPtr();
721  }
722  else
723  FlushPtr = GetFlushRecPtr();
724 
725  if (cmd->timeline != 0)
726  {
727  XLogRecPtr switchpoint;
728 
729  sendTimeLine = cmd->timeline;
731  {
732  sendTimeLineIsHistoric = false;
734  }
735  else
736  {
737  List *timeLineHistory;
738 
739  sendTimeLineIsHistoric = true;
740 
741  /*
742  * Check that the timeline the client requested exists, and the
743  * requested start location is on that timeline.
744  */
745  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
746  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
748  list_free_deep(timeLineHistory);
749 
750  /*
751  * Found the requested timeline in the history. Check that
752  * requested startpoint is on that timeline in our history.
753  *
754  * This is quite loose on purpose. We only check that we didn't
755  * fork off the requested timeline before the switchpoint. We
756  * don't check that we switched *to* it before the requested
757  * starting point. This is because the client can legitimately
758  * request to start replication from the beginning of the WAL
759  * segment that contains switchpoint, but on the new timeline, so
760  * that it doesn't end up with a partial segment. If you ask for
761  * too old a starting point, you'll get an error later when we
762  * fail to find the requested WAL segment in pg_wal.
763  *
764  * XXX: we could be more strict here and only allow a startpoint
765  * that's older than the switchpoint, if it's still in the same
766  * WAL segment.
767  */
768  if (!XLogRecPtrIsInvalid(switchpoint) &&
769  switchpoint < cmd->startpoint)
770  {
771  ereport(ERROR,
772  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
774  cmd->timeline),
775  errdetail("This server's history forked from timeline %u at %X/%X.",
776  cmd->timeline,
777  LSN_FORMAT_ARGS(switchpoint))));
778  }
779  sendTimeLineValidUpto = switchpoint;
780  }
781  }
782  else
783  {
786  sendTimeLineIsHistoric = false;
787  }
788 
790 
791  /* If there is nothing to stream, don't even enter COPY mode */
793  {
794  /*
795  * When we first start replication the standby will be behind the
796  * primary. For some applications, for example synchronous
797  * replication, it is important to have a clear state for this initial
798  * catchup mode, so we can trigger actions when we change streaming
799  * state later. We may stay in this state for a long time, which is
800  * exactly why we want to be able to monitor whether or not we are
801  * still here.
802  */
804 
805  /* Send a CopyBothResponse message, and start streaming */
806  pq_beginmessage(&buf, 'W');
807  pq_sendbyte(&buf, 0);
808  pq_sendint16(&buf, 0);
809  pq_endmessage(&buf);
810  pq_flush();
811 
812  /*
813  * Don't allow a request to stream from a future point in WAL that
814  * hasn't been flushed to disk in this server yet.
815  */
816  if (FlushPtr < cmd->startpoint)
817  {
818  ereport(ERROR,
819  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
821  LSN_FORMAT_ARGS(FlushPtr))));
822  }
823 
824  /* Start streaming from the requested point */
825  sentPtr = cmd->startpoint;
826 
827  /* Initialize shared memory status, too */
828  SpinLockAcquire(&MyWalSnd->mutex);
829  MyWalSnd->sentPtr = sentPtr;
830  SpinLockRelease(&MyWalSnd->mutex);
831 
833 
834  /* Main loop of walsender */
835  replication_active = true;
836 
838 
839  replication_active = false;
840  if (got_STOPPING)
841  proc_exit(0);
843 
845  }
846 
847  if (cmd->slotname)
849 
850  /*
851  * Copy is finished now. Send a single-row result set indicating the next
852  * timeline.
853  */
855  {
856  char startpos_str[8 + 1 + 8 + 1];
858  TupOutputState *tstate;
859  TupleDesc tupdesc;
860  Datum values[2];
861  bool nulls[2];
862 
863  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
865 
867  MemSet(nulls, false, sizeof(nulls));
868 
869  /*
870  * Need a tuple descriptor representing two columns. int8 may seem
871  * like a surprising data type for this, but in theory int4 would not
872  * be wide enough for this, as TimeLineID is unsigned.
873  */
874  tupdesc = CreateTemplateTupleDesc(2);
875  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
876  INT8OID, -1, 0);
877  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
878  TEXTOID, -1, 0);
879 
880  /* prepare for projection of tuple */
881  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
882 
883  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
884  values[1] = CStringGetTextDatum(startpos_str);
885 
886  /* send it to dest */
887  do_tup_output(tstate, values, nulls);
888 
889  end_tup_output(tstate);
890  }
891 
892  /* Send CommandComplete message */
893  EndReplicationCommand("START_STREAMING");
894 }
895 
896 /*
897  * XLogReaderRoutine->page_read callback for logical decoding contexts, as a
898  * walsender process.
899  *
900  * Inside the walsender we can do better than read_local_xlog_page,
901  * which has to do a plain sleep/busy loop, because the walsender's latch gets
902  * set every time WAL is flushed.
903  */
904 static int
906  XLogRecPtr targetRecPtr, char *cur_page)
907 {
908  XLogRecPtr flushptr;
909  int count;
910  WALReadError errinfo;
911  XLogSegNo segno;
912 
913  XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
915  sendTimeLine = state->currTLI;
917  sendTimeLineNextTLI = state->nextTLI;
918 
919  /* make sure we have enough WAL available */
920  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
921 
922  /* fail if not (implies we are going to shut down) */
923  if (flushptr < targetPagePtr + reqLen)
924  return -1;
925 
926  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
927  count = XLOG_BLCKSZ; /* more than one block available */
928  else
929  count = flushptr - targetPagePtr; /* part of the page available */
930 
931  /* now actually read the data, we know it's there */
932  if (!WALRead(state,
933  cur_page,
934  targetPagePtr,
935  XLOG_BLCKSZ,
936  state->seg.ws_tli, /* Pass the current TLI because only
937  * WalSndSegmentOpen controls whether new
938  * TLI is needed. */
939  &errinfo))
940  WALReadRaiseError(&errinfo);
941 
942  /*
943  * After reading into the buffer, check that what we read was valid. We do
944  * this after reading, because even though the segment was present when we
945  * opened it, it might get recycled or removed while we read it. The
946  * read() succeeds in that case, but the data we tried to read might
947  * already have been overwritten with new WAL records.
948  */
949  XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
950  CheckXLogRemoved(segno, state->seg.ws_tli);
951 
952  return count;
953 }
954 
955 /*
956  * Process extra options given to CREATE_REPLICATION_SLOT.
957  */
958 static void
960  bool *reserve_wal,
961  CRSSnapshotAction *snapshot_action,
962  bool *two_phase)
963 {
964  ListCell *lc;
965  bool snapshot_action_given = false;
966  bool reserve_wal_given = false;
967  bool two_phase_given = false;
968 
969  /* Parse options */
970  foreach(lc, cmd->options)
971  {
972  DefElem *defel = (DefElem *) lfirst(lc);
973 
974  if (strcmp(defel->defname, "snapshot") == 0)
975  {
976  char *action;
977 
978  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
979  ereport(ERROR,
980  (errcode(ERRCODE_SYNTAX_ERROR),
981  errmsg("conflicting or redundant options")));
982 
983  action = defGetString(defel);
984  snapshot_action_given = true;
985 
986  if (strcmp(action, "export") == 0)
987  *snapshot_action = CRS_EXPORT_SNAPSHOT;
988  else if (strcmp(action, "nothing") == 0)
989  *snapshot_action = CRS_NOEXPORT_SNAPSHOT;
990  else if (strcmp(action, "use") == 0)
991  *snapshot_action = CRS_USE_SNAPSHOT;
992  else
993  ereport(ERROR,
994  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
995  errmsg("unrecognized value for CREATE_REPLICATION_SLOT option \"%s\": \"%s\"",
996  defel->defname, action)));
997 
998  }
999  else if (strcmp(defel->defname, "reserve_wal") == 0)
1000  {
1001  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
1002  ereport(ERROR,
1003  (errcode(ERRCODE_SYNTAX_ERROR),
1004  errmsg("conflicting or redundant options")));
1005 
1006  reserve_wal_given = true;
1007  *reserve_wal = defGetBoolean(defel);
1008  }
1009  else if (strcmp(defel->defname, "two_phase") == 0)
1010  {
1011  if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1012  ereport(ERROR,
1013  (errcode(ERRCODE_SYNTAX_ERROR),
1014  errmsg("conflicting or redundant options")));
1015  two_phase_given = true;
1016  *two_phase = defGetBoolean(defel);
1017  }
1018  else
1019  elog(ERROR, "unrecognized option: %s", defel->defname);
1020  }
1021 }
1022 
1023 /*
1024  * Create a new replication slot.
1025  */
1026 static void
1028 {
1029  const char *snapshot_name = NULL;
1030  char xloc[MAXFNAMELEN];
1031  char *slot_name;
1032  bool reserve_wal = false;
1033  bool two_phase = false;
1034  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
1035  DestReceiver *dest;
1036  TupOutputState *tstate;
1037  TupleDesc tupdesc;
1038  Datum values[4];
1039  bool nulls[4];
1040 
1042 
1043  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase);
1044 
1045  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1046  {
1047  ReplicationSlotCreate(cmd->slotname, false,
1049  false);
1050  }
1051  else
1052  {
1054 
1055  /*
1056  * Initially create persistent slot as ephemeral - that allows us to
1057  * nicely handle errors during initialization because it'll get
1058  * dropped if this transaction fails. We'll make it persistent at the
1059  * end. Temporary slots can be created as temporary from beginning as
1060  * they get dropped on error as well.
1061  */
1062  ReplicationSlotCreate(cmd->slotname, true,
1064  two_phase);
1065  }
1066 
1067  if (cmd->kind == REPLICATION_KIND_LOGICAL)
1068  {
1070  bool need_full_snapshot = false;
1071 
1072  /*
1073  * Do options check early so that we can bail before calling the
1074  * DecodingContextFindStartpoint which can take long time.
1075  */
1076  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1077  {
1078  if (IsTransactionBlock())
1079  ereport(ERROR,
1080  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1081  (errmsg("%s must not be called inside a transaction",
1082  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
1083 
1084  need_full_snapshot = true;
1085  }
1086  else if (snapshot_action == CRS_USE_SNAPSHOT)
1087  {
1088  if (!IsTransactionBlock())
1089  ereport(ERROR,
1090  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1091  (errmsg("%s must be called inside a transaction",
1092  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1093 
1095  ereport(ERROR,
1096  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1097  (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
1098  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1099 
1100  if (FirstSnapshotSet)
1101  ereport(ERROR,
1102  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1103  (errmsg("%s must be called before any query",
1104  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1105 
1106  if (IsSubTransaction())
1107  ereport(ERROR,
1108  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1109  (errmsg("%s must not be called in a subtransaction",
1110  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1111 
1112  need_full_snapshot = true;
1113  }
1114 
1115  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
1117  XL_ROUTINE(.page_read = logical_read_xlog_page,
1118  .segment_open = WalSndSegmentOpen,
1119  .segment_close = wal_segment_close),
1122 
1123  /*
1124  * Signal that we don't need the timeout mechanism. We're just
1125  * creating the replication slot and don't yet accept feedback
1126  * messages or send keepalives. As we possibly need to wait for
1127  * further WAL the walsender would otherwise possibly be killed too
1128  * soon.
1129  */
1131 
1132  /* build initial snapshot, might take a while */
1134 
1135  /*
1136  * Export or use the snapshot if we've been asked to do so.
1137  *
1138  * NB. We will convert the snapbuild.c kind of snapshot to normal
1139  * snapshot when doing this.
1140  */
1141  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1142  {
1143  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
1144  }
1145  else if (snapshot_action == CRS_USE_SNAPSHOT)
1146  {
1147  Snapshot snap;
1148 
1151  }
1152 
1153  /* don't need the decoding context anymore */
1154  FreeDecodingContext(ctx);
1155 
1156  if (!cmd->temporary)
1158  }
1159  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
1160  {
1162 
1164 
1165  /* Write this slot to disk if it's a permanent one. */
1166  if (!cmd->temporary)
1168  }
1169 
1170  snprintf(xloc, sizeof(xloc), "%X/%X",
1172 
1174  MemSet(nulls, false, sizeof(nulls));
1175 
1176  /*----------
1177  * Need a tuple descriptor representing four columns:
1178  * - first field: the slot name
1179  * - second field: LSN at which we became consistent
1180  * - third field: exported snapshot's name
1181  * - fourth field: output plugin
1182  *----------
1183  */
1184  tupdesc = CreateTemplateTupleDesc(4);
1185  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1186  TEXTOID, -1, 0);
1187  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1188  TEXTOID, -1, 0);
1189  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1190  TEXTOID, -1, 0);
1191  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1192  TEXTOID, -1, 0);
1193 
1194  /* prepare for projection of tuples */
1195  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1196 
1197  /* slot_name */
1198  slot_name = NameStr(MyReplicationSlot->data.name);
1199  values[0] = CStringGetTextDatum(slot_name);
1200 
1201  /* consistent wal location */
1202  values[1] = CStringGetTextDatum(xloc);
1203 
1204  /* snapshot name, or NULL if none */
1205  if (snapshot_name != NULL)
1206  values[2] = CStringGetTextDatum(snapshot_name);
1207  else
1208  nulls[2] = true;
1209 
1210  /* plugin, or NULL if none */
1211  if (cmd->plugin != NULL)
1212  values[3] = CStringGetTextDatum(cmd->plugin);
1213  else
1214  nulls[3] = true;
1215 
1216  /* send it to dest */
1217  do_tup_output(tstate, values, nulls);
1218  end_tup_output(tstate);
1219 
1221 }
1222 
1223 /*
1224  * Get rid of a replication slot that is no longer wanted.
1225  */
1226 static void
1228 {
1229  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1230 }
1231 
1232 /*
1233  * Load previously initiated logical slot and prepare for sending data (via
1234  * WalSndLoop).
1235  */
1236 static void
1238 {
1240  QueryCompletion qc;
1241 
1242  /* make sure that our requirements are still fulfilled */
1244 
1246 
1247  ReplicationSlotAcquire(cmd->slotname, true);
1248 
1250  ereport(ERROR,
1251  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1252  errmsg("cannot read from logical replication slot \"%s\"",
1253  cmd->slotname),
1254  errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
1255 
1256  /*
1257  * Force a disconnect, so that the decoding code doesn't need to care
1258  * about an eventual switch from running in recovery, to running in a
1259  * normal environment. Client code is expected to handle reconnects.
1260  */
1262  {
1263  ereport(LOG,
1264  (errmsg("terminating walsender process after promotion")));
1265  got_STOPPING = true;
1266  }
1267 
1268  /*
1269  * Create our decoding context, making it start at the previously ack'ed
1270  * position.
1271  *
1272  * Do this before sending a CopyBothResponse message, so that any errors
1273  * are reported early.
1274  */
1275  logical_decoding_ctx =
1276  CreateDecodingContext(cmd->startpoint, cmd->options, false,
1277  XL_ROUTINE(.page_read = logical_read_xlog_page,
1278  .segment_open = WalSndSegmentOpen,
1279  .segment_close = wal_segment_close),
1282  xlogreader = logical_decoding_ctx->reader;
1283 
1285 
1286  /* Send a CopyBothResponse message, and start streaming */
1287  pq_beginmessage(&buf, 'W');
1288  pq_sendbyte(&buf, 0);
1289  pq_sendint16(&buf, 0);
1290  pq_endmessage(&buf);
1291  pq_flush();
1292 
1293  /* Start reading WAL from the oldest required WAL. */
1294  XLogBeginRead(logical_decoding_ctx->reader,
1296 
1297  /*
1298  * Report the location after which we'll send out further commits as the
1299  * current sentPtr.
1300  */
1302 
1303  /* Also update the sent position status in shared memory */
1304  SpinLockAcquire(&MyWalSnd->mutex);
1306  SpinLockRelease(&MyWalSnd->mutex);
1307 
1308  replication_active = true;
1309 
1311 
1312  /* Main loop of walsender */
1314 
1315  FreeDecodingContext(logical_decoding_ctx);
1317 
1318  replication_active = false;
1319  if (got_STOPPING)
1320  proc_exit(0);
1322 
1323  /* Get out of COPY mode (CommandComplete). */
1324  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
1325  EndCommand(&qc, DestRemote, false);
1326 }
1327 
1328 /*
1329  * LogicalDecodingContext 'prepare_write' callback.
1330  *
1331  * Prepare a write into a StringInfo.
1332  *
1333  * Don't do anything lasting in here, it's quite possible that nothing will be done
1334  * with the data.
1335  */
1336 static void
1338 {
1339  /* can't have sync rep confused by sending the same LSN several times */
1340  if (!last_write)
1341  lsn = InvalidXLogRecPtr;
1342 
1343  resetStringInfo(ctx->out);
1344 
1345  pq_sendbyte(ctx->out, 'w');
1346  pq_sendint64(ctx->out, lsn); /* dataStart */
1347  pq_sendint64(ctx->out, lsn); /* walEnd */
1348 
1349  /*
1350  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1351  * reserve space here.
1352  */
1353  pq_sendint64(ctx->out, 0); /* sendtime */
1354 }
1355 
1356 /*
1357  * LogicalDecodingContext 'write' callback.
1358  *
1359  * Actually write out data previously prepared by WalSndPrepareWrite out to
1360  * the network. Take as long as needed, but process replies from the other
1361  * side and check timeouts during that.
1362  */
1363 static void
1365  bool last_write)
1366 {
1367  TimestampTz now;
1368 
1369  /*
1370  * Fill the send timestamp last, so that it is taken as late as possible.
1371  * This is somewhat ugly, but the protocol is set as it's already used for
1372  * several releases by streaming physical replication.
1373  */
1374  resetStringInfo(&tmpbuf);
1375  now = GetCurrentTimestamp();
1376  pq_sendint64(&tmpbuf, now);
1377  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1378  tmpbuf.data, sizeof(int64));
1379 
1380  /* output previously gathered data in a CopyData packet */
1381  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1382 
1384 
1385  /* Try to flush pending output to the client */
1386  if (pq_flush_if_writable() != 0)
1387  WalSndShutdown();
1388 
1389  /* Try taking fast path unless we get too close to walsender timeout. */
1391  wal_sender_timeout / 2) &&
1392  !pq_is_send_pending())
1393  {
1394  return;
1395  }
1396 
1397  /* If we have pending write here, go to slow path */
1398  for (;;)
1399  {
1400  long sleeptime;
1401 
1402  /* Check for input from the client */
1404 
1405  /* die if timeout was reached */
1407 
1408  /* Send keepalive if the time has come */
1410 
1411  if (!pq_is_send_pending())
1412  break;
1413 
1415 
1416  /* Sleep until something happens or we time out */
1419 
1420  /* Clear any already-pending wakeups */
1422 
1424 
1425  /* Process any requests or signals received recently */
1426  if (ConfigReloadPending)
1427  {
1428  ConfigReloadPending = false;
1431  }
1432 
1433  /* Try to flush pending output to the client */
1434  if (pq_flush_if_writable() != 0)
1435  WalSndShutdown();
1436  }
1437 
1438  /* reactivate latch so WalSndLoop knows to continue */
1439  SetLatch(MyLatch);
1440 }
1441 
1442 /*
1443  * LogicalDecodingContext 'update_progress' callback.
1444  *
1445  * Write the current position to the lag tracker (see XLogSendPhysical).
1446  */
1447 static void
1449 {
1450  static TimestampTz sendTime = 0;
1452 
1453  /*
1454  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1455  * avoid flooding the lag tracker when we commit frequently.
1456  */
1457 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1458  if (!TimestampDifferenceExceeds(sendTime, now,
1460  return;
1461 
1462  LagTrackerWrite(lsn, now);
1463  sendTime = now;
1464 }
1465 
1466 /*
1467  * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
1468  *
1469  * Returns end LSN of flushed WAL. Normally this will be >= loc, but
1470  * if we detect a shutdown request (either from postmaster or client)
1471  * we will return early, so caller must always check.
1472  */
1473 static XLogRecPtr
1475 {
1476  int wakeEvents;
1477  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1478 
1479  /*
1480  * Fast path to avoid acquiring the spinlock in case we already know we
1481  * have enough WAL available. This is particularly interesting if we're
1482  * far behind.
1483  */
1484  if (RecentFlushPtr != InvalidXLogRecPtr &&
1485  loc <= RecentFlushPtr)
1486  return RecentFlushPtr;
1487 
1488  /* Get a more recent flush pointer. */
1489  if (!RecoveryInProgress())
1490  RecentFlushPtr = GetFlushRecPtr();
1491  else
1492  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1493 
1494  for (;;)
1495  {
1496  long sleeptime;
1497 
1498  /* Clear any already-pending wakeups */
1500 
1502 
1503  /* Process any requests or signals received recently */
1504  if (ConfigReloadPending)
1505  {
1506  ConfigReloadPending = false;
1509  }
1510 
1511  /* Check for input from the client */
1513 
1514  /*
1515  * If we're shutting down, trigger pending WAL to be written out,
1516  * otherwise we'd possibly end up waiting for WAL that never gets
1517  * written, because walwriter has shut down already.
1518  */
1519  if (got_STOPPING)
1521 
1522  /* Update our idea of the currently flushed position. */
1523  if (!RecoveryInProgress())
1524  RecentFlushPtr = GetFlushRecPtr();
1525  else
1526  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1527 
1528  /*
1529  * If postmaster asked us to stop, don't wait anymore.
1530  *
1531  * It's important to do this check after the recomputation of
1532  * RecentFlushPtr, so we can send all remaining data before shutting
1533  * down.
1534  */
1535  if (got_STOPPING)
1536  break;
1537 
1538  /*
1539  * We only send regular messages to the client for full decoded
1540  * transactions, but a synchronous replication and walsender shutdown
1541  * possibly are waiting for a later location. So, before sleeping, we
1542  * send a ping containing the flush location. If the receiver is
1543  * otherwise idle, this keepalive will trigger a reply. Processing the
1544  * reply will update these MyWalSnd locations.
1545  */
1546  if (MyWalSnd->flush < sentPtr &&
1547  MyWalSnd->write < sentPtr &&
1549  WalSndKeepalive(false);
1550 
1551  /* check whether we're done */
1552  if (loc <= RecentFlushPtr)
1553  break;
1554 
1555  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1556  WalSndCaughtUp = true;
1557 
1558  /*
1559  * Try to flush any pending output to the client.
1560  */
1561  if (pq_flush_if_writable() != 0)
1562  WalSndShutdown();
1563 
1564  /*
1565  * If we have received CopyDone from the client, sent CopyDone
1566  * ourselves, and the output buffer is empty, it's time to exit
1567  * streaming, so fail the current WAL fetch request.
1568  */
1570  !pq_is_send_pending())
1571  break;
1572 
1573  /* die if timeout was reached */
1575 
1576  /* Send keepalive if the time has come */
1578 
1579  /*
1580  * Sleep until something happens or we time out. Also wait for the
1581  * socket becoming writable, if there's still pending output.
1582  * Otherwise we might sit on sendable output data while waiting for
1583  * new WAL to be generated. (But if we have nothing to send, we don't
1584  * want to wake on socket-writable.)
1585  */
1587 
1588  wakeEvents = WL_SOCKET_READABLE;
1589 
1590  if (pq_is_send_pending())
1591  wakeEvents |= WL_SOCKET_WRITEABLE;
1592 
1593  WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL);
1594  }
1595 
1596  /* reactivate latch so WalSndLoop knows to continue */
1597  SetLatch(MyLatch);
1598  return RecentFlushPtr;
1599 }
1600 
1601 /*
1602  * Execute an incoming replication command.
1603  *
1604  * Returns true if the cmd_string was recognized as WalSender command, false
1605  * if not.
1606  */
1607 bool
1608 exec_replication_command(const char *cmd_string)
1609 {
1610  int parse_rc;
1611  Node *cmd_node;
1612  const char *cmdtag;
1613  MemoryContext cmd_context;
1614  MemoryContext old_context;
1615 
1616  /*
1617  * If WAL sender has been told that shutdown is getting close, switch its
1618  * status accordingly to handle the next replication commands correctly.
1619  */
1620  if (got_STOPPING)
1622 
1623  /*
1624  * Throw error if in stopping mode. We need prevent commands that could
1625  * generate WAL while the shutdown checkpoint is being written. To be
1626  * safe, we just prohibit all new commands.
1627  */
1628  if (MyWalSnd->state == WALSNDSTATE_STOPPING)
1629  ereport(ERROR,
1630  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1631 
1632  /*
1633  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1634  * command arrives. Clean up the old stuff if there's anything.
1635  */
1637 
1639 
1640  /*
1641  * Parse the command.
1642  */
1644  "Replication command context",
1646  old_context = MemoryContextSwitchTo(cmd_context);
1647 
1648  replication_scanner_init(cmd_string);
1649  parse_rc = replication_yyparse();
1650  if (parse_rc != 0)
1651  ereport(ERROR,
1652  (errcode(ERRCODE_SYNTAX_ERROR),
1653  errmsg_internal("replication command parser returned %d",
1654  parse_rc)));
1656 
1657  cmd_node = replication_parse_result;
1658 
1659  /*
1660  * If it's a SQL command, just clean up our mess and return false; the
1661  * caller will take care of executing it.
1662  */
1663  if (IsA(cmd_node, SQLCmd))
1664  {
1665  if (MyDatabaseId == InvalidOid)
1666  ereport(ERROR,
1667  (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1668 
1669  MemoryContextSwitchTo(old_context);
1670  MemoryContextDelete(cmd_context);
1671 
1672  /* Tell the caller that this wasn't a WalSender command. */
1673  return false;
1674  }
1675 
1676  /*
1677  * Report query to various monitoring facilities. For this purpose, we
1678  * report replication commands just like SQL commands.
1679  */
1680  debug_query_string = cmd_string;
1681 
1683 
1684  /*
1685  * Log replication command if log_replication_commands is enabled. Even
1686  * when it's disabled, log the command with DEBUG1 level for backward
1687  * compatibility.
1688  */
1690  (errmsg("received replication command: %s", cmd_string)));
1691 
1692  /*
1693  * Disallow replication commands in aborted transaction blocks.
1694  */
1696  ereport(ERROR,
1697  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1698  errmsg("current transaction is aborted, "
1699  "commands ignored until end of transaction block")));
1700 
1702 
1703  /*
1704  * Allocate buffers that will be used for each outgoing and incoming
1705  * message. We do this just once per command to reduce palloc overhead.
1706  */
1707  initStringInfo(&output_message);
1708  initStringInfo(&reply_message);
1709  initStringInfo(&tmpbuf);
1710 
1711  switch (cmd_node->type)
1712  {
1713  case T_IdentifySystemCmd:
1714  cmdtag = "IDENTIFY_SYSTEM";
1715  set_ps_display(cmdtag);
1716  IdentifySystem();
1717  EndReplicationCommand(cmdtag);
1718  break;
1719 
1721  cmdtag = "READ_REPLICATION_SLOT";
1722  set_ps_display(cmdtag);
1724  EndReplicationCommand(cmdtag);
1725  break;
1726 
1727  case T_BaseBackupCmd:
1728  cmdtag = "BASE_BACKUP";
1729  set_ps_display(cmdtag);
1730  PreventInTransactionBlock(true, cmdtag);
1731  SendBaseBackup((BaseBackupCmd *) cmd_node);
1732  EndReplicationCommand(cmdtag);
1733  break;
1734 
1736  cmdtag = "CREATE_REPLICATION_SLOT";
1737  set_ps_display(cmdtag);
1739  EndReplicationCommand(cmdtag);
1740  break;
1741 
1743  cmdtag = "DROP_REPLICATION_SLOT";
1744  set_ps_display(cmdtag);
1746  EndReplicationCommand(cmdtag);
1747  break;
1748 
1749  case T_StartReplicationCmd:
1750  {
1751  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1752 
1753  cmdtag = "START_REPLICATION";
1754  set_ps_display(cmdtag);
1755  PreventInTransactionBlock(true, cmdtag);
1756 
1757  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1758  StartReplication(cmd);
1759  else
1761 
1762  /* dupe, but necessary per libpqrcv_endstreaming */
1763  EndReplicationCommand(cmdtag);
1764 
1765  Assert(xlogreader != NULL);
1766  break;
1767  }
1768 
1769  case T_TimeLineHistoryCmd:
1770  cmdtag = "TIMELINE_HISTORY";
1771  set_ps_display(cmdtag);
1772  PreventInTransactionBlock(true, cmdtag);
1774  EndReplicationCommand(cmdtag);
1775  break;
1776 
1777  case T_VariableShowStmt:
1778  {
1780  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1781 
1782  cmdtag = "SHOW";
1783  set_ps_display(cmdtag);
1784 
1785  /* syscache access needs a transaction environment */
1787  GetPGVariable(n->name, dest);
1789  EndReplicationCommand(cmdtag);
1790  }
1791  break;
1792 
1793  default:
1794  elog(ERROR, "unrecognized replication command node tag: %u",
1795  cmd_node->type);
1796  }
1797 
1798  /* done */
1799  MemoryContextSwitchTo(old_context);
1800  MemoryContextDelete(cmd_context);
1801 
1802  /*
1803  * We need not update ps display or pg_stat_activity, because PostgresMain
1804  * will reset those to "idle". But we must reset debug_query_string to
1805  * ensure it doesn't become a dangling pointer.
1806  */
1807  debug_query_string = NULL;
1808 
1809  return true;
1810 }
1811 
1812 /*
1813  * Process any incoming messages while streaming. Also checks if the remote
1814  * end has closed the connection.
1815  */
1816 static void
1818 {
1819  unsigned char firstchar;
1820  int maxmsglen;
1821  int r;
1822  bool received = false;
1823 
1825 
1826  /*
1827  * If we already received a CopyDone from the frontend, any subsequent
1828  * message is the beginning of a new command, and should be processed in
1829  * the main processing loop.
1830  */
1831  while (!streamingDoneReceiving)
1832  {
1833  pq_startmsgread();
1834  r = pq_getbyte_if_available(&firstchar);
1835  if (r < 0)
1836  {
1837  /* unexpected error or EOF */
1839  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1840  errmsg("unexpected EOF on standby connection")));
1841  proc_exit(0);
1842  }
1843  if (r == 0)
1844  {
1845  /* no data available without blocking */
1846  pq_endmsgread();
1847  break;
1848  }
1849 
1850  /* Validate message type and set packet size limit */
1851  switch (firstchar)
1852  {
1853  case 'd':
1854  maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
1855  break;
1856  case 'c':
1857  case 'X':
1858  maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
1859  break;
1860  default:
1861  ereport(FATAL,
1862  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1863  errmsg("invalid standby message type \"%c\"",
1864  firstchar)));
1865  maxmsglen = 0; /* keep compiler quiet */
1866  break;
1867  }
1868 
1869  /* Read the message contents */
1870  resetStringInfo(&reply_message);
1871  if (pq_getmessage(&reply_message, maxmsglen))
1872  {
1874  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1875  errmsg("unexpected EOF on standby connection")));
1876  proc_exit(0);
1877  }
1878 
1879  /* ... and process it */
1880  switch (firstchar)
1881  {
1882  /*
1883  * 'd' means a standby reply wrapped in a CopyData packet.
1884  */
1885  case 'd':
1887  received = true;
1888  break;
1889 
1890  /*
1891  * CopyDone means the standby requested to finish streaming.
1892  * Reply with CopyDone, if we had not sent that already.
1893  */
1894  case 'c':
1895  if (!streamingDoneSending)
1896  {
1897  pq_putmessage_noblock('c', NULL, 0);
1898  streamingDoneSending = true;
1899  }
1900 
1901  streamingDoneReceiving = true;
1902  received = true;
1903  break;
1904 
1905  /*
1906  * 'X' means that the standby is closing down the socket.
1907  */
1908  case 'X':
1909  proc_exit(0);
1910 
1911  default:
1912  Assert(false); /* NOT REACHED */
1913  }
1914  }
1915 
1916  /*
1917  * Save the last reply timestamp if we've received at least one reply.
1918  */
1919  if (received)
1920  {
1922  waiting_for_ping_response = false;
1923  }
1924 }
1925 
1926 /*
1927  * Process a status update message received from standby.
1928  */
1929 static void
1931 {
1932  char msgtype;
1933 
1934  /*
1935  * Check message type from the first byte.
1936  */
1937  msgtype = pq_getmsgbyte(&reply_message);
1938 
1939  switch (msgtype)
1940  {
1941  case 'r':
1943  break;
1944 
1945  case 'h':
1947  break;
1948 
1949  default:
1951  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1952  errmsg("unexpected message type \"%c\"", msgtype)));
1953  proc_exit(0);
1954  }
1955 }
1956 
1957 /*
1958  * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
1959  */
1960 static void
1962 {
1963  bool changed = false;
1965 
1966  Assert(lsn != InvalidXLogRecPtr);
1967  SpinLockAcquire(&slot->mutex);
1968  if (slot->data.restart_lsn != lsn)
1969  {
1970  changed = true;
1971  slot->data.restart_lsn = lsn;
1972  }
1973  SpinLockRelease(&slot->mutex);
1974 
1975  if (changed)
1976  {
1979  }
1980 
1981  /*
1982  * One could argue that the slot should be saved to disk now, but that'd
1983  * be energy wasted - the worst lost information can do here is give us
1984  * wrong information in a statistics view - we'll just potentially be more
1985  * conservative in removing files.
1986  */
1987 }
1988 
1989 /*
1990  * Regular reply from standby advising of WAL locations on standby server.
1991  */
1992 static void
1994 {
1995  XLogRecPtr writePtr,
1996  flushPtr,
1997  applyPtr;
1998  bool replyRequested;
1999  TimeOffset writeLag,
2000  flushLag,
2001  applyLag;
2002  bool clearLagTimes;
2003  TimestampTz now;
2004  TimestampTz replyTime;
2005 
2006  static bool fullyAppliedLastTime = false;
2007 
2008  /* the caller already consumed the msgtype byte */
2009  writePtr = pq_getmsgint64(&reply_message);
2010  flushPtr = pq_getmsgint64(&reply_message);
2011  applyPtr = pq_getmsgint64(&reply_message);
2012  replyTime = pq_getmsgint64(&reply_message);
2013  replyRequested = pq_getmsgbyte(&reply_message);
2014 
2016  {
2017  char *replyTimeStr;
2018 
2019  /* Copy because timestamptz_to_str returns a static buffer */
2020  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2021 
2022  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
2023  LSN_FORMAT_ARGS(writePtr),
2024  LSN_FORMAT_ARGS(flushPtr),
2025  LSN_FORMAT_ARGS(applyPtr),
2026  replyRequested ? " (reply requested)" : "",
2027  replyTimeStr);
2028 
2029  pfree(replyTimeStr);
2030  }
2031 
2032  /* See if we can compute the round-trip lag for these positions. */
2033  now = GetCurrentTimestamp();
2034  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
2035  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
2036  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
2037 
2038  /*
2039  * If the standby reports that it has fully replayed the WAL in two
2040  * consecutive reply messages, then the second such message must result
2041  * from wal_receiver_status_interval expiring on the standby. This is a
2042  * convenient time to forget the lag times measured when it last
2043  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
2044  * until more WAL traffic arrives.
2045  */
2046  clearLagTimes = false;
2047  if (applyPtr == sentPtr)
2048  {
2049  if (fullyAppliedLastTime)
2050  clearLagTimes = true;
2051  fullyAppliedLastTime = true;
2052  }
2053  else
2054  fullyAppliedLastTime = false;
2055 
2056  /* Send a reply if the standby requested one. */
2057  if (replyRequested)
2058  WalSndKeepalive(false);
2059 
2060  /*
2061  * Update shared state for this WalSender process based on reply data from
2062  * standby.
2063  */
2064  {
2065  WalSnd *walsnd = MyWalSnd;
2066 
2067  SpinLockAcquire(&walsnd->mutex);
2068  walsnd->write = writePtr;
2069  walsnd->flush = flushPtr;
2070  walsnd->apply = applyPtr;
2071  if (writeLag != -1 || clearLagTimes)
2072  walsnd->writeLag = writeLag;
2073  if (flushLag != -1 || clearLagTimes)
2074  walsnd->flushLag = flushLag;
2075  if (applyLag != -1 || clearLagTimes)
2076  walsnd->applyLag = applyLag;
2077  walsnd->replyTime = replyTime;
2078  SpinLockRelease(&walsnd->mutex);
2079  }
2080 
2083 
2084  /*
2085  * Advance our local xmin horizon when the client confirmed a flush.
2086  */
2087  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
2088  {
2091  else
2093  }
2094 }
2095 
2096 /* compute new replication slot xmin horizon if needed */
2097 static void
2099 {
2100  bool changed = false;
2102 
2103  SpinLockAcquire(&slot->mutex);
2105 
2106  /*
2107  * For physical replication we don't need the interlock provided by xmin
2108  * and effective_xmin since the consequences of a missed increase are
2109  * limited to query cancellations, so set both at once.
2110  */
2111  if (!TransactionIdIsNormal(slot->data.xmin) ||
2112  !TransactionIdIsNormal(feedbackXmin) ||
2113  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
2114  {
2115  changed = true;
2116  slot->data.xmin = feedbackXmin;
2117  slot->effective_xmin = feedbackXmin;
2118  }
2119  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
2120  !TransactionIdIsNormal(feedbackCatalogXmin) ||
2121  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
2122  {
2123  changed = true;
2124  slot->data.catalog_xmin = feedbackCatalogXmin;
2125  slot->effective_catalog_xmin = feedbackCatalogXmin;
2126  }
2127  SpinLockRelease(&slot->mutex);
2128 
2129  if (changed)
2130  {
2133  }
2134 }
2135 
2136 /*
2137  * Check that the provided xmin/epoch are sane, that is, not in the future
2138  * and not so far back as to be already wrapped around.
2139  *
2140  * Epoch of nextXid should be same as standby, or if the counter has
2141  * wrapped, then one greater than standby.
2142  *
2143  * This check doesn't care about whether clog exists for these xids
2144  * at all.
2145  */
2146 static bool
2148 {
2149  FullTransactionId nextFullXid;
2150  TransactionId nextXid;
2151  uint32 nextEpoch;
2152 
2153  nextFullXid = ReadNextFullTransactionId();
2154  nextXid = XidFromFullTransactionId(nextFullXid);
2155  nextEpoch = EpochFromFullTransactionId(nextFullXid);
2156 
2157  if (xid <= nextXid)
2158  {
2159  if (epoch != nextEpoch)
2160  return false;
2161  }
2162  else
2163  {
2164  if (epoch + 1 != nextEpoch)
2165  return false;
2166  }
2167 
2168  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2169  return false; /* epoch OK, but it's wrapped around */
2170 
2171  return true;
2172 }
2173 
2174 /*
2175  * Hot Standby feedback
2176  */
2177 static void
2179 {
2180  TransactionId feedbackXmin;
2181  uint32 feedbackEpoch;
2182  TransactionId feedbackCatalogXmin;
2183  uint32 feedbackCatalogEpoch;
2184  TimestampTz replyTime;
2185 
2186  /*
2187  * Decipher the reply message. The caller already consumed the msgtype
2188  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2189  * of this message.
2190  */
2191  replyTime = pq_getmsgint64(&reply_message);
2192  feedbackXmin = pq_getmsgint(&reply_message, 4);
2193  feedbackEpoch = pq_getmsgint(&reply_message, 4);
2194  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2195  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2196 
2198  {
2199  char *replyTimeStr;
2200 
2201  /* Copy because timestamptz_to_str returns a static buffer */
2202  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2203 
2204  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2205  feedbackXmin,
2206  feedbackEpoch,
2207  feedbackCatalogXmin,
2208  feedbackCatalogEpoch,
2209  replyTimeStr);
2210 
2211  pfree(replyTimeStr);
2212  }
2213 
2214  /*
2215  * Update shared state for this WalSender process based on reply data from
2216  * standby.
2217  */
2218  {
2219  WalSnd *walsnd = MyWalSnd;
2220 
2221  SpinLockAcquire(&walsnd->mutex);
2222  walsnd->replyTime = replyTime;
2223  SpinLockRelease(&walsnd->mutex);
2224  }
2225 
2226  /*
2227  * Unset WalSender's xmins if the feedback message values are invalid.
2228  * This happens when the downstream turned hot_standby_feedback off.
2229  */
2230  if (!TransactionIdIsNormal(feedbackXmin)
2231  && !TransactionIdIsNormal(feedbackCatalogXmin))
2232  {
2234  if (MyReplicationSlot != NULL)
2235  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2236  return;
2237  }
2238 
2239  /*
2240  * Check that the provided xmin/epoch are sane, that is, not in the future
2241  * and not so far back as to be already wrapped around. Ignore if not.
2242  */
2243  if (TransactionIdIsNormal(feedbackXmin) &&
2244  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2245  return;
2246 
2247  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2248  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2249  return;
2250 
2251  /*
2252  * Set the WalSender's xmin equal to the standby's requested xmin, so that
2253  * the xmin will be taken into account by GetSnapshotData() /
2254  * ComputeXidHorizons(). This will hold back the removal of dead rows and
2255  * thereby prevent the generation of cleanup conflicts on the standby
2256  * server.
2257  *
2258  * There is a small window for a race condition here: although we just
2259  * checked that feedbackXmin precedes nextXid, the nextXid could have
2260  * gotten advanced between our fetching it and applying the xmin below,
2261  * perhaps far enough to make feedbackXmin wrap around. In that case the
2262  * xmin we set here would be "in the future" and have no effect. No point
2263  * in worrying about this since it's too late to save the desired data
2264  * anyway. Assuming that the standby sends us an increasing sequence of
2265  * xmins, this could only happen during the first reply cycle, else our
2266  * own xmin would prevent nextXid from advancing so far.
2267  *
2268  * We don't bother taking the ProcArrayLock here. Setting the xmin field
2269  * is assumed atomic, and there's no real need to prevent concurrent
2270  * horizon determinations. (If we're moving our xmin forward, this is
2271  * obviously safe, and if we're moving it backwards, well, the data is at
2272  * risk already since a VACUUM could already have determined the horizon.)
2273  *
2274  * If we're using a replication slot we reserve the xmin via that,
2275  * otherwise via the walsender's PGPROC entry. We can only track the
2276  * catalog xmin separately when using a slot, so we store the least of the
2277  * two provided when not using a slot.
2278  *
2279  * XXX: It might make sense to generalize the ephemeral slot concept and
2280  * always use the slot mechanism to handle the feedback xmin.
2281  */
2282  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2283  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2284  else
2285  {
2286  if (TransactionIdIsNormal(feedbackCatalogXmin)
2287  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2288  MyProc->xmin = feedbackCatalogXmin;
2289  else
2290  MyProc->xmin = feedbackXmin;
2291  }
2292 }
2293 
2294 /*
2295  * Compute how long send/receive loops should sleep.
2296  *
2297  * If wal_sender_timeout is enabled we want to wake up in time to send
2298  * keepalives and to abort the connection if wal_sender_timeout has been
2299  * reached.
2300  */
2301 static long
2303 {
2304  long sleeptime = 10000; /* 10 s */
2305 
2307  {
2308  TimestampTz wakeup_time;
2309 
2310  /*
2311  * At the latest stop sleeping once wal_sender_timeout has been
2312  * reached.
2313  */
2316 
2317  /*
2318  * If no ping has been sent yet, wakeup when it's time to do so.
2319  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2320  * the timeout passed without a response.
2321  */
2324  wal_sender_timeout / 2);
2325 
2326  /* Compute relative time until wakeup. */
2327  sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
2328  }
2329 
2330  return sleeptime;
2331 }
2332 
2333 /*
2334  * Check whether there have been responses by the client within
2335  * wal_sender_timeout and shutdown if not. Using last_processing as the
2336  * reference point avoids counting server-side stalls against the client.
2337  * However, a long server-side stall can make WalSndKeepaliveIfNecessary()
2338  * postdate last_processing by more than wal_sender_timeout. If that happens,
2339  * the client must reply almost immediately to avoid a timeout. This rarely
2340  * affects the default configuration, under which clients spontaneously send a
2341  * message every standby_message_timeout = wal_sender_timeout/6 = 10s. We
2342  * could eliminate that problem by recognizing timeout expiration at
2343  * wal_sender_timeout/2 after the keepalive.
2344  */
2345 static void
2347 {
2348  TimestampTz timeout;
2349 
2350  /* don't bail out if we're doing something that doesn't require timeouts */
2351  if (last_reply_timestamp <= 0)
2352  return;
2353 
2356 
2357  if (wal_sender_timeout > 0 && last_processing >= timeout)
2358  {
2359  /*
2360  * Since typically expiration of replication timeout means
2361  * communication problem, we don't send the error message to the
2362  * standby.
2363  */
2365  (errmsg("terminating walsender process due to replication timeout")));
2366 
2367  WalSndShutdown();
2368  }
2369 }
2370 
2371 /* Main loop of walsender process that streams the WAL over Copy messages. */
2372 static void
2374 {
2375  /*
2376  * Initialize the last reply timestamp. That enables timeout processing
2377  * from hereon.
2378  */
2380  waiting_for_ping_response = false;
2381 
2382  /*
2383  * Loop until we reach the end of this timeline or the client requests to
2384  * stop streaming.
2385  */
2386  for (;;)
2387  {
2388  /* Clear any already-pending wakeups */
2390 
2392 
2393  /* Process any requests or signals received recently */
2394  if (ConfigReloadPending)
2395  {
2396  ConfigReloadPending = false;
2399  }
2400 
2401  /* Check for input from the client */
2403 
2404  /*
2405  * If we have received CopyDone from the client, sent CopyDone
2406  * ourselves, and the output buffer is empty, it's time to exit
2407  * streaming.
2408  */
2410  !pq_is_send_pending())
2411  break;
2412 
2413  /*
2414  * If we don't have any pending data in the output buffer, try to send
2415  * some more. If there is some, we don't bother to call send_data
2416  * again until we've flushed it ... but we'd better assume we are not
2417  * caught up.
2418  */
2419  if (!pq_is_send_pending())
2420  send_data();
2421  else
2422  WalSndCaughtUp = false;
2423 
2424  /* Try to flush pending output to the client */
2425  if (pq_flush_if_writable() != 0)
2426  WalSndShutdown();
2427 
2428  /* If nothing remains to be sent right now ... */
2430  {
2431  /*
2432  * If we're in catchup state, move to streaming. This is an
2433  * important state change for users to know about, since before
2434  * this point data loss might occur if the primary dies and we
2435  * need to failover to the standby. The state change is also
2436  * important for synchronous replication, since commits that
2437  * started to wait at that point might wait for some time.
2438  */
2439  if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
2440  {
2441  ereport(DEBUG1,
2442  (errmsg_internal("\"%s\" has now caught up with upstream server",
2443  application_name)));
2445  }
2446 
2447  /*
2448  * When SIGUSR2 arrives, we send any outstanding logs up to the
2449  * shutdown checkpoint record (i.e., the latest record), wait for
2450  * them to be replicated to the standby, and exit. This may be a
2451  * normal termination at shutdown, or a promotion, the walsender
2452  * is not sure which.
2453  */
2454  if (got_SIGUSR2)
2455  WalSndDone(send_data);
2456  }
2457 
2458  /* Check for replication timeout. */
2460 
2461  /* Send keepalive if the time has come */
2463 
2464  /*
2465  * Block if we have unsent data. XXX For logical replication, let
2466  * WalSndWaitForWal() handle any other blocking; idle receivers need
2467  * its additional actions. For physical replication, also block if
2468  * caught up; its send_data does not block.
2469  */
2470  if ((WalSndCaughtUp && send_data != XLogSendLogical &&
2473  {
2474  long sleeptime;
2475  int wakeEvents;
2476 
2478  wakeEvents = WL_SOCKET_READABLE;
2479  else
2480  wakeEvents = 0;
2481 
2482  /*
2483  * Use fresh timestamp, not last_processing, to reduce the chance
2484  * of reaching wal_sender_timeout before sending a keepalive.
2485  */
2487 
2488  if (pq_is_send_pending())
2489  wakeEvents |= WL_SOCKET_WRITEABLE;
2490 
2491  /* Sleep until something happens or we time out */
2492  WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
2493  }
2494  }
2495 }
2496 
2497 /* Initialize a per-walsender data structure for this walsender process */
2498 static void
2500 {
2501  int i;
2502 
2503  /*
2504  * WalSndCtl should be set up already (we inherit this by fork() or
2505  * EXEC_BACKEND mechanism from the postmaster).
2506  */
2507  Assert(WalSndCtl != NULL);
2508  Assert(MyWalSnd == NULL);
2509 
2510  /*
2511  * Find a free walsender slot and reserve it. This must not fail due to
2512  * the prior check for free WAL senders in InitProcess().
2513  */
2514  for (i = 0; i < max_wal_senders; i++)
2515  {
2516  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2517 
2518  SpinLockAcquire(&walsnd->mutex);
2519 
2520  if (walsnd->pid != 0)
2521  {
2522  SpinLockRelease(&walsnd->mutex);
2523  continue;
2524  }
2525  else
2526  {
2527  /*
2528  * Found a free slot. Reserve it for us.
2529  */
2530  walsnd->pid = MyProcPid;
2531  walsnd->state = WALSNDSTATE_STARTUP;
2532  walsnd->sentPtr = InvalidXLogRecPtr;
2533  walsnd->needreload = false;
2534  walsnd->write = InvalidXLogRecPtr;
2535  walsnd->flush = InvalidXLogRecPtr;
2536  walsnd->apply = InvalidXLogRecPtr;
2537  walsnd->writeLag = -1;
2538  walsnd->flushLag = -1;
2539  walsnd->applyLag = -1;
2540  walsnd->sync_standby_priority = 0;
2541  walsnd->latch = &MyProc->procLatch;
2542  walsnd->replyTime = 0;
2543  SpinLockRelease(&walsnd->mutex);
2544  /* don't need the lock anymore */
2545  MyWalSnd = (WalSnd *) walsnd;
2546 
2547  break;
2548  }
2549  }
2550 
2551  Assert(MyWalSnd != NULL);
2552 
2553  /* Arrange to clean up at walsender exit */
2555 }
2556 
2557 /* Destroy the per-walsender data structure for this walsender process */
2558 static void
2560 {
2561  WalSnd *walsnd = MyWalSnd;
2562 
2563  Assert(walsnd != NULL);
2564 
2565  MyWalSnd = NULL;
2566 
2567  SpinLockAcquire(&walsnd->mutex);
2568  /* clear latch while holding the spinlock, so it can safely be read */
2569  walsnd->latch = NULL;
2570  /* Mark WalSnd struct as no longer being in use. */
2571  walsnd->pid = 0;
2572  SpinLockRelease(&walsnd->mutex);
2573 }
2574 
2575 /* XLogReaderRoutine->segment_open callback */
2576 static void
2578  TimeLineID *tli_p)
2579 {
2580  char path[MAXPGPATH];
2581 
2582  /*-------
2583  * When reading from a historic timeline, and there is a timeline switch
2584  * within this segment, read from the WAL segment belonging to the new
2585  * timeline.
2586  *
2587  * For example, imagine that this server is currently on timeline 5, and
2588  * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
2589  * 0/13002088. In pg_wal, we have these files:
2590  *
2591  * ...
2592  * 000000040000000000000012
2593  * 000000040000000000000013
2594  * 000000050000000000000013
2595  * 000000050000000000000014
2596  * ...
2597  *
2598  * In this situation, when requested to send the WAL from segment 0x13, on
2599  * timeline 4, we read the WAL from file 000000050000000000000013. Archive
2600  * recovery prefers files from newer timelines, so if the segment was
2601  * restored from the archive on this server, the file belonging to the old
2602  * timeline, 000000040000000000000013, might not exist. Their contents are
2603  * equal up to the switchpoint, because at a timeline switch, the used
2604  * portion of the old segment is copied to the new file. -------
2605  */
2606  *tli_p = sendTimeLine;
2608  {
2609  XLogSegNo endSegNo;
2610 
2612  if (nextSegNo == endSegNo)
2613  *tli_p = sendTimeLineNextTLI;
2614  }
2615 
2616  XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
2617  state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2618  if (state->seg.ws_file >= 0)
2619  return;
2620 
2621  /*
2622  * If the file is not found, assume it's because the standby asked for a
2623  * too old WAL segment that has already been removed or recycled.
2624  */
2625  if (errno == ENOENT)
2626  {
2627  char xlogfname[MAXFNAMELEN];
2628  int save_errno = errno;
2629 
2630  XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
2631  errno = save_errno;
2632  ereport(ERROR,
2634  errmsg("requested WAL segment %s has already been removed",
2635  xlogfname)));
2636  }
2637  else
2638  ereport(ERROR,
2640  errmsg("could not open file \"%s\": %m",
2641  path)));
2642 }
2643 
2644 /*
2645  * Send out the WAL in its normal physical/stored form.
2646  *
2647  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
2648  * but not yet sent to the client, and buffer it in the libpq output
2649  * buffer.
2650  *
2651  * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
2652  * otherwise WalSndCaughtUp is set to false.
2653  */
2654 static void
2656 {
2657  XLogRecPtr SendRqstPtr;
2659  XLogRecPtr endptr;
2660  Size nbytes;
2661  XLogSegNo segno;
2662  WALReadError errinfo;
2663 
2664  /* If requested switch the WAL sender to the stopping state. */
2665  if (got_STOPPING)
2667 
2669  {
2670  WalSndCaughtUp = true;
2671  return;
2672  }
2673 
2674  /* Figure out how far we can safely send the WAL. */
2676  {
2677  /*
2678  * Streaming an old timeline that's in this server's history, but is
2679  * not the one we're currently inserting or replaying. It can be
2680  * streamed up to the point where we switched off that timeline.
2681  */
2682  SendRqstPtr = sendTimeLineValidUpto;
2683  }
2684  else if (am_cascading_walsender)
2685  {
2686  /*
2687  * Streaming the latest timeline on a standby.
2688  *
2689  * Attempt to send all WAL that has already been replayed, so that we
2690  * know it's valid. If we're receiving WAL through streaming
2691  * replication, it's also OK to send any WAL that has been received
2692  * but not replayed.
2693  *
2694  * The timeline we're recovering from can change, or we can be
2695  * promoted. In either case, the current timeline becomes historic. We
2696  * need to detect that so that we don't try to stream past the point
2697  * where we switched to another timeline. We check for promotion or
2698  * timeline switch after calculating FlushPtr, to avoid a race
2699  * condition: if the timeline becomes historic just after we checked
2700  * that it was still current, it's still be OK to stream it up to the
2701  * FlushPtr that was calculated before it became historic.
2702  */
2703  bool becameHistoric = false;
2704 
2705  SendRqstPtr = GetStandbyFlushRecPtr();
2706 
2707  if (!RecoveryInProgress())
2708  {
2709  /*
2710  * We have been promoted. RecoveryInProgress() updated
2711  * ThisTimeLineID to the new current timeline.
2712  */
2713  am_cascading_walsender = false;
2714  becameHistoric = true;
2715  }
2716  else
2717  {
2718  /*
2719  * Still a cascading standby. But is the timeline we're sending
2720  * still the one recovery is recovering from? ThisTimeLineID was
2721  * updated by the GetStandbyFlushRecPtr() call above.
2722  */
2724  becameHistoric = true;
2725  }
2726 
2727  if (becameHistoric)
2728  {
2729  /*
2730  * The timeline we were sending has become historic. Read the
2731  * timeline history file of the new timeline to see where exactly
2732  * we forked off from the timeline we were sending.
2733  */
2734  List *history;
2735 
2738 
2740  list_free_deep(history);
2741 
2742  sendTimeLineIsHistoric = true;
2743 
2744  SendRqstPtr = sendTimeLineValidUpto;
2745  }
2746  }
2747  else
2748  {
2749  /*
2750  * Streaming the current timeline on a primary.
2751  *
2752  * Attempt to send all data that's already been written out and
2753  * fsync'd to disk. We cannot go further than what's been written out
2754  * given the current implementation of WALRead(). And in any case
2755  * it's unsafe to send WAL that is not securely down to disk on the
2756  * primary: if the primary subsequently crashes and restarts, standbys
2757  * must not have applied any WAL that got lost on the primary.
2758  */
2759  SendRqstPtr = GetFlushRecPtr();
2760  }
2761 
2762  /*
2763  * Record the current system time as an approximation of the time at which
2764  * this WAL location was written for the purposes of lag tracking.
2765  *
2766  * In theory we could make XLogFlush() record a time in shmem whenever WAL
2767  * is flushed and we could get that time as well as the LSN when we call
2768  * GetFlushRecPtr() above (and likewise for the cascading standby
2769  * equivalent), but rather than putting any new code into the hot WAL path
2770  * it seems good enough to capture the time here. We should reach this
2771  * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2772  * may take some time, we read the WAL flush pointer and take the time
2773  * very close to together here so that we'll get a later position if it is
2774  * still moving.
2775  *
2776  * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
2777  * this gives us a cheap approximation for the WAL flush time for this
2778  * LSN.
2779  *
2780  * Note that the LSN is not necessarily the LSN for the data contained in
2781  * the present message; it's the end of the WAL, which might be further
2782  * ahead. All the lag tracking machinery cares about is finding out when
2783  * that arbitrary LSN is eventually reported as written, flushed and
2784  * applied, so that it can measure the elapsed time.
2785  */
2786  LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2787 
2788  /*
2789  * If this is a historic timeline and we've reached the point where we
2790  * forked to the next timeline, stop streaming.
2791  *
2792  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2793  * startup process will normally replay all WAL that has been received
2794  * from the primary, before promoting, but if the WAL streaming is
2795  * terminated at a WAL page boundary, the valid portion of the timeline
2796  * might end in the middle of a WAL record. We might've already sent the
2797  * first half of that partial WAL record to the cascading standby, so that
2798  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2799  * replay the partial WAL record either, so it can still follow our
2800  * timeline switch.
2801  */
2803  {
2804  /* close the current file. */
2805  if (xlogreader->seg.ws_file >= 0)
2806  wal_segment_close(xlogreader);
2807 
2808  /* Send CopyDone */
2809  pq_putmessage_noblock('c', NULL, 0);
2810  streamingDoneSending = true;
2811 
2812  WalSndCaughtUp = true;
2813 
2814  elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2817  return;
2818  }
2819 
2820  /* Do we have any work to do? */
2821  Assert(sentPtr <= SendRqstPtr);
2822  if (SendRqstPtr <= sentPtr)
2823  {
2824  WalSndCaughtUp = true;
2825  return;
2826  }
2827 
2828  /*
2829  * Figure out how much to send in one message. If there's no more than
2830  * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2831  * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2832  *
2833  * The rounding is not only for performance reasons. Walreceiver relies on
2834  * the fact that we never split a WAL record across two messages. Since a
2835  * long WAL record is split at page boundary into continuation records,
2836  * page boundary is always a safe cut-off point. We also assume that
2837  * SendRqstPtr never points to the middle of a WAL record.
2838  */
2839  startptr = sentPtr;
2840  endptr = startptr;
2841  endptr += MAX_SEND_SIZE;
2842 
2843  /* if we went beyond SendRqstPtr, back off */
2844  if (SendRqstPtr <= endptr)
2845  {
2846  endptr = SendRqstPtr;
2848  WalSndCaughtUp = false;
2849  else
2850  WalSndCaughtUp = true;
2851  }
2852  else
2853  {
2854  /* round down to page boundary. */
2855  endptr -= (endptr % XLOG_BLCKSZ);
2856  WalSndCaughtUp = false;
2857  }
2858 
2859  nbytes = endptr - startptr;
2860  Assert(nbytes <= MAX_SEND_SIZE);
2861 
2862  /*
2863  * OK to read and send the slice.
2864  */
2865  resetStringInfo(&output_message);
2866  pq_sendbyte(&output_message, 'w');
2867 
2868  pq_sendint64(&output_message, startptr); /* dataStart */
2869  pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2870  pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2871 
2872  /*
2873  * Read the log directly into the output buffer to avoid extra memcpy
2874  * calls.
2875  */
2876  enlargeStringInfo(&output_message, nbytes);
2877 
2878 retry:
2879  if (!WALRead(xlogreader,
2880  &output_message.data[output_message.len],
2881  startptr,
2882  nbytes,
2883  xlogreader->seg.ws_tli, /* Pass the current TLI because
2884  * only WalSndSegmentOpen controls
2885  * whether new TLI is needed. */
2886  &errinfo))
2887  WALReadRaiseError(&errinfo);
2888 
2889  /* See logical_read_xlog_page(). */
2890  XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
2891  CheckXLogRemoved(segno, xlogreader->seg.ws_tli);
2892 
2893  /*
2894  * During recovery, the currently-open WAL file might be replaced with the
2895  * file of the same name retrieved from archive. So we always need to
2896  * check what we read was valid after reading into the buffer. If it's
2897  * invalid, we try to open and read the file again.
2898  */
2900  {
2901  WalSnd *walsnd = MyWalSnd;
2902  bool reload;
2903 
2904  SpinLockAcquire(&walsnd->mutex);
2905  reload = walsnd->needreload;
2906  walsnd->needreload = false;
2907  SpinLockRelease(&walsnd->mutex);
2908 
2909  if (reload && xlogreader->seg.ws_file >= 0)
2910  {
2911  wal_segment_close(xlogreader);
2912 
2913  goto retry;
2914  }
2915  }
2916 
2917  output_message.len += nbytes;
2918  output_message.data[output_message.len] = '\0';
2919 
2920  /*
2921  * Fill the send timestamp last, so that it is taken as late as possible.
2922  */
2923  resetStringInfo(&tmpbuf);
2924  pq_sendint64(&tmpbuf, GetCurrentTimestamp());
2925  memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2926  tmpbuf.data, sizeof(int64));
2927 
2928  pq_putmessage_noblock('d', output_message.data, output_message.len);
2929 
2930  sentPtr = endptr;
2931 
2932  /* Update shared memory status */
2933  {
2934  WalSnd *walsnd = MyWalSnd;
2935 
2936  SpinLockAcquire(&walsnd->mutex);
2937  walsnd->sentPtr = sentPtr;
2938  SpinLockRelease(&walsnd->mutex);
2939  }
2940 
2941  /* Report progress of XLOG streaming in PS display */
2943  {
2944  char activitymsg[50];
2945 
2946  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
2948  set_ps_display(activitymsg);
2949  }
2950 }
2951 
2952 /*
2953  * Stream out logically decoded data.
2954  */
2955 static void
2957 {
2958  XLogRecord *record;
2959  char *errm;
2960 
2961  /*
2962  * We'll use the current flush point to determine whether we've caught up.
2963  * This variable is static in order to cache it across calls. Caching is
2964  * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
2965  * spinlock.
2966  */
2967  static XLogRecPtr flushPtr = InvalidXLogRecPtr;
2968 
2969  /*
2970  * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
2971  * true in WalSndWaitForWal, if we're actually waiting. We also set to
2972  * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
2973  * didn't wait - i.e. when we're shutting down.
2974  */
2975  WalSndCaughtUp = false;
2976 
2977  record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
2978 
2979  /* xlog record was invalid */
2980  if (errm != NULL)
2981  elog(ERROR, "%s", errm);
2982 
2983  if (record != NULL)
2984  {
2985  /*
2986  * Note the lack of any call to LagTrackerWrite() which is handled by
2987  * WalSndUpdateProgress which is called by output plugin through
2988  * logical decoding write api.
2989  */
2990  LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
2991 
2992  sentPtr = logical_decoding_ctx->reader->EndRecPtr;
2993  }
2994 
2995  /*
2996  * If first time through in this session, initialize flushPtr. Otherwise,
2997  * we only need to update flushPtr if EndRecPtr is past it.
2998  */
2999  if (flushPtr == InvalidXLogRecPtr)
3000  flushPtr = GetFlushRecPtr();
3001  else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3002  flushPtr = GetFlushRecPtr();
3003 
3004  /* If EndRecPtr is still past our flushPtr, it means we caught up. */
3005  if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3006  WalSndCaughtUp = true;
3007 
3008  /*
3009  * If we're caught up and have been requested to stop, have WalSndLoop()
3010  * terminate the connection in an orderly manner, after writing out all
3011  * the pending data.
3012  */
3014  got_SIGUSR2 = true;
3015 
3016  /* Update shared memory status */
3017  {
3018  WalSnd *walsnd = MyWalSnd;
3019 
3020  SpinLockAcquire(&walsnd->mutex);
3021  walsnd->sentPtr = sentPtr;
3022  SpinLockRelease(&walsnd->mutex);
3023  }
3024 }
3025 
3026 /*
3027  * Shutdown if the sender is caught up.
3028  *
3029  * NB: This should only be called when the shutdown signal has been received
3030  * from postmaster.
3031  *
3032  * Note that if we determine that there's still more data to send, this
3033  * function will return control to the caller.
3034  */
3035 static void
3037 {
3038  XLogRecPtr replicatedPtr;
3039 
3040  /* ... let's just be real sure we're caught up ... */
3041  send_data();
3042 
3043  /*
3044  * To figure out whether all WAL has successfully been replicated, check
3045  * flush location if valid, write otherwise. Tools like pg_receivewal will
3046  * usually (unless in synchronous mode) return an invalid flush location.
3047  */
3048  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
3049  MyWalSnd->write : MyWalSnd->flush;
3050 
3051  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
3052  !pq_is_send_pending())
3053  {
3054  QueryCompletion qc;
3055 
3056  /* Inform the standby that XLOG streaming is done */
3057  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
3058  EndCommand(&qc, DestRemote, false);
3059  pq_flush();
3060 
3061  proc_exit(0);
3062  }
3064  WalSndKeepalive(true);
3065 }
3066 
3067 /*
3068  * Returns the latest point in WAL that has been safely flushed to disk, and
3069  * can be sent to the standby. This should only be called when in recovery,
3070  * ie. we're streaming to a cascaded standby.
3071  *
3072  * As a side-effect, ThisTimeLineID is updated to the TLI of the last
3073  * replayed WAL record.
3074  */
3075 static XLogRecPtr
3077 {
3078  XLogRecPtr replayPtr;
3079  TimeLineID replayTLI;
3080  XLogRecPtr receivePtr;
3082  XLogRecPtr result;
3083 
3084  /*
3085  * We can safely send what's already been replayed. Also, if walreceiver
3086  * is streaming WAL from the same timeline, we can send anything that it
3087  * has streamed, but hasn't been replayed yet.
3088  */
3089 
3090  receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
3091  replayPtr = GetXLogReplayRecPtr(&replayTLI);
3092 
3093  ThisTimeLineID = replayTLI;
3094 
3095  result = replayPtr;
3096  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
3097  result = receivePtr;
3098 
3099  return result;
3100 }
3101 
3102 /*
3103  * Request walsenders to reload the currently-open WAL file
3104  */
3105 void
3107 {
3108  int i;
3109 
3110  for (i = 0; i < max_wal_senders; i++)
3111  {
3112  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3113 
3114  SpinLockAcquire(&walsnd->mutex);
3115  if (walsnd->pid == 0)
3116  {
3117  SpinLockRelease(&walsnd->mutex);
3118  continue;
3119  }
3120  walsnd->needreload = true;
3121  SpinLockRelease(&walsnd->mutex);
3122  }
3123 }
3124 
3125 /*
3126  * Handle PROCSIG_WALSND_INIT_STOPPING signal.
3127  */
3128 void
3130 {
3132 
3133  /*
3134  * If replication has not yet started, die like with SIGTERM. If
3135  * replication is active, only set a flag and wake up the main loop. It
3136  * will send any outstanding WAL, wait for it to be replicated to the
3137  * standby, and then exit gracefully.
3138  */
3139  if (!replication_active)
3140  kill(MyProcPid, SIGTERM);
3141  else
3142  got_STOPPING = true;
3143 }
3144 
3145 /*
3146  * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
3147  * sender should already have been switched to WALSNDSTATE_STOPPING at
3148  * this point.
3149  */
3150 static void
3152 {
3153  int save_errno = errno;
3154 
3155  got_SIGUSR2 = true;
3156  SetLatch(MyLatch);
3157 
3158  errno = save_errno;
3159 }
3160 
3161 /* Set up signal handlers */
3162 void
3164 {
3165  /* Set up signal handlers */
3167  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3168  pqsignal(SIGTERM, die); /* request shutdown */
3169  /* SIGQUIT handler was already set up by InitPostmasterChild */
3170  InitializeTimeouts(); /* establishes SIGALRM handler */
3173  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3174  * shutdown */
3175 
3176  /* Reset some signals that are accepted by postmaster but not here */
3178 }
3179 
3180 /* Report shared-memory space needed by WalSndShmemInit */
3181 Size
3183 {
3184  Size size = 0;
3185 
3186  size = offsetof(WalSndCtlData, walsnds);
3187  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3188 
3189  return size;
3190 }
3191 
3192 /* Allocate and initialize walsender-related shared memory */
3193 void
3195 {
3196  bool found;
3197  int i;
3198 
3199  WalSndCtl = (WalSndCtlData *)
3200  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3201 
3202  if (!found)
3203  {
3204  /* First time through, so initialize */
3205  MemSet(WalSndCtl, 0, WalSndShmemSize());
3206 
3207  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3208  SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
3209 
3210  for (i = 0; i < max_wal_senders; i++)
3211  {
3212  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3213 
3214  SpinLockInit(&walsnd->mutex);
3215  }
3216  }
3217 }
3218 
3219 /*
3220  * Wake up all walsenders
3221  *
3222  * This will be called inside critical sections, so throwing an error is not
3223  * advisable.
3224  */
3225 void
3227 {
3228  int i;
3229 
3230  for (i = 0; i < max_wal_senders; i++)
3231  {
3232  Latch *latch;
3233  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3234 
3235  /*
3236  * Get latch pointer with spinlock held, for the unlikely case that
3237  * pointer reads aren't atomic (as they're 8 bytes).
3238  */
3239  SpinLockAcquire(&walsnd->mutex);
3240  latch = walsnd->latch;
3241  SpinLockRelease(&walsnd->mutex);
3242 
3243  if (latch != NULL)
3244  SetLatch(latch);
3245  }
3246 }
3247 
3248 /*
3249  * Wait for readiness on the FeBe socket, or a timeout. The mask should be
3250  * composed of optional WL_SOCKET_WRITEABLE and WL_SOCKET_READABLE flags. Exit
3251  * on postmaster death.
3252  */
3253 static void
3254 WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
3255 {
3256  WaitEvent event;
3257 
3258  ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
3259  if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
3260  (event.events & WL_POSTMASTER_DEATH))
3261  proc_exit(1);
3262 }
3263 
3264 /*
3265  * Signal all walsenders to move to stopping state.
3266  *
3267  * This will trigger walsenders to move to a state where no further WAL can be
3268  * generated. See this file's header for details.
3269  */
3270 void
3272 {
3273  int i;
3274 
3275  for (i = 0; i < max_wal_senders; i++)
3276  {
3277  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3278  pid_t pid;
3279 
3280  SpinLockAcquire(&walsnd->mutex);
3281  pid = walsnd->pid;
3282  SpinLockRelease(&walsnd->mutex);
3283 
3284  if (pid == 0)
3285  continue;
3286 
3288  }
3289 }
3290 
3291 /*
3292  * Wait that all the WAL senders have quit or reached the stopping state. This
3293  * is used by the checkpointer to control when the shutdown checkpoint can
3294  * safely be performed.
3295  */
3296 void
3298 {
3299  for (;;)
3300  {
3301  int i;
3302  bool all_stopped = true;
3303 
3304  for (i = 0; i < max_wal_senders; i++)
3305  {
3306  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3307 
3308  SpinLockAcquire(&walsnd->mutex);
3309 
3310  if (walsnd->pid == 0)
3311  {
3312  SpinLockRelease(&walsnd->mutex);
3313  continue;
3314  }
3315 
3316  if (walsnd->state != WALSNDSTATE_STOPPING)
3317  {
3318  all_stopped = false;
3319  SpinLockRelease(&walsnd->mutex);
3320  break;
3321  }
3322  SpinLockRelease(&walsnd->mutex);
3323  }
3324 
3325  /* safe to leave if confirmation is done for all WAL senders */
3326  if (all_stopped)
3327  return;
3328 
3329  pg_usleep(10000L); /* wait for 10 msec */
3330  }
3331 }
3332 
3333 /* Set state for current walsender (only called in walsender) */
3334 void
3336 {
3337  WalSnd *walsnd = MyWalSnd;
3338 
3340 
3341  if (walsnd->state == state)
3342  return;
3343 
3344  SpinLockAcquire(&walsnd->mutex);
3345  walsnd->state = state;
3346  SpinLockRelease(&walsnd->mutex);
3347 }
3348 
3349 /*
3350  * Return a string constant representing the state. This is used
3351  * in system views, and should *not* be translated.
3352  */
3353 static const char *
3355 {
3356  switch (state)
3357  {
3358  case WALSNDSTATE_STARTUP:
3359  return "startup";
3360  case WALSNDSTATE_BACKUP:
3361  return "backup";
3362  case WALSNDSTATE_CATCHUP:
3363  return "catchup";
3364  case WALSNDSTATE_STREAMING:
3365  return "streaming";
3366  case WALSNDSTATE_STOPPING:
3367  return "stopping";
3368  }
3369  return "UNKNOWN";
3370 }
3371 
3372 static Interval *
3374 {
3375  Interval *result = palloc(sizeof(Interval));
3376 
3377  result->month = 0;
3378  result->day = 0;
3379  result->time = offset;
3380 
3381  return result;
3382 }
3383 
3384 /*
3385  * Returns activity of walsenders, including pids and xlog locations sent to
3386  * standby servers.
3387  */
3388 Datum
3390 {
3391 #define PG_STAT_GET_WAL_SENDERS_COLS 12
3392  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3393  TupleDesc tupdesc;
3394  Tuplestorestate *tupstore;
3395  MemoryContext per_query_ctx;
3396  MemoryContext oldcontext;
3397  SyncRepStandbyData *sync_standbys;
3398  int num_standbys;
3399  int i;
3400 
3401  /* check to see if caller supports us returning a tuplestore */
3402  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3403  ereport(ERROR,
3404  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3405  errmsg("set-valued function called in context that cannot accept a set")));
3406  if (!(rsinfo->allowedModes & SFRM_Materialize))
3407  ereport(ERROR,
3408  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3409  errmsg("materialize mode required, but it is not allowed in this context")));
3410 
3411  /* Build a tuple descriptor for our result type */
3412  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3413  elog(ERROR, "return type must be a row type");
3414 
3415  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3416  oldcontext = MemoryContextSwitchTo(per_query_ctx);
3417 
3418  tupstore = tuplestore_begin_heap(true, false, work_mem);
3419  rsinfo->returnMode = SFRM_Materialize;
3420  rsinfo->setResult = tupstore;
3421  rsinfo->setDesc = tupdesc;
3422 
3423  MemoryContextSwitchTo(oldcontext);
3424 
3425  /*
3426  * Get the currently active synchronous standbys. This could be out of
3427  * date before we're done, but we'll use the data anyway.
3428  */
3429  num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3430 
3431  for (i = 0; i < max_wal_senders; i++)
3432  {
3433  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3435  XLogRecPtr write;
3436  XLogRecPtr flush;
3437  XLogRecPtr apply;
3438  TimeOffset writeLag;
3439  TimeOffset flushLag;
3440  TimeOffset applyLag;
3441  int priority;
3442  int pid;
3444  TimestampTz replyTime;
3445  bool is_sync_standby;
3447  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3448  int j;
3449 
3450  /* Collect data from shared memory */
3451  SpinLockAcquire(&walsnd->mutex);
3452  if (walsnd->pid == 0)
3453  {
3454  SpinLockRelease(&walsnd->mutex);
3455  continue;
3456  }
3457  pid = walsnd->pid;
3458  sentPtr = walsnd->sentPtr;
3459  state = walsnd->state;
3460  write = walsnd->write;
3461  flush = walsnd->flush;
3462  apply = walsnd->apply;
3463  writeLag = walsnd->writeLag;
3464  flushLag = walsnd->flushLag;
3465  applyLag = walsnd->applyLag;
3466  priority = walsnd->sync_standby_priority;
3467  replyTime = walsnd->replyTime;
3468  SpinLockRelease(&walsnd->mutex);
3469 
3470  /*
3471  * Detect whether walsender is/was considered synchronous. We can
3472  * provide some protection against stale data by checking the PID
3473  * along with walsnd_index.
3474  */
3475  is_sync_standby = false;
3476  for (j = 0; j < num_standbys; j++)
3477  {
3478  if (sync_standbys[j].walsnd_index == i &&
3479  sync_standbys[j].pid == pid)
3480  {
3481  is_sync_standby = true;
3482  break;
3483  }
3484  }
3485 
3486  memset(nulls, 0, sizeof(nulls));
3487  values[0] = Int32GetDatum(pid);
3488 
3489  if (!is_member_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
3490  {
3491  /*
3492  * Only superusers and members of pg_read_all_stats can see
3493  * details. Other users only get the pid value to know it's a
3494  * walsender, but no details.
3495  */
3496  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3497  }
3498  else
3499  {
3500  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3501 
3502  if (XLogRecPtrIsInvalid(sentPtr))
3503  nulls[2] = true;
3504  values[2] = LSNGetDatum(sentPtr);
3505 
3506  if (XLogRecPtrIsInvalid(write))
3507  nulls[3] = true;
3508  values[3] = LSNGetDatum(write);
3509 
3510  if (XLogRecPtrIsInvalid(flush))
3511  nulls[4] = true;
3512  values[4] = LSNGetDatum(flush);
3513 
3514  if (XLogRecPtrIsInvalid(apply))
3515  nulls[5] = true;
3516  values[5] = LSNGetDatum(apply);
3517 
3518  /*
3519  * Treat a standby such as a pg_basebackup background process
3520  * which always returns an invalid flush location, as an
3521  * asynchronous standby.
3522  */
3523  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3524 
3525  if (writeLag < 0)
3526  nulls[6] = true;
3527  else
3528  values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3529 
3530  if (flushLag < 0)
3531  nulls[7] = true;
3532  else
3533  values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3534 
3535  if (applyLag < 0)
3536  nulls[8] = true;
3537  else
3538  values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3539 
3540  values[9] = Int32GetDatum(priority);
3541 
3542  /*
3543  * More easily understood version of standby state. This is purely
3544  * informational.
3545  *
3546  * In quorum-based sync replication, the role of each standby
3547  * listed in synchronous_standby_names can be changing very
3548  * frequently. Any standbys considered as "sync" at one moment can
3549  * be switched to "potential" ones at the next moment. So, it's
3550  * basically useless to report "sync" or "potential" as their sync
3551  * states. We report just "quorum" for them.
3552  */
3553  if (priority == 0)
3554  values[10] = CStringGetTextDatum("async");
3555  else if (is_sync_standby)
3557  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3558  else
3559  values[10] = CStringGetTextDatum("potential");
3560 
3561  if (replyTime == 0)
3562  nulls[11] = true;
3563  else
3564  values[11] = TimestampTzGetDatum(replyTime);
3565  }
3566 
3567  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3568  }
3569 
3570  /* clean up and return the tuplestore */
3571  tuplestore_donestoring(tupstore);
3572 
3573  return (Datum) 0;
3574 }
3575 
3576 /*
3577  * Send a keepalive message to standby.
3578  *
3579  * If requestReply is set, the message requests the other party to send
3580  * a message back to us, for heartbeat purposes. We also set a flag to
3581  * let nearby code that we're waiting for that response, to avoid
3582  * repeated requests.
3583  */
3584 static void
3585 WalSndKeepalive(bool requestReply)
3586 {
3587  elog(DEBUG2, "sending replication keepalive");
3588 
3589  /* construct the message... */
3590  resetStringInfo(&output_message);
3591  pq_sendbyte(&output_message, 'k');
3592  pq_sendint64(&output_message, sentPtr);
3593  pq_sendint64(&output_message, GetCurrentTimestamp());
3594  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3595 
3596  /* ... and send it wrapped in CopyData */
3597  pq_putmessage_noblock('d', output_message.data, output_message.len);
3598 
3599  /* Set local flag */
3600  if (requestReply)
3602 }
3603 
3604 /*
3605  * Send keepalive message if too much time has elapsed.
3606  */
3607 static void
3609 {
3610  TimestampTz ping_time;
3611 
3612  /*
3613  * Don't send keepalive messages if timeouts are globally disabled or
3614  * we're doing something not partaking in timeouts.
3615  */
3616  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3617  return;
3618 
3620  return;
3621 
3622  /*
3623  * If half of wal_sender_timeout has lapsed without receiving any reply
3624  * from the standby, send a keep-alive message to the standby requesting
3625  * an immediate reply.
3626  */
3628  wal_sender_timeout / 2);
3629  if (last_processing >= ping_time)
3630  {
3631  WalSndKeepalive(true);
3632 
3633  /* Try to flush pending output to the client */
3634  if (pq_flush_if_writable() != 0)
3635  WalSndShutdown();
3636  }
3637 }
3638 
3639 /*
3640  * Record the end of the WAL and the time it was flushed locally, so that
3641  * LagTrackerRead can compute the elapsed time (lag) when this WAL location is
3642  * eventually reported to have been written, flushed and applied by the
3643  * standby in a reply message.
3644  */
3645 static void
3647 {
3648  bool buffer_full;
3649  int new_write_head;
3650  int i;
3651 
3652  if (!am_walsender)
3653  return;
3654 
3655  /*
3656  * If the lsn hasn't advanced since last time, then do nothing. This way
3657  * we only record a new sample when new WAL has been written.
3658  */
3659  if (lag_tracker->last_lsn == lsn)
3660  return;
3661  lag_tracker->last_lsn = lsn;
3662 
3663  /*
3664  * If advancing the write head of the circular buffer would crash into any
3665  * of the read heads, then the buffer is full. In other words, the
3666  * slowest reader (presumably apply) is the one that controls the release
3667  * of space.
3668  */
3669  new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3670  buffer_full = false;
3671  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3672  {
3673  if (new_write_head == lag_tracker->read_heads[i])
3674  buffer_full = true;
3675  }
3676 
3677  /*
3678  * If the buffer is full, for now we just rewind by one slot and overwrite
3679  * the last sample, as a simple (if somewhat uneven) way to lower the
3680  * sampling rate. There may be better adaptive compaction algorithms.
3681  */
3682  if (buffer_full)
3683  {
3684  new_write_head = lag_tracker->write_head;
3685  if (lag_tracker->write_head > 0)
3686  lag_tracker->write_head--;
3687  else
3688  lag_tracker->write_head = LAG_TRACKER_BUFFER_SIZE - 1;
3689  }
3690 
3691  /* Store a sample at the current write head position. */
3692  lag_tracker->buffer[lag_tracker->write_head].lsn = lsn;
3693  lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
3694  lag_tracker->write_head = new_write_head;
3695 }
3696 
3697 /*
3698  * Find out how much time has elapsed between the moment WAL location 'lsn'
3699  * (or the highest known earlier LSN) was flushed locally and the time 'now'.
3700  * We have a separate read head for each of the reported LSN locations we
3701  * receive in replies from standby; 'head' controls which read head is
3702  * used. Whenever a read head crosses an LSN which was written into the
3703  * lag buffer with LagTrackerWrite, we can use the associated timestamp to
3704  * find out the time this LSN (or an earlier one) was flushed locally, and
3705  * therefore compute the lag.
3706  *
3707  * Return -1 if no new sample data is available, and otherwise the elapsed
3708  * time in microseconds.
3709  */
3710 static TimeOffset
3712 {
3713  TimestampTz time = 0;
3714 
3715  /* Read all unread samples up to this LSN or end of buffer. */
3716  while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
3717  lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
3718  {
3719  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3720  lag_tracker->last_read[head] =
3721  lag_tracker->buffer[lag_tracker->read_heads[head]];
3722  lag_tracker->read_heads[head] =
3723  (lag_tracker->read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
3724  }
3725 
3726  /*
3727  * If the lag tracker is empty, that means the standby has processed
3728  * everything we've ever sent so we should now clear 'last_read'. If we
3729  * didn't do that, we'd risk using a stale and irrelevant sample for
3730  * interpolation at the beginning of the next burst of WAL after a period
3731  * of idleness.
3732  */
3733  if (lag_tracker->read_heads[head] == lag_tracker->write_head)
3734  lag_tracker->last_read[head].time = 0;
3735 
3736  if (time > now)
3737  {
3738  /* If the clock somehow went backwards, treat as not found. */
3739  return -1;
3740  }
3741  else if (time == 0)
3742  {
3743  /*
3744  * We didn't cross a time. If there is a future sample that we
3745  * haven't reached yet, and we've already reached at least one sample,
3746  * let's interpolate the local flushed time. This is mainly useful
3747  * for reporting a completely stuck apply position as having
3748  * increasing lag, since otherwise we'd have to wait for it to
3749  * eventually start moving again and cross one of our samples before
3750  * we can show the lag increasing.
3751  */
3752  if (lag_tracker->read_heads[head] == lag_tracker->write_head)
3753  {
3754  /* There are no future samples, so we can't interpolate. */
3755  return -1;
3756  }
3757  else if (lag_tracker->last_read[head].time != 0)
3758  {
3759  /* We can interpolate between last_read and the next sample. */
3760  double fraction;
3761  WalTimeSample prev = lag_tracker->last_read[head];
3762  WalTimeSample next = lag_tracker->buffer[lag_tracker->read_heads[head]];
3763 
3764  if (lsn < prev.lsn)
3765  {
3766  /*
3767  * Reported LSNs shouldn't normally go backwards, but it's
3768  * possible when there is a timeline change. Treat as not
3769  * found.
3770  */
3771  return -1;
3772  }
3773 
3774  Assert(prev.lsn < next.lsn);
3775 
3776  if (prev.time > next.time)
3777  {
3778  /* If the clock somehow went backwards, treat as not found. */
3779  return -1;
3780  }
3781 
3782  /* See how far we are between the previous and next samples. */
3783  fraction =
3784  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3785 
3786  /* Scale the local flush time proportionally. */
3787  time = (TimestampTz)
3788  ((double) prev.time + (next.time - prev.time) * fraction);
3789  }
3790  else
3791  {
3792  /*
3793  * We have only a future sample, implying that we were entirely
3794  * caught up but and now there is a new burst of WAL and the
3795  * standby hasn't processed the first sample yet. Until the
3796  * standby reaches the future sample the best we can do is report
3797  * the hypothetical lag if that sample were to be replayed now.
3798  */
3799  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3800  }
3801  }
3802 
3803  /* Return the elapsed time since local flush time in microseconds. */
3804  Assert(time != 0);
3805  return now - time;
3806 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
static void InitWalSenderSlot(void)
Definition: walsender.c:2499
void InitializeTimeouts(void)
Definition: timeout.c:461
#define pq_is_send_pending()
Definition: libpq.h:48
#define NIL
Definition: pg_list.h:65
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
XLogRecPtr write
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:213
static void ProcessStandbyMessage(void)
Definition: walsender.c:1930
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:348
#define pg_attribute_noreturn()
Definition: c.h:179
#define FeBeWaitSetSocketPos
Definition: libpq.h:63
#define WL_SOCKET_WRITEABLE
Definition: latch.h:127
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define IsA(nodeptr, _type_)
Definition: nodes.h:588
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:218
#define AllocSetContextCreate
Definition: memutils.h:173
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition: walsender.c:3254
TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history)
Definition: timeline.c:552
XLogRecPtr startpoint
Definition: replnodes.h:85
#define DEBUG1
Definition: elog.h:25
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:207
int MyProcPid
Definition: globals.c:43
int wal_sender_timeout
Definition: walsender.c:123
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:829
uint32 TimeLineID
Definition: xlogdefs.h:59
#define pq_flush()
Definition: libpq.h:46
static int32 next
Definition: blutils.c:219
static void pgstat_report_wait_end(void)
Definition: wait_event.h:274
void ProcessConfigFile(GucContext context)
int wal_segment_size
Definition: xlog.c:120
int write_head
Definition: walsender.c:214
uint32 TransactionId
Definition: c.h:587
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:45
bool wake_wal_senders
Definition: walsender.c:130
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
bool exec_replication_command(const char *cmd_string)
Definition: walsender.c:1608
Oid GetUserId(void)
Definition: miscinit.c:495
#define READ_REPLICATION_SLOT_COLS
#define SIGUSR1
Definition: win32_port.h:179
bool update_process_title
Definition: ps_status.c:36
#define write(a, b, c)
Definition: win32.h:14
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3151
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:564
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1227
#define SIGCHLD
Definition: win32_port.h:177
PGPROC * MyProc
Definition: proc.c:68
int64 TimestampTz
Definition: timestamp.h:39
#define PQ_SMALL_MESSAGE_LIMIT
Definition: libpq.h:30
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:216
ResourceOwner CurrentResourceOwner
Definition: resowner.c:146
Size WalSndShmemSize(void)
Definition: walsender.c:3182
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:307
char * pstrdup(const char *in)
Definition: mcxt.c:1299
void CommitTransactionCommand(void)
Definition: xact.c:2959
#define XACT_REPEATABLE_READ
Definition: xact.h:38
#define SpinLockInit(lock)
Definition: spin.h:60
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:380
uint8 syncrep_method
Definition: syncrep.h:69
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:975
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:76
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:392
static void XLogSendPhysical(void)
Definition: walsender.c:2655
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2286
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
TimeOffset flushLag
void ResourceOwnerDelete(ResourceOwner owner)
Definition: resowner.c:737
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Definition: nodes.h:537
static StringInfoData output_message
Definition: walsender.c:157
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
struct cursor * cur
Definition: ecpg.c:28
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:698
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4721
#define MemSet(start, val, len)
Definition: c.h:1008
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
Definition: latch.c:948
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2559
#define kill(pid, sig)
Definition: win32_port.h:464
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase)
Definition: slot.c:226
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:3036
#define WL_SOCKET_READABLE
Definition: latch.h:126
void ReplicationSlotSave(void)
Definition: slot.c:710
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8698
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
TimestampTz time
Definition: walsender.c:203
WaitEventSet * FeBeWaitSet
Definition: pqcomm.c:168
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
Definition: snapmgr.c:2241
#define SIGPIPE
Definition: win32_port.h:172
ReplicationSlotPersistentData data
Definition: slot.h:147
TimeOffset writeLag
#define SIGUSR2
Definition: win32_port.h:180
#define LOG
Definition: elog.h:26
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
bool RecoveryInProgress(void)
Definition: xlog.c:8341
void SetLatch(Latch *latch)
Definition: latch.c:567
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1711
#define OidIsValid(objectId)
Definition: c.h:710
void list_free_deep(List *list)
Definition: list.c:1405
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1271
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:9352
void ResetLatch(Latch *latch)
Definition: latch.c:660
slock_t mutex
Node * replication_parse_result
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2949
Latch procLatch
Definition: proc.h:130
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
Definition: xlogutils.c:707
XLogRecPtr confirmed_flush
Definition: slot.h:84
XLogRecPtr EndRecPtr
Definition: xlogreader.h:176
int32 day
Definition: timestamp.h:47
#define XidFromFullTransactionId(x)
Definition: transam.h:48
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:2147
static LagTracker * lag_tracker
Definition: walsender.c:219
TimeLineID timeline
Definition: replnodes.h:108
ReplicationKind kind
Definition: replnodes.h:56
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
void set_ps_display(const char *activity)
Definition: ps_status.c:349
bool IsTransactionBlock(void)
Definition: xact.c:4703
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
WALOpenSegment seg
Definition: xlogreader.h:225
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:622
bool am_walsender
Definition: walsender.c:115
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
void HandleWalSndInitStopping(void)
Definition: walsender.c:3129
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
void ReplicationSlotReserveWal(void)
Definition: slot.c:1081
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:261
XLogRecPtr flush
bool defGetBoolean(DefElem *def)
Definition: define.c:106
char data[BLCKSZ]
Definition: c.h:1141
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:589
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:817
ReplicationKind kind
Definition: replnodes.h:82
void WalSndRqstFileReload(void)
Definition: walsender.c:3106
void pfree(void *pointer)
Definition: mcxt.c:1169
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:271
#define PQ_LARGE_MESSAGE_LIMIT
Definition: libpq.h:31
void ConditionVariableCancelSleep(void)
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3585
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2344
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:657
#define ERROR
Definition: elog.h:46
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:106
static TimestampTz last_processing
Definition: walsender.c:162
void pq_startmsgread(void)
Definition: pqcomm.c:1152
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2510
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1961
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:4000
bool FirstSnapshotSet
Definition: snapmgr.c:149
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:166
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
void WalSndSignals(void)
Definition: walsender.c:3163
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
#define FATAL
Definition: elog.h:49
char * defGetString(DefElem *def)
Definition: define.c:49
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:944
Latch * latch
void(* WalSndSendDataCallback)(void)
Definition: walsender.c:225
uint32 events
Definition: latch.h:145
#define MAXPGPATH
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11942
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2266
void ReplicationSlotPersist(void)
Definition: slot.c:745
TransactionId effective_xmin
Definition: slot.h:143
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1034
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:197
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:195
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:478
TransactionId xmin
Definition: proc.h:138
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2113
#define DEBUG2
Definition: elog.h:24
bool XLogBackgroundFlush(void)
Definition: xlog.c:3077
Definition: latch.h:110
bool message_level_is_interesting(int elevel)
Definition: elog.c:270
Definition: dest.h:89
#define TLHistoryFileName(fname, tli)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
WalSndState state
NodeTag type
Definition: nodes.h:539
static char * buf
Definition: pg_test_fsync.c:68
void WalSndWaitStopping(void)
Definition: walsender.c:3297
static bool streamingDoneSending
Definition: walsender.c:179
uint64 XLogSegNo
Definition: xlogdefs.h:48
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
int errdetail(const char *fmt,...)
Definition: elog.c:1042
#define COMMERROR
Definition: elog.h:30
int errcode_for_file_access(void)
Definition: elog.c:721
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:670
#define SIGHUP
Definition: win32_port.h:167
TransactionId catalog_xmin
Definition: slot.h:70
#define pq_flush_if_writable()
Definition: libpq.h:47
TimeOffset time
Definition: timestamp.h:45
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3399
XLogRecPtr currTLIValidUntil
Definition: xlogreader.h:248
#define InvalidTransactionId
Definition: transam.h:31
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:243
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:283
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1364
unsigned int uint32
Definition: c.h:441
void ReplicationSlotRelease(void)
Definition: slot.c:469
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition: xlogreader.c:78
TransactionId xmin
Definition: slot.h:62
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
void InitWalSender(void)
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
#define SlotIsLogical(slot)
Definition: slot.h:169
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1697
static XLogReaderState * xlogreader
Definition: walsender.c:137
void SyncRepInitConfig(void)
Definition: syncrep.c:411
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:221
static bool WalSndCaughtUp
Definition: walsender.c:183
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
MemoryContext TopMemoryContext
Definition: mcxt.c:48
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3646
XLogRecPtr lsn
Definition: walsender.c:202
int replication_yyparse(void)
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:261
Definition: guc.h:72
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:258
XLogRecPtr last_lsn
Definition: walsender.c:212
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:47
int32 month
Definition: timestamp.h:48
#define XL_ROUTINE(...)
Definition: xlogreader.h:116
int max_wal_senders
Definition: walsender.c:121
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2373
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3608
int CloseTransientFile(int fd)
Definition: fd.c:2687
#define SIG_IGN
Definition: win32_port.h:164
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1337
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
const char * debug_query_string
Definition: postgres.c:89
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1214
static StringInfoData reply_message
Definition: walsender.c:158
#define MAXFNAMELEN
bool in_use
Definition: slot.h:123
TimeLineID nextTLI
Definition: xlogreader.h:254
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
int64 TimeOffset
Definition: timestamp.h:40
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1474
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2346
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
#define InvalidBackendId
Definition: backendid.h:23
#define WL_POSTMASTER_DEATH
Definition: latch.h:129
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3711
void WalSndErrorCleanup(void)
Definition: walsender.c:297
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
uintptr_t Datum
Definition: postgres.h:411
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3354
XLogRecPtr sentPtr
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
bool am_db_walsender
Definition: walsender.c:118
TransactionId effective_catalog_xmin
Definition: slot.h:144
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1237
Oid MyDatabaseId
Definition: globals.c:88
static void WalSndShutdown(void)
Definition: walsender.c:229
static TimeLineID receiveTLI
Definition: xlog.c:201
int work_mem
Definition: globals.c:124
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:580
void WalSndResourceCleanup(bool isCommit)
Definition: walsender.c:332
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
static bool two_phase
WalSnd * MyWalSnd
Definition: walsender.c:112
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:906
static XLogRecPtr sentPtr
Definition: walsender.c:154
void pq_endmsgread(void)
Definition: pqcomm.c:1176
#define InvalidOid
Definition: postgres_ext.h:36
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2577
TimeLineID ThisTimeLineID
Definition: xlog.c:195
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define ereport(elevel,...)
Definition: elog.h:157
int allowedModes
Definition: execnodes.h:306
void WalSndInitStopping(void)
Definition: walsender.c:3271
TimeLineID currTLI
Definition: xlogreader.h:238
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4869
TimeOffset applyLag
struct SnapBuild * snapshot_builder
Definition: logical.h:43
int errmsg_internal(const char *fmt,...)
Definition: elog.c:996
SetFunctionReturnMode returnMode
Definition: execnodes.h:308
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2302
#define SIG_DFL
Definition: win32_port.h:162
#define SIGNAL_ARGS
Definition: c.h:1333
static TimeLineID sendTimeLine
Definition: walsender.c:145
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
#define lfirst(lc)
Definition: pg_list.h:169
void WalSndSetState(WalSndState state)
Definition: walsender.c:3335
int sync_standby_priority
Definition: regguts.h:317
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
Definition: resowner.c:486
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition: walsender.c:463
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:633
static void XLogSendLogical(void)
Definition: walsender.c:2956
XLogRecPtr restart_lsn
Definition: slot.h:73
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:36
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:215
void StartTransactionCommand(void)
Definition: xact.c:2858
bool needreload
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1993
size_t Size
Definition: c.h:540
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
char * dbname
Definition: streamutil.c:51
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:207
int XactIsoLevel
Definition: xact.c:76
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:146
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:148
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1706
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:234
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
WalSndState
Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
Definition: walsender.c:3389
TimeLineID ws_tli
Definition: xlogreader.h:48
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase)
Definition: walsender.c:959
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1027
static bool streamingDoneReceiving
Definition: walsender.c:180
Tuplestorestate * setResult
Definition: execnodes.h:311
static StringInfoData tmpbuf
Definition: walsender.c:159
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1070
#define XLogFilePath(path, tli, logSegNo, wal_segsz_bytes)
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition: syncrep.c:725
bool IsSubTransaction(void)
Definition: xact.c:4776
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:530
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:304
bool log_replication_commands
Definition: walsender.c:125
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4999
#define Int32GetDatum(X)
Definition: postgres.h:523
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:563
char * application_name
Definition: guc.c:622
TupleDesc setDesc
Definition: execnodes.h:312
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
void * palloc(Size size)
Definition: mcxt.c:1062
int errmsg(const char *fmt,...)
Definition: elog.c:909
XLogReaderState * reader
Definition: logical.h:41
void pgstat_report_activity(BackendState state, const char *cmd_str)
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:51
CRSSnapshotAction
Definition: walsender.h:20
#define elog(elevel,...)
Definition: elog.h:232
StringInfo out
Definition: logical.h:70
static const unsigned __int64 epoch
Definition: gettimeofday.c:34
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
int i
void WalSndShmemInit(void)
Definition: walsender.c:3194
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
#define NameStr(name)
Definition: c.h:681
TimeLineID timeline
Definition: replnodes.h:84
#define CStringGetTextDatum(s)
Definition: builtins.h:86
void * arg
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:2098
struct Latch * MyLatch
Definition: globals.c:57
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
Definition: xlogreader.c:1100
void ReplicationSlotCleanup(void)
Definition: slot.c:525
WALSegmentContext segcxt
Definition: xlogreader.h:224
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
char * defname
Definition: parsenodes.h:758
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:3076
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:905
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:153
TimestampTz replyTime
void LWLockReleaseAll(void)
Definition: lwlock.c:1902
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3373
slock_t mutex
Definition: slot.h:120
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:642
CommandDest whereToSendOutput
Definition: postgres.c:92
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
static bool waiting_for_ping_response
Definition: walsender.c:171
XLogRecPtr apply
static void IdentifySystem(void)
Definition: walsender.c:377
Definition: pg_list.h:50
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:103
#define snprintf
Definition: port.h:217
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:318
int16 AttrNumber
Definition: attnum.h:21
#define UINT64_FORMAT
Definition: c.h:484
void replication_scanner_finish(void)
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:767
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:440
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1817
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544
#define MAX_SEND_SIZE
Definition: walsender.c:106
#define die(msg)
Definition: pg_test_fsync.c:97
#define read(a, b, c)
Definition: win32.h:13
static bool sendTimeLineIsHistoric
Definition: walsender.c:147
#define offsetof(type, field)
Definition: c.h:727
void WalSndWakeup(void)
Definition: walsender.c:3226
void ReplicationSlotMarkDirty(void)
Definition: slot.c:728
bool am_cascading_walsender
Definition: walsender.c:116
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:2178
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1693
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
Definition: latch.c:1306
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:683
static XLogRecPtr startptr
Definition: basebackup.c:116
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1448
void replication_scanner_init(const char *query_string)
#define TLHistoryFilePath(path, tli)
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1774
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)