PostgreSQL Source Code  git master
walsender.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * walsender.c
4  *
5  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6  * care of sending XLOG from the primary server to a single recipient.
7  * (Note that there can be more than one walsender process concurrently.)
8  * It is started by the postmaster when the walreceiver of a standby server
9  * connects to the primary server and requests XLOG streaming replication.
10  *
11  * A walsender is similar to a regular backend, ie. there is a one-to-one
12  * relationship between a connection and a walsender process, but instead
13  * of processing SQL queries, it understands a small set of special
14  * replication-mode commands. The START_REPLICATION command begins streaming
15  * WAL to the client. While streaming, the walsender keeps reading XLOG
16  * records from the disk and sends them to the standby server over the
17  * COPY protocol, until either side ends the replication by exiting COPY
18  * mode (or until the connection is closed).
19  *
20  * Normal termination is by SIGTERM, which instructs the walsender to
21  * close the connection and exit(0) at the next convenient moment. Emergency
22  * termination is by SIGQUIT; like any backend, the walsender will simply
23  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24  * are treated as not a crash but approximately normal termination;
25  * the walsender will exit quickly without sending any more XLOG records.
26  *
27  * If the server is shut down, checkpointer sends us
28  * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If
29  * the backend is idle or runs an SQL query this causes the backend to
30  * shutdown, if logical replication is in progress all existing WAL records
31  * are processed followed by a shutdown. Otherwise this causes the walsender
32  * to switch to the "stopping" state. In this state, the walsender will reject
33  * any further replication commands. The checkpointer begins the shutdown
34  * checkpoint once all walsenders are confirmed as stopping. When the shutdown
35  * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
36  * walsender to send any outstanding WAL, including the shutdown checkpoint
37  * record, wait for it to be replicated to the standby, and then exit.
38  *
39  *
40  * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
41  *
42  * IDENTIFICATION
43  * src/backend/replication/walsender.c
44  *
45  *-------------------------------------------------------------------------
46  */
47 #include "postgres.h"
48 
49 #include <signal.h>
50 #include <unistd.h>
51 
52 #include "access/printtup.h"
53 #include "access/timeline.h"
54 #include "access/transam.h"
55 #include "access/xact.h"
56 #include "access/xlog_internal.h"
57 #include "access/xlogreader.h"
58 #include "access/xlogrecovery.h"
59 #include "access/xlogutils.h"
60 #include "catalog/pg_authid.h"
61 #include "catalog/pg_type.h"
62 #include "commands/dbcommands.h"
63 #include "commands/defrem.h"
64 #include "funcapi.h"
65 #include "libpq/libpq.h"
66 #include "libpq/pqformat.h"
67 #include "miscadmin.h"
68 #include "nodes/replnodes.h"
69 #include "pgstat.h"
70 #include "postmaster/interrupt.h"
71 #include "replication/basebackup.h"
72 #include "replication/decode.h"
73 #include "replication/logical.h"
74 #include "replication/slot.h"
75 #include "replication/snapbuild.h"
76 #include "replication/syncrep.h"
78 #include "replication/walsender.h"
81 #include "storage/fd.h"
82 #include "storage/ipc.h"
83 #include "storage/pmsignal.h"
84 #include "storage/proc.h"
85 #include "storage/procarray.h"
86 #include "tcop/dest.h"
87 #include "tcop/tcopprot.h"
88 #include "utils/acl.h"
89 #include "utils/builtins.h"
90 #include "utils/guc.h"
91 #include "utils/memutils.h"
92 #include "utils/pg_lsn.h"
93 #include "utils/portal.h"
94 #include "utils/ps_status.h"
95 #include "utils/timeout.h"
96 #include "utils/timestamp.h"
97 
98 /*
99  * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
100  *
101  * We don't have a good idea of what a good value would be; there's some
102  * overhead per message in both walsender and walreceiver, but on the other
103  * hand sending large batches makes walsender less responsive to signals
104  * because signals are checked only between messages. 128kB (with
105  * default 8k blocks) seems like a reasonable guess for now.
106  */
107 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
108 
109 /* Array of WalSnds in shared memory */
111 
112 /* My slot in the shared memory array */
113 WalSnd *MyWalSnd = NULL;
114 
115 /* Global state */
116 bool am_walsender = false; /* Am I a walsender process? */
117 bool am_cascading_walsender = false; /* Am I cascading WAL to another
118  * standby? */
119 bool am_db_walsender = false; /* Connected to a database? */
120 
121 /* User-settable parameters for walsender */
122 int max_wal_senders = 0; /* the maximum number of concurrent
123  * walsenders */
124 int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
125  * data message */
127 
128 /*
129  * State for WalSndWakeupRequest
130  */
131 bool wake_wal_senders = false;
132 
133 /*
134  * xlogreader used for replication. Note that a WAL sender doing physical
135  * replication does not need xlogreader to read WAL, but it needs one to
136  * keep a state of its work.
137  */
139 
140 /*
141  * These variables keep track of the state of the timeline we're currently
142  * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
143  * the timeline is not the latest timeline on this server, and the server's
144  * history forked off from that timeline at sendTimeLineValidUpto.
145  */
148 static bool sendTimeLineIsHistoric = false;
150 
151 /*
152  * How far have we sent WAL already? This is also advertised in
153  * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
154  */
156 
157 /* Buffers for constructing outgoing messages and processing reply messages. */
161 
162 /* Timestamp of last ProcessRepliesIfAny(). */
164 
165 /*
166  * Timestamp of last ProcessRepliesIfAny() that saw a reply from the
167  * standby. Set to 0 if wal_sender_timeout doesn't need to be active.
168  */
170 
171 /* Have we sent a heartbeat message asking for reply, since last reply? */
172 static bool waiting_for_ping_response = false;
173 
174 /*
175  * While streaming WAL in Copy mode, streamingDoneSending is set to true
176  * after we have sent CopyDone. We should not send any more CopyData messages
177  * after that. streamingDoneReceiving is set to true when we receive CopyDone
178  * from the other end. When both become true, it's time to exit Copy mode.
179  */
182 
183 /* Are we there yet? */
184 static bool WalSndCaughtUp = false;
185 
186 /* Flags set by signal handlers for later service in main loop */
187 static volatile sig_atomic_t got_SIGUSR2 = false;
188 static volatile sig_atomic_t got_STOPPING = false;
189 
190 /*
191  * This is set while we are streaming. When not set
192  * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
193  * the main loop is responsible for checking got_STOPPING and terminating when
194  * it's set (after streaming any remaining WAL).
195  */
196 static volatile sig_atomic_t replication_active = false;
197 
199 
200 /* A sample associating a WAL location with the time it was written. */
201 typedef struct
202 {
205 } WalTimeSample;
206 
207 /* The size of our buffer of time samples. */
208 #define LAG_TRACKER_BUFFER_SIZE 8192
209 
210 /* A mechanism for tracking replication lag. */
211 typedef struct
212 {
216  int read_heads[NUM_SYNC_REP_WAIT_MODE];
218 } LagTracker;
219 
221 
222 /* Signal handlers */
224 
225 /* Prototypes for private functions */
226 typedef void (*WalSndSendDataCallback) (void);
227 static void WalSndLoop(WalSndSendDataCallback send_data);
228 static void InitWalSenderSlot(void);
229 static void WalSndKill(int code, Datum arg);
231 static void XLogSendPhysical(void);
232 static void XLogSendLogical(void);
233 static void WalSndDone(WalSndSendDataCallback send_data);
235 static void IdentifySystem(void);
239 static void StartReplication(StartReplicationCmd *cmd);
241 static void ProcessStandbyMessage(void);
242 static void ProcessStandbyReplyMessage(void);
243 static void ProcessStandbyHSFeedbackMessage(void);
244 static void ProcessRepliesIfAny(void);
245 static void ProcessPendingWrites(void);
246 static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr);
247 static void WalSndKeepaliveIfNecessary(void);
248 static void WalSndCheckTimeOut(void);
250 static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
251 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
252 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
254  bool skipped_xact);
256 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
257 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
259 
260 static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
261  TimeLineID *tli_p);
262 
263 
264 /* Initialize walsender process before entering the main command loop */
265 void
266 InitWalSender(void)
267 {
269 
270  /* Create a per-walsender data structure in shared memory */
272 
273  /*
274  * We don't currently need any ResourceOwner in a walsender process, but
275  * if we did, we could call CreateAuxProcessResourceOwner here.
276  */
277 
278  /*
279  * Let postmaster know that we're a WAL sender. Once we've declared us as
280  * a WAL sender process, postmaster will let us outlive the bgwriter and
281  * kill us last in the shutdown sequence, so we get a chance to stream all
282  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
283  * there's no going back, and we mustn't write any WAL records after this.
284  */
287 
288  /*
289  * If the client didn't specify a database to connect to, show in PGPROC
290  * that our advertised xmin should affect vacuum horizons in all
291  * databases. This allows physical replication clients to send hot
292  * standby feedback that will delay vacuum cleanup in all databases.
293  */
294  if (MyDatabaseId == InvalidOid)
295  {
297  LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
300  LWLockRelease(ProcArrayLock);
301  }
302 
303  /* Initialize empty timestamp buffer for lag tracking. */
305 }
306 
307 /*
308  * Clean up after an error.
309  *
310  * WAL sender processes don't use transactions like regular backends do.
311  * This function does any cleanup required after an error in a WAL sender
312  * process, similar to what transaction abort does in a regular backend.
313  */
314 void
316 {
320 
321  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
323 
324  if (MyReplicationSlot != NULL)
326 
328 
329  replication_active = false;
330 
331  /*
332  * If there is a transaction in progress, it will clean up our
333  * ResourceOwner, but if a replication command set up a resource owner
334  * without a transaction, we've got to clean that up now.
335  */
337  WalSndResourceCleanup(false);
338 
339  if (got_STOPPING || got_SIGUSR2)
340  proc_exit(0);
341 
342  /* Revert back to startup state */
344 }
345 
346 /*
347  * Clean up any ResourceOwner we created.
348  */
349 void
350 WalSndResourceCleanup(bool isCommit)
351 {
352  ResourceOwner resowner;
353 
354  if (CurrentResourceOwner == NULL)
355  return;
356 
357  /*
358  * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
359  * in a local variable and clear it first.
360  */
361  resowner = CurrentResourceOwner;
362  CurrentResourceOwner = NULL;
363 
364  /* Now we can release resources and delete it. */
365  ResourceOwnerRelease(resowner,
366  RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
367  ResourceOwnerRelease(resowner,
368  RESOURCE_RELEASE_LOCKS, isCommit, true);
369  ResourceOwnerRelease(resowner,
370  RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
371  ResourceOwnerDelete(resowner);
372 }
373 
374 /*
375  * Handle a client's connection abort in an orderly manner.
376  */
377 static void
378 WalSndShutdown(void)
379 {
380  /*
381  * Reset whereToSendOutput to prevent ereport from attempting to send any
382  * more messages to the standby.
383  */
386 
387  proc_exit(0);
388  abort(); /* keep the compiler quiet */
389 }
390 
391 /*
392  * Handle the IDENTIFY_SYSTEM command.
393  */
394 static void
396 {
397  char sysid[32];
398  char xloc[MAXFNAMELEN];
399  XLogRecPtr logptr;
400  char *dbname = NULL;
402  TupOutputState *tstate;
403  TupleDesc tupdesc;
404  Datum values[4];
405  bool nulls[4];
406  TimeLineID currTLI;
407 
408  /*
409  * Reply with a result set with one row, four columns. First col is system
410  * ID, second is timeline ID, third is current xlog location and the
411  * fourth contains the database name if we are connected to one.
412  */
413 
414  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
416 
419  logptr = GetStandbyFlushRecPtr(&currTLI);
420  else
421  logptr = GetFlushRecPtr(&currTLI);
422 
423  snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
424 
425  if (MyDatabaseId != InvalidOid)
426  {
428 
429  /* syscache access needs a transaction env. */
431  /* make dbname live outside TX context */
435  /* CommitTransactionCommand switches to TopMemoryContext */
437  }
438 
440  MemSet(nulls, false, sizeof(nulls));
441 
442  /* need a tuple descriptor representing four columns */
443  tupdesc = CreateTemplateTupleDesc(4);
444  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
445  TEXTOID, -1, 0);
446  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
447  INT4OID, -1, 0);
448  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
449  TEXTOID, -1, 0);
450  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
451  TEXTOID, -1, 0);
452 
453  /* prepare for projection of tuples */
454  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
455 
456  /* column 1: system identifier */
457  values[0] = CStringGetTextDatum(sysid);
458 
459  /* column 2: timeline */
460  values[1] = Int32GetDatum(currTLI);
461 
462  /* column 3: wal location */
463  values[2] = CStringGetTextDatum(xloc);
464 
465  /* column 4: database name, or NULL if none */
466  if (dbname)
468  else
469  nulls[3] = true;
470 
471  /* send it to dest */
472  do_tup_output(tstate, values, nulls);
473 
474  end_tup_output(tstate);
475 }
476 
477 /* Handle READ_REPLICATION_SLOT command */
478 static void
480 {
481 #define READ_REPLICATION_SLOT_COLS 3
482  ReplicationSlot *slot;
484  TupOutputState *tstate;
485  TupleDesc tupdesc;
487  bool nulls[READ_REPLICATION_SLOT_COLS];
488 
490  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
491  TEXTOID, -1, 0);
492  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
493  TEXTOID, -1, 0);
494  /* TimeLineID is unsigned, so int4 is not wide enough. */
495  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
496  INT8OID, -1, 0);
497 
499  MemSet(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
500 
501  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
502  slot = SearchNamedReplicationSlot(cmd->slotname, false);
503  if (slot == NULL || !slot->in_use)
504  {
505  LWLockRelease(ReplicationSlotControlLock);
506  }
507  else
508  {
509  ReplicationSlot slot_contents;
510  int i = 0;
511 
512  /* Copy slot contents while holding spinlock */
513  SpinLockAcquire(&slot->mutex);
514  slot_contents = *slot;
515  SpinLockRelease(&slot->mutex);
516  LWLockRelease(ReplicationSlotControlLock);
517 
518  if (OidIsValid(slot_contents.data.database))
519  ereport(ERROR,
520  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
521  errmsg("cannot use \"%s\" with logical replication slot \"%s\"",
522  "READ_REPLICATION_SLOT",
523  NameStr(slot_contents.data.name)));
524 
525  /* slot type */
526  values[i] = CStringGetTextDatum("physical");
527  nulls[i] = false;
528  i++;
529 
530  /* start LSN */
531  if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
532  {
533  char xloc[64];
534 
535  snprintf(xloc, sizeof(xloc), "%X/%X",
536  LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
537  values[i] = CStringGetTextDatum(xloc);
538  nulls[i] = false;
539  }
540  i++;
541 
542  /* timeline this WAL was produced on */
543  if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
544  {
545  TimeLineID slots_position_timeline;
546  TimeLineID current_timeline;
547  List *timeline_history = NIL;
548 
549  /*
550  * While in recovery, use as timeline the currently-replaying one
551  * to get the LSN position's history.
552  */
553  if (RecoveryInProgress())
554  (void) GetXLogReplayRecPtr(&current_timeline);
555  else
556  current_timeline = GetWALInsertionTimeLine();
557 
558  timeline_history = readTimeLineHistory(current_timeline);
559  slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
560  timeline_history);
561  values[i] = Int64GetDatum((int64) slots_position_timeline);
562  nulls[i] = false;
563  }
564  i++;
565 
567  }
568 
570  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
571  do_tup_output(tstate, values, nulls);
572  end_tup_output(tstate);
573 }
574 
575 
576 /*
577  * Handle TIMELINE_HISTORY command.
578  */
579 static void
581 {
583  char histfname[MAXFNAMELEN];
584  char path[MAXPGPATH];
585  int fd;
586  off_t histfilelen;
587  off_t bytesleft;
588  Size len;
589 
590  /*
591  * Reply with a result set with one row, and two columns. The first col is
592  * the name of the history file, 2nd is the contents.
593  */
594 
595  TLHistoryFileName(histfname, cmd->timeline);
596  TLHistoryFilePath(path, cmd->timeline);
597 
598  /* Send a RowDescription message */
599  pq_beginmessage(&buf, 'T');
600  pq_sendint16(&buf, 2); /* 2 fields */
601 
602  /* first field */
603  pq_sendstring(&buf, "filename"); /* col name */
604  pq_sendint32(&buf, 0); /* table oid */
605  pq_sendint16(&buf, 0); /* attnum */
606  pq_sendint32(&buf, TEXTOID); /* type oid */
607  pq_sendint16(&buf, -1); /* typlen */
608  pq_sendint32(&buf, 0); /* typmod */
609  pq_sendint16(&buf, 0); /* format code */
610 
611  /* second field */
612  pq_sendstring(&buf, "content"); /* col name */
613  pq_sendint32(&buf, 0); /* table oid */
614  pq_sendint16(&buf, 0); /* attnum */
615  pq_sendint32(&buf, TEXTOID); /* type oid */
616  pq_sendint16(&buf, -1); /* typlen */
617  pq_sendint32(&buf, 0); /* typmod */
618  pq_sendint16(&buf, 0); /* format code */
619  pq_endmessage(&buf);
620 
621  /* Send a DataRow message */
622  pq_beginmessage(&buf, 'D');
623  pq_sendint16(&buf, 2); /* # of columns */
624  len = strlen(histfname);
625  pq_sendint32(&buf, len); /* col1 len */
626  pq_sendbytes(&buf, histfname, len);
627 
628  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
629  if (fd < 0)
630  ereport(ERROR,
632  errmsg("could not open file \"%s\": %m", path)));
633 
634  /* Determine file length and send it to client */
635  histfilelen = lseek(fd, 0, SEEK_END);
636  if (histfilelen < 0)
637  ereport(ERROR,
639  errmsg("could not seek to end of file \"%s\": %m", path)));
640  if (lseek(fd, 0, SEEK_SET) != 0)
641  ereport(ERROR,
643  errmsg("could not seek to beginning of file \"%s\": %m", path)));
644 
645  pq_sendint32(&buf, histfilelen); /* col2 len */
646 
647  bytesleft = histfilelen;
648  while (bytesleft > 0)
649  {
650  PGAlignedBlock rbuf;
651  int nread;
652 
654  nread = read(fd, rbuf.data, sizeof(rbuf));
656  if (nread < 0)
657  ereport(ERROR,
659  errmsg("could not read file \"%s\": %m",
660  path)));
661  else if (nread == 0)
662  ereport(ERROR,
664  errmsg("could not read file \"%s\": read %d of %zu",
665  path, nread, (Size) bytesleft)));
666 
667  pq_sendbytes(&buf, rbuf.data, nread);
668  bytesleft -= nread;
669  }
670 
671  if (CloseTransientFile(fd) != 0)
672  ereport(ERROR,
674  errmsg("could not close file \"%s\": %m", path)));
675 
676  pq_endmessage(&buf);
677 }
678 
679 /*
680  * Handle START_REPLICATION command.
681  *
682  * At the moment, this never returns, but an ereport(ERROR) will take us back
683  * to the main loop.
684  */
685 static void
687 {
689  XLogRecPtr FlushPtr;
690  TimeLineID FlushTLI;
691 
692  /* create xlogreader for physical replication */
693  xlogreader =
695  XL_ROUTINE(.segment_open = WalSndSegmentOpen,
696  .segment_close = wal_segment_close),
697  NULL);
698 
699  if (!xlogreader)
700  ereport(ERROR,
701  (errcode(ERRCODE_OUT_OF_MEMORY),
702  errmsg("out of memory"),
703  errdetail("Failed while allocating a WAL reading processor.")));
704 
705  /*
706  * We assume here that we're logging enough information in the WAL for
707  * log-shipping, since this is checked in PostmasterMain().
708  *
709  * NOTE: wal_level can only change at shutdown, so in most cases it is
710  * difficult for there to be WAL data that we can still see that was
711  * written at wal_level='minimal'.
712  */
713 
714  if (cmd->slotname)
715  {
716  ReplicationSlotAcquire(cmd->slotname, true);
718  ereport(ERROR,
719  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
720  errmsg("cannot use a logical replication slot for physical replication")));
721 
722  /*
723  * We don't need to verify the slot's restart_lsn here; instead we
724  * rely on the caller requesting the starting point to use. If the
725  * WAL segment doesn't exist, we'll fail later.
726  */
727  }
728 
729  /*
730  * Select the timeline. If it was given explicitly by the client, use
731  * that. Otherwise use the timeline of the last replayed record.
732  */
735  FlushPtr = GetStandbyFlushRecPtr(&FlushTLI);
736  else
737  FlushPtr = GetFlushRecPtr(&FlushTLI);
738 
739  if (cmd->timeline != 0)
740  {
741  XLogRecPtr switchpoint;
742 
743  sendTimeLine = cmd->timeline;
744  if (sendTimeLine == FlushTLI)
745  {
746  sendTimeLineIsHistoric = false;
748  }
749  else
750  {
751  List *timeLineHistory;
752 
753  sendTimeLineIsHistoric = true;
754 
755  /*
756  * Check that the timeline the client requested exists, and the
757  * requested start location is on that timeline.
758  */
759  timeLineHistory = readTimeLineHistory(FlushTLI);
760  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
762  list_free_deep(timeLineHistory);
763 
764  /*
765  * Found the requested timeline in the history. Check that
766  * requested startpoint is on that timeline in our history.
767  *
768  * This is quite loose on purpose. We only check that we didn't
769  * fork off the requested timeline before the switchpoint. We
770  * don't check that we switched *to* it before the requested
771  * starting point. This is because the client can legitimately
772  * request to start replication from the beginning of the WAL
773  * segment that contains switchpoint, but on the new timeline, so
774  * that it doesn't end up with a partial segment. If you ask for
775  * too old a starting point, you'll get an error later when we
776  * fail to find the requested WAL segment in pg_wal.
777  *
778  * XXX: we could be more strict here and only allow a startpoint
779  * that's older than the switchpoint, if it's still in the same
780  * WAL segment.
781  */
782  if (!XLogRecPtrIsInvalid(switchpoint) &&
783  switchpoint < cmd->startpoint)
784  {
785  ereport(ERROR,
786  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
788  cmd->timeline),
789  errdetail("This server's history forked from timeline %u at %X/%X.",
790  cmd->timeline,
791  LSN_FORMAT_ARGS(switchpoint))));
792  }
793  sendTimeLineValidUpto = switchpoint;
794  }
795  }
796  else
797  {
798  sendTimeLine = FlushTLI;
800  sendTimeLineIsHistoric = false;
801  }
802 
804 
805  /* If there is nothing to stream, don't even enter COPY mode */
807  {
808  /*
809  * When we first start replication the standby will be behind the
810  * primary. For some applications, for example synchronous
811  * replication, it is important to have a clear state for this initial
812  * catchup mode, so we can trigger actions when we change streaming
813  * state later. We may stay in this state for a long time, which is
814  * exactly why we want to be able to monitor whether or not we are
815  * still here.
816  */
818 
819  /* Send a CopyBothResponse message, and start streaming */
820  pq_beginmessage(&buf, 'W');
821  pq_sendbyte(&buf, 0);
822  pq_sendint16(&buf, 0);
823  pq_endmessage(&buf);
824  pq_flush();
825 
826  /*
827  * Don't allow a request to stream from a future point in WAL that
828  * hasn't been flushed to disk in this server yet.
829  */
830  if (FlushPtr < cmd->startpoint)
831  {
832  ereport(ERROR,
833  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
835  LSN_FORMAT_ARGS(FlushPtr))));
836  }
837 
838  /* Start streaming from the requested point */
839  sentPtr = cmd->startpoint;
840 
841  /* Initialize shared memory status, too */
845 
847 
848  /* Main loop of walsender */
849  replication_active = true;
850 
852 
853  replication_active = false;
854  if (got_STOPPING)
855  proc_exit(0);
857 
859  }
860 
861  if (cmd->slotname)
863 
864  /*
865  * Copy is finished now. Send a single-row result set indicating the next
866  * timeline.
867  */
869  {
870  char startpos_str[8 + 1 + 8 + 1];
872  TupOutputState *tstate;
873  TupleDesc tupdesc;
874  Datum values[2];
875  bool nulls[2];
876 
877  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
879 
881  MemSet(nulls, false, sizeof(nulls));
882 
883  /*
884  * Need a tuple descriptor representing two columns. int8 may seem
885  * like a surprising data type for this, but in theory int4 would not
886  * be wide enough for this, as TimeLineID is unsigned.
887  */
888  tupdesc = CreateTemplateTupleDesc(2);
889  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
890  INT8OID, -1, 0);
891  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
892  TEXTOID, -1, 0);
893 
894  /* prepare for projection of tuple */
895  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
896 
898  values[1] = CStringGetTextDatum(startpos_str);
899 
900  /* send it to dest */
901  do_tup_output(tstate, values, nulls);
902 
903  end_tup_output(tstate);
904  }
905 
906  /* Send CommandComplete message */
907  EndReplicationCommand("START_STREAMING");
908 }
909 
910 /*
911  * XLogReaderRoutine->page_read callback for logical decoding contexts, as a
912  * walsender process.
913  *
914  * Inside the walsender we can do better than read_local_xlog_page,
915  * which has to do a plain sleep/busy loop, because the walsender's latch gets
916  * set every time WAL is flushed.
917  */
918 static int
920  XLogRecPtr targetRecPtr, char *cur_page)
921 {
922  XLogRecPtr flushptr;
923  int count;
924  WALReadError errinfo;
925  XLogSegNo segno;
927 
928  /*
929  * Since logical decoding is only permitted on a primary server, we know
930  * that the current timeline ID can't be changing any more. If we did this
931  * on a standby, we'd have to worry about the values we compute here
932  * becoming invalid due to a promotion or timeline change.
933  */
934  XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
935  sendTimeLineIsHistoric = (state->currTLI != currTLI);
936  sendTimeLine = state->currTLI;
937  sendTimeLineValidUpto = state->currTLIValidUntil;
938  sendTimeLineNextTLI = state->nextTLI;
939 
940  /* make sure we have enough WAL available */
941  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
942 
943  /* fail if not (implies we are going to shut down) */
944  if (flushptr < targetPagePtr + reqLen)
945  return -1;
946 
947  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
948  count = XLOG_BLCKSZ; /* more than one block available */
949  else
950  count = flushptr - targetPagePtr; /* part of the page available */
951 
952  /* now actually read the data, we know it's there */
953  if (!WALRead(state,
954  cur_page,
955  targetPagePtr,
956  XLOG_BLCKSZ,
957  state->seg.ws_tli, /* Pass the current TLI because only
958  * WalSndSegmentOpen controls whether new
959  * TLI is needed. */
960  &errinfo))
961  WALReadRaiseError(&errinfo);
962 
963  /*
964  * After reading into the buffer, check that what we read was valid. We do
965  * this after reading, because even though the segment was present when we
966  * opened it, it might get recycled or removed while we read it. The
967  * read() succeeds in that case, but the data we tried to read might
968  * already have been overwritten with new WAL records.
969  */
970  XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
971  CheckXLogRemoved(segno, state->seg.ws_tli);
972 
973  return count;
974 }
975 
976 /*
977  * Process extra options given to CREATE_REPLICATION_SLOT.
978  */
979 static void
981  bool *reserve_wal,
982  CRSSnapshotAction *snapshot_action,
983  bool *two_phase)
984 {
985  ListCell *lc;
986  bool snapshot_action_given = false;
987  bool reserve_wal_given = false;
988  bool two_phase_given = false;
989 
990  /* Parse options */
991  foreach(lc, cmd->options)
992  {
993  DefElem *defel = (DefElem *) lfirst(lc);
994 
995  if (strcmp(defel->defname, "snapshot") == 0)
996  {
997  char *action;
998 
999  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1000  ereport(ERROR,
1001  (errcode(ERRCODE_SYNTAX_ERROR),
1002  errmsg("conflicting or redundant options")));
1003 
1004  action = defGetString(defel);
1005  snapshot_action_given = true;
1006 
1007  if (strcmp(action, "export") == 0)
1008  *snapshot_action = CRS_EXPORT_SNAPSHOT;
1009  else if (strcmp(action, "nothing") == 0)
1010  *snapshot_action = CRS_NOEXPORT_SNAPSHOT;
1011  else if (strcmp(action, "use") == 0)
1012  *snapshot_action = CRS_USE_SNAPSHOT;
1013  else
1014  ereport(ERROR,
1015  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1016  errmsg("unrecognized value for CREATE_REPLICATION_SLOT option \"%s\": \"%s\"",
1017  defel->defname, action)));
1018  }
1019  else if (strcmp(defel->defname, "reserve_wal") == 0)
1020  {
1021  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
1022  ereport(ERROR,
1023  (errcode(ERRCODE_SYNTAX_ERROR),
1024  errmsg("conflicting or redundant options")));
1025 
1026  reserve_wal_given = true;
1027  *reserve_wal = defGetBoolean(defel);
1028  }
1029  else if (strcmp(defel->defname, "two_phase") == 0)
1030  {
1031  if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1032  ereport(ERROR,
1033  (errcode(ERRCODE_SYNTAX_ERROR),
1034  errmsg("conflicting or redundant options")));
1035  two_phase_given = true;
1036  *two_phase = defGetBoolean(defel);
1037  }
1038  else
1039  elog(ERROR, "unrecognized option: %s", defel->defname);
1040  }
1041 }
1042 
1043 /*
1044  * Create a new replication slot.
1045  */
1046 static void
1048 {
1049  const char *snapshot_name = NULL;
1050  char xloc[MAXFNAMELEN];
1051  char *slot_name;
1052  bool reserve_wal = false;
1053  bool two_phase = false;
1054  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
1055  DestReceiver *dest;
1056  TupOutputState *tstate;
1057  TupleDesc tupdesc;
1058  Datum values[4];
1059  bool nulls[4];
1060 
1062 
1063  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase);
1064 
1065  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1066  {
1067  ReplicationSlotCreate(cmd->slotname, false,
1069  false);
1070  }
1071  else
1072  {
1074 
1075  /*
1076  * Initially create persistent slot as ephemeral - that allows us to
1077  * nicely handle errors during initialization because it'll get
1078  * dropped if this transaction fails. We'll make it persistent at the
1079  * end. Temporary slots can be created as temporary from beginning as
1080  * they get dropped on error as well.
1081  */
1082  ReplicationSlotCreate(cmd->slotname, true,
1084  two_phase);
1085  }
1086 
1087  if (cmd->kind == REPLICATION_KIND_LOGICAL)
1088  {
1090  bool need_full_snapshot = false;
1091 
1092  /*
1093  * Do options check early so that we can bail before calling the
1094  * DecodingContextFindStartpoint which can take long time.
1095  */
1096  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1097  {
1098  if (IsTransactionBlock())
1099  ereport(ERROR,
1100  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1101  (errmsg("%s must not be called inside a transaction",
1102  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
1103 
1104  need_full_snapshot = true;
1105  }
1106  else if (snapshot_action == CRS_USE_SNAPSHOT)
1107  {
1108  if (!IsTransactionBlock())
1109  ereport(ERROR,
1110  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1111  (errmsg("%s must be called inside a transaction",
1112  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1113 
1115  ereport(ERROR,
1116  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1117  (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
1118  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1119 
1120  if (FirstSnapshotSet)
1121  ereport(ERROR,
1122  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1123  (errmsg("%s must be called before any query",
1124  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1125 
1126  if (IsSubTransaction())
1127  ereport(ERROR,
1128  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1129  (errmsg("%s must not be called in a subtransaction",
1130  "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1131 
1132  need_full_snapshot = true;
1133  }
1134 
1135  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
1137  XL_ROUTINE(.page_read = logical_read_xlog_page,
1138  .segment_open = WalSndSegmentOpen,
1139  .segment_close = wal_segment_close),
1142 
1143  /*
1144  * Signal that we don't need the timeout mechanism. We're just
1145  * creating the replication slot and don't yet accept feedback
1146  * messages or send keepalives. As we possibly need to wait for
1147  * further WAL the walsender would otherwise possibly be killed too
1148  * soon.
1149  */
1151 
1152  /* build initial snapshot, might take a while */
1154 
1155  /*
1156  * Export or use the snapshot if we've been asked to do so.
1157  *
1158  * NB. We will convert the snapbuild.c kind of snapshot to normal
1159  * snapshot when doing this.
1160  */
1161  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1162  {
1163  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
1164  }
1165  else if (snapshot_action == CRS_USE_SNAPSHOT)
1166  {
1167  Snapshot snap;
1168 
1171  }
1172 
1173  /* don't need the decoding context anymore */
1174  FreeDecodingContext(ctx);
1175 
1176  if (!cmd->temporary)
1178  }
1179  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
1180  {
1182 
1184 
1185  /* Write this slot to disk if it's a permanent one. */
1186  if (!cmd->temporary)
1188  }
1189 
1190  snprintf(xloc, sizeof(xloc), "%X/%X",
1192 
1194  MemSet(nulls, false, sizeof(nulls));
1195 
1196  /*----------
1197  * Need a tuple descriptor representing four columns:
1198  * - first field: the slot name
1199  * - second field: LSN at which we became consistent
1200  * - third field: exported snapshot's name
1201  * - fourth field: output plugin
1202  *----------
1203  */
1204  tupdesc = CreateTemplateTupleDesc(4);
1205  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1206  TEXTOID, -1, 0);
1207  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1208  TEXTOID, -1, 0);
1209  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1210  TEXTOID, -1, 0);
1211  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1212  TEXTOID, -1, 0);
1213 
1214  /* prepare for projection of tuples */
1215  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1216 
1217  /* slot_name */
1218  slot_name = NameStr(MyReplicationSlot->data.name);
1219  values[0] = CStringGetTextDatum(slot_name);
1220 
1221  /* consistent wal location */
1222  values[1] = CStringGetTextDatum(xloc);
1223 
1224  /* snapshot name, or NULL if none */
1225  if (snapshot_name != NULL)
1226  values[2] = CStringGetTextDatum(snapshot_name);
1227  else
1228  nulls[2] = true;
1229 
1230  /* plugin, or NULL if none */
1231  if (cmd->plugin != NULL)
1232  values[3] = CStringGetTextDatum(cmd->plugin);
1233  else
1234  nulls[3] = true;
1235 
1236  /* send it to dest */
1237  do_tup_output(tstate, values, nulls);
1238  end_tup_output(tstate);
1239 
1241 }
1242 
1243 /*
1244  * Get rid of a replication slot that is no longer wanted.
1245  */
1246 static void
1248 {
1249  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1250 }
1251 
1252 /*
1253  * Load previously initiated logical slot and prepare for sending data (via
1254  * WalSndLoop).
1255  */
1256 static void
1258 {
1260  QueryCompletion qc;
1261 
1262  /* make sure that our requirements are still fulfilled */
1264 
1266 
1267  ReplicationSlotAcquire(cmd->slotname, true);
1268 
1270  ereport(ERROR,
1271  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1272  errmsg("cannot read from logical replication slot \"%s\"",
1273  cmd->slotname),
1274  errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
1275 
1276  /*
1277  * Force a disconnect, so that the decoding code doesn't need to care
1278  * about an eventual switch from running in recovery, to running in a
1279  * normal environment. Client code is expected to handle reconnects.
1280  */
1282  {
1283  ereport(LOG,
1284  (errmsg("terminating walsender process after promotion")));
1285  got_STOPPING = true;
1286  }
1287 
1288  /*
1289  * Create our decoding context, making it start at the previously ack'ed
1290  * position.
1291  *
1292  * Do this before sending a CopyBothResponse message, so that any errors
1293  * are reported early.
1294  */
1296  CreateDecodingContext(cmd->startpoint, cmd->options, false,
1297  XL_ROUTINE(.page_read = logical_read_xlog_page,
1298  .segment_open = WalSndSegmentOpen,
1299  .segment_close = wal_segment_close),
1303 
1305 
1306  /* Send a CopyBothResponse message, and start streaming */
1307  pq_beginmessage(&buf, 'W');
1308  pq_sendbyte(&buf, 0);
1309  pq_sendint16(&buf, 0);
1310  pq_endmessage(&buf);
1311  pq_flush();
1312 
1313  /* Start reading WAL from the oldest required WAL. */
1316 
1317  /*
1318  * Report the location after which we'll send out further commits as the
1319  * current sentPtr.
1320  */
1322 
1323  /* Also update the sent position status in shared memory */
1327 
1328  replication_active = true;
1329 
1331 
1332  /* Main loop of walsender */
1334 
1337 
1338  replication_active = false;
1339  if (got_STOPPING)
1340  proc_exit(0);
1342 
1343  /* Get out of COPY mode (CommandComplete). */
1344  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
1345  EndCommand(&qc, DestRemote, false);
1346 }
1347 
1348 /*
1349  * LogicalDecodingContext 'prepare_write' callback.
1350  *
1351  * Prepare a write into a StringInfo.
1352  *
1353  * Don't do anything lasting in here, it's quite possible that nothing will be done
1354  * with the data.
1355  */
1356 static void
1358 {
1359  /* can't have sync rep confused by sending the same LSN several times */
1360  if (!last_write)
1361  lsn = InvalidXLogRecPtr;
1362 
1363  resetStringInfo(ctx->out);
1364 
1365  pq_sendbyte(ctx->out, 'w');
1366  pq_sendint64(ctx->out, lsn); /* dataStart */
1367  pq_sendint64(ctx->out, lsn); /* walEnd */
1368 
1369  /*
1370  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1371  * reserve space here.
1372  */
1373  pq_sendint64(ctx->out, 0); /* sendtime */
1374 }
1375 
1376 /*
1377  * LogicalDecodingContext 'write' callback.
1378  *
1379  * Actually write out data previously prepared by WalSndPrepareWrite out to
1380  * the network. Take as long as needed, but process replies from the other
1381  * side and check timeouts during that.
1382  */
1383 static void
1385  bool last_write)
1386 {
1387  TimestampTz now;
1388 
1389  /*
1390  * Fill the send timestamp last, so that it is taken as late as possible.
1391  * This is somewhat ugly, but the protocol is set as it's already used for
1392  * several releases by streaming physical replication.
1393  */
1396  pq_sendint64(&tmpbuf, now);
1397  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1398  tmpbuf.data, sizeof(int64));
1399 
1400  /* output previously gathered data in a CopyData packet */
1401  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1402 
1404 
1405  /* Try to flush pending output to the client */
1406  if (pq_flush_if_writable() != 0)
1407  WalSndShutdown();
1408 
1409  /* Try taking fast path unless we get too close to walsender timeout. */
1411  wal_sender_timeout / 2) &&
1412  !pq_is_send_pending())
1413  {
1414  return;
1415  }
1416 
1417  /* If we have pending write here, go to slow path */
1419 }
1420 
1421 /*
1422  * Wait until there is no pending write. Also process replies from the other
1423  * side and check timeouts during that.
1424  */
1425 static void
1427 {
1428  for (;;)
1429  {
1430  long sleeptime;
1431 
1432  /* Check for input from the client */
1434 
1435  /* die if timeout was reached */
1437 
1438  /* Send keepalive if the time has come */
1440 
1441  if (!pq_is_send_pending())
1442  break;
1443 
1445 
1446  /* Sleep until something happens or we time out */
1449 
1450  /* Clear any already-pending wakeups */
1452 
1454 
1455  /* Process any requests or signals received recently */
1456  if (ConfigReloadPending)
1457  {
1458  ConfigReloadPending = false;
1461  }
1462 
1463  /* Try to flush pending output to the client */
1464  if (pq_flush_if_writable() != 0)
1465  WalSndShutdown();
1466  }
1467 
1468  /* reactivate latch so WalSndLoop knows to continue */
1469  SetLatch(MyLatch);
1470 }
1471 
1472 /*
1473  * LogicalDecodingContext 'update_progress' callback.
1474  *
1475  * Write the current position to the lag tracker (see XLogSendPhysical).
1476  *
1477  * When skipping empty transactions, send a keepalive message if necessary.
1478  */
1479 static void
1481  bool skipped_xact)
1482 {
1483  static TimestampTz sendTime = 0;
1485  bool pending_writes = false;
1486  bool end_xact = ctx->end_xact;
1487 
1488  /*
1489  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1490  * avoid flooding the lag tracker when we commit frequently.
1491  *
1492  * We don't have a mechanism to get the ack for any LSN other than end
1493  * xact LSN from the downstream. So, we track lag only for end of
1494  * transaction LSN.
1495  */
1496 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1497  if (end_xact && TimestampDifferenceExceeds(sendTime, now,
1499  {
1500  LagTrackerWrite(lsn, now);
1501  sendTime = now;
1502  }
1503 
1504  /*
1505  * When skipping empty transactions in synchronous replication, we send a
1506  * keepalive message to avoid delaying such transactions.
1507  *
1508  * It is okay to check sync_standbys_defined flag without lock here as in
1509  * the worst case we will just send an extra keepalive message when it is
1510  * really not required.
1511  */
1512  if (skipped_xact &&
1513  SyncRepRequested() &&
1514  ((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
1515  {
1516  WalSndKeepalive(false, lsn);
1517 
1518  /* Try to flush pending output to the client */
1519  if (pq_flush_if_writable() != 0)
1520  WalSndShutdown();
1521 
1522  /* If we have pending write here, make sure it's actually flushed */
1523  if (pq_is_send_pending())
1524  pending_writes = true;
1525  }
1526 
1527  /*
1528  * Process pending writes if any or try to send a keepalive if required.
1529  * We don't need to try sending keep alive messages at the transaction end
1530  * as that will be done at a later point in time. This is required only
1531  * for large transactions where we don't send any changes to the
1532  * downstream and the receiver can timeout due to that.
1533  */
1534  if (pending_writes || (!end_xact &&
1536  wal_sender_timeout / 2)))
1538 }
1539 
1540 /*
1541  * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
1542  *
1543  * Returns end LSN of flushed WAL. Normally this will be >= loc, but
1544  * if we detect a shutdown request (either from postmaster or client)
1545  * we will return early, so caller must always check.
1546  */
1547 static XLogRecPtr
1549 {
1550  int wakeEvents;
1551  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1552 
1553  /*
1554  * Fast path to avoid acquiring the spinlock in case we already know we
1555  * have enough WAL available. This is particularly interesting if we're
1556  * far behind.
1557  */
1558  if (RecentFlushPtr != InvalidXLogRecPtr &&
1559  loc <= RecentFlushPtr)
1560  return RecentFlushPtr;
1561 
1562  /* Get a more recent flush pointer. */
1563  if (!RecoveryInProgress())
1564  RecentFlushPtr = GetFlushRecPtr(NULL);
1565  else
1566  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1567 
1568  for (;;)
1569  {
1570  long sleeptime;
1571 
1572  /* Clear any already-pending wakeups */
1574 
1576 
1577  /* Process any requests or signals received recently */
1578  if (ConfigReloadPending)
1579  {
1580  ConfigReloadPending = false;
1583  }
1584 
1585  /* Check for input from the client */
1587 
1588  /*
1589  * If we're shutting down, trigger pending WAL to be written out,
1590  * otherwise we'd possibly end up waiting for WAL that never gets
1591  * written, because walwriter has shut down already.
1592  */
1593  if (got_STOPPING)
1595 
1596  /* Update our idea of the currently flushed position. */
1597  if (!RecoveryInProgress())
1598  RecentFlushPtr = GetFlushRecPtr(NULL);
1599  else
1600  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1601 
1602  /*
1603  * If postmaster asked us to stop, don't wait anymore.
1604  *
1605  * It's important to do this check after the recomputation of
1606  * RecentFlushPtr, so we can send all remaining data before shutting
1607  * down.
1608  */
1609  if (got_STOPPING)
1610  break;
1611 
1612  /*
1613  * We only send regular messages to the client for full decoded
1614  * transactions, but a synchronous replication and walsender shutdown
1615  * possibly are waiting for a later location. So, before sleeping, we
1616  * send a ping containing the flush location. If the receiver is
1617  * otherwise idle, this keepalive will trigger a reply. Processing the
1618  * reply will update these MyWalSnd locations.
1619  */
1620  if (MyWalSnd->flush < sentPtr &&
1621  MyWalSnd->write < sentPtr &&
1624 
1625  /* check whether we're done */
1626  if (loc <= RecentFlushPtr)
1627  break;
1628 
1629  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1630  WalSndCaughtUp = true;
1631 
1632  /*
1633  * Try to flush any pending output to the client.
1634  */
1635  if (pq_flush_if_writable() != 0)
1636  WalSndShutdown();
1637 
1638  /*
1639  * If we have received CopyDone from the client, sent CopyDone
1640  * ourselves, and the output buffer is empty, it's time to exit
1641  * streaming, so fail the current WAL fetch request.
1642  */
1644  !pq_is_send_pending())
1645  break;
1646 
1647  /* die if timeout was reached */
1649 
1650  /* Send keepalive if the time has come */
1652 
1653  /*
1654  * Sleep until something happens or we time out. Also wait for the
1655  * socket becoming writable, if there's still pending output.
1656  * Otherwise we might sit on sendable output data while waiting for
1657  * new WAL to be generated. (But if we have nothing to send, we don't
1658  * want to wake on socket-writable.)
1659  */
1661 
1662  wakeEvents = WL_SOCKET_READABLE;
1663 
1664  if (pq_is_send_pending())
1665  wakeEvents |= WL_SOCKET_WRITEABLE;
1666 
1667  WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL);
1668  }
1669 
1670  /* reactivate latch so WalSndLoop knows to continue */
1671  SetLatch(MyLatch);
1672  return RecentFlushPtr;
1673 }
1674 
1675 /*
1676  * Execute an incoming replication command.
1677  *
1678  * Returns true if the cmd_string was recognized as WalSender command, false
1679  * if not.
1680  */
1681 bool
1682 exec_replication_command(const char *cmd_string)
1683 {
1684  int parse_rc;
1685  Node *cmd_node;
1686  const char *cmdtag;
1687  MemoryContext cmd_context;
1688  MemoryContext old_context;
1689 
1690  /*
1691  * If WAL sender has been told that shutdown is getting close, switch its
1692  * status accordingly to handle the next replication commands correctly.
1693  */
1694  if (got_STOPPING)
1696 
1697  /*
1698  * Throw error if in stopping mode. We need prevent commands that could
1699  * generate WAL while the shutdown checkpoint is being written. To be
1700  * safe, we just prohibit all new commands.
1701  */
1703  ereport(ERROR,
1704  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1705  errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1706 
1707  /*
1708  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1709  * command arrives. Clean up the old stuff if there's anything.
1710  */
1712 
1714 
1715  /*
1716  * Prepare to parse and execute the command.
1717  */
1719  "Replication command context",
1721  old_context = MemoryContextSwitchTo(cmd_context);
1722 
1723  replication_scanner_init(cmd_string);
1724 
1725  /*
1726  * Is it a WalSender command?
1727  */
1729  {
1730  /* Nope; clean up and get out. */
1732 
1733  MemoryContextSwitchTo(old_context);
1734  MemoryContextDelete(cmd_context);
1735 
1736  /* XXX this is a pretty random place to make this check */
1737  if (MyDatabaseId == InvalidOid)
1738  ereport(ERROR,
1739  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1740  errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1741 
1742  /* Tell the caller that this wasn't a WalSender command. */
1743  return false;
1744  }
1745 
1746  /*
1747  * Looks like a WalSender command, so parse it.
1748  */
1749  parse_rc = replication_yyparse();
1750  if (parse_rc != 0)
1751  ereport(ERROR,
1752  (errcode(ERRCODE_SYNTAX_ERROR),
1753  errmsg_internal("replication command parser returned %d",
1754  parse_rc)));
1756 
1757  cmd_node = replication_parse_result;
1758 
1759  /*
1760  * Report query to various monitoring facilities. For this purpose, we
1761  * report replication commands just like SQL commands.
1762  */
1763  debug_query_string = cmd_string;
1764 
1766 
1767  /*
1768  * Log replication command if log_replication_commands is enabled. Even
1769  * when it's disabled, log the command with DEBUG1 level for backward
1770  * compatibility.
1771  */
1773  (errmsg("received replication command: %s", cmd_string)));
1774 
1775  /*
1776  * Disallow replication commands in aborted transaction blocks.
1777  */
1779  ereport(ERROR,
1780  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1781  errmsg("current transaction is aborted, "
1782  "commands ignored until end of transaction block")));
1783 
1785 
1786  /*
1787  * Allocate buffers that will be used for each outgoing and incoming
1788  * message. We do this just once per command to reduce palloc overhead.
1789  */
1793 
1794  switch (cmd_node->type)
1795  {
1796  case T_IdentifySystemCmd:
1797  cmdtag = "IDENTIFY_SYSTEM";
1798  set_ps_display(cmdtag);
1799  IdentifySystem();
1800  EndReplicationCommand(cmdtag);
1801  break;
1802 
1804  cmdtag = "READ_REPLICATION_SLOT";
1805  set_ps_display(cmdtag);
1807  EndReplicationCommand(cmdtag);
1808  break;
1809 
1810  case T_BaseBackupCmd:
1811  cmdtag = "BASE_BACKUP";
1812  set_ps_display(cmdtag);
1813  PreventInTransactionBlock(true, cmdtag);
1814  SendBaseBackup((BaseBackupCmd *) cmd_node);
1815  EndReplicationCommand(cmdtag);
1816  break;
1817 
1819  cmdtag = "CREATE_REPLICATION_SLOT";
1820  set_ps_display(cmdtag);
1822  EndReplicationCommand(cmdtag);
1823  break;
1824 
1826  cmdtag = "DROP_REPLICATION_SLOT";
1827  set_ps_display(cmdtag);
1829  EndReplicationCommand(cmdtag);
1830  break;
1831 
1832  case T_StartReplicationCmd:
1833  {
1834  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1835 
1836  cmdtag = "START_REPLICATION";
1837  set_ps_display(cmdtag);
1838  PreventInTransactionBlock(true, cmdtag);
1839 
1840  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1841  StartReplication(cmd);
1842  else
1844 
1845  /* dupe, but necessary per libpqrcv_endstreaming */
1846  EndReplicationCommand(cmdtag);
1847 
1848  Assert(xlogreader != NULL);
1849  break;
1850  }
1851 
1852  case T_TimeLineHistoryCmd:
1853  cmdtag = "TIMELINE_HISTORY";
1854  set_ps_display(cmdtag);
1855  PreventInTransactionBlock(true, cmdtag);
1857  EndReplicationCommand(cmdtag);
1858  break;
1859 
1860  case T_VariableShowStmt:
1861  {
1863  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1864 
1865  cmdtag = "SHOW";
1866  set_ps_display(cmdtag);
1867 
1868  /* syscache access needs a transaction environment */
1870  GetPGVariable(n->name, dest);
1872  EndReplicationCommand(cmdtag);
1873  }
1874  break;
1875 
1876  default:
1877  elog(ERROR, "unrecognized replication command node tag: %u",
1878  cmd_node->type);
1879  }
1880 
1881  /* done */
1882  MemoryContextSwitchTo(old_context);
1883  MemoryContextDelete(cmd_context);
1884 
1885  /*
1886  * We need not update ps display or pg_stat_activity, because PostgresMain
1887  * will reset those to "idle". But we must reset debug_query_string to
1888  * ensure it doesn't become a dangling pointer.
1889  */
1890  debug_query_string = NULL;
1891 
1892  return true;
1893 }
1894 
1895 /*
1896  * Process any incoming messages while streaming. Also checks if the remote
1897  * end has closed the connection.
1898  */
1899 static void
1901 {
1902  unsigned char firstchar;
1903  int maxmsglen;
1904  int r;
1905  bool received = false;
1906 
1908 
1909  /*
1910  * If we already received a CopyDone from the frontend, any subsequent
1911  * message is the beginning of a new command, and should be processed in
1912  * the main processing loop.
1913  */
1914  while (!streamingDoneReceiving)
1915  {
1916  pq_startmsgread();
1917  r = pq_getbyte_if_available(&firstchar);
1918  if (r < 0)
1919  {
1920  /* unexpected error or EOF */
1922  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1923  errmsg("unexpected EOF on standby connection")));
1924  proc_exit(0);
1925  }
1926  if (r == 0)
1927  {
1928  /* no data available without blocking */
1929  pq_endmsgread();
1930  break;
1931  }
1932 
1933  /* Validate message type and set packet size limit */
1934  switch (firstchar)
1935  {
1936  case 'd':
1937  maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
1938  break;
1939  case 'c':
1940  case 'X':
1941  maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
1942  break;
1943  default:
1944  ereport(FATAL,
1945  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1946  errmsg("invalid standby message type \"%c\"",
1947  firstchar)));
1948  maxmsglen = 0; /* keep compiler quiet */
1949  break;
1950  }
1951 
1952  /* Read the message contents */
1954  if (pq_getmessage(&reply_message, maxmsglen))
1955  {
1957  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1958  errmsg("unexpected EOF on standby connection")));
1959  proc_exit(0);
1960  }
1961 
1962  /* ... and process it */
1963  switch (firstchar)
1964  {
1965  /*
1966  * 'd' means a standby reply wrapped in a CopyData packet.
1967  */
1968  case 'd':
1970  received = true;
1971  break;
1972 
1973  /*
1974  * CopyDone means the standby requested to finish streaming.
1975  * Reply with CopyDone, if we had not sent that already.
1976  */
1977  case 'c':
1978  if (!streamingDoneSending)
1979  {
1980  pq_putmessage_noblock('c', NULL, 0);
1981  streamingDoneSending = true;
1982  }
1983 
1984  streamingDoneReceiving = true;
1985  received = true;
1986  break;
1987 
1988  /*
1989  * 'X' means that the standby is closing down the socket.
1990  */
1991  case 'X':
1992  proc_exit(0);
1993 
1994  default:
1995  Assert(false); /* NOT REACHED */
1996  }
1997  }
1998 
1999  /*
2000  * Save the last reply timestamp if we've received at least one reply.
2001  */
2002  if (received)
2003  {
2005  waiting_for_ping_response = false;
2006  }
2007 }
2008 
2009 /*
2010  * Process a status update message received from standby.
2011  */
2012 static void
2014 {
2015  char msgtype;
2016 
2017  /*
2018  * Check message type from the first byte.
2019  */
2020  msgtype = pq_getmsgbyte(&reply_message);
2021 
2022  switch (msgtype)
2023  {
2024  case 'r':
2026  break;
2027 
2028  case 'h':
2030  break;
2031 
2032  default:
2034  (errcode(ERRCODE_PROTOCOL_VIOLATION),
2035  errmsg("unexpected message type \"%c\"", msgtype)));
2036  proc_exit(0);
2037  }
2038 }
2039 
2040 /*
2041  * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
2042  */
2043 static void
2045 {
2046  bool changed = false;
2048 
2049  Assert(lsn != InvalidXLogRecPtr);
2050  SpinLockAcquire(&slot->mutex);
2051  if (slot->data.restart_lsn != lsn)
2052  {
2053  changed = true;
2054  slot->data.restart_lsn = lsn;
2055  }
2056  SpinLockRelease(&slot->mutex);
2057 
2058  if (changed)
2059  {
2062  }
2063 
2064  /*
2065  * One could argue that the slot should be saved to disk now, but that'd
2066  * be energy wasted - the worst lost information can do here is give us
2067  * wrong information in a statistics view - we'll just potentially be more
2068  * conservative in removing files.
2069  */
2070 }
2071 
2072 /*
2073  * Regular reply from standby advising of WAL locations on standby server.
2074  */
2075 static void
2077 {
2078  XLogRecPtr writePtr,
2079  flushPtr,
2080  applyPtr;
2081  bool replyRequested;
2082  TimeOffset writeLag,
2083  flushLag,
2084  applyLag;
2085  bool clearLagTimes;
2086  TimestampTz now;
2087  TimestampTz replyTime;
2088 
2089  static bool fullyAppliedLastTime = false;
2090 
2091  /* the caller already consumed the msgtype byte */
2092  writePtr = pq_getmsgint64(&reply_message);
2093  flushPtr = pq_getmsgint64(&reply_message);
2094  applyPtr = pq_getmsgint64(&reply_message);
2095  replyTime = pq_getmsgint64(&reply_message);
2096  replyRequested = pq_getmsgbyte(&reply_message);
2097 
2099  {
2100  char *replyTimeStr;
2101 
2102  /* Copy because timestamptz_to_str returns a static buffer */
2103  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2104 
2105  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
2106  LSN_FORMAT_ARGS(writePtr),
2107  LSN_FORMAT_ARGS(flushPtr),
2108  LSN_FORMAT_ARGS(applyPtr),
2109  replyRequested ? " (reply requested)" : "",
2110  replyTimeStr);
2111 
2112  pfree(replyTimeStr);
2113  }
2114 
2115  /* See if we can compute the round-trip lag for these positions. */
2117  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
2118  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
2119  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
2120 
2121  /*
2122  * If the standby reports that it has fully replayed the WAL in two
2123  * consecutive reply messages, then the second such message must result
2124  * from wal_receiver_status_interval expiring on the standby. This is a
2125  * convenient time to forget the lag times measured when it last
2126  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
2127  * until more WAL traffic arrives.
2128  */
2129  clearLagTimes = false;
2130  if (applyPtr == sentPtr)
2131  {
2132  if (fullyAppliedLastTime)
2133  clearLagTimes = true;
2134  fullyAppliedLastTime = true;
2135  }
2136  else
2137  fullyAppliedLastTime = false;
2138 
2139  /* Send a reply if the standby requested one. */
2140  if (replyRequested)
2142 
2143  /*
2144  * Update shared state for this WalSender process based on reply data from
2145  * standby.
2146  */
2147  {
2148  WalSnd *walsnd = MyWalSnd;
2149 
2150  SpinLockAcquire(&walsnd->mutex);
2151  walsnd->write = writePtr;
2152  walsnd->flush = flushPtr;
2153  walsnd->apply = applyPtr;
2154  if (writeLag != -1 || clearLagTimes)
2155  walsnd->writeLag = writeLag;
2156  if (flushLag != -1 || clearLagTimes)
2157  walsnd->flushLag = flushLag;
2158  if (applyLag != -1 || clearLagTimes)
2159  walsnd->applyLag = applyLag;
2160  walsnd->replyTime = replyTime;
2161  SpinLockRelease(&walsnd->mutex);
2162  }
2163 
2166 
2167  /*
2168  * Advance our local xmin horizon when the client confirmed a flush.
2169  */
2170  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
2171  {
2174  else
2176  }
2177 }
2178 
2179 /* compute new replication slot xmin horizon if needed */
2180 static void
2182 {
2183  bool changed = false;
2185 
2186  SpinLockAcquire(&slot->mutex);
2188 
2189  /*
2190  * For physical replication we don't need the interlock provided by xmin
2191  * and effective_xmin since the consequences of a missed increase are
2192  * limited to query cancellations, so set both at once.
2193  */
2194  if (!TransactionIdIsNormal(slot->data.xmin) ||
2195  !TransactionIdIsNormal(feedbackXmin) ||
2196  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
2197  {
2198  changed = true;
2199  slot->data.xmin = feedbackXmin;
2200  slot->effective_xmin = feedbackXmin;
2201  }
2202  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
2203  !TransactionIdIsNormal(feedbackCatalogXmin) ||
2204  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
2205  {
2206  changed = true;
2207  slot->data.catalog_xmin = feedbackCatalogXmin;
2208  slot->effective_catalog_xmin = feedbackCatalogXmin;
2209  }
2210  SpinLockRelease(&slot->mutex);
2211 
2212  if (changed)
2213  {
2216  }
2217 }
2218 
2219 /*
2220  * Check that the provided xmin/epoch are sane, that is, not in the future
2221  * and not so far back as to be already wrapped around.
2222  *
2223  * Epoch of nextXid should be same as standby, or if the counter has
2224  * wrapped, then one greater than standby.
2225  *
2226  * This check doesn't care about whether clog exists for these xids
2227  * at all.
2228  */
2229 static bool
2231 {
2232  FullTransactionId nextFullXid;
2233  TransactionId nextXid;
2234  uint32 nextEpoch;
2235 
2236  nextFullXid = ReadNextFullTransactionId();
2237  nextXid = XidFromFullTransactionId(nextFullXid);
2238  nextEpoch = EpochFromFullTransactionId(nextFullXid);
2239 
2240  if (xid <= nextXid)
2241  {
2242  if (epoch != nextEpoch)
2243  return false;
2244  }
2245  else
2246  {
2247  if (epoch + 1 != nextEpoch)
2248  return false;
2249  }
2250 
2251  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2252  return false; /* epoch OK, but it's wrapped around */
2253 
2254  return true;
2255 }
2256 
2257 /*
2258  * Hot Standby feedback
2259  */
2260 static void
2262 {
2263  TransactionId feedbackXmin;
2264  uint32 feedbackEpoch;
2265  TransactionId feedbackCatalogXmin;
2266  uint32 feedbackCatalogEpoch;
2267  TimestampTz replyTime;
2268 
2269  /*
2270  * Decipher the reply message. The caller already consumed the msgtype
2271  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2272  * of this message.
2273  */
2274  replyTime = pq_getmsgint64(&reply_message);
2275  feedbackXmin = pq_getmsgint(&reply_message, 4);
2276  feedbackEpoch = pq_getmsgint(&reply_message, 4);
2277  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2278  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2279 
2281  {
2282  char *replyTimeStr;
2283 
2284  /* Copy because timestamptz_to_str returns a static buffer */
2285  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2286 
2287  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2288  feedbackXmin,
2289  feedbackEpoch,
2290  feedbackCatalogXmin,
2291  feedbackCatalogEpoch,
2292  replyTimeStr);
2293 
2294  pfree(replyTimeStr);
2295  }
2296 
2297  /*
2298  * Update shared state for this WalSender process based on reply data from
2299  * standby.
2300  */
2301  {
2302  WalSnd *walsnd = MyWalSnd;
2303 
2304  SpinLockAcquire(&walsnd->mutex);
2305  walsnd->replyTime = replyTime;
2306  SpinLockRelease(&walsnd->mutex);
2307  }
2308 
2309  /*
2310  * Unset WalSender's xmins if the feedback message values are invalid.
2311  * This happens when the downstream turned hot_standby_feedback off.
2312  */
2313  if (!TransactionIdIsNormal(feedbackXmin)
2314  && !TransactionIdIsNormal(feedbackCatalogXmin))
2315  {
2317  if (MyReplicationSlot != NULL)
2318  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2319  return;
2320  }
2321 
2322  /*
2323  * Check that the provided xmin/epoch are sane, that is, not in the future
2324  * and not so far back as to be already wrapped around. Ignore if not.
2325  */
2326  if (TransactionIdIsNormal(feedbackXmin) &&
2327  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2328  return;
2329 
2330  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2331  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2332  return;
2333 
2334  /*
2335  * Set the WalSender's xmin equal to the standby's requested xmin, so that
2336  * the xmin will be taken into account by GetSnapshotData() /
2337  * ComputeXidHorizons(). This will hold back the removal of dead rows and
2338  * thereby prevent the generation of cleanup conflicts on the standby
2339  * server.
2340  *
2341  * There is a small window for a race condition here: although we just
2342  * checked that feedbackXmin precedes nextXid, the nextXid could have
2343  * gotten advanced between our fetching it and applying the xmin below,
2344  * perhaps far enough to make feedbackXmin wrap around. In that case the
2345  * xmin we set here would be "in the future" and have no effect. No point
2346  * in worrying about this since it's too late to save the desired data
2347  * anyway. Assuming that the standby sends us an increasing sequence of
2348  * xmins, this could only happen during the first reply cycle, else our
2349  * own xmin would prevent nextXid from advancing so far.
2350  *
2351  * We don't bother taking the ProcArrayLock here. Setting the xmin field
2352  * is assumed atomic, and there's no real need to prevent concurrent
2353  * horizon determinations. (If we're moving our xmin forward, this is
2354  * obviously safe, and if we're moving it backwards, well, the data is at
2355  * risk already since a VACUUM could already have determined the horizon.)
2356  *
2357  * If we're using a replication slot we reserve the xmin via that,
2358  * otherwise via the walsender's PGPROC entry. We can only track the
2359  * catalog xmin separately when using a slot, so we store the least of the
2360  * two provided when not using a slot.
2361  *
2362  * XXX: It might make sense to generalize the ephemeral slot concept and
2363  * always use the slot mechanism to handle the feedback xmin.
2364  */
2365  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2366  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2367  else
2368  {
2369  if (TransactionIdIsNormal(feedbackCatalogXmin)
2370  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2371  MyProc->xmin = feedbackCatalogXmin;
2372  else
2373  MyProc->xmin = feedbackXmin;
2374  }
2375 }
2376 
2377 /*
2378  * Compute how long send/receive loops should sleep.
2379  *
2380  * If wal_sender_timeout is enabled we want to wake up in time to send
2381  * keepalives and to abort the connection if wal_sender_timeout has been
2382  * reached.
2383  */
2384 static long
2386 {
2387  long sleeptime = 10000; /* 10 s */
2388 
2390  {
2391  TimestampTz wakeup_time;
2392 
2393  /*
2394  * At the latest stop sleeping once wal_sender_timeout has been
2395  * reached.
2396  */
2399 
2400  /*
2401  * If no ping has been sent yet, wakeup when it's time to do so.
2402  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2403  * the timeout passed without a response.
2404  */
2407  wal_sender_timeout / 2);
2408 
2409  /* Compute relative time until wakeup. */
2410  sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
2411  }
2412 
2413  return sleeptime;
2414 }
2415 
2416 /*
2417  * Check whether there have been responses by the client within
2418  * wal_sender_timeout and shutdown if not. Using last_processing as the
2419  * reference point avoids counting server-side stalls against the client.
2420  * However, a long server-side stall can make WalSndKeepaliveIfNecessary()
2421  * postdate last_processing by more than wal_sender_timeout. If that happens,
2422  * the client must reply almost immediately to avoid a timeout. This rarely
2423  * affects the default configuration, under which clients spontaneously send a
2424  * message every standby_message_timeout = wal_sender_timeout/6 = 10s. We
2425  * could eliminate that problem by recognizing timeout expiration at
2426  * wal_sender_timeout/2 after the keepalive.
2427  */
2428 static void
2430 {
2431  TimestampTz timeout;
2432 
2433  /* don't bail out if we're doing something that doesn't require timeouts */
2434  if (last_reply_timestamp <= 0)
2435  return;
2436 
2439 
2440  if (wal_sender_timeout > 0 && last_processing >= timeout)
2441  {
2442  /*
2443  * Since typically expiration of replication timeout means
2444  * communication problem, we don't send the error message to the
2445  * standby.
2446  */
2448  (errmsg("terminating walsender process due to replication timeout")));
2449 
2450  WalSndShutdown();
2451  }
2452 }
2453 
2454 /* Main loop of walsender process that streams the WAL over Copy messages. */
2455 static void
2457 {
2458  /*
2459  * Initialize the last reply timestamp. That enables timeout processing
2460  * from hereon.
2461  */
2463  waiting_for_ping_response = false;
2464 
2465  /*
2466  * Loop until we reach the end of this timeline or the client requests to
2467  * stop streaming.
2468  */
2469  for (;;)
2470  {
2471  /* Clear any already-pending wakeups */
2473 
2475 
2476  /* Process any requests or signals received recently */
2477  if (ConfigReloadPending)
2478  {
2479  ConfigReloadPending = false;
2482  }
2483 
2484  /* Check for input from the client */
2486 
2487  /*
2488  * If we have received CopyDone from the client, sent CopyDone
2489  * ourselves, and the output buffer is empty, it's time to exit
2490  * streaming.
2491  */
2493  !pq_is_send_pending())
2494  break;
2495 
2496  /*
2497  * If we don't have any pending data in the output buffer, try to send
2498  * some more. If there is some, we don't bother to call send_data
2499  * again until we've flushed it ... but we'd better assume we are not
2500  * caught up.
2501  */
2502  if (!pq_is_send_pending())
2503  send_data();
2504  else
2505  WalSndCaughtUp = false;
2506 
2507  /* Try to flush pending output to the client */
2508  if (pq_flush_if_writable() != 0)
2509  WalSndShutdown();
2510 
2511  /* If nothing remains to be sent right now ... */
2513  {
2514  /*
2515  * If we're in catchup state, move to streaming. This is an
2516  * important state change for users to know about, since before
2517  * this point data loss might occur if the primary dies and we
2518  * need to failover to the standby. The state change is also
2519  * important for synchronous replication, since commits that
2520  * started to wait at that point might wait for some time.
2521  */
2523  {
2524  ereport(DEBUG1,
2525  (errmsg_internal("\"%s\" has now caught up with upstream server",
2526  application_name)));
2528  }
2529 
2530  /*
2531  * When SIGUSR2 arrives, we send any outstanding logs up to the
2532  * shutdown checkpoint record (i.e., the latest record), wait for
2533  * them to be replicated to the standby, and exit. This may be a
2534  * normal termination at shutdown, or a promotion, the walsender
2535  * is not sure which.
2536  */
2537  if (got_SIGUSR2)
2538  WalSndDone(send_data);
2539  }
2540 
2541  /* Check for replication timeout. */
2543 
2544  /* Send keepalive if the time has come */
2546 
2547  /*
2548  * Block if we have unsent data. XXX For logical replication, let
2549  * WalSndWaitForWal() handle any other blocking; idle receivers need
2550  * its additional actions. For physical replication, also block if
2551  * caught up; its send_data does not block.
2552  */
2553  if ((WalSndCaughtUp && send_data != XLogSendLogical &&
2556  {
2557  long sleeptime;
2558  int wakeEvents;
2559 
2561  wakeEvents = WL_SOCKET_READABLE;
2562  else
2563  wakeEvents = 0;
2564 
2565  /*
2566  * Use fresh timestamp, not last_processing, to reduce the chance
2567  * of reaching wal_sender_timeout before sending a keepalive.
2568  */
2570 
2571  if (pq_is_send_pending())
2572  wakeEvents |= WL_SOCKET_WRITEABLE;
2573 
2574  /* Sleep until something happens or we time out */
2575  WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
2576  }
2577  }
2578 }
2579 
2580 /* Initialize a per-walsender data structure for this walsender process */
2581 static void
2583 {
2584  int i;
2585 
2586  /*
2587  * WalSndCtl should be set up already (we inherit this by fork() or
2588  * EXEC_BACKEND mechanism from the postmaster).
2589  */
2590  Assert(WalSndCtl != NULL);
2591  Assert(MyWalSnd == NULL);
2592 
2593  /*
2594  * Find a free walsender slot and reserve it. This must not fail due to
2595  * the prior check for free WAL senders in InitProcess().
2596  */
2597  for (i = 0; i < max_wal_senders; i++)
2598  {
2599  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2600 
2601  SpinLockAcquire(&walsnd->mutex);
2602 
2603  if (walsnd->pid != 0)
2604  {
2605  SpinLockRelease(&walsnd->mutex);
2606  continue;
2607  }
2608  else
2609  {
2610  /*
2611  * Found a free slot. Reserve it for us.
2612  */
2613  walsnd->pid = MyProcPid;
2614  walsnd->state = WALSNDSTATE_STARTUP;
2615  walsnd->sentPtr = InvalidXLogRecPtr;
2616  walsnd->needreload = false;
2617  walsnd->write = InvalidXLogRecPtr;
2618  walsnd->flush = InvalidXLogRecPtr;
2619  walsnd->apply = InvalidXLogRecPtr;
2620  walsnd->writeLag = -1;
2621  walsnd->flushLag = -1;
2622  walsnd->applyLag = -1;
2623  walsnd->sync_standby_priority = 0;
2624  walsnd->latch = &MyProc->procLatch;
2625  walsnd->replyTime = 0;
2626  SpinLockRelease(&walsnd->mutex);
2627  /* don't need the lock anymore */
2628  MyWalSnd = (WalSnd *) walsnd;
2629 
2630  break;
2631  }
2632  }
2633 
2634  Assert(MyWalSnd != NULL);
2635 
2636  /* Arrange to clean up at walsender exit */
2638 }
2639 
2640 /* Destroy the per-walsender data structure for this walsender process */
2641 static void
2643 {
2644  WalSnd *walsnd = MyWalSnd;
2645 
2646  Assert(walsnd != NULL);
2647 
2648  MyWalSnd = NULL;
2649 
2650  SpinLockAcquire(&walsnd->mutex);
2651  /* clear latch while holding the spinlock, so it can safely be read */
2652  walsnd->latch = NULL;
2653  /* Mark WalSnd struct as no longer being in use. */
2654  walsnd->pid = 0;
2655  SpinLockRelease(&walsnd->mutex);
2656 }
2657 
2658 /* XLogReaderRoutine->segment_open callback */
2659 static void
2661  TimeLineID *tli_p)
2662 {
2663  char path[MAXPGPATH];
2664 
2665  /*-------
2666  * When reading from a historic timeline, and there is a timeline switch
2667  * within this segment, read from the WAL segment belonging to the new
2668  * timeline.
2669  *
2670  * For example, imagine that this server is currently on timeline 5, and
2671  * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
2672  * 0/13002088. In pg_wal, we have these files:
2673  *
2674  * ...
2675  * 000000040000000000000012
2676  * 000000040000000000000013
2677  * 000000050000000000000013
2678  * 000000050000000000000014
2679  * ...
2680  *
2681  * In this situation, when requested to send the WAL from segment 0x13, on
2682  * timeline 4, we read the WAL from file 000000050000000000000013. Archive
2683  * recovery prefers files from newer timelines, so if the segment was
2684  * restored from the archive on this server, the file belonging to the old
2685  * timeline, 000000040000000000000013, might not exist. Their contents are
2686  * equal up to the switchpoint, because at a timeline switch, the used
2687  * portion of the old segment is copied to the new file. -------
2688  */
2689  *tli_p = sendTimeLine;
2691  {
2692  XLogSegNo endSegNo;
2693 
2694  XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
2695  if (nextSegNo == endSegNo)
2696  *tli_p = sendTimeLineNextTLI;
2697  }
2698 
2699  XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
2700  state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2701  if (state->seg.ws_file >= 0)
2702  return;
2703 
2704  /*
2705  * If the file is not found, assume it's because the standby asked for a
2706  * too old WAL segment that has already been removed or recycled.
2707  */
2708  if (errno == ENOENT)
2709  {
2710  char xlogfname[MAXFNAMELEN];
2711  int save_errno = errno;
2712 
2713  XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
2714  errno = save_errno;
2715  ereport(ERROR,
2717  errmsg("requested WAL segment %s has already been removed",
2718  xlogfname)));
2719  }
2720  else
2721  ereport(ERROR,
2723  errmsg("could not open file \"%s\": %m",
2724  path)));
2725 }
2726 
2727 /*
2728  * Send out the WAL in its normal physical/stored form.
2729  *
2730  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
2731  * but not yet sent to the client, and buffer it in the libpq output
2732  * buffer.
2733  *
2734  * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
2735  * otherwise WalSndCaughtUp is set to false.
2736  */
2737 static void
2739 {
2740  XLogRecPtr SendRqstPtr;
2741  XLogRecPtr startptr;
2742  XLogRecPtr endptr;
2743  Size nbytes;
2744  XLogSegNo segno;
2745  WALReadError errinfo;
2746 
2747  /* If requested switch the WAL sender to the stopping state. */
2748  if (got_STOPPING)
2750 
2752  {
2753  WalSndCaughtUp = true;
2754  return;
2755  }
2756 
2757  /* Figure out how far we can safely send the WAL. */
2759  {
2760  /*
2761  * Streaming an old timeline that's in this server's history, but is
2762  * not the one we're currently inserting or replaying. It can be
2763  * streamed up to the point where we switched off that timeline.
2764  */
2765  SendRqstPtr = sendTimeLineValidUpto;
2766  }
2767  else if (am_cascading_walsender)
2768  {
2769  TimeLineID SendRqstTLI;
2770 
2771  /*
2772  * Streaming the latest timeline on a standby.
2773  *
2774  * Attempt to send all WAL that has already been replayed, so that we
2775  * know it's valid. If we're receiving WAL through streaming
2776  * replication, it's also OK to send any WAL that has been received
2777  * but not replayed.
2778  *
2779  * The timeline we're recovering from can change, or we can be
2780  * promoted. In either case, the current timeline becomes historic. We
2781  * need to detect that so that we don't try to stream past the point
2782  * where we switched to another timeline. We check for promotion or
2783  * timeline switch after calculating FlushPtr, to avoid a race
2784  * condition: if the timeline becomes historic just after we checked
2785  * that it was still current, it's still be OK to stream it up to the
2786  * FlushPtr that was calculated before it became historic.
2787  */
2788  bool becameHistoric = false;
2789 
2790  SendRqstPtr = GetStandbyFlushRecPtr(&SendRqstTLI);
2791 
2792  if (!RecoveryInProgress())
2793  {
2794  /* We have been promoted. */
2795  SendRqstTLI = GetWALInsertionTimeLine();
2796  am_cascading_walsender = false;
2797  becameHistoric = true;
2798  }
2799  else
2800  {
2801  /*
2802  * Still a cascading standby. But is the timeline we're sending
2803  * still the one recovery is recovering from?
2804  */
2805  if (sendTimeLine != SendRqstTLI)
2806  becameHistoric = true;
2807  }
2808 
2809  if (becameHistoric)
2810  {
2811  /*
2812  * The timeline we were sending has become historic. Read the
2813  * timeline history file of the new timeline to see where exactly
2814  * we forked off from the timeline we were sending.
2815  */
2816  List *history;
2817 
2818  history = readTimeLineHistory(SendRqstTLI);
2820 
2822  list_free_deep(history);
2823 
2824  sendTimeLineIsHistoric = true;
2825 
2826  SendRqstPtr = sendTimeLineValidUpto;
2827  }
2828  }
2829  else
2830  {
2831  /*
2832  * Streaming the current timeline on a primary.
2833  *
2834  * Attempt to send all data that's already been written out and
2835  * fsync'd to disk. We cannot go further than what's been written out
2836  * given the current implementation of WALRead(). And in any case
2837  * it's unsafe to send WAL that is not securely down to disk on the
2838  * primary: if the primary subsequently crashes and restarts, standbys
2839  * must not have applied any WAL that got lost on the primary.
2840  */
2841  SendRqstPtr = GetFlushRecPtr(NULL);
2842  }
2843 
2844  /*
2845  * Record the current system time as an approximation of the time at which
2846  * this WAL location was written for the purposes of lag tracking.
2847  *
2848  * In theory we could make XLogFlush() record a time in shmem whenever WAL
2849  * is flushed and we could get that time as well as the LSN when we call
2850  * GetFlushRecPtr() above (and likewise for the cascading standby
2851  * equivalent), but rather than putting any new code into the hot WAL path
2852  * it seems good enough to capture the time here. We should reach this
2853  * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2854  * may take some time, we read the WAL flush pointer and take the time
2855  * very close to together here so that we'll get a later position if it is
2856  * still moving.
2857  *
2858  * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
2859  * this gives us a cheap approximation for the WAL flush time for this
2860  * LSN.
2861  *
2862  * Note that the LSN is not necessarily the LSN for the data contained in
2863  * the present message; it's the end of the WAL, which might be further
2864  * ahead. All the lag tracking machinery cares about is finding out when
2865  * that arbitrary LSN is eventually reported as written, flushed and
2866  * applied, so that it can measure the elapsed time.
2867  */
2868  LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2869 
2870  /*
2871  * If this is a historic timeline and we've reached the point where we
2872  * forked to the next timeline, stop streaming.
2873  *
2874  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2875  * startup process will normally replay all WAL that has been received
2876  * from the primary, before promoting, but if the WAL streaming is
2877  * terminated at a WAL page boundary, the valid portion of the timeline
2878  * might end in the middle of a WAL record. We might've already sent the
2879  * first half of that partial WAL record to the cascading standby, so that
2880  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2881  * replay the partial WAL record either, so it can still follow our
2882  * timeline switch.
2883  */
2885  {
2886  /* close the current file. */
2887  if (xlogreader->seg.ws_file >= 0)
2889 
2890  /* Send CopyDone */
2891  pq_putmessage_noblock('c', NULL, 0);
2892  streamingDoneSending = true;
2893 
2894  WalSndCaughtUp = true;
2895 
2896  elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2899  return;
2900  }
2901 
2902  /* Do we have any work to do? */
2903  Assert(sentPtr <= SendRqstPtr);
2904  if (SendRqstPtr <= sentPtr)
2905  {
2906  WalSndCaughtUp = true;
2907  return;
2908  }
2909 
2910  /*
2911  * Figure out how much to send in one message. If there's no more than
2912  * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2913  * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2914  *
2915  * The rounding is not only for performance reasons. Walreceiver relies on
2916  * the fact that we never split a WAL record across two messages. Since a
2917  * long WAL record is split at page boundary into continuation records,
2918  * page boundary is always a safe cut-off point. We also assume that
2919  * SendRqstPtr never points to the middle of a WAL record.
2920  */
2921  startptr = sentPtr;
2922  endptr = startptr;
2923  endptr += MAX_SEND_SIZE;
2924 
2925  /* if we went beyond SendRqstPtr, back off */
2926  if (SendRqstPtr <= endptr)
2927  {
2928  endptr = SendRqstPtr;
2930  WalSndCaughtUp = false;
2931  else
2932  WalSndCaughtUp = true;
2933  }
2934  else
2935  {
2936  /* round down to page boundary. */
2937  endptr -= (endptr % XLOG_BLCKSZ);
2938  WalSndCaughtUp = false;
2939  }
2940 
2941  nbytes = endptr - startptr;
2942  Assert(nbytes <= MAX_SEND_SIZE);
2943 
2944  /*
2945  * OK to read and send the slice.
2946  */
2948  pq_sendbyte(&output_message, 'w');
2949 
2950  pq_sendint64(&output_message, startptr); /* dataStart */
2951  pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2952  pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2953 
2954  /*
2955  * Read the log directly into the output buffer to avoid extra memcpy
2956  * calls.
2957  */
2959 
2960 retry:
2961  if (!WALRead(xlogreader,
2963  startptr,
2964  nbytes,
2965  xlogreader->seg.ws_tli, /* Pass the current TLI because
2966  * only WalSndSegmentOpen controls
2967  * whether new TLI is needed. */
2968  &errinfo))
2969  WALReadRaiseError(&errinfo);
2970 
2971  /* See logical_read_xlog_page(). */
2972  XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
2974 
2975  /*
2976  * During recovery, the currently-open WAL file might be replaced with the
2977  * file of the same name retrieved from archive. So we always need to
2978  * check what we read was valid after reading into the buffer. If it's
2979  * invalid, we try to open and read the file again.
2980  */
2982  {
2983  WalSnd *walsnd = MyWalSnd;
2984  bool reload;
2985 
2986  SpinLockAcquire(&walsnd->mutex);
2987  reload = walsnd->needreload;
2988  walsnd->needreload = false;
2989  SpinLockRelease(&walsnd->mutex);
2990 
2991  if (reload && xlogreader->seg.ws_file >= 0)
2992  {
2994 
2995  goto retry;
2996  }
2997  }
2998 
2999  output_message.len += nbytes;
3001 
3002  /*
3003  * Fill the send timestamp last, so that it is taken as late as possible.
3004  */
3007  memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
3008  tmpbuf.data, sizeof(int64));
3009 
3011 
3012  sentPtr = endptr;
3013 
3014  /* Update shared memory status */
3015  {
3016  WalSnd *walsnd = MyWalSnd;
3017 
3018  SpinLockAcquire(&walsnd->mutex);
3019  walsnd->sentPtr = sentPtr;
3020  SpinLockRelease(&walsnd->mutex);
3021  }
3022 
3023  /* Report progress of XLOG streaming in PS display */
3025  {
3026  char activitymsg[50];
3027 
3028  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
3030  set_ps_display(activitymsg);
3031  }
3032 }
3033 
3034 /*
3035  * Stream out logically decoded data.
3036  */
3037 static void
3039 {
3040  XLogRecord *record;
3041  char *errm;
3042 
3043  /*
3044  * We'll use the current flush point to determine whether we've caught up.
3045  * This variable is static in order to cache it across calls. Caching is
3046  * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
3047  * spinlock.
3048  */
3049  static XLogRecPtr flushPtr = InvalidXLogRecPtr;
3050 
3051  /*
3052  * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
3053  * true in WalSndWaitForWal, if we're actually waiting. We also set to
3054  * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
3055  * didn't wait - i.e. when we're shutting down.
3056  */
3057  WalSndCaughtUp = false;
3058 
3059  record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
3060 
3061  /* xlog record was invalid */
3062  if (errm != NULL)
3063  elog(ERROR, "could not find record while sending logically-decoded data: %s",
3064  errm);
3065 
3066  if (record != NULL)
3067  {
3068  /*
3069  * Note the lack of any call to LagTrackerWrite() which is handled by
3070  * WalSndUpdateProgress which is called by output plugin through
3071  * logical decoding write api.
3072  */
3074 
3076  }
3077 
3078  /*
3079  * If first time through in this session, initialize flushPtr. Otherwise,
3080  * we only need to update flushPtr if EndRecPtr is past it.
3081  */
3082  if (flushPtr == InvalidXLogRecPtr)
3083  flushPtr = GetFlushRecPtr(NULL);
3084  else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3085  flushPtr = GetFlushRecPtr(NULL);
3086 
3087  /* If EndRecPtr is still past our flushPtr, it means we caught up. */
3088  if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3089  WalSndCaughtUp = true;
3090 
3091  /*
3092  * If we're caught up and have been requested to stop, have WalSndLoop()
3093  * terminate the connection in an orderly manner, after writing out all
3094  * the pending data.
3095  */
3097  got_SIGUSR2 = true;
3098 
3099  /* Update shared memory status */
3100  {
3101  WalSnd *walsnd = MyWalSnd;
3102 
3103  SpinLockAcquire(&walsnd->mutex);
3104  walsnd->sentPtr = sentPtr;
3105  SpinLockRelease(&walsnd->mutex);
3106  }
3107 }
3108 
3109 /*
3110  * Shutdown if the sender is caught up.
3111  *
3112  * NB: This should only be called when the shutdown signal has been received
3113  * from postmaster.
3114  *
3115  * Note that if we determine that there's still more data to send, this
3116  * function will return control to the caller.
3117  */
3118 static void
3120 {
3121  XLogRecPtr replicatedPtr;
3122 
3123  /* ... let's just be real sure we're caught up ... */
3124  send_data();
3125 
3126  /*
3127  * To figure out whether all WAL has successfully been replicated, check
3128  * flush location if valid, write otherwise. Tools like pg_receivewal will
3129  * usually (unless in synchronous mode) return an invalid flush location.
3130  */
3131  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
3133 
3134  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
3135  !pq_is_send_pending())
3136  {
3137  QueryCompletion qc;
3138 
3139  /* Inform the standby that XLOG streaming is done */
3140  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
3141  EndCommand(&qc, DestRemote, false);
3142  pq_flush();
3143 
3144  proc_exit(0);
3145  }
3148 }
3149 
3150 /*
3151  * Returns the latest point in WAL that has been safely flushed to disk, and
3152  * can be sent to the standby. This should only be called when in recovery,
3153  * ie. we're streaming to a cascaded standby.
3154  *
3155  * As a side-effect, *tli is updated to the TLI of the last
3156  * replayed WAL record.
3157  */
3158 static XLogRecPtr
3160 {
3161  XLogRecPtr replayPtr;
3162  TimeLineID replayTLI;
3163  XLogRecPtr receivePtr;
3165  XLogRecPtr result;
3166 
3167  /*
3168  * We can safely send what's already been replayed. Also, if walreceiver
3169  * is streaming WAL from the same timeline, we can send anything that it
3170  * has streamed, but hasn't been replayed yet.
3171  */
3172 
3173  receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
3174  replayPtr = GetXLogReplayRecPtr(&replayTLI);
3175 
3176  *tli = replayTLI;
3177 
3178  result = replayPtr;
3179  if (receiveTLI == replayTLI && receivePtr > replayPtr)
3180  result = receivePtr;
3181 
3182  return result;
3183 }
3184 
3185 /*
3186  * Request walsenders to reload the currently-open WAL file
3187  */
3188 void
3190 {
3191  int i;
3192 
3193  for (i = 0; i < max_wal_senders; i++)
3194  {
3195  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3196 
3197  SpinLockAcquire(&walsnd->mutex);
3198  if (walsnd->pid == 0)
3199  {
3200  SpinLockRelease(&walsnd->mutex);
3201  continue;
3202  }
3203  walsnd->needreload = true;
3204  SpinLockRelease(&walsnd->mutex);
3205  }
3206 }
3207 
3208 /*
3209  * Handle PROCSIG_WALSND_INIT_STOPPING signal.
3210  */
3211 void
3213 {
3215 
3216  /*
3217  * If replication has not yet started, die like with SIGTERM. If
3218  * replication is active, only set a flag and wake up the main loop. It
3219  * will send any outstanding WAL, wait for it to be replicated to the
3220  * standby, and then exit gracefully.
3221  */
3222  if (!replication_active)
3223  kill(MyProcPid, SIGTERM);
3224  else
3225  got_STOPPING = true;
3226 }
3227 
3228 /*
3229  * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
3230  * sender should already have been switched to WALSNDSTATE_STOPPING at
3231  * this point.
3232  */
3233 static void
3235 {
3236  int save_errno = errno;
3237 
3238  got_SIGUSR2 = true;
3239  SetLatch(MyLatch);
3240 
3241  errno = save_errno;
3242 }
3243 
3244 /* Set up signal handlers */
3245 void
3247 {
3248  /* Set up signal handlers */
3250  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3251  pqsignal(SIGTERM, die); /* request shutdown */
3252  /* SIGQUIT handler was already set up by InitPostmasterChild */
3253  InitializeTimeouts(); /* establishes SIGALRM handler */
3256  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3257  * shutdown */
3258 
3259  /* Reset some signals that are accepted by postmaster but not here */
3261 }
3262 
3263 /* Report shared-memory space needed by WalSndShmemInit */
3264 Size
3266 {
3267  Size size = 0;
3268 
3269  size = offsetof(WalSndCtlData, walsnds);
3270  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3271 
3272  return size;
3273 }
3274 
3275 /* Allocate and initialize walsender-related shared memory */
3276 void
3278 {
3279  bool found;
3280  int i;
3281 
3282  WalSndCtl = (WalSndCtlData *)
3283  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3284 
3285  if (!found)
3286  {
3287  /* First time through, so initialize */
3289 
3290  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3292 
3293  for (i = 0; i < max_wal_senders; i++)
3294  {
3295  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3296 
3297  SpinLockInit(&walsnd->mutex);
3298  }
3299  }
3300 }
3301 
3302 /*
3303  * Wake up all walsenders
3304  *
3305  * This will be called inside critical sections, so throwing an error is not
3306  * advisable.
3307  */
3308 void
3310 {
3311  int i;
3312 
3313  for (i = 0; i < max_wal_senders; i++)
3314  {
3315  Latch *latch;
3316  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3317 
3318  /*
3319  * Get latch pointer with spinlock held, for the unlikely case that
3320  * pointer reads aren't atomic (as they're 8 bytes).
3321  */
3322  SpinLockAcquire(&walsnd->mutex);
3323  latch = walsnd->latch;
3324  SpinLockRelease(&walsnd->mutex);
3325 
3326  if (latch != NULL)
3327  SetLatch(latch);
3328  }
3329 }
3330 
3331 /*
3332  * Wait for readiness on the FeBe socket, or a timeout. The mask should be
3333  * composed of optional WL_SOCKET_WRITEABLE and WL_SOCKET_READABLE flags. Exit
3334  * on postmaster death.
3335  */
3336 static void
3337 WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
3338 {
3339  WaitEvent event;
3340 
3341  ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
3342  if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
3343  (event.events & WL_POSTMASTER_DEATH))
3344  proc_exit(1);
3345 }
3346 
3347 /*
3348  * Signal all walsenders to move to stopping state.
3349  *
3350  * This will trigger walsenders to move to a state where no further WAL can be
3351  * generated. See this file's header for details.
3352  */
3353 void
3355 {
3356  int i;
3357 
3358  for (i = 0; i < max_wal_senders; i++)
3359  {
3360  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3361  pid_t pid;
3362 
3363  SpinLockAcquire(&walsnd->mutex);
3364  pid = walsnd->pid;
3365  SpinLockRelease(&walsnd->mutex);
3366 
3367  if (pid == 0)
3368  continue;
3369 
3371  }
3372 }
3373 
3374 /*
3375  * Wait that all the WAL senders have quit or reached the stopping state. This
3376  * is used by the checkpointer to control when the shutdown checkpoint can
3377  * safely be performed.
3378  */
3379 void
3381 {
3382  for (;;)
3383  {
3384  int i;
3385  bool all_stopped = true;
3386 
3387  for (i = 0; i < max_wal_senders; i++)
3388  {
3389  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3390 
3391  SpinLockAcquire(&walsnd->mutex);
3392 
3393  if (walsnd->pid == 0)
3394  {
3395  SpinLockRelease(&walsnd->mutex);
3396  continue;
3397  }
3398 
3399  if (walsnd->state != WALSNDSTATE_STOPPING)
3400  {
3401  all_stopped = false;
3402  SpinLockRelease(&walsnd->mutex);
3403  break;
3404  }
3405  SpinLockRelease(&walsnd->mutex);
3406  }
3407 
3408  /* safe to leave if confirmation is done for all WAL senders */
3409  if (all_stopped)
3410  return;
3411 
3412  pg_usleep(10000L); /* wait for 10 msec */
3413  }
3414 }
3415 
3416 /* Set state for current walsender (only called in walsender) */
3417 void
3419 {
3420  WalSnd *walsnd = MyWalSnd;
3421 
3423 
3424  if (walsnd->state == state)
3425  return;
3426 
3427  SpinLockAcquire(&walsnd->mutex);
3428  walsnd->state = state;
3429  SpinLockRelease(&walsnd->mutex);
3430 }
3431 
3432 /*
3433  * Return a string constant representing the state. This is used
3434  * in system views, and should *not* be translated.
3435  */
3436 static const char *
3438 {
3439  switch (state)
3440  {
3441  case WALSNDSTATE_STARTUP:
3442  return "startup";
3443  case WALSNDSTATE_BACKUP:
3444  return "backup";
3445  case WALSNDSTATE_CATCHUP:
3446  return "catchup";
3447  case WALSNDSTATE_STREAMING:
3448  return "streaming";
3449  case WALSNDSTATE_STOPPING:
3450  return "stopping";
3451  }
3452  return "UNKNOWN";
3453 }
3454 
3455 static Interval *
3457 {
3458  Interval *result = palloc(sizeof(Interval));
3459 
3460  result->month = 0;
3461  result->day = 0;
3462  result->time = offset;
3463 
3464  return result;
3465 }
3466 
3467 /*
3468  * Returns activity of walsenders, including pids and xlog locations sent to
3469  * standby servers.
3470  */
3471 Datum
3473 {
3474 #define PG_STAT_GET_WAL_SENDERS_COLS 12
3475  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3476  SyncRepStandbyData *sync_standbys;
3477  int num_standbys;
3478  int i;
3479 
3480  SetSingleFuncCall(fcinfo, 0);
3481 
3482  /*
3483  * Get the currently active synchronous standbys. This could be out of
3484  * date before we're done, but we'll use the data anyway.
3485  */
3486  num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3487 
3488  for (i = 0; i < max_wal_senders; i++)
3489  {
3490  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3492  XLogRecPtr write;
3493  XLogRecPtr flush;
3494  XLogRecPtr apply;
3495  TimeOffset writeLag;
3496  TimeOffset flushLag;
3497  TimeOffset applyLag;
3498  int priority;
3499  int pid;
3501  TimestampTz replyTime;
3502  bool is_sync_standby;
3504  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3505  int j;
3506 
3507  /* Collect data from shared memory */
3508  SpinLockAcquire(&walsnd->mutex);
3509  if (walsnd->pid == 0)
3510  {
3511  SpinLockRelease(&walsnd->mutex);
3512  continue;
3513  }
3514  pid = walsnd->pid;
3515  sentPtr = walsnd->sentPtr;
3516  state = walsnd->state;
3517  write = walsnd->write;
3518  flush = walsnd->flush;
3519  apply = walsnd->apply;
3520  writeLag = walsnd->writeLag;
3521  flushLag = walsnd->flushLag;
3522  applyLag = walsnd->applyLag;
3523  priority = walsnd->sync_standby_priority;
3524  replyTime = walsnd->replyTime;
3525  SpinLockRelease(&walsnd->mutex);
3526 
3527  /*
3528  * Detect whether walsender is/was considered synchronous. We can
3529  * provide some protection against stale data by checking the PID
3530  * along with walsnd_index.
3531  */
3532  is_sync_standby = false;
3533  for (j = 0; j < num_standbys; j++)
3534  {
3535  if (sync_standbys[j].walsnd_index == i &&
3536  sync_standbys[j].pid == pid)
3537  {
3538  is_sync_standby = true;
3539  break;
3540  }
3541  }
3542 
3543  memset(nulls, 0, sizeof(nulls));
3544  values[0] = Int32GetDatum(pid);
3545 
3546  if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
3547  {
3548  /*
3549  * Only superusers and roles with privileges of pg_read_all_stats
3550  * can see details. Other users only get the pid value to know
3551  * it's a walsender, but no details.
3552  */
3553  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3554  }
3555  else
3556  {
3558 
3560  nulls[2] = true;
3561  values[2] = LSNGetDatum(sentPtr);
3562 
3564  nulls[3] = true;
3565  values[3] = LSNGetDatum(write);
3566 
3567  if (XLogRecPtrIsInvalid(flush))
3568  nulls[4] = true;
3569  values[4] = LSNGetDatum(flush);
3570 
3571  if (XLogRecPtrIsInvalid(apply))
3572  nulls[5] = true;
3573  values[5] = LSNGetDatum(apply);
3574 
3575  /*
3576  * Treat a standby such as a pg_basebackup background process
3577  * which always returns an invalid flush location, as an
3578  * asynchronous standby.
3579  */
3580  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3581 
3582  if (writeLag < 0)
3583  nulls[6] = true;
3584  else
3586 
3587  if (flushLag < 0)
3588  nulls[7] = true;
3589  else
3591 
3592  if (applyLag < 0)
3593  nulls[8] = true;
3594  else
3596 
3597  values[9] = Int32GetDatum(priority);
3598 
3599  /*
3600  * More easily understood version of standby state. This is purely
3601  * informational.
3602  *
3603  * In quorum-based sync replication, the role of each standby
3604  * listed in synchronous_standby_names can be changing very
3605  * frequently. Any standbys considered as "sync" at one moment can
3606  * be switched to "potential" ones at the next moment. So, it's
3607  * basically useless to report "sync" or "potential" as their sync
3608  * states. We report just "quorum" for them.
3609  */
3610  if (priority == 0)
3611  values[10] = CStringGetTextDatum("async");
3612  else if (is_sync_standby)
3614  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3615  else
3616  values[10] = CStringGetTextDatum("potential");
3617 
3618  if (replyTime == 0)
3619  nulls[11] = true;
3620  else
3621  values[11] = TimestampTzGetDatum(replyTime);
3622  }
3623 
3624  tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
3625  values, nulls);
3626  }
3627 
3628  return (Datum) 0;
3629 }
3630 
3631 /*
3632  * Send a keepalive message to standby.
3633  *
3634  * If requestReply is set, the message requests the other party to send
3635  * a message back to us, for heartbeat purposes. We also set a flag to
3636  * let nearby code know that we're waiting for that response, to avoid
3637  * repeated requests.
3638  *
3639  * writePtr is the location up to which the WAL is sent. It is essentially
3640  * the same as sentPtr but in some cases, we need to send keep alive before
3641  * sentPtr is updated like when skipping empty transactions.
3642  */
3643 static void
3644 WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
3645 {
3646  elog(DEBUG2, "sending replication keepalive");
3647 
3648  /* construct the message... */
3650  pq_sendbyte(&output_message, 'k');
3651  pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr);
3653  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3654 
3655  /* ... and send it wrapped in CopyData */
3657 
3658  /* Set local flag */
3659  if (requestReply)
3661 }
3662 
3663 /*
3664  * Send keepalive message if too much time has elapsed.
3665  */
3666 static void
3668 {
3669  TimestampTz ping_time;
3670 
3671  /*
3672  * Don't send keepalive messages if timeouts are globally disabled or
3673  * we're doing something not partaking in timeouts.
3674  */
3675  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3676  return;
3677 
3679  return;
3680 
3681  /*
3682  * If half of wal_sender_timeout has lapsed without receiving any reply
3683  * from the standby, send a keep-alive message to the standby requesting
3684  * an immediate reply.
3685  */
3687  wal_sender_timeout / 2);
3688  if (last_processing >= ping_time)
3689  {
3691 
3692  /* Try to flush pending output to the client */
3693  if (pq_flush_if_writable() != 0)
3694  WalSndShutdown();
3695  }
3696 }
3697 
3698 /*
3699  * Record the end of the WAL and the time it was flushed locally, so that
3700  * LagTrackerRead can compute the elapsed time (lag) when this WAL location is
3701  * eventually reported to have been written, flushed and applied by the
3702  * standby in a reply message.
3703  */
3704 static void
3706 {
3707  bool buffer_full;
3708  int new_write_head;
3709  int i;
3710 
3711  if (!am_walsender)
3712  return;
3713 
3714  /*
3715  * If the lsn hasn't advanced since last time, then do nothing. This way
3716  * we only record a new sample when new WAL has been written.
3717  */
3718  if (lag_tracker->last_lsn == lsn)
3719  return;
3720  lag_tracker->last_lsn = lsn;
3721 
3722  /*
3723  * If advancing the write head of the circular buffer would crash into any
3724  * of the read heads, then the buffer is full. In other words, the
3725  * slowest reader (presumably apply) is the one that controls the release
3726  * of space.
3727  */
3728  new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3729  buffer_full = false;
3730  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3731  {
3732  if (new_write_head == lag_tracker->read_heads[i])
3733  buffer_full = true;
3734  }
3735 
3736  /*
3737  * If the buffer is full, for now we just rewind by one slot and overwrite
3738  * the last sample, as a simple (if somewhat uneven) way to lower the
3739  * sampling rate. There may be better adaptive compaction algorithms.
3740  */
3741  if (buffer_full)
3742  {
3743  new_write_head = lag_tracker->write_head;
3744  if (lag_tracker->write_head > 0)
3746  else
3748  }
3749 
3750  /* Store a sample at the current write head position. */
3752  lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
3753  lag_tracker->write_head = new_write_head;
3754 }
3755 
3756 /*
3757  * Find out how much time has elapsed between the moment WAL location 'lsn'
3758  * (or the highest known earlier LSN) was flushed locally and the time 'now'.
3759  * We have a separate read head for each of the reported LSN locations we
3760  * receive in replies from standby; 'head' controls which read head is
3761  * used. Whenever a read head crosses an LSN which was written into the
3762  * lag buffer with LagTrackerWrite, we can use the associated timestamp to
3763  * find out the time this LSN (or an earlier one) was flushed locally, and
3764  * therefore compute the lag.
3765  *
3766  * Return -1 if no new sample data is available, and otherwise the elapsed
3767  * time in microseconds.
3768  */
3769 static TimeOffset
3771 {
3772  TimestampTz time = 0;
3773 
3774  /* Read all unread samples up to this LSN or end of buffer. */
3775  while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
3776  lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
3777  {
3778  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3779  lag_tracker->last_read[head] =
3781  lag_tracker->read_heads[head] =
3783  }
3784 
3785  /*
3786  * If the lag tracker is empty, that means the standby has processed
3787  * everything we've ever sent so we should now clear 'last_read'. If we
3788  * didn't do that, we'd risk using a stale and irrelevant sample for
3789  * interpolation at the beginning of the next burst of WAL after a period
3790  * of idleness.
3791  */
3793  lag_tracker->last_read[head].time = 0;
3794 
3795  if (time > now)
3796  {
3797  /* If the clock somehow went backwards, treat as not found. */
3798  return -1;
3799  }
3800  else if (time == 0)
3801  {
3802  /*
3803  * We didn't cross a time. If there is a future sample that we
3804  * haven't reached yet, and we've already reached at least one sample,
3805  * let's interpolate the local flushed time. This is mainly useful
3806  * for reporting a completely stuck apply position as having
3807  * increasing lag, since otherwise we'd have to wait for it to
3808  * eventually start moving again and cross one of our samples before
3809  * we can show the lag increasing.
3810  */
3812  {
3813  /* There are no future samples, so we can't interpolate. */
3814  return -1;
3815  }
3816  else if (lag_tracker->last_read[head].time != 0)
3817  {
3818  /* We can interpolate between last_read and the next sample. */
3819  double fraction;
3820  WalTimeSample prev = lag_tracker->last_read[head];
3822 
3823  if (lsn < prev.lsn)
3824  {
3825  /*
3826  * Reported LSNs shouldn't normally go backwards, but it's
3827  * possible when there is a timeline change. Treat as not
3828  * found.
3829  */
3830  return -1;
3831  }
3832 
3833  Assert(prev.lsn < next.lsn);
3834 
3835  if (prev.time > next.time)
3836  {
3837  /* If the clock somehow went backwards, treat as not found. */
3838  return -1;
3839  }
3840 
3841  /* See how far we are between the previous and next samples. */
3842  fraction =
3843  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3844 
3845  /* Scale the local flush time proportionally. */
3846  time = (TimestampTz)
3847  ((double) prev.time + (next.time - prev.time) * fraction);
3848  }
3849  else
3850  {
3851  /*
3852  * We have only a future sample, implying that we were entirely
3853  * caught up but and now there is a new burst of WAL and the
3854  * standby hasn't processed the first sample yet. Until the
3855  * standby reaches the future sample the best we can do is report
3856  * the hypothetical lag if that sample were to be replayed now.
3857  */
3858  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3859  }
3860  }
3861 
3862  /* Return the elapsed time since local flush time in microseconds. */
3863  Assert(time != 0);
3864  return now - time;
3865 }
bool has_privs_of_role(Oid member, Oid role)
Definition: acl.c:4951
int16 AttrNumber
Definition: attnum.h:21
TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history)
Definition: timeline.c:552
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:580
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:76
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1687
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1705
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1538
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1768
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
#define InvalidBackendId
Definition: backendid.h:23
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:948
static int32 next
Definition: blutils.c:219
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define CStringGetTextDatum(s)
Definition: builtins.h:85
#define NameStr(name)
Definition: c.h:681
unsigned int uint32
Definition: c.h:441
#define offsetof(type, field)
Definition: c.h:727
#define SIGNAL_ARGS
Definition: c.h:1355
#define pg_attribute_noreturn()
Definition: c.h:179
#define PG_BINARY
Definition: c.h:1268
#define UINT64_FORMAT
Definition: c.h:484
#define MemSet(start, val, len)
Definition: c.h:1008
uint32 TransactionId
Definition: c.h:587
#define OidIsValid(objectId)
Definition: c.h:710
size_t Size
Definition: c.h:540
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:36
void ConditionVariableCancelSleep(void)
int64 TimestampTz
Definition: timestamp.h:39
int64 TimeOffset
Definition: timestamp.h:40
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2994
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:91
bool defGetBoolean(DefElem *def)
Definition: define.c:108
char * defGetString(DefElem *def)
Definition: define.c:49
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:166
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:221
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
@ DestRemote
Definition: dest.h:91
@ DestRemoteSimple
Definition: dest.h:93
@ DestNone
Definition: dest.h:89
struct cursor * cur
Definition: ecpg.c:28
int errmsg_internal(const char *fmt,...)
Definition: elog.c:991
int errcode_for_file_access(void)
Definition: elog.c:716
int errdetail(const char *fmt,...)
Definition: elog.c:1037
bool message_level_is_interesting(int elevel)
Definition: elog.c:265
int errcode(int sqlerrcode)
Definition: elog.c:693
int errmsg(const char *fmt,...)
Definition: elog.c:904
#define LOG
Definition: elog.h:25
#define COMMERROR
Definition: elog.h:27
#define FATAL
Definition: elog.h:35
#define DEBUG2
Definition: elog.h:23
#define DEBUG1
Definition: elog.h:24
#define ERROR
Definition: elog.h:33
#define elog(elevel,...)
Definition: elog.h:218
#define ereport(elevel,...)
Definition: elog.h:143
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2333
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2275
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2255
int CloseTransientFile(int fd)
Definition: fd.c:2688
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1071
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2511
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1683
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
void SetSingleFuncCall(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
static const unsigned __int64 epoch
Definition: gettimeofday.c:34
int MyProcPid
Definition: globals.c:44
struct Latch * MyLatch
Definition: globals.c:58
Oid MyDatabaseId
Definition: globals.c:89
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:9666
char * application_name
Definition: guc.c:661
@ PGC_SIGHUP
Definition: guc.h:72
void ProcessConfigFile(GucContext context)
#define write(a, b, c)
Definition: win32.h:14
#define read(a, b, c)
Definition: win32.h:13
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
void proc_exit(int code)
Definition: ipc.c:104
int j
Definition: isn.c:74
int i
Definition: isn.c:73
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
Definition: latch.c:947
void SetLatch(Latch *latch)
Definition: latch.c:566
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
Definition: latch.c:1320
void ResetLatch(Latch *latch)
Definition: latch.c:658
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define WL_POSTMASTER_DEATH
Definition: latch.h:129
#define WL_SOCKET_WRITEABLE
Definition: latch.h:127
#define pq_flush()
Definition: libpq.h:46
#define PQ_SMALL_MESSAGE_LIMIT
Definition: libpq.h:30
#define pq_flush_if_writable()
Definition: libpq.h:47
#define pq_is_send_pending()
Definition: libpq.h:48
#define PQ_LARGE_MESSAGE_LIMIT
Definition: libpq.h:31
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:51
#define FeBeWaitSetSocketPos
Definition: libpq.h:63
Assert(fmt[strlen(fmt) - 1] !='\n')
void list_free_deep(List *list)
Definition: list.c:1519
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1736
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:634
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:590
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:103
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:319
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:479
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1196
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1800
void LWLockReleaseAll(void)
Definition: lwlock.c:1899
@ LW_SHARED
Definition: lwlock.h:105
@ LW_EXCLUSIVE
Definition: lwlock.h:104
char * pstrdup(const char *in)
Definition: mcxt.c:1305
void pfree(void *pointer)
Definition: mcxt.c:1175
MemoryContext TopMemoryContext
Definition: mcxt.c:48
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:906
MemoryContext CurrentMemoryContext
Definition: mcxt.c:42
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:218
void * palloc(Size size)
Definition: mcxt.c:1068
#define AllocSetContextCreate
Definition: memutils.h:173
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:197
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
Oid GetUserId(void)
Definition: miscinit.c:492
@ T_IdentifySystemCmd
Definition: nodes.h:531
@ T_BaseBackupCmd
Definition: nodes.h:532
@ T_VariableShowStmt
Definition: nodes.h:376
@ T_ReadReplicationSlotCmd
Definition: nodes.h:535
@ T_DropReplicationSlotCmd
Definition: nodes.h:534
@ T_TimeLineHistoryCmd
Definition: nodes.h:537
@ T_StartReplicationCmd
Definition: nodes.h:536
@ T_CreateReplicationSlotCmd
Definition: nodes.h:533
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void * arg
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:43
#define MAXPGPATH
const void size_t len
#define lfirst(lc)
Definition: pg_list.h:169
#define NIL
Definition: pg_list.h:65
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
static bool two_phase
#define die(msg)
Definition: pg_test_fsync.c:95
static char * buf
Definition: pg_test_fsync.c:67
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:153
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:307
@ PMSIGNAL_ADVANCE_STATE_MACHINE
Definition: pmsignal.h:42
#define snprintf
Definition: port.h:225
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2962
CommandDest whereToSendOutput
Definition: postgres.c:92
const char * debug_query_string
Definition: postgres.c:89
uintptr_t Datum
Definition: postgres.h:411
#define Int32GetDatum(X)
Definition: postgres.h:523
#define InvalidOid
Definition: postgres_ext.h:36
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1034
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1226
WaitEventSet * FeBeWaitSet
Definition: pqcomm.c:168
void pq_endmsgread(void)
Definition: pqcomm.c:1188
void pq_startmsgread(void)
Definition: pqcomm.c:1164
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PROC_AFFECTS_ALL_HORIZONS
Definition: proc.h:59
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:261
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:638
@ PROCSIG_WALSND_INIT_STOPPING
Definition: procsignal.h:35
bool update_process_title
Definition: ps_status.c:36
void set_ps_display(const char *activity)
Definition: ps_status.c:349
@ REPLICATION_KIND_PHYSICAL
Definition: replnodes.h:22
@ REPLICATION_KIND_LOGICAL
Definition: replnodes.h:23
ResourceOwner CurrentResourceOwner
Definition: resowner.c:146
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
Definition: resowner.c:486
void ResourceOwnerDelete(ResourceOwner owner)
Definition: resowner.c:737
@ RESOURCE_RELEASE_LOCKS
Definition: resowner.h:49
@ RESOURCE_RELEASE_BEFORE_LOCKS
Definition: resowner.h:48
@ RESOURCE_RELEASE_AFTER_LOCKS
Definition: resowner.h:50
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
void pg_usleep(long microsec)
Definition: signal.c:53
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:180
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:377
void ReplicationSlotCleanup(void)
Definition: slot.c:578
void ReplicationSlotMarkDirty(void)
Definition: slot.c:798
void ReplicationSlotReserveWal(void)
Definition: slot.c:1151
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:425
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:837
void ReplicationSlotPersist(void)
Definition: slot.c:815
ReplicationSlot * MyReplicationSlot
Definition: slot.c:97
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:625
void ReplicationSlotSave(void)
Definition: slot.c:780
void ReplicationSlotRelease(void)
Definition: slot.c:522
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:887
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase)
Definition: slot.c:255
@ RS_PERSISTENT
Definition: slot.h:35
@ RS_EPHEMERAL
Definition: slot.h:36
@ RS_TEMPORARY
Definition: slot.h:37
#define SlotIsLogical(slot)
Definition: slot.h:169
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:622
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:530
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:683
bool FirstSnapshotSet
Definition: snapmgr.c:149
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
Definition: snapmgr.c:2269
#define SpinLockInit(lock)
Definition: spin.h:60
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
PGPROC * MyProc
Definition: proc.c:68
PROC_HDR * ProcGlobal
Definition: proc.c:80
char * dbname
Definition: streamutil.c:51
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:283
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
ReplicationKind kind
Definition: replnodes.h:56
char * defname
Definition: parsenodes.h:765
int32 day
Definition: timestamp.h:51
int32 month
Definition: timestamp.h:52
TimeOffset time
Definition: timestamp.h:49
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:214
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:216
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:217
int write_head
Definition: walsender.c:215
XLogRecPtr last_lsn
Definition: walsender.c:213
Definition: latch.h:111
Definition: pg_list.h:51
XLogReaderState * reader
Definition: logical.h:42
struct SnapBuild * snapshot_builder
Definition: logical.h:44
StringInfo out
Definition: logical.h:71
Definition: nodes.h:574
NodeTag type
Definition: nodes.h:575
TransactionId xmin
Definition: proc.h:176
uint8 statusFlags
Definition: proc.h:227
int pgxactoff
Definition: proc.h:186
Latch procLatch
Definition: proc.h:168
uint8 * statusFlags
Definition: proc.h:371
TransactionId xmin
Definition: slot.h:62
TransactionId catalog_xmin
Definition: slot.h:70
XLogRecPtr restart_lsn
Definition: slot.h:73
XLogRecPtr confirmed_flush
Definition: slot.h:84
TransactionId effective_catalog_xmin
Definition: slot.h:144
slock_t mutex
Definition: slot.h:120
bool in_use
Definition: slot.h:123
TransactionId effective_xmin
Definition: slot.h:143
ReplicationSlotPersistentData data
Definition: slot.h:147
TupleDesc setDesc
Definition: execnodes.h:317
Tuplestorestate * setResult
Definition: execnodes.h:316
XLogRecPtr startpoint
Definition: replnodes.h:85
ReplicationKind kind
Definition: replnodes.h:82
TimeLineID timeline
Definition: replnodes.h:84
uint8 syncrep_method
Definition: syncrep.h:69
TimeLineID timeline
Definition: replnodes.h:108
TimeLineID ws_tli
Definition: xlogreader.h:49
uint32 events
Definition: latch.h:146
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
TimeOffset writeLag
slock_t mutex
XLogRecPtr flush
XLogRecPtr sentPtr
TimeOffset flushLag
WalSndState state
XLogRecPtr write
TimeOffset applyLag
int sync_standby_priority
bool needreload
Latch * latch
TimestampTz replyTime
XLogRecPtr apply
TimestampTz time
Definition: walsender.c:204
XLogRecPtr lsn
Definition: walsender.c:203
WALSegmentContext segcxt
Definition: xlogreader.h:271
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207
WALOpenSegment seg
Definition: xlogreader.h:272
Definition: regguts.h:318
void SyncRepInitConfig(void)
Definition: syncrep.c:411
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition: syncrep.c:725
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:440
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28
#define SyncRepRequested()
Definition: syncrep.h:19
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
void InitializeTimeouts(void)
Definition: timeout.c:474
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
#define InvalidTransactionId
Definition: transam.h:31
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
#define XidFromFullTransactionId(x)
Definition: transam.h:48
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:45
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:657
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
char data[BLCKSZ]
Definition: c.h:1138
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:261
@ WAIT_EVENT_WAL_SENDER_MAIN
Definition: wait_event.h:48
@ WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ
Definition: wait_event.h:221
@ WAIT_EVENT_WAL_SENDER_WRITE_DATA
Definition: wait_event.h:69
@ WAIT_EVENT_WAL_SENDER_WAIT_WAL
Definition: wait_event.h:68
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:266
static void pgstat_report_wait_end(void)
Definition: wait_event.h:282
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
void WalSndWakeup(void)
Definition: walsender.c:3309
static void ProcessPendingWrites(void)
Definition: walsender.c:1426
static XLogRecPtr sentPtr
Definition: walsender.c:155
#define READ_REPLICATION_SLOT_COLS
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3456
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition: walsender.c:3337
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3234
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:187
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2429
static void XLogSendPhysical(void)
Definition: walsender.c:2738
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1900
static bool waiting_for_ping_response
Definition: walsender.c:172
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:580
void WalSndErrorCleanup(void)
Definition: walsender.c:315
static void InitWalSenderSlot(void)
Definition: walsender.c:2582
WalSnd * MyWalSnd
Definition: walsender.c:113
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:2261
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition: walsender.c:479
static StringInfoData tmpbuf
Definition: walsender.c:160
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:2181
static LagTracker * lag_tracker
Definition: walsender.c:220
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:2044
static void IdentifySystem(void)
Definition: walsender.c:395
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2660
static StringInfoData reply_message
Definition: walsender.c:159
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3667
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase)
Definition: walsender.c:980
bool am_walsender
Definition: walsender.c:116
void WalSndSetState(WalSndState state)
Definition: walsender.c:3418
static StringInfoData output_message
Definition: walsender.c:158
static TimeLineID sendTimeLine
Definition: walsender.c:146
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2456
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1384
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:198
static void XLogSendLogical(void)
Definition: walsender.c:3038
void WalSndShmemInit(void)
Definition: walsender.c:3277
bool am_db_walsender
Definition: walsender.c:119
static volatile sig_atomic_t replication_active
Definition: walsender.c:196
bool wake_wal_senders
Definition: walsender.c:131
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:188
int max_wal_senders
Definition: walsender.c:122
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:2230
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
Definition: walsender.c:1480
bool exec_replication_command(const char *cmd_string)
Definition: walsender.c:1682
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
#define PG_STAT_GET_WAL_SENDERS_COLS
void(* WalSndSendDataCallback)(void)
Definition: walsender.c:226
Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
Definition: walsender.c:3472
void WalSndInitStopping(void)
Definition: walsender.c:3354
void WalSndWaitStopping(void)
Definition: walsender.c:3380
static bool sendTimeLineIsHistoric
Definition: walsender.c:148
void WalSndResourceCleanup(bool isCommit)
Definition: walsender.c:350
void WalSndRqstFileReload(void)
Definition: walsender.c:3189
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1548
bool am_cascading_walsender
Definition: walsender.c:117
static TimestampTz last_processing
Definition: walsender.c:163
Size WalSndShmemSize(void)
Definition: walsender.c:3265
bool log_replication_commands
Definition: walsender.c:126
void HandleWalSndInitStopping(void)
Definition: walsender.c:3212
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:147
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1047
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:919
static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
Definition: walsender.c:3159
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:2076
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
Definition: walsender.c:3644
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3705
void WalSndSignals(void)
Definition: walsender.c:3246
static bool streamingDoneSending
Definition: walsender.c:180
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1257
static void WalSndShutdown(void)
Definition: walsender.c:230
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2642
int wal_sender_timeout
Definition: walsender.c:124
#define MAX_SEND_SIZE
Definition: walsender.c:107
static bool WalSndCaughtUp
Definition: walsender.c:184
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:149
static void ProcessStandbyMessage(void)
Definition: walsender.c:2013
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1357
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1247
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:208
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3770
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2385
static bool streamingDoneReceiving
Definition: walsender.c:181
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:686
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:3119
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3437
static XLogReaderState * xlogreader
Definition: walsender.c:138
static TimestampTz last_reply_timestamp
Definition: walsender.c:169
WalSndCtlData * WalSndCtl
Definition: walsender.c:110
void InitWalSender(void)
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22
PGDLLIMPORT Node * replication_parse_result
void replication_scanner_init(const char *query_string)
WalSndState
@ WALSNDSTATE_STREAMING
@ WALSNDSTATE_BACKUP
@ WALSNDSTATE_CATCHUP
@ WALSNDSTATE_STARTUP
@ WALSNDSTATE_STOPPING
void replication_scanner_finish(void)
int replication_yyparse(void)
bool replication_scanner_is_replication_command(void)
#define SIGCHLD
Definition: win32_port.h:177
#define SIGHUP
Definition: win32_port.h:167
#define SIG_DFL
Definition: win32_port.h:162
#define SIGPIPE
Definition: win32_port.h:172
#define kill(pid, sig)
Definition: win32_port.h:464
#define SIGUSR1
Definition: win32_port.h:179
#define SIGUSR2
Definition: win32_port.h:180
#define SIG_IGN
Definition: win32_port.h:164
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4784
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3462
void StartTransactionCommand(void)
Definition: xact.c:2925
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:394
int XactIsoLevel
Definition: xact.c:78
bool IsSubTransaction(void)
Definition: xact.c:4839
bool IsTransactionBlock(void)
Definition: xact.c:4766
void CommitTransactionCommand(void)
Definition: xact.c:3022
#define XACT_REPEATABLE_READ
Definition: xact.h:38
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4206
bool RecoveryInProgress(void)
Definition: xlog.c:5753
TimeLineID GetWALInsertionTimeLine(void)
Definition: xlog.c:5941
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3454
int wal_segment_size
Definition: xlog.c:144
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:5918
bool XLogBackgroundFlush(void)
Definition: xlog.c:2699
#define TLHistoryFileName(fname, tli)
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
#define MAXFNAMELEN
#define XLogFilePath(path, tli, logSegNo, wal_segsz_bytes)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define TLHistoryFilePath(path, tli)
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint32 TimeLineID
Definition: xlogdefs.h:59
uint64 XLogSegNo
Definition: xlogdefs.h:48
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:418
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
Definition: xlogreader.c:1460
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition: xlogreader.c:108
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:264
#define XL_ROUTINE(...)
Definition: xlogreader.h:117
static TimeLineID receiveTLI
Definition: xlogrecovery.c:260
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:856
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength, TimeLineID currTLI)
Definition: xlogutils.c:732
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:1039