PostgreSQL Source Code  git master
walsender.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * walsender.c
4  *
5  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6  * care of sending XLOG from the primary server to a single recipient.
7  * (Note that there can be more than one walsender process concurrently.)
8  * It is started by the postmaster when the walreceiver of a standby server
9  * connects to the primary server and requests XLOG streaming replication.
10  *
11  * A walsender is similar to a regular backend, ie. there is a one-to-one
12  * relationship between a connection and a walsender process, but instead
13  * of processing SQL queries, it understands a small set of special
14  * replication-mode commands. The START_REPLICATION command begins streaming
15  * WAL to the client. While streaming, the walsender keeps reading XLOG
16  * records from the disk and sends them to the standby server over the
17  * COPY protocol, until either side ends the replication by exiting COPY
18  * mode (or until the connection is closed).
19  *
20  * Normal termination is by SIGTERM, which instructs the walsender to
21  * close the connection and exit(0) at the next convenient moment. Emergency
22  * termination is by SIGQUIT; like any backend, the walsender will simply
23  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24  * are treated as not a crash but approximately normal termination;
25  * the walsender will exit quickly without sending any more XLOG records.
26  *
27  * If the server is shut down, checkpointer sends us
28  * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If
29  * the backend is idle or runs an SQL query this causes the backend to
30  * shutdown, if logical replication is in progress all existing WAL records
31  * are processed followed by a shutdown. Otherwise this causes the walsender
32  * to switch to the "stopping" state. In this state, the walsender will reject
33  * any further replication commands. The checkpointer begins the shutdown
34  * checkpoint once all walsenders are confirmed as stopping. When the shutdown
35  * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
36  * walsender to send any outstanding WAL, including the shutdown checkpoint
37  * record, wait for it to be replicated to the standby, and then exit.
38  *
39  *
40  * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
41  *
42  * IDENTIFICATION
43  * src/backend/replication/walsender.c
44  *
45  *-------------------------------------------------------------------------
46  */
47 #include "postgres.h"
48 
49 #include <signal.h>
50 #include <unistd.h>
51 
52 #include "access/printtup.h"
53 #include "access/timeline.h"
54 #include "access/transam.h"
55 #include "access/xact.h"
56 #include "access/xlog_internal.h"
57 #include "access/xlogreader.h"
58 #include "access/xlogrecovery.h"
59 #include "access/xlogutils.h"
60 #include "backup/basebackup.h"
61 #include "catalog/pg_authid.h"
62 #include "catalog/pg_type.h"
63 #include "commands/dbcommands.h"
64 #include "commands/defrem.h"
65 #include "funcapi.h"
66 #include "libpq/libpq.h"
67 #include "libpq/pqformat.h"
68 #include "miscadmin.h"
69 #include "nodes/replnodes.h"
70 #include "pgstat.h"
71 #include "postmaster/interrupt.h"
72 #include "replication/decode.h"
73 #include "replication/logical.h"
74 #include "replication/slot.h"
75 #include "replication/snapbuild.h"
76 #include "replication/syncrep.h"
78 #include "replication/walsender.h"
81 #include "storage/fd.h"
82 #include "storage/ipc.h"
83 #include "storage/pmsignal.h"
84 #include "storage/proc.h"
85 #include "storage/procarray.h"
86 #include "tcop/dest.h"
87 #include "tcop/tcopprot.h"
88 #include "utils/acl.h"
89 #include "utils/builtins.h"
90 #include "utils/guc.h"
91 #include "utils/memutils.h"
92 #include "utils/pg_lsn.h"
93 #include "utils/portal.h"
94 #include "utils/ps_status.h"
95 #include "utils/timeout.h"
96 #include "utils/timestamp.h"
97 
98 /*
99  * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
100  *
101  * We don't have a good idea of what a good value would be; there's some
102  * overhead per message in both walsender and walreceiver, but on the other
103  * hand sending large batches makes walsender less responsive to signals
104  * because signals are checked only between messages. 128kB (with
105  * default 8k blocks) seems like a reasonable guess for now.
106  */
107 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
108 
109 /* Array of WalSnds in shared memory */
111 
112 /* My slot in the shared memory array */
113 WalSnd *MyWalSnd = NULL;
114 
115 /* Global state */
116 bool am_walsender = false; /* Am I a walsender process? */
117 bool am_cascading_walsender = false; /* Am I cascading WAL to another
118  * standby? */
119 bool am_db_walsender = false; /* Connected to a database? */
120 
121 /* GUC variables */
122 int max_wal_senders = 10; /* the maximum number of concurrent
123  * walsenders */
124 int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
125  * data message */
127 
128 /*
129  * State for WalSndWakeupRequest
130  */
131 bool wake_wal_senders = false;
132 
133 /*
134  * xlogreader used for replication. Note that a WAL sender doing physical
135  * replication does not need xlogreader to read WAL, but it needs one to
136  * keep a state of its work.
137  */
139 
140 /*
141  * These variables keep track of the state of the timeline we're currently
142  * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
143  * the timeline is not the latest timeline on this server, and the server's
144  * history forked off from that timeline at sendTimeLineValidUpto.
145  */
148 static bool sendTimeLineIsHistoric = false;
150 
151 /*
152  * How far have we sent WAL already? This is also advertised in
153  * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
154  */
156 
157 /* Buffers for constructing outgoing messages and processing reply messages. */
161 
162 /* Timestamp of last ProcessRepliesIfAny(). */
164 
165 /*
166  * Timestamp of last ProcessRepliesIfAny() that saw a reply from the
167  * standby. Set to 0 if wal_sender_timeout doesn't need to be active.
168  */
170 
171 /* Have we sent a heartbeat message asking for reply, since last reply? */
172 static bool waiting_for_ping_response = false;
173 
174 /*
175  * While streaming WAL in Copy mode, streamingDoneSending is set to true
176  * after we have sent CopyDone. We should not send any more CopyData messages
177  * after that. streamingDoneReceiving is set to true when we receive CopyDone
178  * from the other end. When both become true, it's time to exit Copy mode.
179  */
182 
183 /* Are we there yet? */
184 static bool WalSndCaughtUp = false;
185 
186 /* Flags set by signal handlers for later service in main loop */
187 static volatile sig_atomic_t got_SIGUSR2 = false;
188 static volatile sig_atomic_t got_STOPPING = false;
189 
190 /*
191  * This is set while we are streaming. When not set
192  * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
193  * the main loop is responsible for checking got_STOPPING and terminating when
194  * it's set (after streaming any remaining WAL).
195  */
196 static volatile sig_atomic_t replication_active = false;
197 
199 
200 /* A sample associating a WAL location with the time it was written. */
201 typedef struct
202 {
205 } WalTimeSample;
206 
207 /* The size of our buffer of time samples. */
208 #define LAG_TRACKER_BUFFER_SIZE 8192
209 
210 /* A mechanism for tracking replication lag. */
211 typedef struct
212 {
216  int read_heads[NUM_SYNC_REP_WAIT_MODE];
218 } LagTracker;
219 
221 
222 /* Signal handlers */
224 
225 /* Prototypes for private functions */
226 typedef void (*WalSndSendDataCallback) (void);
227 static void WalSndLoop(WalSndSendDataCallback send_data);
228 static void InitWalSenderSlot(void);
229 static void WalSndKill(int code, Datum arg);
231 static void XLogSendPhysical(void);
232 static void XLogSendLogical(void);
233 static void WalSndDone(WalSndSendDataCallback send_data);
235 static void IdentifySystem(void);
239 static void StartReplication(StartReplicationCmd *cmd);
241 static void ProcessStandbyMessage(void);
242 static void ProcessStandbyReplyMessage(void);
243 static void ProcessStandbyHSFeedbackMessage(void);
244 static void ProcessRepliesIfAny(void);
245 static void ProcessPendingWrites(void);
246 static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr);
247 static void WalSndKeepaliveIfNecessary(void);
248 static void WalSndCheckTimeOut(void);
250 static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
251 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
252 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
254  bool skipped_xact);
256 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
257 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
259 
260 static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
261  TimeLineID *tli_p);
262 
263 
264 /* Initialize walsender process before entering the main command loop */
265 void
266 InitWalSender(void)
267 {
269 
270  /* Create a per-walsender data structure in shared memory */
272 
273  /*
274  * We don't currently need any ResourceOwner in a walsender process, but
275  * if we did, we could call CreateAuxProcessResourceOwner here.
276  */
277 
278  /*
279  * Let postmaster know that we're a WAL sender. Once we've declared us as
280  * a WAL sender process, postmaster will let us outlive the bgwriter and
281  * kill us last in the shutdown sequence, so we get a chance to stream all
282  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
283  * there's no going back, and we mustn't write any WAL records after this.
284  */
287 
288  /*
289  * If the client didn't specify a database to connect to, show in PGPROC
290  * that our advertised xmin should affect vacuum horizons in all
291  * databases. This allows physical replication clients to send hot
292  * standby feedback that will delay vacuum cleanup in all databases.
293  */
294  if (MyDatabaseId == InvalidOid)
295  {
297  LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
300  LWLockRelease(ProcArrayLock);
301  }
302 
303  /* Initialize empty timestamp buffer for lag tracking. */
305 }
306 
307 /*
308  * Clean up after an error.
309  *
310  * WAL sender processes don't use transactions like regular backends do.
311  * This function does any cleanup required after an error in a WAL sender
312  * process, similar to what transaction abort does in a regular backend.
313  */
314 void
316 {
320 
321  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
323 
324  if (MyReplicationSlot != NULL)
326 
328 
329  replication_active = false;
330 
331  /*
332  * If there is a transaction in progress, it will clean up our
333  * ResourceOwner, but if a replication command set up a resource owner
334  * without a transaction, we've got to clean that up now.
335  */
337  WalSndResourceCleanup(false);
338 
339  if (got_STOPPING || got_SIGUSR2)
340  proc_exit(0);
341 
342  /* Revert back to startup state */
344 }
345 
346 /*
347  * Clean up any ResourceOwner we created.
348  */
349 void
350 WalSndResourceCleanup(bool isCommit)
351 {
352  ResourceOwner resowner;
353 
354  if (CurrentResourceOwner == NULL)
355  return;
356 
357  /*
358  * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
359  * in a local variable and clear it first.
360  */
361  resowner = CurrentResourceOwner;
362  CurrentResourceOwner = NULL;
363 
364  /* Now we can release resources and delete it. */
365  ResourceOwnerRelease(resowner,
366  RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
367  ResourceOwnerRelease(resowner,
368  RESOURCE_RELEASE_LOCKS, isCommit, true);
369  ResourceOwnerRelease(resowner,
370  RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
371  ResourceOwnerDelete(resowner);
372 }
373 
374 /*
375  * Handle a client's connection abort in an orderly manner.
376  */
377 static void
378 WalSndShutdown(void)
379 {
380  /*
381  * Reset whereToSendOutput to prevent ereport from attempting to send any
382  * more messages to the standby.
383  */
386 
387  proc_exit(0);
388  abort(); /* keep the compiler quiet */
389 }
390 
391 /*
392  * Handle the IDENTIFY_SYSTEM command.
393  */
394 static void
396 {
397  char sysid[32];
398  char xloc[MAXFNAMELEN];
399  XLogRecPtr logptr;
400  char *dbname = NULL;
402  TupOutputState *tstate;
403  TupleDesc tupdesc;
404  Datum values[4];
405  bool nulls[4] = {0};
406  TimeLineID currTLI;
407 
408  /*
409  * Reply with a result set with one row, four columns. First col is system
410  * ID, second is timeline ID, third is current xlog location and the
411  * fourth contains the database name if we are connected to one.
412  */
413 
414  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
416 
419  logptr = GetStandbyFlushRecPtr(&currTLI);
420  else
421  logptr = GetFlushRecPtr(&currTLI);
422 
423  snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
424 
425  if (MyDatabaseId != InvalidOid)
426  {
428 
429  /* syscache access needs a transaction env. */
431  /* make dbname live outside TX context */
435  /* CommitTransactionCommand switches to TopMemoryContext */
437  }
438 
440 
441  /* need a tuple descriptor representing four columns */
442  tupdesc = CreateTemplateTupleDesc(4);
443  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
444  TEXTOID, -1, 0);
445  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
446  INT8OID, -1, 0);
447  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
448  TEXTOID, -1, 0);
449  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
450  TEXTOID, -1, 0);
451 
452  /* prepare for projection of tuples */
453  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
454 
455  /* column 1: system identifier */
456  values[0] = CStringGetTextDatum(sysid);
457 
458  /* column 2: timeline */
459  values[1] = Int64GetDatum(currTLI);
460 
461  /* column 3: wal location */
462  values[2] = CStringGetTextDatum(xloc);
463 
464  /* column 4: database name, or NULL if none */
465  if (dbname)
467  else
468  nulls[3] = true;
469 
470  /* send it to dest */
471  do_tup_output(tstate, values, nulls);
472 
473  end_tup_output(tstate);
474 }
475 
476 /* Handle READ_REPLICATION_SLOT command */
477 static void
479 {
480 #define READ_REPLICATION_SLOT_COLS 3
481  ReplicationSlot *slot;
483  TupOutputState *tstate;
484  TupleDesc tupdesc;
486  bool nulls[READ_REPLICATION_SLOT_COLS];
487 
489  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
490  TEXTOID, -1, 0);
491  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
492  TEXTOID, -1, 0);
493  /* TimeLineID is unsigned, so int4 is not wide enough. */
494  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
495  INT8OID, -1, 0);
496 
497  memset(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
498 
499  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
500  slot = SearchNamedReplicationSlot(cmd->slotname, false);
501  if (slot == NULL || !slot->in_use)
502  {
503  LWLockRelease(ReplicationSlotControlLock);
504  }
505  else
506  {
507  ReplicationSlot slot_contents;
508  int i = 0;
509 
510  /* Copy slot contents while holding spinlock */
511  SpinLockAcquire(&slot->mutex);
512  slot_contents = *slot;
513  SpinLockRelease(&slot->mutex);
514  LWLockRelease(ReplicationSlotControlLock);
515 
516  if (OidIsValid(slot_contents.data.database))
517  ereport(ERROR,
518  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
519  errmsg("cannot use %s with a logical replication slot",
520  "READ_REPLICATION_SLOT"));
521 
522  /* slot type */
523  values[i] = CStringGetTextDatum("physical");
524  nulls[i] = false;
525  i++;
526 
527  /* start LSN */
528  if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
529  {
530  char xloc[64];
531 
532  snprintf(xloc, sizeof(xloc), "%X/%X",
533  LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
534  values[i] = CStringGetTextDatum(xloc);
535  nulls[i] = false;
536  }
537  i++;
538 
539  /* timeline this WAL was produced on */
540  if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
541  {
542  TimeLineID slots_position_timeline;
543  TimeLineID current_timeline;
544  List *timeline_history = NIL;
545 
546  /*
547  * While in recovery, use as timeline the currently-replaying one
548  * to get the LSN position's history.
549  */
550  if (RecoveryInProgress())
551  (void) GetXLogReplayRecPtr(&current_timeline);
552  else
553  current_timeline = GetWALInsertionTimeLine();
554 
555  timeline_history = readTimeLineHistory(current_timeline);
556  slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
557  timeline_history);
558  values[i] = Int64GetDatum((int64) slots_position_timeline);
559  nulls[i] = false;
560  }
561  i++;
562 
564  }
565 
567  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
568  do_tup_output(tstate, values, nulls);
569  end_tup_output(tstate);
570 }
571 
572 
573 /*
574  * Handle TIMELINE_HISTORY command.
575  */
576 static void
578 {
580  TupleDesc tupdesc;
582  char histfname[MAXFNAMELEN];
583  char path[MAXPGPATH];
584  int fd;
585  off_t histfilelen;
586  off_t bytesleft;
587  Size len;
588 
590 
591  /*
592  * Reply with a result set with one row, and two columns. The first col is
593  * the name of the history file, 2nd is the contents.
594  */
595  tupdesc = CreateTemplateTupleDesc(2);
596  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "filename", TEXTOID, -1, 0);
597  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "content", TEXTOID, -1, 0);
598 
599  TLHistoryFileName(histfname, cmd->timeline);
600  TLHistoryFilePath(path, cmd->timeline);
601 
602  /* Send a RowDescription message */
603  dest->rStartup(dest, CMD_SELECT, tupdesc);
604 
605  /* Send a DataRow message */
606  pq_beginmessage(&buf, 'D');
607  pq_sendint16(&buf, 2); /* # of columns */
608  len = strlen(histfname);
609  pq_sendint32(&buf, len); /* col1 len */
610  pq_sendbytes(&buf, histfname, len);
611 
612  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
613  if (fd < 0)
614  ereport(ERROR,
616  errmsg("could not open file \"%s\": %m", path)));
617 
618  /* Determine file length and send it to client */
619  histfilelen = lseek(fd, 0, SEEK_END);
620  if (histfilelen < 0)
621  ereport(ERROR,
623  errmsg("could not seek to end of file \"%s\": %m", path)));
624  if (lseek(fd, 0, SEEK_SET) != 0)
625  ereport(ERROR,
627  errmsg("could not seek to beginning of file \"%s\": %m", path)));
628 
629  pq_sendint32(&buf, histfilelen); /* col2 len */
630 
631  bytesleft = histfilelen;
632  while (bytesleft > 0)
633  {
634  PGAlignedBlock rbuf;
635  int nread;
636 
638  nread = read(fd, rbuf.data, sizeof(rbuf));
640  if (nread < 0)
641  ereport(ERROR,
643  errmsg("could not read file \"%s\": %m",
644  path)));
645  else if (nread == 0)
646  ereport(ERROR,
648  errmsg("could not read file \"%s\": read %d of %zu",
649  path, nread, (Size) bytesleft)));
650 
651  pq_sendbytes(&buf, rbuf.data, nread);
652  bytesleft -= nread;
653  }
654 
655  if (CloseTransientFile(fd) != 0)
656  ereport(ERROR,
658  errmsg("could not close file \"%s\": %m", path)));
659 
660  pq_endmessage(&buf);
661 }
662 
663 /*
664  * Handle START_REPLICATION command.
665  *
666  * At the moment, this never returns, but an ereport(ERROR) will take us back
667  * to the main loop.
668  */
669 static void
671 {
673  XLogRecPtr FlushPtr;
674  TimeLineID FlushTLI;
675 
676  /* create xlogreader for physical replication */
677  xlogreader =
679  XL_ROUTINE(.segment_open = WalSndSegmentOpen,
680  .segment_close = wal_segment_close),
681  NULL);
682 
683  if (!xlogreader)
684  ereport(ERROR,
685  (errcode(ERRCODE_OUT_OF_MEMORY),
686  errmsg("out of memory"),
687  errdetail("Failed while allocating a WAL reading processor.")));
688 
689  /*
690  * We assume here that we're logging enough information in the WAL for
691  * log-shipping, since this is checked in PostmasterMain().
692  *
693  * NOTE: wal_level can only change at shutdown, so in most cases it is
694  * difficult for there to be WAL data that we can still see that was
695  * written at wal_level='minimal'.
696  */
697 
698  if (cmd->slotname)
699  {
700  ReplicationSlotAcquire(cmd->slotname, true);
702  ereport(ERROR,
703  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
704  errmsg("cannot use a logical replication slot for physical replication")));
705 
706  /*
707  * We don't need to verify the slot's restart_lsn here; instead we
708  * rely on the caller requesting the starting point to use. If the
709  * WAL segment doesn't exist, we'll fail later.
710  */
711  }
712 
713  /*
714  * Select the timeline. If it was given explicitly by the client, use
715  * that. Otherwise use the timeline of the last replayed record.
716  */
719  FlushPtr = GetStandbyFlushRecPtr(&FlushTLI);
720  else
721  FlushPtr = GetFlushRecPtr(&FlushTLI);
722 
723  if (cmd->timeline != 0)
724  {
725  XLogRecPtr switchpoint;
726 
727  sendTimeLine = cmd->timeline;
728  if (sendTimeLine == FlushTLI)
729  {
730  sendTimeLineIsHistoric = false;
732  }
733  else
734  {
735  List *timeLineHistory;
736 
737  sendTimeLineIsHistoric = true;
738 
739  /*
740  * Check that the timeline the client requested exists, and the
741  * requested start location is on that timeline.
742  */
743  timeLineHistory = readTimeLineHistory(FlushTLI);
744  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
746  list_free_deep(timeLineHistory);
747 
748  /*
749  * Found the requested timeline in the history. Check that
750  * requested startpoint is on that timeline in our history.
751  *
752  * This is quite loose on purpose. We only check that we didn't
753  * fork off the requested timeline before the switchpoint. We
754  * don't check that we switched *to* it before the requested
755  * starting point. This is because the client can legitimately
756  * request to start replication from the beginning of the WAL
757  * segment that contains switchpoint, but on the new timeline, so
758  * that it doesn't end up with a partial segment. If you ask for
759  * too old a starting point, you'll get an error later when we
760  * fail to find the requested WAL segment in pg_wal.
761  *
762  * XXX: we could be more strict here and only allow a startpoint
763  * that's older than the switchpoint, if it's still in the same
764  * WAL segment.
765  */
766  if (!XLogRecPtrIsInvalid(switchpoint) &&
767  switchpoint < cmd->startpoint)
768  {
769  ereport(ERROR,
770  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
772  cmd->timeline),
773  errdetail("This server's history forked from timeline %u at %X/%X.",
774  cmd->timeline,
775  LSN_FORMAT_ARGS(switchpoint))));
776  }
777  sendTimeLineValidUpto = switchpoint;
778  }
779  }
780  else
781  {
782  sendTimeLine = FlushTLI;
784  sendTimeLineIsHistoric = false;
785  }
786 
788 
789  /* If there is nothing to stream, don't even enter COPY mode */
791  {
792  /*
793  * When we first start replication the standby will be behind the
794  * primary. For some applications, for example synchronous
795  * replication, it is important to have a clear state for this initial
796  * catchup mode, so we can trigger actions when we change streaming
797  * state later. We may stay in this state for a long time, which is
798  * exactly why we want to be able to monitor whether or not we are
799  * still here.
800  */
802 
803  /* Send a CopyBothResponse message, and start streaming */
804  pq_beginmessage(&buf, 'W');
805  pq_sendbyte(&buf, 0);
806  pq_sendint16(&buf, 0);
807  pq_endmessage(&buf);
808  pq_flush();
809 
810  /*
811  * Don't allow a request to stream from a future point in WAL that
812  * hasn't been flushed to disk in this server yet.
813  */
814  if (FlushPtr < cmd->startpoint)
815  {
816  ereport(ERROR,
817  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
819  LSN_FORMAT_ARGS(FlushPtr))));
820  }
821 
822  /* Start streaming from the requested point */
823  sentPtr = cmd->startpoint;
824 
825  /* Initialize shared memory status, too */
829 
831 
832  /* Main loop of walsender */
833  replication_active = true;
834 
836 
837  replication_active = false;
838  if (got_STOPPING)
839  proc_exit(0);
841 
843  }
844 
845  if (cmd->slotname)
847 
848  /*
849  * Copy is finished now. Send a single-row result set indicating the next
850  * timeline.
851  */
853  {
854  char startpos_str[8 + 1 + 8 + 1];
856  TupOutputState *tstate;
857  TupleDesc tupdesc;
858  Datum values[2];
859  bool nulls[2] = {0};
860 
861  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
863 
865 
866  /*
867  * Need a tuple descriptor representing two columns. int8 may seem
868  * like a surprising data type for this, but in theory int4 would not
869  * be wide enough for this, as TimeLineID is unsigned.
870  */
871  tupdesc = CreateTemplateTupleDesc(2);
872  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
873  INT8OID, -1, 0);
874  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
875  TEXTOID, -1, 0);
876 
877  /* prepare for projection of tuple */
878  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
879 
881  values[1] = CStringGetTextDatum(startpos_str);
882 
883  /* send it to dest */
884  do_tup_output(tstate, values, nulls);
885 
886  end_tup_output(tstate);
887  }
888 
889  /* Send CommandComplete message */
890  EndReplicationCommand("START_STREAMING");
891 }
892 
893 /*
894  * XLogReaderRoutine->page_read callback for logical decoding contexts, as a
895  * walsender process.
896  *
897  * Inside the walsender we can do better than read_local_xlog_page,
898  * which has to do a plain sleep/busy loop, because the walsender's latch gets
899  * set every time WAL is flushed.
900  */
901 static int
903  XLogRecPtr targetRecPtr, char *cur_page)
904 {
905  XLogRecPtr flushptr;
906  int count;
907  WALReadError errinfo;
908  XLogSegNo segno;
910 
911  /*
912  * Since logical decoding is only permitted on a primary server, we know
913  * that the current timeline ID can't be changing any more. If we did this
914  * on a standby, we'd have to worry about the values we compute here
915  * becoming invalid due to a promotion or timeline change.
916  */
917  XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
918  sendTimeLineIsHistoric = (state->currTLI != currTLI);
919  sendTimeLine = state->currTLI;
920  sendTimeLineValidUpto = state->currTLIValidUntil;
921  sendTimeLineNextTLI = state->nextTLI;
922 
923  /* make sure we have enough WAL available */
924  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
925 
926  /* fail if not (implies we are going to shut down) */
927  if (flushptr < targetPagePtr + reqLen)
928  return -1;
929 
930  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
931  count = XLOG_BLCKSZ; /* more than one block available */
932  else
933  count = flushptr - targetPagePtr; /* part of the page available */
934 
935  /* now actually read the data, we know it's there */
936  if (!WALRead(state,
937  cur_page,
938  targetPagePtr,
939  XLOG_BLCKSZ,
940  state->seg.ws_tli, /* Pass the current TLI because only
941  * WalSndSegmentOpen controls whether new
942  * TLI is needed. */
943  &errinfo))
944  WALReadRaiseError(&errinfo);
945 
946  /*
947  * After reading into the buffer, check that what we read was valid. We do
948  * this after reading, because even though the segment was present when we
949  * opened it, it might get recycled or removed while we read it. The
950  * read() succeeds in that case, but the data we tried to read might
951  * already have been overwritten with new WAL records.
952  */
953  XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
954  CheckXLogRemoved(segno, state->seg.ws_tli);
955 
956  return count;
957 }
958 
959 /*
960  * Process extra options given to CREATE_REPLICATION_SLOT.
961  */
962 static void
964  bool *reserve_wal,
965  CRSSnapshotAction *snapshot_action,
966  bool *two_phase)
967 {
968  ListCell *lc;
969  bool snapshot_action_given = false;
970  bool reserve_wal_given = false;
971  bool two_phase_given = false;
972 
973  /* Parse options */
974  foreach(lc, cmd->options)
975  {
976  DefElem *defel = (DefElem *) lfirst(lc);
977 
978  if (strcmp(defel->defname, "snapshot") == 0)
979  {
980  char *action;
981 
982  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
983  ereport(ERROR,
984  (errcode(ERRCODE_SYNTAX_ERROR),
985  errmsg("conflicting or redundant options")));
986 
987  action = defGetString(defel);
988  snapshot_action_given = true;
989 
990  if (strcmp(action, "export") == 0)
991  *snapshot_action = CRS_EXPORT_SNAPSHOT;
992  else if (strcmp(action, "nothing") == 0)
993  *snapshot_action = CRS_NOEXPORT_SNAPSHOT;
994  else if (strcmp(action, "use") == 0)
995  *snapshot_action = CRS_USE_SNAPSHOT;
996  else
997  ereport(ERROR,
998  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
999  errmsg("unrecognized value for CREATE_REPLICATION_SLOT option \"%s\": \"%s\"",
1000  defel->defname, action)));
1001  }
1002  else if (strcmp(defel->defname, "reserve_wal") == 0)
1003  {
1004  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
1005  ereport(ERROR,
1006  (errcode(ERRCODE_SYNTAX_ERROR),
1007  errmsg("conflicting or redundant options")));
1008 
1009  reserve_wal_given = true;
1010  *reserve_wal = defGetBoolean(defel);
1011  }
1012  else if (strcmp(defel->defname, "two_phase") == 0)
1013  {
1014  if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1015  ereport(ERROR,
1016  (errcode(ERRCODE_SYNTAX_ERROR),
1017  errmsg("conflicting or redundant options")));
1018  two_phase_given = true;
1019  *two_phase = defGetBoolean(defel);
1020  }
1021  else
1022  elog(ERROR, "unrecognized option: %s", defel->defname);
1023  }
1024 }
1025 
1026 /*
1027  * Create a new replication slot.
1028  */
1029 static void
1031 {
1032  const char *snapshot_name = NULL;
1033  char xloc[MAXFNAMELEN];
1034  char *slot_name;
1035  bool reserve_wal = false;
1036  bool two_phase = false;
1037  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
1038  DestReceiver *dest;
1039  TupOutputState *tstate;
1040  TupleDesc tupdesc;
1041  Datum values[4];
1042  bool nulls[4] = {0};
1043 
1045 
1046  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase);
1047 
1048  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1049  {
1050  ReplicationSlotCreate(cmd->slotname, false,
1052  false);
1053  }
1054  else
1055  {
1057 
1058  /*
1059  * Initially create persistent slot as ephemeral - that allows us to
1060  * nicely handle errors during initialization because it'll get
1061  * dropped if this transaction fails. We'll make it persistent at the
1062  * end. Temporary slots can be created as temporary from beginning as
1063  * they get dropped on error as well.
1064  */
1065  ReplicationSlotCreate(cmd->slotname, true,
1067  two_phase);
1068  }
1069 
1070  if (cmd->kind == REPLICATION_KIND_LOGICAL)
1071  {
1073  bool need_full_snapshot = false;
1074 
1075  /*
1076  * Do options check early so that we can bail before calling the
1077  * DecodingContextFindStartpoint which can take long time.
1078  */
1079  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1080  {
1081  if (IsTransactionBlock())
1082  ereport(ERROR,
1083  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1084  (errmsg("%s must not be called inside a transaction",
1085  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
1086 
1087  need_full_snapshot = true;
1088  }
1089  else if (snapshot_action == CRS_USE_SNAPSHOT)
1090  {
1091  if (!IsTransactionBlock())
1092  ereport(ERROR,
1093  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1094  (errmsg("%s must be called inside a transaction",
1095  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1096 
1098  ereport(ERROR,
1099  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1100  (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
1101  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1102  if (!XactReadOnly)
1103  ereport(ERROR,
1104  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1105  (errmsg("%s must be called in a read only transaction",
1106  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1107 
1108  if (FirstSnapshotSet)
1109  ereport(ERROR,
1110  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1111  (errmsg("%s must be called before any query",
1112  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1113 
1114  if (IsSubTransaction())
1115  ereport(ERROR,
1116  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1117  (errmsg("%s must not be called in a subtransaction",
1118  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1119 
1120  need_full_snapshot = true;
1121  }
1122 
1123  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
1125  XL_ROUTINE(.page_read = logical_read_xlog_page,
1126  .segment_open = WalSndSegmentOpen,
1127  .segment_close = wal_segment_close),
1130 
1131  /*
1132  * Signal that we don't need the timeout mechanism. We're just
1133  * creating the replication slot and don't yet accept feedback
1134  * messages or send keepalives. As we possibly need to wait for
1135  * further WAL the walsender would otherwise possibly be killed too
1136  * soon.
1137  */
1139 
1140  /* build initial snapshot, might take a while */
1142 
1143  /*
1144  * Export or use the snapshot if we've been asked to do so.
1145  *
1146  * NB. We will convert the snapbuild.c kind of snapshot to normal
1147  * snapshot when doing this.
1148  */
1149  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1150  {
1151  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
1152  }
1153  else if (snapshot_action == CRS_USE_SNAPSHOT)
1154  {
1155  Snapshot snap;
1156 
1159  }
1160 
1161  /* don't need the decoding context anymore */
1162  FreeDecodingContext(ctx);
1163 
1164  if (!cmd->temporary)
1166  }
1167  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
1168  {
1170 
1172 
1173  /* Write this slot to disk if it's a permanent one. */
1174  if (!cmd->temporary)
1176  }
1177 
1178  snprintf(xloc, sizeof(xloc), "%X/%X",
1180 
1182 
1183  /*----------
1184  * Need a tuple descriptor representing four columns:
1185  * - first field: the slot name
1186  * - second field: LSN at which we became consistent
1187  * - third field: exported snapshot's name
1188  * - fourth field: output plugin
1189  *----------
1190  */
1191  tupdesc = CreateTemplateTupleDesc(4);
1192  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1193  TEXTOID, -1, 0);
1194  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1195  TEXTOID, -1, 0);
1196  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1197  TEXTOID, -1, 0);
1198  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1199  TEXTOID, -1, 0);
1200 
1201  /* prepare for projection of tuples */
1202  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1203 
1204  /* slot_name */
1205  slot_name = NameStr(MyReplicationSlot->data.name);
1206  values[0] = CStringGetTextDatum(slot_name);
1207 
1208  /* consistent wal location */
1209  values[1] = CStringGetTextDatum(xloc);
1210 
1211  /* snapshot name, or NULL if none */
1212  if (snapshot_name != NULL)
1213  values[2] = CStringGetTextDatum(snapshot_name);
1214  else
1215  nulls[2] = true;
1216 
1217  /* plugin, or NULL if none */
1218  if (cmd->plugin != NULL)
1219  values[3] = CStringGetTextDatum(cmd->plugin);
1220  else
1221  nulls[3] = true;
1222 
1223  /* send it to dest */
1224  do_tup_output(tstate, values, nulls);
1225  end_tup_output(tstate);
1226 
1228 }
1229 
1230 /*
1231  * Get rid of a replication slot that is no longer wanted.
1232  */
1233 static void
1235 {
1236  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1237 }
1238 
1239 /*
1240  * Load previously initiated logical slot and prepare for sending data (via
1241  * WalSndLoop).
1242  */
1243 static void
1245 {
1247  QueryCompletion qc;
1248 
1249  /* make sure that our requirements are still fulfilled */
1251 
1253 
1254  ReplicationSlotAcquire(cmd->slotname, true);
1255 
1257  ereport(ERROR,
1258  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1259  errmsg("cannot read from logical replication slot \"%s\"",
1260  cmd->slotname),
1261  errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
1262 
1263  /*
1264  * Force a disconnect, so that the decoding code doesn't need to care
1265  * about an eventual switch from running in recovery, to running in a
1266  * normal environment. Client code is expected to handle reconnects.
1267  */
1269  {
1270  ereport(LOG,
1271  (errmsg("terminating walsender process after promotion")));
1272  got_STOPPING = true;
1273  }
1274 
1275  /*
1276  * Create our decoding context, making it start at the previously ack'ed
1277  * position.
1278  *
1279  * Do this before sending a CopyBothResponse message, so that any errors
1280  * are reported early.
1281  */
1283  CreateDecodingContext(cmd->startpoint, cmd->options, false,
1284  XL_ROUTINE(.page_read = logical_read_xlog_page,
1285  .segment_open = WalSndSegmentOpen,
1286  .segment_close = wal_segment_close),
1290 
1292 
1293  /* Send a CopyBothResponse message, and start streaming */
1294  pq_beginmessage(&buf, 'W');
1295  pq_sendbyte(&buf, 0);
1296  pq_sendint16(&buf, 0);
1297  pq_endmessage(&buf);
1298  pq_flush();
1299 
1300  /* Start reading WAL from the oldest required WAL. */
1303 
1304  /*
1305  * Report the location after which we'll send out further commits as the
1306  * current sentPtr.
1307  */
1309 
1310  /* Also update the sent position status in shared memory */
1314 
1315  replication_active = true;
1316 
1318 
1319  /* Main loop of walsender */
1321 
1324 
1325  replication_active = false;
1326  if (got_STOPPING)
1327  proc_exit(0);
1329 
1330  /* Get out of COPY mode (CommandComplete). */
1331  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
1332  EndCommand(&qc, DestRemote, false);
1333 }
1334 
1335 /*
1336  * LogicalDecodingContext 'prepare_write' callback.
1337  *
1338  * Prepare a write into a StringInfo.
1339  *
1340  * Don't do anything lasting in here, it's quite possible that nothing will be done
1341  * with the data.
1342  */
1343 static void
1345 {
1346  /* can't have sync rep confused by sending the same LSN several times */
1347  if (!last_write)
1348  lsn = InvalidXLogRecPtr;
1349 
1350  resetStringInfo(ctx->out);
1351 
1352  pq_sendbyte(ctx->out, 'w');
1353  pq_sendint64(ctx->out, lsn); /* dataStart */
1354  pq_sendint64(ctx->out, lsn); /* walEnd */
1355 
1356  /*
1357  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1358  * reserve space here.
1359  */
1360  pq_sendint64(ctx->out, 0); /* sendtime */
1361 }
1362 
1363 /*
1364  * LogicalDecodingContext 'write' callback.
1365  *
1366  * Actually write out data previously prepared by WalSndPrepareWrite out to
1367  * the network. Take as long as needed, but process replies from the other
1368  * side and check timeouts during that.
1369  */
1370 static void
1372  bool last_write)
1373 {
1374  TimestampTz now;
1375 
1376  /*
1377  * Fill the send timestamp last, so that it is taken as late as possible.
1378  * This is somewhat ugly, but the protocol is set as it's already used for
1379  * several releases by streaming physical replication.
1380  */
1383  pq_sendint64(&tmpbuf, now);
1384  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1385  tmpbuf.data, sizeof(int64));
1386 
1387  /* output previously gathered data in a CopyData packet */
1388  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1389 
1391 
1392  /* Try to flush pending output to the client */
1393  if (pq_flush_if_writable() != 0)
1394  WalSndShutdown();
1395 
1396  /* Try taking fast path unless we get too close to walsender timeout. */
1398  wal_sender_timeout / 2) &&
1399  !pq_is_send_pending())
1400  {
1401  return;
1402  }
1403 
1404  /* If we have pending write here, go to slow path */
1406 }
1407 
1408 /*
1409  * Wait until there is no pending write. Also process replies from the other
1410  * side and check timeouts during that.
1411  */
1412 static void
1414 {
1415  for (;;)
1416  {
1417  long sleeptime;
1418 
1419  /* Check for input from the client */
1421 
1422  /* die if timeout was reached */
1424 
1425  /* Send keepalive if the time has come */
1427 
1428  if (!pq_is_send_pending())
1429  break;
1430 
1432 
1433  /* Sleep until something happens or we time out */
1436 
1437  /* Clear any already-pending wakeups */
1439 
1441 
1442  /* Process any requests or signals received recently */
1443  if (ConfigReloadPending)
1444  {
1445  ConfigReloadPending = false;
1448  }
1449 
1450  /* Try to flush pending output to the client */
1451  if (pq_flush_if_writable() != 0)
1452  WalSndShutdown();
1453  }
1454 
1455  /* reactivate latch so WalSndLoop knows to continue */
1456  SetLatch(MyLatch);
1457 }
1458 
1459 /*
1460  * LogicalDecodingContext 'update_progress' callback.
1461  *
1462  * Write the current position to the lag tracker (see XLogSendPhysical).
1463  *
1464  * When skipping empty transactions, send a keepalive message if necessary.
1465  */
1466 static void
1468  bool skipped_xact)
1469 {
1470  static TimestampTz sendTime = 0;
1472  bool pending_writes = false;
1473  bool end_xact = ctx->end_xact;
1474 
1475  /*
1476  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1477  * avoid flooding the lag tracker when we commit frequently.
1478  *
1479  * We don't have a mechanism to get the ack for any LSN other than end
1480  * xact LSN from the downstream. So, we track lag only for end of
1481  * transaction LSN.
1482  */
1483 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1484  if (end_xact && TimestampDifferenceExceeds(sendTime, now,
1486  {
1487  LagTrackerWrite(lsn, now);
1488  sendTime = now;
1489  }
1490 
1491  /*
1492  * When skipping empty transactions in synchronous replication, we send a
1493  * keepalive message to avoid delaying such transactions.
1494  *
1495  * It is okay to check sync_standbys_defined flag without lock here as in
1496  * the worst case we will just send an extra keepalive message when it is
1497  * really not required.
1498  */
1499  if (skipped_xact &&
1500  SyncRepRequested() &&
1501  ((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
1502  {
1503  WalSndKeepalive(false, lsn);
1504 
1505  /* Try to flush pending output to the client */
1506  if (pq_flush_if_writable() != 0)
1507  WalSndShutdown();
1508 
1509  /* If we have pending write here, make sure it's actually flushed */
1510  if (pq_is_send_pending())
1511  pending_writes = true;
1512  }
1513 
1514  /*
1515  * Process pending writes if any or try to send a keepalive if required.
1516  * We don't need to try sending keep alive messages at the transaction end
1517  * as that will be done at a later point in time. This is required only
1518  * for large transactions where we don't send any changes to the
1519  * downstream and the receiver can timeout due to that.
1520  */
1521  if (pending_writes || (!end_xact &&
1523  wal_sender_timeout / 2)))
1525 }
1526 
1527 /*
1528  * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
1529  *
1530  * Returns end LSN of flushed WAL. Normally this will be >= loc, but
1531  * if we detect a shutdown request (either from postmaster or client)
1532  * we will return early, so caller must always check.
1533  */
1534 static XLogRecPtr
1536 {
1537  int wakeEvents;
1538  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1539 
1540  /*
1541  * Fast path to avoid acquiring the spinlock in case we already know we
1542  * have enough WAL available. This is particularly interesting if we're
1543  * far behind.
1544  */
1545  if (RecentFlushPtr != InvalidXLogRecPtr &&
1546  loc <= RecentFlushPtr)
1547  return RecentFlushPtr;
1548 
1549  /* Get a more recent flush pointer. */
1550  if (!RecoveryInProgress())
1551  RecentFlushPtr = GetFlushRecPtr(NULL);
1552  else
1553  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1554 
1555  for (;;)
1556  {
1557  long sleeptime;
1558 
1559  /* Clear any already-pending wakeups */
1561 
1563 
1564  /* Process any requests or signals received recently */
1565  if (ConfigReloadPending)
1566  {
1567  ConfigReloadPending = false;
1570  }
1571 
1572  /* Check for input from the client */
1574 
1575  /*
1576  * If we're shutting down, trigger pending WAL to be written out,
1577  * otherwise we'd possibly end up waiting for WAL that never gets
1578  * written, because walwriter has shut down already.
1579  */
1580  if (got_STOPPING)
1582 
1583  /* Update our idea of the currently flushed position. */
1584  if (!RecoveryInProgress())
1585  RecentFlushPtr = GetFlushRecPtr(NULL);
1586  else
1587  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1588 
1589  /*
1590  * If postmaster asked us to stop, don't wait anymore.
1591  *
1592  * It's important to do this check after the recomputation of
1593  * RecentFlushPtr, so we can send all remaining data before shutting
1594  * down.
1595  */
1596  if (got_STOPPING)
1597  break;
1598 
1599  /*
1600  * We only send regular messages to the client for full decoded
1601  * transactions, but a synchronous replication and walsender shutdown
1602  * possibly are waiting for a later location. So, before sleeping, we
1603  * send a ping containing the flush location. If the receiver is
1604  * otherwise idle, this keepalive will trigger a reply. Processing the
1605  * reply will update these MyWalSnd locations.
1606  */
1607  if (MyWalSnd->flush < sentPtr &&
1608  MyWalSnd->write < sentPtr &&
1611 
1612  /* check whether we're done */
1613  if (loc <= RecentFlushPtr)
1614  break;
1615 
1616  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1617  WalSndCaughtUp = true;
1618 
1619  /*
1620  * Try to flush any pending output to the client.
1621  */
1622  if (pq_flush_if_writable() != 0)
1623  WalSndShutdown();
1624 
1625  /*
1626  * If we have received CopyDone from the client, sent CopyDone
1627  * ourselves, and the output buffer is empty, it's time to exit
1628  * streaming, so fail the current WAL fetch request.
1629  */
1631  !pq_is_send_pending())
1632  break;
1633 
1634  /* die if timeout was reached */
1636 
1637  /* Send keepalive if the time has come */
1639 
1640  /*
1641  * Sleep until something happens or we time out. Also wait for the
1642  * socket becoming writable, if there's still pending output.
1643  * Otherwise we might sit on sendable output data while waiting for
1644  * new WAL to be generated. (But if we have nothing to send, we don't
1645  * want to wake on socket-writable.)
1646  */
1648 
1649  wakeEvents = WL_SOCKET_READABLE;
1650 
1651  if (pq_is_send_pending())
1652  wakeEvents |= WL_SOCKET_WRITEABLE;
1653 
1654  WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL);
1655  }
1656 
1657  /* reactivate latch so WalSndLoop knows to continue */
1658  SetLatch(MyLatch);
1659  return RecentFlushPtr;
1660 }
1661 
1662 /*
1663  * Execute an incoming replication command.
1664  *
1665  * Returns true if the cmd_string was recognized as WalSender command, false
1666  * if not.
1667  */
1668 bool
1669 exec_replication_command(const char *cmd_string)
1670 {
1671  int parse_rc;
1672  Node *cmd_node;
1673  const char *cmdtag;
1674  MemoryContext cmd_context;
1675  MemoryContext old_context;
1676 
1677  /*
1678  * If WAL sender has been told that shutdown is getting close, switch its
1679  * status accordingly to handle the next replication commands correctly.
1680  */
1681  if (got_STOPPING)
1683 
1684  /*
1685  * Throw error if in stopping mode. We need prevent commands that could
1686  * generate WAL while the shutdown checkpoint is being written. To be
1687  * safe, we just prohibit all new commands.
1688  */
1690  ereport(ERROR,
1691  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1692  errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1693 
1694  /*
1695  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1696  * command arrives. Clean up the old stuff if there's anything.
1697  */
1699 
1701 
1702  /*
1703  * Prepare to parse and execute the command.
1704  */
1706  "Replication command context",
1708  old_context = MemoryContextSwitchTo(cmd_context);
1709 
1710  replication_scanner_init(cmd_string);
1711 
1712  /*
1713  * Is it a WalSender command?
1714  */
1716  {
1717  /* Nope; clean up and get out. */
1719 
1720  MemoryContextSwitchTo(old_context);
1721  MemoryContextDelete(cmd_context);
1722 
1723  /* XXX this is a pretty random place to make this check */
1724  if (MyDatabaseId == InvalidOid)
1725  ereport(ERROR,
1726  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1727  errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1728 
1729  /* Tell the caller that this wasn't a WalSender command. */
1730  return false;
1731  }
1732 
1733  /*
1734  * Looks like a WalSender command, so parse it.
1735  */
1736  parse_rc = replication_yyparse();
1737  if (parse_rc != 0)
1738  ereport(ERROR,
1739  (errcode(ERRCODE_SYNTAX_ERROR),
1740  errmsg_internal("replication command parser returned %d",
1741  parse_rc)));
1743 
1744  cmd_node = replication_parse_result;
1745 
1746  /*
1747  * Report query to various monitoring facilities. For this purpose, we
1748  * report replication commands just like SQL commands.
1749  */
1750  debug_query_string = cmd_string;
1751 
1753 
1754  /*
1755  * Log replication command if log_replication_commands is enabled. Even
1756  * when it's disabled, log the command with DEBUG1 level for backward
1757  * compatibility.
1758  */
1760  (errmsg("received replication command: %s", cmd_string)));
1761 
1762  /*
1763  * Disallow replication commands in aborted transaction blocks.
1764  */
1766  ereport(ERROR,
1767  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1768  errmsg("current transaction is aborted, "
1769  "commands ignored until end of transaction block")));
1770 
1772 
1773  /*
1774  * Allocate buffers that will be used for each outgoing and incoming
1775  * message. We do this just once per command to reduce palloc overhead.
1776  */
1780 
1781  switch (cmd_node->type)
1782  {
1783  case T_IdentifySystemCmd:
1784  cmdtag = "IDENTIFY_SYSTEM";
1785  set_ps_display(cmdtag);
1786  IdentifySystem();
1787  EndReplicationCommand(cmdtag);
1788  break;
1789 
1790  case T_ReadReplicationSlotCmd:
1791  cmdtag = "READ_REPLICATION_SLOT";
1792  set_ps_display(cmdtag);
1794  EndReplicationCommand(cmdtag);
1795  break;
1796 
1797  case T_BaseBackupCmd:
1798  cmdtag = "BASE_BACKUP";
1799  set_ps_display(cmdtag);
1800  PreventInTransactionBlock(true, cmdtag);
1801  SendBaseBackup((BaseBackupCmd *) cmd_node);
1802  EndReplicationCommand(cmdtag);
1803  break;
1804 
1805  case T_CreateReplicationSlotCmd:
1806  cmdtag = "CREATE_REPLICATION_SLOT";
1807  set_ps_display(cmdtag);
1809  EndReplicationCommand(cmdtag);
1810  break;
1811 
1812  case T_DropReplicationSlotCmd:
1813  cmdtag = "DROP_REPLICATION_SLOT";
1814  set_ps_display(cmdtag);
1816  EndReplicationCommand(cmdtag);
1817  break;
1818 
1819  case T_StartReplicationCmd:
1820  {
1821  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1822 
1823  cmdtag = "START_REPLICATION";
1824  set_ps_display(cmdtag);
1825  PreventInTransactionBlock(true, cmdtag);
1826 
1827  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1828  StartReplication(cmd);
1829  else
1831 
1832  /* dupe, but necessary per libpqrcv_endstreaming */
1833  EndReplicationCommand(cmdtag);
1834 
1835  Assert(xlogreader != NULL);
1836  break;
1837  }
1838 
1839  case T_TimeLineHistoryCmd:
1840  cmdtag = "TIMELINE_HISTORY";
1841  set_ps_display(cmdtag);
1842  PreventInTransactionBlock(true, cmdtag);
1844  EndReplicationCommand(cmdtag);
1845  break;
1846 
1847  case T_VariableShowStmt:
1848  {
1850  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1851 
1852  cmdtag = "SHOW";
1853  set_ps_display(cmdtag);
1854 
1855  /* syscache access needs a transaction environment */
1857  GetPGVariable(n->name, dest);
1859  EndReplicationCommand(cmdtag);
1860  }
1861  break;
1862 
1863  default:
1864  elog(ERROR, "unrecognized replication command node tag: %u",
1865  cmd_node->type);
1866  }
1867 
1868  /* done */
1869  MemoryContextSwitchTo(old_context);
1870  MemoryContextDelete(cmd_context);
1871 
1872  /*
1873  * We need not update ps display or pg_stat_activity, because PostgresMain
1874  * will reset those to "idle". But we must reset debug_query_string to
1875  * ensure it doesn't become a dangling pointer.
1876  */
1877  debug_query_string = NULL;
1878 
1879  return true;
1880 }
1881 
1882 /*
1883  * Process any incoming messages while streaming. Also checks if the remote
1884  * end has closed the connection.
1885  */
1886 static void
1888 {
1889  unsigned char firstchar;
1890  int maxmsglen;
1891  int r;
1892  bool received = false;
1893 
1895 
1896  /*
1897  * If we already received a CopyDone from the frontend, any subsequent
1898  * message is the beginning of a new command, and should be processed in
1899  * the main processing loop.
1900  */
1901  while (!streamingDoneReceiving)
1902  {
1903  pq_startmsgread();
1904  r = pq_getbyte_if_available(&firstchar);
1905  if (r < 0)
1906  {
1907  /* unexpected error or EOF */
1909  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1910  errmsg("unexpected EOF on standby connection")));
1911  proc_exit(0);
1912  }
1913  if (r == 0)
1914  {
1915  /* no data available without blocking */
1916  pq_endmsgread();
1917  break;
1918  }
1919 
1920  /* Validate message type and set packet size limit */
1921  switch (firstchar)
1922  {
1923  case 'd':
1924  maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
1925  break;
1926  case 'c':
1927  case 'X':
1928  maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
1929  break;
1930  default:
1931  ereport(FATAL,
1932  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1933  errmsg("invalid standby message type \"%c\"",
1934  firstchar)));
1935  maxmsglen = 0; /* keep compiler quiet */
1936  break;
1937  }
1938 
1939  /* Read the message contents */
1941  if (pq_getmessage(&reply_message, maxmsglen))
1942  {
1944  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1945  errmsg("unexpected EOF on standby connection")));
1946  proc_exit(0);
1947  }
1948 
1949  /* ... and process it */
1950  switch (firstchar)
1951  {
1952  /*
1953  * 'd' means a standby reply wrapped in a CopyData packet.
1954  */
1955  case 'd':
1957  received = true;
1958  break;
1959 
1960  /*
1961  * CopyDone means the standby requested to finish streaming.
1962  * Reply with CopyDone, if we had not sent that already.
1963  */
1964  case 'c':
1965  if (!streamingDoneSending)
1966  {
1967  pq_putmessage_noblock('c', NULL, 0);
1968  streamingDoneSending = true;
1969  }
1970 
1971  streamingDoneReceiving = true;
1972  received = true;
1973  break;
1974 
1975  /*
1976  * 'X' means that the standby is closing down the socket.
1977  */
1978  case 'X':
1979  proc_exit(0);
1980 
1981  default:
1982  Assert(false); /* NOT REACHED */
1983  }
1984  }
1985 
1986  /*
1987  * Save the last reply timestamp if we've received at least one reply.
1988  */
1989  if (received)
1990  {
1992  waiting_for_ping_response = false;
1993  }
1994 }
1995 
1996 /*
1997  * Process a status update message received from standby.
1998  */
1999 static void
2001 {
2002  char msgtype;
2003 
2004  /*
2005  * Check message type from the first byte.
2006  */
2007  msgtype = pq_getmsgbyte(&reply_message);
2008 
2009  switch (msgtype)
2010  {
2011  case 'r':
2013  break;
2014 
2015  case 'h':
2017  break;
2018 
2019  default:
2021  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2022  errmsg("unexpected message type \"%c\"", msgtype)));
2023  proc_exit(0);
2024  }
2025 }
2026 
2027 /*
2028  * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
2029  */
2030 static void
2032 {
2033  bool changed = false;
2035 
2036  Assert(lsn != InvalidXLogRecPtr);
2037  SpinLockAcquire(&slot->mutex);
2038  if (slot->data.restart_lsn != lsn)
2039  {
2040  changed = true;
2041  slot->data.restart_lsn = lsn;
2042  }
2043  SpinLockRelease(&slot->mutex);
2044 
2045  if (changed)
2046  {
2049  }
2050 
2051  /*
2052  * One could argue that the slot should be saved to disk now, but that'd
2053  * be energy wasted - the worst thing lost information could cause here is
2054  * to give wrong information in a statistics view - we'll just potentially
2055  * be more conservative in removing files.
2056  */
2057 }
2058 
2059 /*
2060  * Regular reply from standby advising of WAL locations on standby server.
2061  */
2062 static void
2064 {
2065  XLogRecPtr writePtr,
2066  flushPtr,
2067  applyPtr;
2068  bool replyRequested;
2069  TimeOffset writeLag,
2070  flushLag,
2071  applyLag;
2072  bool clearLagTimes;
2073  TimestampTz now;
2074  TimestampTz replyTime;
2075 
2076  static bool fullyAppliedLastTime = false;
2077 
2078  /* the caller already consumed the msgtype byte */
2079  writePtr = pq_getmsgint64(&reply_message);
2080  flushPtr = pq_getmsgint64(&reply_message);
2081  applyPtr = pq_getmsgint64(&reply_message);
2082  replyTime = pq_getmsgint64(&reply_message);
2083  replyRequested = pq_getmsgbyte(&reply_message);
2084 
2086  {
2087  char *replyTimeStr;
2088 
2089  /* Copy because timestamptz_to_str returns a static buffer */
2090  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2091 
2092  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
2093  LSN_FORMAT_ARGS(writePtr),
2094  LSN_FORMAT_ARGS(flushPtr),
2095  LSN_FORMAT_ARGS(applyPtr),
2096  replyRequested ? " (reply requested)" : "",
2097  replyTimeStr);
2098 
2099  pfree(replyTimeStr);
2100  }
2101 
2102  /* See if we can compute the round-trip lag for these positions. */
2104  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
2105  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
2106  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
2107 
2108  /*
2109  * If the standby reports that it has fully replayed the WAL in two
2110  * consecutive reply messages, then the second such message must result
2111  * from wal_receiver_status_interval expiring on the standby. This is a
2112  * convenient time to forget the lag times measured when it last
2113  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
2114  * until more WAL traffic arrives.
2115  */
2116  clearLagTimes = false;
2117  if (applyPtr == sentPtr)
2118  {
2119  if (fullyAppliedLastTime)
2120  clearLagTimes = true;
2121  fullyAppliedLastTime = true;
2122  }
2123  else
2124  fullyAppliedLastTime = false;
2125 
2126  /* Send a reply if the standby requested one. */
2127  if (replyRequested)
2129 
2130  /*
2131  * Update shared state for this WalSender process based on reply data from
2132  * standby.
2133  */
2134  {
2135  WalSnd *walsnd = MyWalSnd;
2136 
2137  SpinLockAcquire(&walsnd->mutex);
2138  walsnd->write = writePtr;
2139  walsnd->flush = flushPtr;
2140  walsnd->apply = applyPtr;
2141  if (writeLag != -1 || clearLagTimes)
2142  walsnd->writeLag = writeLag;
2143  if (flushLag != -1 || clearLagTimes)
2144  walsnd->flushLag = flushLag;
2145  if (applyLag != -1 || clearLagTimes)
2146  walsnd->applyLag = applyLag;
2147  walsnd->replyTime = replyTime;
2148  SpinLockRelease(&walsnd->mutex);
2149  }
2150 
2153 
2154  /*
2155  * Advance our local xmin horizon when the client confirmed a flush.
2156  */
2157  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
2158  {
2161  else
2163  }
2164 }
2165 
2166 /* compute new replication slot xmin horizon if needed */
2167 static void
2169 {
2170  bool changed = false;
2172 
2173  SpinLockAcquire(&slot->mutex);
2175 
2176  /*
2177  * For physical replication we don't need the interlock provided by xmin
2178  * and effective_xmin since the consequences of a missed increase are
2179  * limited to query cancellations, so set both at once.
2180  */
2181  if (!TransactionIdIsNormal(slot->data.xmin) ||
2182  !TransactionIdIsNormal(feedbackXmin) ||
2183  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
2184  {
2185  changed = true;
2186  slot->data.xmin = feedbackXmin;
2187  slot->effective_xmin = feedbackXmin;
2188  }
2189  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
2190  !TransactionIdIsNormal(feedbackCatalogXmin) ||
2191  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
2192  {
2193  changed = true;
2194  slot->data.catalog_xmin = feedbackCatalogXmin;
2195  slot->effective_catalog_xmin = feedbackCatalogXmin;
2196  }
2197  SpinLockRelease(&slot->mutex);
2198 
2199  if (changed)
2200  {
2203  }
2204 }
2205 
2206 /*
2207  * Check that the provided xmin/epoch are sane, that is, not in the future
2208  * and not so far back as to be already wrapped around.
2209  *
2210  * Epoch of nextXid should be same as standby, or if the counter has
2211  * wrapped, then one greater than standby.
2212  *
2213  * This check doesn't care about whether clog exists for these xids
2214  * at all.
2215  */
2216 static bool
2218 {
2219  FullTransactionId nextFullXid;
2220  TransactionId nextXid;
2221  uint32 nextEpoch;
2222 
2223  nextFullXid = ReadNextFullTransactionId();
2224  nextXid = XidFromFullTransactionId(nextFullXid);
2225  nextEpoch = EpochFromFullTransactionId(nextFullXid);
2226 
2227  if (xid <= nextXid)
2228  {
2229  if (epoch != nextEpoch)
2230  return false;
2231  }
2232  else
2233  {
2234  if (epoch + 1 != nextEpoch)
2235  return false;
2236  }
2237 
2238  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2239  return false; /* epoch OK, but it's wrapped around */
2240 
2241  return true;
2242 }
2243 
2244 /*
2245  * Hot Standby feedback
2246  */
2247 static void
2249 {
2250  TransactionId feedbackXmin;
2251  uint32 feedbackEpoch;
2252  TransactionId feedbackCatalogXmin;
2253  uint32 feedbackCatalogEpoch;
2254  TimestampTz replyTime;
2255 
2256  /*
2257  * Decipher the reply message. The caller already consumed the msgtype
2258  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2259  * of this message.
2260  */
2261  replyTime = pq_getmsgint64(&reply_message);
2262  feedbackXmin = pq_getmsgint(&reply_message, 4);
2263  feedbackEpoch = pq_getmsgint(&reply_message, 4);
2264  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2265  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2266 
2268  {
2269  char *replyTimeStr;
2270 
2271  /* Copy because timestamptz_to_str returns a static buffer */
2272  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2273 
2274  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2275  feedbackXmin,
2276  feedbackEpoch,
2277  feedbackCatalogXmin,
2278  feedbackCatalogEpoch,
2279  replyTimeStr);
2280 
2281  pfree(replyTimeStr);
2282  }
2283 
2284  /*
2285  * Update shared state for this WalSender process based on reply data from
2286  * standby.
2287  */
2288  {
2289  WalSnd *walsnd = MyWalSnd;
2290 
2291  SpinLockAcquire(&walsnd->mutex);
2292  walsnd->replyTime = replyTime;
2293  SpinLockRelease(&walsnd->mutex);
2294  }
2295 
2296  /*
2297  * Unset WalSender's xmins if the feedback message values are invalid.
2298  * This happens when the downstream turned hot_standby_feedback off.
2299  */
2300  if (!TransactionIdIsNormal(feedbackXmin)
2301  && !TransactionIdIsNormal(feedbackCatalogXmin))
2302  {
2304  if (MyReplicationSlot != NULL)
2305  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2306  return;
2307  }
2308 
2309  /*
2310  * Check that the provided xmin/epoch are sane, that is, not in the future
2311  * and not so far back as to be already wrapped around. Ignore if not.
2312  */
2313  if (TransactionIdIsNormal(feedbackXmin) &&
2314  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2315  return;
2316 
2317  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2318  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2319  return;
2320 
2321  /*
2322  * Set the WalSender's xmin equal to the standby's requested xmin, so that
2323  * the xmin will be taken into account by GetSnapshotData() /
2324  * ComputeXidHorizons(). This will hold back the removal of dead rows and
2325  * thereby prevent the generation of cleanup conflicts on the standby
2326  * server.
2327  *
2328  * There is a small window for a race condition here: although we just
2329  * checked that feedbackXmin precedes nextXid, the nextXid could have
2330  * gotten advanced between our fetching it and applying the xmin below,
2331  * perhaps far enough to make feedbackXmin wrap around. In that case the
2332  * xmin we set here would be "in the future" and have no effect. No point
2333  * in worrying about this since it's too late to save the desired data
2334  * anyway. Assuming that the standby sends us an increasing sequence of
2335  * xmins, this could only happen during the first reply cycle, else our
2336  * own xmin would prevent nextXid from advancing so far.
2337  *
2338  * We don't bother taking the ProcArrayLock here. Setting the xmin field
2339  * is assumed atomic, and there's no real need to prevent concurrent
2340  * horizon determinations. (If we're moving our xmin forward, this is
2341  * obviously safe, and if we're moving it backwards, well, the data is at
2342  * risk already since a VACUUM could already have determined the horizon.)
2343  *
2344  * If we're using a replication slot we reserve the xmin via that,
2345  * otherwise via the walsender's PGPROC entry. We can only track the
2346  * catalog xmin separately when using a slot, so we store the least of the
2347  * two provided when not using a slot.
2348  *
2349  * XXX: It might make sense to generalize the ephemeral slot concept and
2350  * always use the slot mechanism to handle the feedback xmin.
2351  */
2352  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2353  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2354  else
2355  {
2356  if (TransactionIdIsNormal(feedbackCatalogXmin)
2357  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2358  MyProc->xmin = feedbackCatalogXmin;
2359  else
2360  MyProc->xmin = feedbackXmin;
2361  }
2362 }
2363 
2364 /*
2365  * Compute how long send/receive loops should sleep.
2366  *
2367  * If wal_sender_timeout is enabled we want to wake up in time to send
2368  * keepalives and to abort the connection if wal_sender_timeout has been
2369  * reached.
2370  */
2371 static long
2373 {
2374  long sleeptime = 10000; /* 10 s */
2375 
2377  {
2378  TimestampTz wakeup_time;
2379 
2380  /*
2381  * At the latest stop sleeping once wal_sender_timeout has been
2382  * reached.
2383  */
2386 
2387  /*
2388  * If no ping has been sent yet, wakeup when it's time to do so.
2389  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2390  * the timeout passed without a response.
2391  */
2394  wal_sender_timeout / 2);
2395 
2396  /* Compute relative time until wakeup. */
2397  sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
2398  }
2399 
2400  return sleeptime;
2401 }
2402 
2403 /*
2404  * Check whether there have been responses by the client within
2405  * wal_sender_timeout and shutdown if not. Using last_processing as the
2406  * reference point avoids counting server-side stalls against the client.
2407  * However, a long server-side stall can make WalSndKeepaliveIfNecessary()
2408  * postdate last_processing by more than wal_sender_timeout. If that happens,
2409  * the client must reply almost immediately to avoid a timeout. This rarely
2410  * affects the default configuration, under which clients spontaneously send a
2411  * message every standby_message_timeout = wal_sender_timeout/6 = 10s. We
2412  * could eliminate that problem by recognizing timeout expiration at
2413  * wal_sender_timeout/2 after the keepalive.
2414  */
2415 static void
2417 {
2418  TimestampTz timeout;
2419 
2420  /* don't bail out if we're doing something that doesn't require timeouts */
2421  if (last_reply_timestamp <= 0)
2422  return;
2423 
2426 
2427  if (wal_sender_timeout > 0 && last_processing >= timeout)
2428  {
2429  /*
2430  * Since typically expiration of replication timeout means
2431  * communication problem, we don't send the error message to the
2432  * standby.
2433  */
2435  (errmsg("terminating walsender process due to replication timeout")));
2436 
2437  WalSndShutdown();
2438  }
2439 }
2440 
2441 /* Main loop of walsender process that streams the WAL over Copy messages. */
2442 static void
2444 {
2445  /*
2446  * Initialize the last reply timestamp. That enables timeout processing
2447  * from hereon.
2448  */
2450  waiting_for_ping_response = false;
2451 
2452  /*
2453  * Loop until we reach the end of this timeline or the client requests to
2454  * stop streaming.
2455  */
2456  for (;;)
2457  {
2458  /* Clear any already-pending wakeups */
2460 
2462 
2463  /* Process any requests or signals received recently */
2464  if (ConfigReloadPending)
2465  {
2466  ConfigReloadPending = false;
2469  }
2470 
2471  /* Check for input from the client */
2473 
2474  /*
2475  * If we have received CopyDone from the client, sent CopyDone
2476  * ourselves, and the output buffer is empty, it's time to exit
2477  * streaming.
2478  */
2480  !pq_is_send_pending())
2481  break;
2482 
2483  /*
2484  * If we don't have any pending data in the output buffer, try to send
2485  * some more. If there is some, we don't bother to call send_data
2486  * again until we've flushed it ... but we'd better assume we are not
2487  * caught up.
2488  */
2489  if (!pq_is_send_pending())
2490  send_data();
2491  else
2492  WalSndCaughtUp = false;
2493 
2494  /* Try to flush pending output to the client */
2495  if (pq_flush_if_writable() != 0)
2496  WalSndShutdown();
2497 
2498  /* If nothing remains to be sent right now ... */
2500  {
2501  /*
2502  * If we're in catchup state, move to streaming. This is an
2503  * important state change for users to know about, since before
2504  * this point data loss might occur if the primary dies and we
2505  * need to failover to the standby. The state change is also
2506  * important for synchronous replication, since commits that
2507  * started to wait at that point might wait for some time.
2508  */
2510  {
2511  ereport(DEBUG1,
2512  (errmsg_internal("\"%s\" has now caught up with upstream server",
2513  application_name)));
2515  }
2516 
2517  /*
2518  * When SIGUSR2 arrives, we send any outstanding logs up to the
2519  * shutdown checkpoint record (i.e., the latest record), wait for
2520  * them to be replicated to the standby, and exit. This may be a
2521  * normal termination at shutdown, or a promotion, the walsender
2522  * is not sure which.
2523  */
2524  if (got_SIGUSR2)
2525  WalSndDone(send_data);
2526  }
2527 
2528  /* Check for replication timeout. */
2530 
2531  /* Send keepalive if the time has come */
2533 
2534  /*
2535  * Block if we have unsent data. XXX For logical replication, let
2536  * WalSndWaitForWal() handle any other blocking; idle receivers need
2537  * its additional actions. For physical replication, also block if
2538  * caught up; its send_data does not block.
2539  */
2540  if ((WalSndCaughtUp && send_data != XLogSendLogical &&
2543  {
2544  long sleeptime;
2545  int wakeEvents;
2546 
2548  wakeEvents = WL_SOCKET_READABLE;
2549  else
2550  wakeEvents = 0;
2551 
2552  /*
2553  * Use fresh timestamp, not last_processing, to reduce the chance
2554  * of reaching wal_sender_timeout before sending a keepalive.
2555  */
2557 
2558  if (pq_is_send_pending())
2559  wakeEvents |= WL_SOCKET_WRITEABLE;
2560 
2561  /* Sleep until something happens or we time out */
2562  WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
2563  }
2564  }
2565 }
2566 
2567 /* Initialize a per-walsender data structure for this walsender process */
2568 static void
2570 {
2571  int i;
2572 
2573  /*
2574  * WalSndCtl should be set up already (we inherit this by fork() or
2575  * EXEC_BACKEND mechanism from the postmaster).
2576  */
2577  Assert(WalSndCtl != NULL);
2578  Assert(MyWalSnd == NULL);
2579 
2580  /*
2581  * Find a free walsender slot and reserve it. This must not fail due to
2582  * the prior check for free WAL senders in InitProcess().
2583  */
2584  for (i = 0; i < max_wal_senders; i++)
2585  {
2586  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2587 
2588  SpinLockAcquire(&walsnd->mutex);
2589 
2590  if (walsnd->pid != 0)
2591  {
2592  SpinLockRelease(&walsnd->mutex);
2593  continue;
2594  }
2595  else
2596  {
2597  /*
2598  * Found a free slot. Reserve it for us.
2599  */
2600  walsnd->pid = MyProcPid;
2601  walsnd->state = WALSNDSTATE_STARTUP;
2602  walsnd->sentPtr = InvalidXLogRecPtr;
2603  walsnd->needreload = false;
2604  walsnd->write = InvalidXLogRecPtr;
2605  walsnd->flush = InvalidXLogRecPtr;
2606  walsnd->apply = InvalidXLogRecPtr;
2607  walsnd->writeLag = -1;
2608  walsnd->flushLag = -1;
2609  walsnd->applyLag = -1;
2610  walsnd->sync_standby_priority = 0;
2611  walsnd->latch = &MyProc->procLatch;
2612  walsnd->replyTime = 0;
2613  SpinLockRelease(&walsnd->mutex);
2614  /* don't need the lock anymore */
2615  MyWalSnd = (WalSnd *) walsnd;
2616 
2617  break;
2618  }
2619  }
2620 
2621  Assert(MyWalSnd != NULL);
2622 
2623  /* Arrange to clean up at walsender exit */
2625 }
2626 
2627 /* Destroy the per-walsender data structure for this walsender process */
2628 static void
2630 {
2631  WalSnd *walsnd = MyWalSnd;
2632 
2633  Assert(walsnd != NULL);
2634 
2635  MyWalSnd = NULL;
2636 
2637  SpinLockAcquire(&walsnd->mutex);
2638  /* clear latch while holding the spinlock, so it can safely be read */
2639  walsnd->latch = NULL;
2640  /* Mark WalSnd struct as no longer being in use. */
2641  walsnd->pid = 0;
2642  SpinLockRelease(&walsnd->mutex);
2643 }
2644 
2645 /* XLogReaderRoutine->segment_open callback */
2646 static void
2648  TimeLineID *tli_p)
2649 {
2650  char path[MAXPGPATH];
2651 
2652  /*-------
2653  * When reading from a historic timeline, and there is a timeline switch
2654  * within this segment, read from the WAL segment belonging to the new
2655  * timeline.
2656  *
2657  * For example, imagine that this server is currently on timeline 5, and
2658  * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
2659  * 0/13002088. In pg_wal, we have these files:
2660  *
2661  * ...
2662  * 000000040000000000000012
2663  * 000000040000000000000013
2664  * 000000050000000000000013
2665  * 000000050000000000000014
2666  * ...
2667  *
2668  * In this situation, when requested to send the WAL from segment 0x13, on
2669  * timeline 4, we read the WAL from file 000000050000000000000013. Archive
2670  * recovery prefers files from newer timelines, so if the segment was
2671  * restored from the archive on this server, the file belonging to the old
2672  * timeline, 000000040000000000000013, might not exist. Their contents are
2673  * equal up to the switchpoint, because at a timeline switch, the used
2674  * portion of the old segment is copied to the new file. -------
2675  */
2676  *tli_p = sendTimeLine;
2678  {
2679  XLogSegNo endSegNo;
2680 
2681  XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
2682  if (nextSegNo == endSegNo)
2683  *tli_p = sendTimeLineNextTLI;
2684  }
2685 
2686  XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
2687  state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2688  if (state->seg.ws_file >= 0)
2689  return;
2690 
2691  /*
2692  * If the file is not found, assume it's because the standby asked for a
2693  * too old WAL segment that has already been removed or recycled.
2694  */
2695  if (errno == ENOENT)
2696  {
2697  char xlogfname[MAXFNAMELEN];
2698  int save_errno = errno;
2699 
2700  XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
2701  errno = save_errno;
2702  ereport(ERROR,
2704  errmsg("requested WAL segment %s has already been removed",
2705  xlogfname)));
2706  }
2707  else
2708  ereport(ERROR,
2710  errmsg("could not open file \"%s\": %m",
2711  path)));
2712 }
2713 
2714 /*
2715  * Send out the WAL in its normal physical/stored form.
2716  *
2717  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
2718  * but not yet sent to the client, and buffer it in the libpq output
2719  * buffer.
2720  *
2721  * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
2722  * otherwise WalSndCaughtUp is set to false.
2723  */
2724 static void
2726 {
2727  XLogRecPtr SendRqstPtr;
2728  XLogRecPtr startptr;
2729  XLogRecPtr endptr;
2730  Size nbytes;
2731  XLogSegNo segno;
2732  WALReadError errinfo;
2733 
2734  /* If requested switch the WAL sender to the stopping state. */
2735  if (got_STOPPING)
2737 
2739  {
2740  WalSndCaughtUp = true;
2741  return;
2742  }
2743 
2744  /* Figure out how far we can safely send the WAL. */
2746  {
2747  /*
2748  * Streaming an old timeline that's in this server's history, but is
2749  * not the one we're currently inserting or replaying. It can be
2750  * streamed up to the point where we switched off that timeline.
2751  */
2752  SendRqstPtr = sendTimeLineValidUpto;
2753  }
2754  else if (am_cascading_walsender)
2755  {
2756  TimeLineID SendRqstTLI;
2757 
2758  /*
2759  * Streaming the latest timeline on a standby.
2760  *
2761  * Attempt to send all WAL that has already been replayed, so that we
2762  * know it's valid. If we're receiving WAL through streaming
2763  * replication, it's also OK to send any WAL that has been received
2764  * but not replayed.
2765  *
2766  * The timeline we're recovering from can change, or we can be
2767  * promoted. In either case, the current timeline becomes historic. We
2768  * need to detect that so that we don't try to stream past the point
2769  * where we switched to another timeline. We check for promotion or
2770  * timeline switch after calculating FlushPtr, to avoid a race
2771  * condition: if the timeline becomes historic just after we checked
2772  * that it was still current, it's still be OK to stream it up to the
2773  * FlushPtr that was calculated before it became historic.
2774  */
2775  bool becameHistoric = false;
2776 
2777  SendRqstPtr = GetStandbyFlushRecPtr(&SendRqstTLI);
2778 
2779  if (!RecoveryInProgress())
2780  {
2781  /* We have been promoted. */
2782  SendRqstTLI = GetWALInsertionTimeLine();
2783  am_cascading_walsender = false;
2784  becameHistoric = true;
2785  }
2786  else
2787  {
2788  /*
2789  * Still a cascading standby. But is the timeline we're sending
2790  * still the one recovery is recovering from?
2791  */
2792  if (sendTimeLine != SendRqstTLI)
2793  becameHistoric = true;
2794  }
2795 
2796  if (becameHistoric)
2797  {
2798  /*
2799  * The timeline we were sending has become historic. Read the
2800  * timeline history file of the new timeline to see where exactly
2801  * we forked off from the timeline we were sending.
2802  */
2803  List *history;
2804 
2805  history = readTimeLineHistory(SendRqstTLI);
2807 
2809  list_free_deep(history);
2810 
2811  sendTimeLineIsHistoric = true;
2812 
2813  SendRqstPtr = sendTimeLineValidUpto;
2814  }
2815  }
2816  else
2817  {
2818  /*
2819  * Streaming the current timeline on a primary.
2820  *
2821  * Attempt to send all data that's already been written out and
2822  * fsync'd to disk. We cannot go further than what's been written out
2823  * given the current implementation of WALRead(). And in any case
2824  * it's unsafe to send WAL that is not securely down to disk on the
2825  * primary: if the primary subsequently crashes and restarts, standbys
2826  * must not have applied any WAL that got lost on the primary.
2827  */
2828  SendRqstPtr = GetFlushRecPtr(NULL);
2829  }
2830 
2831  /*
2832  * Record the current system time as an approximation of the time at which
2833  * this WAL location was written for the purposes of lag tracking.
2834  *
2835  * In theory we could make XLogFlush() record a time in shmem whenever WAL
2836  * is flushed and we could get that time as well as the LSN when we call
2837  * GetFlushRecPtr() above (and likewise for the cascading standby
2838  * equivalent), but rather than putting any new code into the hot WAL path
2839  * it seems good enough to capture the time here. We should reach this
2840  * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2841  * may take some time, we read the WAL flush pointer and take the time
2842  * very close to together here so that we'll get a later position if it is
2843  * still moving.
2844  *
2845  * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
2846  * this gives us a cheap approximation for the WAL flush time for this
2847  * LSN.
2848  *
2849  * Note that the LSN is not necessarily the LSN for the data contained in
2850  * the present message; it's the end of the WAL, which might be further
2851  * ahead. All the lag tracking machinery cares about is finding out when
2852  * that arbitrary LSN is eventually reported as written, flushed and
2853  * applied, so that it can measure the elapsed time.
2854  */
2855  LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2856 
2857  /*
2858  * If this is a historic timeline and we've reached the point where we
2859  * forked to the next timeline, stop streaming.
2860  *
2861  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2862  * startup process will normally replay all WAL that has been received
2863  * from the primary, before promoting, but if the WAL streaming is
2864  * terminated at a WAL page boundary, the valid portion of the timeline
2865  * might end in the middle of a WAL record. We might've already sent the
2866  * first half of that partial WAL record to the cascading standby, so that
2867  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2868  * replay the partial WAL record either, so it can still follow our
2869  * timeline switch.
2870  */
2872  {
2873  /* close the current file. */
2874  if (xlogreader->seg.ws_file >= 0)
2876 
2877  /* Send CopyDone */
2878  pq_putmessage_noblock('c', NULL, 0);
2879  streamingDoneSending = true;
2880 
2881  WalSndCaughtUp = true;
2882 
2883  elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2886  return;
2887  }
2888 
2889  /* Do we have any work to do? */
2890  Assert(sentPtr <= SendRqstPtr);
2891  if (SendRqstPtr <= sentPtr)
2892  {
2893  WalSndCaughtUp = true;
2894  return;
2895  }
2896 
2897  /*
2898  * Figure out how much to send in one message. If there's no more than
2899  * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2900  * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2901  *
2902  * The rounding is not only for performance reasons. Walreceiver relies on
2903  * the fact that we never split a WAL record across two messages. Since a
2904  * long WAL record is split at page boundary into continuation records,
2905  * page boundary is always a safe cut-off point. We also assume that
2906  * SendRqstPtr never points to the middle of a WAL record.
2907  */
2908  startptr = sentPtr;
2909  endptr = startptr;
2910  endptr += MAX_SEND_SIZE;
2911 
2912  /* if we went beyond SendRqstPtr, back off */
2913  if (SendRqstPtr <= endptr)
2914  {
2915  endptr = SendRqstPtr;
2917  WalSndCaughtUp = false;
2918  else
2919  WalSndCaughtUp = true;
2920  }
2921  else
2922  {
2923  /* round down to page boundary. */
2924  endptr -= (endptr % XLOG_BLCKSZ);
2925  WalSndCaughtUp = false;
2926  }
2927 
2928  nbytes = endptr - startptr;
2929  Assert(nbytes <= MAX_SEND_SIZE);
2930 
2931  /*
2932  * OK to read and send the slice.
2933  */
2935  pq_sendbyte(&output_message, 'w');
2936 
2937  pq_sendint64(&output_message, startptr); /* dataStart */
2938  pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2939  pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2940 
2941  /*
2942  * Read the log directly into the output buffer to avoid extra memcpy
2943  * calls.
2944  */
2946 
2947 retry:
2948  if (!WALRead(xlogreader,
2950  startptr,
2951  nbytes,
2952  xlogreader->seg.ws_tli, /* Pass the current TLI because
2953  * only WalSndSegmentOpen controls
2954  * whether new TLI is needed. */
2955  &errinfo))
2956  WALReadRaiseError(&errinfo);
2957 
2958  /* See logical_read_xlog_page(). */
2959  XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
2961 
2962  /*
2963  * During recovery, the currently-open WAL file might be replaced with the
2964  * file of the same name retrieved from archive. So we always need to
2965  * check what we read was valid after reading into the buffer. If it's
2966  * invalid, we try to open and read the file again.
2967  */
2969  {
2970  WalSnd *walsnd = MyWalSnd;
2971  bool reload;
2972 
2973  SpinLockAcquire(&walsnd->mutex);
2974  reload = walsnd->needreload;
2975  walsnd->needreload = false;
2976  SpinLockRelease(&walsnd->mutex);
2977 
2978  if (reload && xlogreader->seg.ws_file >= 0)
2979  {
2981 
2982  goto retry;
2983  }
2984  }
2985 
2986  output_message.len += nbytes;
2988 
2989  /*
2990  * Fill the send timestamp last, so that it is taken as late as possible.
2991  */
2994  memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2995  tmpbuf.data, sizeof(int64));
2996 
2998 
2999  sentPtr = endptr;
3000 
3001  /* Update shared memory status */
3002  {
3003  WalSnd *walsnd = MyWalSnd;
3004 
3005  SpinLockAcquire(&walsnd->mutex);
3006  walsnd->sentPtr = sentPtr;
3007  SpinLockRelease(&walsnd->mutex);
3008  }
3009 
3010  /* Report progress of XLOG streaming in PS display */
3012  {
3013  char activitymsg[50];
3014 
3015  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
3017  set_ps_display(activitymsg);
3018  }
3019 }
3020 
3021 /*
3022  * Stream out logically decoded data.
3023  */
3024 static void
3026 {
3027  XLogRecord *record;
3028  char *errm;
3029 
3030  /*
3031  * We'll use the current flush point to determine whether we've caught up.
3032  * This variable is static in order to cache it across calls. Caching is
3033  * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
3034  * spinlock.
3035  */
3036  static XLogRecPtr flushPtr = InvalidXLogRecPtr;
3037 
3038  /*
3039  * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
3040  * true in WalSndWaitForWal, if we're actually waiting. We also set to
3041  * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
3042  * didn't wait - i.e. when we're shutting down.
3043  */
3044  WalSndCaughtUp = false;
3045 
3046  record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
3047 
3048  /* xlog record was invalid */
3049  if (errm != NULL)
3050  elog(ERROR, "could not find record while sending logically-decoded data: %s",
3051  errm);
3052 
3053  if (record != NULL)
3054  {
3055  /*
3056  * Note the lack of any call to LagTrackerWrite() which is handled by
3057  * WalSndUpdateProgress which is called by output plugin through
3058  * logical decoding write api.
3059  */
3061 
3063  }
3064 
3065  /*
3066  * If first time through in this session, initialize flushPtr. Otherwise,
3067  * we only need to update flushPtr if EndRecPtr is past it.
3068  */
3069  if (flushPtr == InvalidXLogRecPtr)
3070  flushPtr = GetFlushRecPtr(NULL);
3071  else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3072  flushPtr = GetFlushRecPtr(NULL);
3073 
3074  /* If EndRecPtr is still past our flushPtr, it means we caught up. */
3075  if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3076  WalSndCaughtUp = true;
3077 
3078  /*
3079  * If we're caught up and have been requested to stop, have WalSndLoop()
3080  * terminate the connection in an orderly manner, after writing out all
3081  * the pending data.
3082  */
3084  got_SIGUSR2 = true;
3085 
3086  /* Update shared memory status */
3087  {
3088  WalSnd *walsnd = MyWalSnd;
3089 
3090  SpinLockAcquire(&walsnd->mutex);
3091  walsnd->sentPtr = sentPtr;
3092  SpinLockRelease(&walsnd->mutex);
3093  }
3094 }
3095 
3096 /*
3097  * Shutdown if the sender is caught up.
3098  *
3099  * NB: This should only be called when the shutdown signal has been received
3100  * from postmaster.
3101  *
3102  * Note that if we determine that there's still more data to send, this
3103  * function will return control to the caller.
3104  */
3105 static void
3107 {
3108  XLogRecPtr replicatedPtr;
3109 
3110  /* ... let's just be real sure we're caught up ... */
3111  send_data();
3112 
3113  /*
3114  * To figure out whether all WAL has successfully been replicated, check
3115  * flush location if valid, write otherwise. Tools like pg_receivewal will
3116  * usually (unless in synchronous mode) return an invalid flush location.
3117  */
3118  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
3120 
3121  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
3122  !pq_is_send_pending())
3123  {
3124  QueryCompletion qc;
3125 
3126  /* Inform the standby that XLOG streaming is done */
3127  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
3128  EndCommand(&qc, DestRemote, false);
3129  pq_flush();
3130 
3131  proc_exit(0);
3132  }
3135 }
3136 
3137 /*
3138  * Returns the latest point in WAL that has been safely flushed to disk, and
3139  * can be sent to the standby. This should only be called when in recovery,
3140  * ie. we're streaming to a cascaded standby.
3141  *
3142  * As a side-effect, *tli is updated to the TLI of the last
3143  * replayed WAL record.
3144  */
3145 static XLogRecPtr
3147 {
3148  XLogRecPtr replayPtr;
3149  TimeLineID replayTLI;
3150  XLogRecPtr receivePtr;
3152  XLogRecPtr result;
3153 
3154  /*
3155  * We can safely send what's already been replayed. Also, if walreceiver
3156  * is streaming WAL from the same timeline, we can send anything that it
3157  * has streamed, but hasn't been replayed yet.
3158  */
3159 
3160  receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
3161  replayPtr = GetXLogReplayRecPtr(&replayTLI);
3162 
3163  *tli = replayTLI;
3164 
3165  result = replayPtr;
3166  if (receiveTLI == replayTLI && receivePtr > replayPtr)
3167  result = receivePtr;
3168 
3169  return result;
3170 }
3171 
3172 /*
3173  * Request walsenders to reload the currently-open WAL file
3174  */
3175 void
3177 {
3178  int i;
3179 
3180  for (i = 0; i < max_wal_senders; i++)
3181  {
3182  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3183 
3184  SpinLockAcquire(&walsnd->mutex);
3185  if (walsnd->pid == 0)
3186  {
3187  SpinLockRelease(&walsnd->mutex);
3188  continue;
3189  }
3190  walsnd->needreload = true;
3191  SpinLockRelease(&walsnd->mutex);
3192  }
3193 }
3194 
3195 /*
3196  * Handle PROCSIG_WALSND_INIT_STOPPING signal.
3197  */
3198 void
3200 {
3202 
3203  /*
3204  * If replication has not yet started, die like with SIGTERM. If
3205  * replication is active, only set a flag and wake up the main loop. It
3206  * will send any outstanding WAL, wait for it to be replicated to the
3207  * standby, and then exit gracefully.
3208  */
3209  if (!replication_active)
3210  kill(MyProcPid, SIGTERM);
3211  else
3212  got_STOPPING = true;
3213 }
3214 
3215 /*
3216  * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
3217  * sender should already have been switched to WALSNDSTATE_STOPPING at
3218  * this point.
3219  */
3220 static void
3222 {
3223  int save_errno = errno;
3224 
3225  got_SIGUSR2 = true;
3226  SetLatch(MyLatch);
3227 
3228  errno = save_errno;
3229 }
3230 
3231 /* Set up signal handlers */
3232 void
3234 {
3235  /* Set up signal handlers */
3237  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3238  pqsignal(SIGTERM, die); /* request shutdown */
3239  /* SIGQUIT handler was already set up by InitPostmasterChild */
3240  InitializeTimeouts(); /* establishes SIGALRM handler */
3243  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3244  * shutdown */
3245 
3246  /* Reset some signals that are accepted by postmaster but not here */
3248 }
3249 
3250 /* Report shared-memory space needed by WalSndShmemInit */
3251 Size
3253 {
3254  Size size = 0;
3255 
3256  size = offsetof(WalSndCtlData, walsnds);
3257  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3258 
3259  return size;
3260 }
3261 
3262 /* Allocate and initialize walsender-related shared memory */
3263 void
3265 {
3266  bool found;
3267  int i;
3268 
3269  WalSndCtl = (WalSndCtlData *)
3270  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3271 
3272  if (!found)
3273  {
3274  /* First time through, so initialize */
3276 
3277  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3279 
3280  for (i = 0; i < max_wal_senders; i++)
3281  {
3282  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3283 
3284  SpinLockInit(&walsnd->mutex);
3285  }
3286  }
3287 }
3288 
3289 /*
3290  * Wake up all walsenders
3291  *
3292  * This will be called inside critical sections, so throwing an error is not
3293  * advisable.
3294  */
3295 void
3297 {
3298  int i;
3299 
3300  for (i = 0; i < max_wal_senders; i++)
3301  {
3302  Latch *latch;
3303  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3304 
3305  /*
3306  * Get latch pointer with spinlock held, for the unlikely case that
3307  * pointer reads aren't atomic (as they're 8 bytes).
3308  */
3309  SpinLockAcquire(&walsnd->mutex);
3310  latch = walsnd->latch;
3311  SpinLockRelease(&walsnd->mutex);
3312 
3313  if (latch != NULL)
3314  SetLatch(latch);
3315  }
3316 }
3317 
3318 /*
3319  * Wait for readiness on the FeBe socket, or a timeout. The mask should be
3320  * composed of optional WL_SOCKET_WRITEABLE and WL_SOCKET_READABLE flags. Exit
3321  * on postmaster death.
3322  */
3323 static void
3324 WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
3325 {
3326  WaitEvent event;
3327 
3328  ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
3329  if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
3330  (event.events & WL_POSTMASTER_DEATH))
3331  proc_exit(1);
3332 }
3333 
3334 /*
3335  * Signal all walsenders to move to stopping state.
3336  *
3337  * This will trigger walsenders to move to a state where no further WAL can be
3338  * generated. See this file's header for details.
3339  */
3340 void
3342 {
3343  int i;
3344 
3345  for (i = 0; i < max_wal_senders; i++)
3346  {
3347  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3348  pid_t pid;
3349 
3350  SpinLockAcquire(&walsnd->mutex);
3351  pid = walsnd->pid;
3352  SpinLockRelease(&walsnd->mutex);
3353 
3354  if (pid == 0)
3355  continue;
3356 
3358  }
3359 }
3360 
3361 /*
3362  * Wait that all the WAL senders have quit or reached the stopping state. This
3363  * is used by the checkpointer to control when the shutdown checkpoint can
3364  * safely be performed.
3365  */
3366 void
3368 {
3369  for (;;)
3370  {
3371  int i;
3372  bool all_stopped = true;
3373 
3374  for (i = 0; i < max_wal_senders; i++)
3375  {
3376  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3377 
3378  SpinLockAcquire(&walsnd->mutex);
3379 
3380  if (walsnd->pid == 0)
3381  {
3382  SpinLockRelease(&walsnd->mutex);
3383  continue;
3384  }
3385 
3386  if (walsnd->state != WALSNDSTATE_STOPPING)
3387  {
3388  all_stopped = false;
3389  SpinLockRelease(&walsnd->mutex);
3390  break;
3391  }
3392  SpinLockRelease(&walsnd->mutex);
3393  }
3394 
3395  /* safe to leave if confirmation is done for all WAL senders */
3396  if (all_stopped)
3397  return;
3398 
3399  pg_usleep(10000L); /* wait for 10 msec */
3400  }
3401 }
3402 
3403 /* Set state for current walsender (only called in walsender) */
3404 void
3406 {
3407  WalSnd *walsnd = MyWalSnd;
3408 
3410 
3411  if (walsnd->state == state)
3412  return;
3413 
3414  SpinLockAcquire(&walsnd->mutex);
3415  walsnd->state = state;
3416  SpinLockRelease(&walsnd->mutex);
3417 }
3418 
3419 /*
3420  * Return a string constant representing the state. This is used
3421  * in system views, and should *not* be translated.
3422  */
3423 static const char *
3425 {
3426  switch (state)
3427  {
3428  case WALSNDSTATE_STARTUP:
3429  return "startup";
3430  case WALSNDSTATE_BACKUP:
3431  return "backup";
3432  case WALSNDSTATE_CATCHUP:
3433  return "catchup";
3434  case WALSNDSTATE_STREAMING:
3435  return "streaming";
3436  case WALSNDSTATE_STOPPING:
3437  return "stopping";
3438  }
3439  return "UNKNOWN";
3440 }
3441 
3442 static Interval *
3444 {
3445  Interval *result = palloc(sizeof(Interval));
3446 
3447  result->month = 0;
3448  result->day = 0;
3449  result->time = offset;
3450 
3451  return result;
3452 }
3453 
3454 /*
3455  * Returns activity of walsenders, including pids and xlog locations sent to
3456  * standby servers.
3457  */
3458 Datum
3460 {
3461 #define PG_STAT_GET_WAL_SENDERS_COLS 12
3462  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3463  SyncRepStandbyData *sync_standbys;
3464  int num_standbys;
3465  int i;
3466 
3467  InitMaterializedSRF(fcinfo, 0);
3468 
3469  /*
3470  * Get the currently active synchronous standbys. This could be out of
3471  * date before we're done, but we'll use the data anyway.
3472  */
3473  num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3474 
3475  for (i = 0; i < max_wal_senders; i++)
3476  {
3477  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3479  XLogRecPtr write;
3480  XLogRecPtr flush;
3481  XLogRecPtr apply;
3482  TimeOffset writeLag;
3483  TimeOffset flushLag;
3484  TimeOffset applyLag;
3485  int priority;
3486  int pid;
3488  TimestampTz replyTime;
3489  bool is_sync_standby;
3491  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS] = {0};
3492  int j;
3493 
3494  /* Collect data from shared memory */
3495  SpinLockAcquire(&walsnd->mutex);
3496  if (walsnd->pid == 0)
3497  {
3498  SpinLockRelease(&walsnd->mutex);
3499  continue;
3500  }
3501  pid = walsnd->pid;
3502  sentPtr = walsnd->sentPtr;
3503  state = walsnd->state;
3504  write = walsnd->write;
3505  flush = walsnd->flush;
3506  apply = walsnd->apply;
3507  writeLag = walsnd->writeLag;
3508  flushLag = walsnd->flushLag;
3509  applyLag = walsnd->applyLag;
3510  priority = walsnd->sync_standby_priority;
3511  replyTime = walsnd->replyTime;
3512  SpinLockRelease(&walsnd->mutex);
3513 
3514  /*
3515  * Detect whether walsender is/was considered synchronous. We can
3516  * provide some protection against stale data by checking the PID
3517  * along with walsnd_index.
3518  */
3519  is_sync_standby = false;
3520  for (j = 0; j < num_standbys; j++)
3521  {
3522  if (sync_standbys[j].walsnd_index == i &&
3523  sync_standbys[j].pid == pid)
3524  {
3525  is_sync_standby = true;
3526  break;
3527  }
3528  }
3529 
3530  values[0] = Int32GetDatum(pid);
3531 
3532  if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
3533  {
3534  /*
3535  * Only superusers and roles with privileges of pg_read_all_stats
3536  * can see details. Other users only get the pid value to know
3537  * it's a walsender, but no details.
3538  */
3539  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3540  }
3541  else
3542  {
3544 
3546  nulls[2] = true;
3547  values[2] = LSNGetDatum(sentPtr);
3548 
3550  nulls[3] = true;
3551  values[3] = LSNGetDatum(write);
3552 
3553  if (XLogRecPtrIsInvalid(flush))
3554  nulls[4] = true;
3555  values[4] = LSNGetDatum(flush);
3556 
3557  if (XLogRecPtrIsInvalid(apply))
3558  nulls[5] = true;
3559  values[5] = LSNGetDatum(apply);
3560 
3561  /*
3562  * Treat a standby such as a pg_basebackup background process
3563  * which always returns an invalid flush location, as an
3564  * asynchronous standby.
3565  */
3566  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3567 
3568  if (writeLag < 0)
3569  nulls[6] = true;
3570  else
3572 
3573  if (flushLag < 0)
3574  nulls[7] = true;
3575  else
3577 
3578  if (applyLag < 0)
3579  nulls[8] = true;
3580  else
3582 
3583  values[9] = Int32GetDatum(priority);
3584 
3585  /*
3586  * More easily understood version of standby state. This is purely
3587  * informational.
3588  *
3589  * In quorum-based sync replication, the role of each standby
3590  * listed in synchronous_standby_names can be changing very
3591  * frequently. Any standbys considered as "sync" at one moment can
3592  * be switched to "potential" ones at the next moment. So, it's
3593  * basically useless to report "sync" or "potential" as their sync
3594  * states. We report just "quorum" for them.
3595  */
3596  if (priority == 0)
3597  values[10] = CStringGetTextDatum("async");
3598  else if (is_sync_standby)
3600  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3601  else
3602  values[10] = CStringGetTextDatum("potential");
3603 
3604  if (replyTime == 0)
3605  nulls[11] = true;
3606  else
3607  values[11] = TimestampTzGetDatum(replyTime);
3608  }
3609 
3610  tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
3611  values, nulls);
3612  }
3613 
3614  return (Datum) 0;
3615 }
3616 
3617 /*
3618  * Send a keepalive message to standby.
3619  *
3620  * If requestReply is set, the message requests the other party to send
3621  * a message back to us, for heartbeat purposes. We also set a flag to
3622  * let nearby code know that we're waiting for that response, to avoid
3623  * repeated requests.
3624  *
3625  * writePtr is the location up to which the WAL is sent. It is essentially
3626  * the same as sentPtr but in some cases, we need to send keep alive before
3627  * sentPtr is updated like when skipping empty transactions.
3628  */
3629 static void
3630 WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
3631 {
3632  elog(DEBUG2, "sending replication keepalive");
3633 
3634  /* construct the message... */
3636  pq_sendbyte(&output_message, 'k');
3637  pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr);
3639  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3640 
3641  /* ... and send it wrapped in CopyData */
3643 
3644  /* Set local flag */
3645  if (requestReply)
3647 }
3648 
3649 /*
3650  * Send keepalive message if too much time has elapsed.
3651  */
3652 static void
3654 {
3655  TimestampTz ping_time;
3656 
3657  /*
3658  * Don't send keepalive messages if timeouts are globally disabled or
3659  * we're doing something not partaking in timeouts.
3660  */
3661  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3662  return;
3663 
3665  return;
3666 
3667  /*
3668  * If half of wal_sender_timeout has lapsed without receiving any reply
3669  * from the standby, send a keep-alive message to the standby requesting
3670  * an immediate reply.
3671  */
3673  wal_sender_timeout / 2);
3674  if (last_processing >= ping_time)
3675  {
3677 
3678  /* Try to flush pending output to the client */
3679  if (pq_flush_if_writable() != 0)
3680  WalSndShutdown();
3681  }
3682 }
3683 
3684 /*
3685  * Record the end of the WAL and the time it was flushed locally, so that
3686  * LagTrackerRead can compute the elapsed time (lag) when this WAL location is
3687  * eventually reported to have been written, flushed and applied by the
3688  * standby in a reply message.
3689  */
3690 static void
3692 {
3693  bool buffer_full;
3694  int new_write_head;
3695  int i;
3696 
3697  if (!am_walsender)
3698  return;
3699 
3700  /*
3701  * If the lsn hasn't advanced since last time, then do nothing. This way
3702  * we only record a new sample when new WAL has been written.
3703  */
3704  if (lag_tracker->last_lsn == lsn)
3705  return;
3706  lag_tracker->last_lsn = lsn;
3707 
3708  /*
3709  * If advancing the write head of the circular buffer would crash into any
3710  * of the read heads, then the buffer is full. In other words, the
3711  * slowest reader (presumably apply) is the one that controls the release
3712  * of space.
3713  */
3714  new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3715  buffer_full = false;
3716  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3717  {
3718  if (new_write_head == lag_tracker->read_heads[i])
3719  buffer_full = true;
3720  }
3721 
3722  /*
3723  * If the buffer is full, for now we just rewind by one slot and overwrite
3724  * the last sample, as a simple (if somewhat uneven) way to lower the
3725  * sampling rate. There may be better adaptive compaction algorithms.
3726  */
3727  if (buffer_full)
3728  {
3729  new_write_head = lag_tracker->write_head;
3730  if (lag_tracker->write_head > 0)
3732  else
3734  }
3735 
3736  /* Store a sample at the current write head position. */
3738  lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
3739  lag_tracker->write_head = new_write_head;
3740 }
3741 
3742 /*
3743  * Find out how much time has elapsed between the moment WAL location 'lsn'
3744  * (or the highest known earlier LSN) was flushed locally and the time 'now'.
3745  * We have a separate read head for each of the reported LSN locations we
3746  * receive in replies from standby; 'head' controls which read head is
3747  * used. Whenever a read head crosses an LSN which was written into the
3748  * lag buffer with LagTrackerWrite, we can use the associated timestamp to
3749  * find out the time this LSN (or an earlier one) was flushed locally, and
3750  * therefore compute the lag.
3751  *
3752  * Return -1 if no new sample data is available, and otherwise the elapsed
3753  * time in microseconds.
3754  */
3755 static TimeOffset
3757 {
3758  TimestampTz time = 0;
3759 
3760  /* Read all unread samples up to this LSN or end of buffer. */
3761  while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
3762  lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
3763  {
3764  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3765  lag_tracker->last_read[head] =
3767  lag_tracker->read_heads[head] =
3769  }
3770 
3771  /*
3772  * If the lag tracker is empty, that means the standby has processed
3773  * everything we've ever sent so we should now clear 'last_read'. If we
3774  * didn't do that, we'd risk using a stale and irrelevant sample for
3775  * interpolation at the beginning of the next burst of WAL after a period
3776  * of idleness.
3777  */
3779  lag_tracker->last_read[head].time = 0;
3780 
3781  if (time > now)
3782  {
3783  /* If the clock somehow went backwards, treat as not found. */
3784  return -1;
3785  }
3786  else if (time == 0)
3787  {
3788  /*
3789  * We didn't cross a time. If there is a future sample that we
3790  * haven't reached yet, and we've already reached at least one sample,
3791  * let's interpolate the local flushed time. This is mainly useful
3792  * for reporting a completely stuck apply position as having
3793  * increasing lag, since otherwise we'd have to wait for it to
3794  * eventually start moving again and cross one of our samples before
3795  * we can show the lag increasing.
3796  */
3798  {
3799  /* There are no future samples, so we can't interpolate. */
3800  return -1;
3801  }
3802  else if (lag_tracker->last_read[head].time != 0)
3803  {
3804  /* We can interpolate between last_read and the next sample. */
3805  double fraction;
3806  WalTimeSample prev = lag_tracker->last_read[head];
3808 
3809  if (lsn < prev.lsn)
3810  {
3811  /*
3812  * Reported LSNs shouldn't normally go backwards, but it's
3813  * possible when there is a timeline change. Treat as not
3814  * found.
3815  */
3816  return -1;
3817  }
3818 
3819  Assert(prev.lsn < next.lsn);
3820 
3821  if (prev.time > next.time)
3822  {
3823  /* If the clock somehow went backwards, treat as not found. */
3824  return -1;
3825  }
3826 
3827  /* See how far we are between the previous and next samples. */
3828  fraction =
3829  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3830 
3831  /* Scale the local flush time proportionally. */
3832  time = (TimestampTz)
3833  ((double) prev.time + (next.time - prev.time) * fraction);
3834  }
3835  else
3836  {
3837  /*
3838  * We have only a future sample, implying that we were entirely
3839  * caught up but and now there is a new burst of WAL and the
3840  * standby hasn't processed the first sample yet. Until the
3841  * standby reaches the future sample the best we can do is report
3842  * the hypothetical lag if that sample were to be replayed now.
3843  */
3844  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3845  }
3846  }
3847 
3848  /* Return the elapsed time since local flush time in microseconds. */
3849  Assert(time != 0);
3850  return now - time;
3851 }
bool has_privs_of_role(Oid member, Oid role)
Definition: acl.c:4969
int16 AttrNumber
Definition: attnum.h:21
TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history)
Definition: timeline.c:544
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:572
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:76
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1703
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1727
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1582
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1546
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1790
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
#define InvalidBackendId
Definition: backendid.h:23
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:964
static int32 next
Definition: blutils.c:219
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define CStringGetTextDatum(s)
Definition: builtins.h:94
#define NameStr(name)
Definition: c.h:730
unsigned int uint32
Definition: c.h:490
#define SIGNAL_ARGS
Definition: c.h:1332
#define PG_BINARY
Definition: c.h:1260
#define pg_attribute_noreturn()
Definition: c.h:201
#define UINT64_FORMAT
Definition: c.h:533
#define MemSet(start, val, len)
Definition: c.h:1004
uint32 TransactionId
Definition: c.h:636
#define OidIsValid(objectId)
Definition: c.h:759
size_t Size
Definition: c.h:589
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:38
void ConditionVariableCancelSleep(void)
int64 TimestampTz
Definition: timestamp.h:39
int64 TimeOffset
Definition: timestamp.h:40
char * get_database_name(Oid dbid)
Definition: dbcommands.c:3028
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:91
bool defGetBoolean(DefElem *def)
Definition: define.c:108
char * defGetString(DefElem *def)
Definition: define.c:49
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:166
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:201
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
@ DestRemote
Definition: dest.h:89
@ DestRemoteSimple
Definition: dest.h:91
@ DestNone
Definition: dest.h:87
struct cursor * cur
Definition: ecpg.c:28
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1156
int errcode_for_file_access(void)
Definition: elog.c:881
int errdetail(const char *fmt,...)
Definition: elog.c:1202
bool message_level_is_interesting(int elevel)
Definition: elog.c:277
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define LOG
Definition: elog.h:31
#define COMMERROR
Definition: elog.h:33
#define FATAL
Definition: elog.h:41
#define DEBUG2
Definition: elog.h:29
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2333
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2275
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2255
int CloseTransientFile(int fd)
Definition: fd.c:2610
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:993
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2434
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1794
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
int MyProcPid
Definition: globals.c:44
struct Latch * MyLatch
Definition: globals.c:58
Oid MyDatabaseId
Definition: globals.c:89
@ PGC_SIGHUP
Definition: guc.h:71
void ProcessConfigFile(GucContext context)
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc_funcs.c:392
char * application_name
Definition: guc_tables.c:517
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
#define write(a, b, c)
Definition: win32.h:14
#define read(a, b, c)
Definition: win32.h:13
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
void proc_exit(int code)
Definition: ipc.c:104
int j
Definition: isn.c:74
int i
Definition: isn.c:73
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
Definition: latch.c:1008
void SetLatch(Latch *latch)
Definition: latch.c:607
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
Definition: latch.c:1383
void ResetLatch(Latch *latch)
Definition: latch.c:699
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define WL_POSTMASTER_DEATH
Definition: latch.h:129
#define WL_SOCKET_WRITEABLE
Definition: latch.h:127
#define pq_flush()
Definition: libpq.h:46
#define PQ_SMALL_MESSAGE_LIMIT
Definition: libpq.h:30
#define pq_flush_if_writable()
Definition: libpq.h:47
#define pq_is_send_pending()
Definition: libpq.h:48
#define PQ_LARGE_MESSAGE_LIMIT
Definition: libpq.h:31
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:51
#define FeBeWaitSetSocketPos
Definition: libpq.h:63
Assert(fmt[strlen(fmt) - 1] !='\n')
void list_free_deep(List *list)
Definition: list.c:1559
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1788
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:647
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:603
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:108
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:330
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:490
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1195
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
void LWLockReleaseAll(void)
Definition: lwlock.c:1902
@ LW_SHARED
Definition: lwlock.h:116
@ LW_EXCLUSIVE
Definition: lwlock.h:115
char * pstrdup(const char *in)
Definition: mcxt.c:1624
void pfree(void *pointer)
Definition: mcxt.c:1436
MemoryContext TopMemoryContext
Definition: mcxt.c:141
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1048
MemoryContext CurrentMemoryContext
Definition: mcxt.c:135
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:387
void * palloc(Size size)
Definition: mcxt.c:1210
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
Oid GetUserId(void)
Definition: miscinit.c:510
@ CMD_SELECT
Definition: nodes.h:276
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:138
void * arg
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:41
#define MAXPGPATH
const void size_t len
#define lfirst(lc)
Definition: pg_list.h:172
#define NIL
Definition: pg_list.h:68
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
static bool two_phase
#define die(msg)
Definition: pg_test_fsync.c:95
static char * buf
Definition: pg_test_fsync.c:67
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:181
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:339
@ PMSIGNAL_ADVANCE_STATE_MACHINE
Definition: pmsignal.h:42
pqsigfunc pqsignal(int signo, pqsigfunc func)
#define snprintf
Definition: port.h:238
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2969
CommandDest whereToSendOutput
Definition: postgres.c:84
const char * debug_query_string
Definition: postgres.c:81
uintptr_t Datum
Definition: postgres.h:64
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
#define InvalidOid
Definition: postgres_ext.h:36
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1016
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1208
WaitEventSet * FeBeWaitSet
Definition: pqcomm.c:164
void pq_endmsgread(void)
Definition: pqcomm.c:1170
void pq_startmsgread(void)
Definition: pqcomm.c:1146
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:418
void pq_sendbytes(StringInfo buf, const void *data, int datalen)
Definition: pqformat.c:126
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:299
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:402
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:456
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PROC_AFFECTS_ALL_HORIZONS
Definition: proc.h:61
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:262
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:639
@ PROCSIG_WALSND_INIT_STOPPING
Definition: procsignal.h:35
bool update_process_title
Definition: ps_status.c:31
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
@ REPLICATION_KIND_PHYSICAL
Definition: replnodes.h:22
@ REPLICATION_KIND_LOGICAL
Definition: replnodes.h:23
ResourceOwner CurrentResourceOwner
Definition: resowner.c:146
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
Definition: resowner.c:486
void ResourceOwnerDelete(ResourceOwner owner)
Definition: resowner.c:742
@ RESOURCE_RELEASE_LOCKS
Definition: resowner.h:49
@ RESOURCE_RELEASE_BEFORE_LOCKS
Definition: resowner.h:48
@ RESOURCE_RELEASE_AFTER_LOCKS
Definition: resowner.h:50
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
void pg_usleep(long microsec)
Definition: signal.c:53
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:374
void ReplicationSlotCleanup(void)
Definition: slot.c:603
void ReplicationSlotMarkDirty(void)
Definition: slot.c:796
void ReplicationSlotReserveWal(void)
Definition: slot.c:1158
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:450
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:835
void ReplicationSlotPersist(void)
Definition: slot.c:813
ReplicationSlot * MyReplicationSlot
Definition: slot.c:98
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:641
void ReplicationSlotSave(void)
Definition: slot.c:778
void ReplicationSlotRelease(void)
Definition: slot.c:547
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:892
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase)
Definition: slot.c:252
@ RS_PERSISTENT
Definition: slot.h:35
@ RS_EPHEMERAL
Definition: slot.h:36
@ RS_TEMPORARY
Definition: slot.h:37
#define SlotIsLogical(slot)
Definition: slot.h:169
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:664
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:565
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:725
bool FirstSnapshotSet
Definition: snapmgr.c:150
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
Definition: snapmgr.c:2271
#define SpinLockInit(lock)
Definition: spin.h:60
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
PGPROC * MyProc
Definition: proc.c:66
PROC_HDR * ProcGlobal
Definition: proc.c:78
char * dbname
Definition: streamutil.c:51
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:283
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
ReplicationKind kind
Definition: replnodes.h:56
char * defname
Definition: parsenodes.h:810
int32 day
Definition: timestamp.h:51
int32 month
Definition: timestamp.h:52
TimeOffset time
Definition: timestamp.h:49
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:214
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:216
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:217
int write_head
Definition: walsender.c:215
XLogRecPtr last_lsn
Definition: walsender.c:213
Definition: latch.h:111
Definition: pg_list.h:54
XLogReaderState * reader
Definition: logical.h:42
struct SnapBuild * snapshot_builder
Definition: logical.h:44
StringInfo out
Definition: logical.h:71
Definition: nodes.h:129
NodeTag type
Definition: nodes.h:130
TransactionId xmin
Definition: proc.h:178
uint8 statusFlags
Definition: proc.h:233
int pgxactoff
Definition: proc.h:188
Latch procLatch
Definition: proc.h:170
uint8 * statusFlags
Definition: proc.h:377
TransactionId xmin
Definition: slot.h:62
TransactionId catalog_xmin
Definition: slot.h:70
XLogRecPtr restart_lsn
Definition: slot.h:73
XLogRecPtr confirmed_flush
Definition: slot.h:84
TransactionId effective_catalog_xmin
Definition: slot.h:144
slock_t mutex
Definition: slot.h:120
bool in_use
Definition: slot.h:123
TransactionId effective_xmin
Definition: slot.h:143
ReplicationSlotPersistentData data
Definition: slot.h:147
TupleDesc setDesc
Definition: execnodes.h:334
Tuplestorestate * setResult
Definition: execnodes.h:333
XLogRecPtr startpoint
Definition: replnodes.h:85
ReplicationKind kind
Definition: replnodes.h:82
TimeLineID timeline
Definition: replnodes.h:84
uint8 syncrep_method
Definition: syncrep.h:68
TimeLineID timeline
Definition: replnodes.h:108
TimeLineID ws_tli
Definition: xlogreader.h:49
uint32 events
Definition: latch.h:153
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
TimeOffset writeLag
slock_t mutex
XLogRecPtr flush
XLogRecPtr sentPtr
TimeOffset flushLag
WalSndState state
XLogRecPtr write
TimeOffset applyLag
int sync_standby_priority
bool needreload
Latch * latch
TimestampTz replyTime
XLogRecPtr apply
TimestampTz time
Definition: walsender.c:204
XLogRecPtr lsn
Definition: walsender.c:203
WALSegmentContext segcxt
Definition: xlogreader.h:271
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207
WALOpenSegment seg
Definition: xlogreader.h:272
Definition: regguts.h:318
void SyncRepInitConfig(void)
Definition: syncrep.c:403
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:98
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition: syncrep.c:717
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:432
#define SYNC_REP_PRIORITY
Definition: syncrep.h:35
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:27
#define SyncRepRequested()
Definition: syncrep.h:18
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:23
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:24
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:25
void InitializeTimeouts(void)
Definition: timeout.c:474
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:299
#define InvalidTransactionId
Definition: transam.h:31
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
#define XidFromFullTransactionId(x)
Definition: transam.h:48
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:45
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:657
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
char data[BLCKSZ]
Definition: c.h:1130
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
static Datum IntervalPGetDatum(const Interval *X)
Definition: timestamp.h:58
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:261
@ WAIT_EVENT_WAL_SENDER_MAIN
Definition: wait_event.h:49
@ WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ
Definition: wait_event.h:226
@ WAIT_EVENT_WAL_SENDER_WRITE_DATA
Definition: wait_event.h:70
@ WAIT_EVENT_WAL_SENDER_WAIT_WAL
Definition: wait_event.h:69
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:271
static void pgstat_report_wait_end(void)
Definition: wait_event.h:287
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
void WalSndWakeup(void)
Definition: walsender.c:3296
static void ProcessPendingWrites(void)
Definition: walsender.c:1413
static XLogRecPtr sentPtr
Definition: walsender.c:155
#define READ_REPLICATION_SLOT_COLS
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3443
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition: walsender.c:3324
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3221
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:187
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2416
static void XLogSendPhysical(void)
Definition: walsender.c:2725
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1887
static bool waiting_for_ping_response
Definition: walsender.c:172
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:577
void WalSndErrorCleanup(void)
Definition: walsender.c:315
static void InitWalSenderSlot(void)
Definition: walsender.c:2569
WalSnd * MyWalSnd
Definition: walsender.c:113
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:2248
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition: walsender.c:478
static StringInfoData tmpbuf
Definition: walsender.c:160
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:2168
static LagTracker * lag_tracker
Definition: walsender.c:220
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:2031
static void IdentifySystem(void)
Definition: walsender.c:395
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2647
static StringInfoData reply_message
Definition: walsender.c:159
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3653
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase)
Definition: walsender.c:963
bool am_walsender
Definition: walsender.c:116
void WalSndSetState(WalSndState state)
Definition: walsender.c:3405
static StringInfoData output_message
Definition: walsender.c:158
static TimeLineID sendTimeLine
Definition: walsender.c:146
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2443
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1371
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:198
static void XLogSendLogical(void)
Definition: walsender.c:3025
void WalSndShmemInit(void)
Definition: walsender.c:3264
bool am_db_walsender
Definition: walsender.c:119
static volatile sig_atomic_t replication_active
Definition: walsender.c:196
bool wake_wal_senders
Definition: walsender.c:131
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:188
int max_wal_senders
Definition: walsender.c:122
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:2217
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
Definition: walsender.c:1467
bool exec_replication_command(const char *cmd_string)
Definition: walsender.c:1669
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
#define PG_STAT_GET_WAL_SENDERS_COLS
void(* WalSndSendDataCallback)(void)
Definition: walsender.c:226
Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
Definition: walsender.c:3459
void WalSndInitStopping(void)
Definition: walsender.c:3341
void WalSndWaitStopping(void)
Definition: walsender.c:3367
static bool sendTimeLineIsHistoric
Definition: walsender.c:148
void WalSndResourceCleanup(bool isCommit)
Definition: walsender.c:350
void WalSndRqstFileReload(void)
Definition: walsender.c:3176
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1535
bool am_cascading_walsender
Definition: walsender.c:117
static TimestampTz last_processing
Definition: walsender.c:163
Size WalSndShmemSize(void)
Definition: walsender.c:3252
bool log_replication_commands
Definition: walsender.c:126
void HandleWalSndInitStopping(void)
Definition: walsender.c:3199
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:147
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1030
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:902
static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
Definition: walsender.c:3146
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:2063
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
Definition: walsender.c:3630
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3691
void WalSndSignals(void)
Definition: walsender.c:3233
static bool streamingDoneSending
Definition: walsender.c:180
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1244
static void WalSndShutdown(void)
Definition: walsender.c:230
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2629
int wal_sender_timeout
Definition: walsender.c:124
#define MAX_SEND_SIZE
Definition: walsender.c:107
static bool WalSndCaughtUp
Definition: walsender.c:184
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:149
static void ProcessStandbyMessage(void)
Definition: walsender.c:2000
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1344
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1234
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:208
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3756
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2372
static bool streamingDoneReceiving
Definition: walsender.c:181
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:670
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:3106
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3424
static XLogReaderState * xlogreader
Definition: walsender.c:138
static TimestampTz last_reply_timestamp
Definition: walsender.c:169
WalSndCtlData * WalSndCtl
Definition: walsender.c:110
void InitWalSender(void)
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22
PGDLLIMPORT Node * replication_parse_result
WalSndState
@ WALSNDSTATE_STREAMING
@ WALSNDSTATE_BACKUP
@ WALSNDSTATE_CATCHUP
@ WALSNDSTATE_STARTUP
@ WALSNDSTATE_STOPPING
void replication_scanner_finish(void)
int replication_yyparse(void)
void replication_scanner_init(const char *str)
bool replication_scanner_is_replication_command(void)
#define SIGCHLD
Definition: win32_port.h:186
#define SIGHUP
Definition: win32_port.h:176
#define SIG_DFL
Definition: win32_port.h:171
#define SIGPIPE
Definition: win32_port.h:181
#define kill(pid, sig)
Definition: win32_port.h:489
#define SIGUSR1
Definition: win32_port.h:188
#define SIGUSR2
Definition: win32_port.h:189
#define SIG_IGN
Definition: win32_port.h:173
static const unsigned __int64 epoch
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4841
bool XactReadOnly
Definition: xact.c:82
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3488
void StartTransactionCommand(void)
Definition: xact.c:2944
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:398
int XactIsoLevel
Definition: xact.c:79
bool IsSubTransaction(void)
Definition: xact.c:4896
bool IsTransactionBlock(void)
Definition: xact.c:4823
void CommitTransactionCommand(void)
Definition: xact.c:3041
#define XACT_REPEATABLE_READ
Definition: xact.h:38
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4177
bool RecoveryInProgress(void)
Definition: xlog.c:5908
TimeLineID GetWALInsertionTimeLine(void)
Definition: xlog.c:6096
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3435
int wal_segment_size
Definition: xlog.c:146
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:6073
bool XLogBackgroundFlush(void)
Definition: xlog.c:2704
#define MAXFNAMELEN
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
static void XLogFilePath(char *path, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void TLHistoryFilePath(char *path, TimeLineID tli)
static void TLHistoryFileName(char *fname, TimeLineID tli)
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint32 TimeLineID
Definition: xlogdefs.h:59
uint64 XLogSegNo
Definition: xlogdefs.h:48
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:422
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
Definition: xlogreader.c:1493
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition: xlogreader.c:108
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:264
#define XL_ROUTINE(...)
Definition: xlogreader.h:117
static TimeLineID receiveTLI
Definition: xlogrecovery.c:263
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:859
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength, TimeLineID currTLI)
Definition: xlogutils.c:735
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:1042