PostgreSQL Source Code  git master
walsender.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * walsender.c
4  *
5  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6  * care of sending XLOG from the primary server to a single recipient.
7  * (Note that there can be more than one walsender process concurrently.)
8  * It is started by the postmaster when the walreceiver of a standby server
9  * connects to the primary server and requests XLOG streaming replication.
10  *
11  * A walsender is similar to a regular backend, ie. there is a one-to-one
12  * relationship between a connection and a walsender process, but instead
13  * of processing SQL queries, it understands a small set of special
14  * replication-mode commands. The START_REPLICATION command begins streaming
15  * WAL to the client. While streaming, the walsender keeps reading XLOG
16  * records from the disk and sends them to the standby server over the
17  * COPY protocol, until either side ends the replication by exiting COPY
18  * mode (or until the connection is closed).
19  *
20  * Normal termination is by SIGTERM, which instructs the walsender to
21  * close the connection and exit(0) at the next convenient moment. Emergency
22  * termination is by SIGQUIT; like any backend, the walsender will simply
23  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24  * are treated as not a crash but approximately normal termination;
25  * the walsender will exit quickly without sending any more XLOG records.
26  *
27  * If the server is shut down, checkpointer sends us
28  * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If
29  * the backend is idle or runs an SQL query this causes the backend to
30  * shutdown, if logical replication is in progress all existing WAL records
31  * are processed followed by a shutdown. Otherwise this causes the walsender
32  * to switch to the "stopping" state. In this state, the walsender will reject
33  * any further replication commands. The checkpointer begins the shutdown
34  * checkpoint once all walsenders are confirmed as stopping. When the shutdown
35  * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
36  * walsender to send any outstanding WAL, including the shutdown checkpoint
37  * record, wait for it to be replicated to the standby, and then exit.
38  *
39  *
40  * Portions Copyright (c) 2010-2020, PostgreSQL Global Development Group
41  *
42  * IDENTIFICATION
43  * src/backend/replication/walsender.c
44  *
45  *-------------------------------------------------------------------------
46  */
47 #include "postgres.h"
48 
49 #include <signal.h>
50 #include <unistd.h>
51 
52 #include "access/printtup.h"
53 #include "access/timeline.h"
54 #include "access/transam.h"
55 #include "access/xact.h"
56 #include "access/xlog_internal.h"
57 #include "access/xlogreader.h"
58 #include "access/xlogutils.h"
59 #include "catalog/pg_authid.h"
60 #include "catalog/pg_type.h"
61 #include "commands/dbcommands.h"
62 #include "commands/defrem.h"
63 #include "funcapi.h"
64 #include "libpq/libpq.h"
65 #include "libpq/pqformat.h"
66 #include "miscadmin.h"
67 #include "nodes/replnodes.h"
68 #include "pgstat.h"
69 #include "postmaster/interrupt.h"
70 #include "replication/basebackup.h"
71 #include "replication/decode.h"
72 #include "replication/logical.h"
73 #include "replication/slot.h"
74 #include "replication/snapbuild.h"
75 #include "replication/syncrep.h"
77 #include "replication/walsender.h"
80 #include "storage/fd.h"
81 #include "storage/ipc.h"
82 #include "storage/pmsignal.h"
83 #include "storage/proc.h"
84 #include "storage/procarray.h"
85 #include "tcop/dest.h"
86 #include "tcop/tcopprot.h"
87 #include "utils/acl.h"
88 #include "utils/builtins.h"
89 #include "utils/guc.h"
90 #include "utils/memutils.h"
91 #include "utils/pg_lsn.h"
92 #include "utils/portal.h"
93 #include "utils/ps_status.h"
94 #include "utils/timeout.h"
95 #include "utils/timestamp.h"
96 
97 /*
98  * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
99  *
100  * We don't have a good idea of what a good value would be; there's some
101  * overhead per message in both walsender and walreceiver, but on the other
102  * hand sending large batches makes walsender less responsive to signals
103  * because signals are checked only between messages. 128kB (with
104  * default 8k blocks) seems like a reasonable guess for now.
105  */
106 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
107 
108 /* Array of WalSnds in shared memory */
110 
111 /* My slot in the shared memory array */
112 WalSnd *MyWalSnd = NULL;
113 
114 /* Global state */
115 bool am_walsender = false; /* Am I a walsender process? */
116 bool am_cascading_walsender = false; /* Am I cascading WAL to another
117  * standby? */
118 bool am_db_walsender = false; /* Connected to a database? */
119 
120 /* User-settable parameters for walsender */
121 int max_wal_senders = 0; /* the maximum number of concurrent
122  * walsenders */
123 int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
124  * data message */
126 
127 /*
128  * State for WalSndWakeupRequest
129  */
130 bool wake_wal_senders = false;
131 
132 /*
133  * xlogreader used for replication. Note that a WAL sender doing physical
134  * replication does not need xlogreader to read WAL, but it needs one to
135  * keep a state of its work.
136  */
138 
139 /*
140  * These variables keep track of the state of the timeline we're currently
141  * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
142  * the timeline is not the latest timeline on this server, and the server's
143  * history forked off from that timeline at sendTimeLineValidUpto.
144  */
147 static bool sendTimeLineIsHistoric = false;
149 
150 /*
151  * How far have we sent WAL already? This is also advertised in
152  * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
153  */
155 
156 /* Buffers for constructing outgoing messages and processing reply messages. */
160 
161 /* Timestamp of last ProcessRepliesIfAny(). */
163 
164 /*
165  * Timestamp of last ProcessRepliesIfAny() that saw a reply from the
166  * standby. Set to 0 if wal_sender_timeout doesn't need to be active.
167  */
169 
170 /* Have we sent a heartbeat message asking for reply, since last reply? */
171 static bool waiting_for_ping_response = false;
172 
173 /*
174  * While streaming WAL in Copy mode, streamingDoneSending is set to true
175  * after we have sent CopyDone. We should not send any more CopyData messages
176  * after that. streamingDoneReceiving is set to true when we receive CopyDone
177  * from the other end. When both become true, it's time to exit Copy mode.
178  */
181 
182 /* Are we there yet? */
183 static bool WalSndCaughtUp = false;
184 
185 /* Flags set by signal handlers for later service in main loop */
186 static volatile sig_atomic_t got_SIGUSR2 = false;
187 static volatile sig_atomic_t got_STOPPING = false;
188 
189 /*
190  * This is set while we are streaming. When not set
191  * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
192  * the main loop is responsible for checking got_STOPPING and terminating when
193  * it's set (after streaming any remaining WAL).
194  */
195 static volatile sig_atomic_t replication_active = false;
196 
198 
199 /* A sample associating a WAL location with the time it was written. */
200 typedef struct
201 {
204 } WalTimeSample;
205 
206 /* The size of our buffer of time samples. */
207 #define LAG_TRACKER_BUFFER_SIZE 8192
208 
209 /* A mechanism for tracking replication lag. */
210 typedef struct
211 {
215  int read_heads[NUM_SYNC_REP_WAIT_MODE];
217 } LagTracker;
218 
220 
221 /* Signal handlers */
223 
224 /* Prototypes for private functions */
225 typedef void (*WalSndSendDataCallback) (void);
226 static void WalSndLoop(WalSndSendDataCallback send_data);
227 static void InitWalSenderSlot(void);
228 static void WalSndKill(int code, Datum arg);
230 static void XLogSendPhysical(void);
231 static void XLogSendLogical(void);
232 static void WalSndDone(WalSndSendDataCallback send_data);
233 static XLogRecPtr GetStandbyFlushRecPtr(void);
234 static void IdentifySystem(void);
237 static void StartReplication(StartReplicationCmd *cmd);
239 static void ProcessStandbyMessage(void);
240 static void ProcessStandbyReplyMessage(void);
241 static void ProcessStandbyHSFeedbackMessage(void);
242 static void ProcessRepliesIfAny(void);
243 static void WalSndKeepalive(bool requestReply);
244 static void WalSndKeepaliveIfNecessary(void);
245 static void WalSndCheckTimeOut(void);
247 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
248 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
251 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
252 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
254 
255 static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
256  TimeLineID *tli_p);
257 
258 
259 /* Initialize walsender process before entering the main command loop */
260 void
261 InitWalSender(void)
262 {
264 
265  /* Create a per-walsender data structure in shared memory */
267 
268  /*
269  * We don't currently need any ResourceOwner in a walsender process, but
270  * if we did, we could call CreateAuxProcessResourceOwner here.
271  */
272 
273  /*
274  * Let postmaster know that we're a WAL sender. Once we've declared us as
275  * a WAL sender process, postmaster will let us outlive the bgwriter and
276  * kill us last in the shutdown sequence, so we get a chance to stream all
277  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
278  * there's no going back, and we mustn't write any WAL records after this.
279  */
282 
283  /* Initialize empty timestamp buffer for lag tracking. */
284  lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
285 }
286 
287 /*
288  * Clean up after an error.
289  *
290  * WAL sender processes don't use transactions like regular backends do.
291  * This function does any cleanup required after an error in a WAL sender
292  * process, similar to what transaction abort does in a regular backend.
293  */
294 void
296 {
300 
301  if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
302  wal_segment_close(xlogreader);
303 
304  if (MyReplicationSlot != NULL)
306 
308 
309  replication_active = false;
310 
311  /*
312  * If there is a transaction in progress, it will clean up our
313  * ResourceOwner, but if a replication command set up a resource owner
314  * without a transaction, we've got to clean that up now.
315  */
317  WalSndResourceCleanup(false);
318 
319  if (got_STOPPING || got_SIGUSR2)
320  proc_exit(0);
321 
322  /* Revert back to startup state */
324 }
325 
326 /*
327  * Clean up any ResourceOwner we created.
328  */
329 void
330 WalSndResourceCleanup(bool isCommit)
331 {
332  ResourceOwner resowner;
333 
334  if (CurrentResourceOwner == NULL)
335  return;
336 
337  /*
338  * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
339  * in a local variable and clear it first.
340  */
341  resowner = CurrentResourceOwner;
342  CurrentResourceOwner = NULL;
343 
344  /* Now we can release resources and delete it. */
345  ResourceOwnerRelease(resowner,
346  RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
347  ResourceOwnerRelease(resowner,
348  RESOURCE_RELEASE_LOCKS, isCommit, true);
349  ResourceOwnerRelease(resowner,
350  RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
351  ResourceOwnerDelete(resowner);
352 }
353 
354 /*
355  * Handle a client's connection abort in an orderly manner.
356  */
357 static void
358 WalSndShutdown(void)
359 {
360  /*
361  * Reset whereToSendOutput to prevent ereport from attempting to send any
362  * more messages to the standby.
363  */
366 
367  proc_exit(0);
368  abort(); /* keep the compiler quiet */
369 }
370 
371 /*
372  * Handle the IDENTIFY_SYSTEM command.
373  */
374 static void
376 {
377  char sysid[32];
378  char xloc[MAXFNAMELEN];
379  XLogRecPtr logptr;
380  char *dbname = NULL;
382  TupOutputState *tstate;
383  TupleDesc tupdesc;
384  Datum values[4];
385  bool nulls[4];
386 
387  /*
388  * Reply with a result set with one row, four columns. First col is system
389  * ID, second is timeline ID, third is current xlog location and the
390  * fourth contains the database name if we are connected to one.
391  */
392 
393  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
395 
398  {
399  /* this also updates ThisTimeLineID */
400  logptr = GetStandbyFlushRecPtr();
401  }
402  else
403  logptr = GetFlushRecPtr();
404 
405  snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
406 
407  if (MyDatabaseId != InvalidOid)
408  {
410 
411  /* syscache access needs a transaction env. */
413  /* make dbname live outside TX context */
417  /* CommitTransactionCommand switches to TopMemoryContext */
419  }
420 
422  MemSet(nulls, false, sizeof(nulls));
423 
424  /* need a tuple descriptor representing four columns */
425  tupdesc = CreateTemplateTupleDesc(4);
426  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
427  TEXTOID, -1, 0);
428  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
429  INT4OID, -1, 0);
430  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
431  TEXTOID, -1, 0);
432  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
433  TEXTOID, -1, 0);
434 
435  /* prepare for projection of tuples */
436  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
437 
438  /* column 1: system identifier */
439  values[0] = CStringGetTextDatum(sysid);
440 
441  /* column 2: timeline */
442  values[1] = Int32GetDatum(ThisTimeLineID);
443 
444  /* column 3: wal location */
445  values[2] = CStringGetTextDatum(xloc);
446 
447  /* column 4: database name, or NULL if none */
448  if (dbname)
449  values[3] = CStringGetTextDatum(dbname);
450  else
451  nulls[3] = true;
452 
453  /* send it to dest */
454  do_tup_output(tstate, values, nulls);
455 
456  end_tup_output(tstate);
457 }
458 
459 
460 /*
461  * Handle TIMELINE_HISTORY command.
462  */
463 static void
465 {
467  char histfname[MAXFNAMELEN];
468  char path[MAXPGPATH];
469  int fd;
470  off_t histfilelen;
471  off_t bytesleft;
472  Size len;
473 
474  /*
475  * Reply with a result set with one row, and two columns. The first col is
476  * the name of the history file, 2nd is the contents.
477  */
478 
479  TLHistoryFileName(histfname, cmd->timeline);
480  TLHistoryFilePath(path, cmd->timeline);
481 
482  /* Send a RowDescription message */
483  pq_beginmessage(&buf, 'T');
484  pq_sendint16(&buf, 2); /* 2 fields */
485 
486  /* first field */
487  pq_sendstring(&buf, "filename"); /* col name */
488  pq_sendint32(&buf, 0); /* table oid */
489  pq_sendint16(&buf, 0); /* attnum */
490  pq_sendint32(&buf, TEXTOID); /* type oid */
491  pq_sendint16(&buf, -1); /* typlen */
492  pq_sendint32(&buf, 0); /* typmod */
493  pq_sendint16(&buf, 0); /* format code */
494 
495  /* second field */
496  pq_sendstring(&buf, "content"); /* col name */
497  pq_sendint32(&buf, 0); /* table oid */
498  pq_sendint16(&buf, 0); /* attnum */
499  pq_sendint32(&buf, BYTEAOID); /* type oid */
500  pq_sendint16(&buf, -1); /* typlen */
501  pq_sendint32(&buf, 0); /* typmod */
502  pq_sendint16(&buf, 0); /* format code */
503  pq_endmessage(&buf);
504 
505  /* Send a DataRow message */
506  pq_beginmessage(&buf, 'D');
507  pq_sendint16(&buf, 2); /* # of columns */
508  len = strlen(histfname);
509  pq_sendint32(&buf, len); /* col1 len */
510  pq_sendbytes(&buf, histfname, len);
511 
512  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
513  if (fd < 0)
514  ereport(ERROR,
516  errmsg("could not open file \"%s\": %m", path)));
517 
518  /* Determine file length and send it to client */
519  histfilelen = lseek(fd, 0, SEEK_END);
520  if (histfilelen < 0)
521  ereport(ERROR,
523  errmsg("could not seek to end of file \"%s\": %m", path)));
524  if (lseek(fd, 0, SEEK_SET) != 0)
525  ereport(ERROR,
527  errmsg("could not seek to beginning of file \"%s\": %m", path)));
528 
529  pq_sendint32(&buf, histfilelen); /* col2 len */
530 
531  bytesleft = histfilelen;
532  while (bytesleft > 0)
533  {
534  PGAlignedBlock rbuf;
535  int nread;
536 
538  nread = read(fd, rbuf.data, sizeof(rbuf));
540  if (nread < 0)
541  ereport(ERROR,
543  errmsg("could not read file \"%s\": %m",
544  path)));
545  else if (nread == 0)
546  ereport(ERROR,
548  errmsg("could not read file \"%s\": read %d of %zu",
549  path, nread, (Size) bytesleft)));
550 
551  pq_sendbytes(&buf, rbuf.data, nread);
552  bytesleft -= nread;
553  }
554 
555  if (CloseTransientFile(fd) != 0)
556  ereport(ERROR,
558  errmsg("could not close file \"%s\": %m", path)));
559 
560  pq_endmessage(&buf);
561 }
562 
563 /*
564  * Handle START_REPLICATION command.
565  *
566  * At the moment, this never returns, but an ereport(ERROR) will take us back
567  * to the main loop.
568  */
569 static void
571 {
573  XLogRecPtr FlushPtr;
574 
575  if (ThisTimeLineID == 0)
576  ereport(ERROR,
577  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
578  errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
579 
580  /* create xlogreader for physical replication */
581  xlogreader =
583  XL_ROUTINE(.segment_open = WalSndSegmentOpen,
584  .segment_close = wal_segment_close),
585  NULL);
586 
587  if (!xlogreader)
588  ereport(ERROR,
589  (errcode(ERRCODE_OUT_OF_MEMORY),
590  errmsg("out of memory")));
591 
592  /*
593  * We assume here that we're logging enough information in the WAL for
594  * log-shipping, since this is checked in PostmasterMain().
595  *
596  * NOTE: wal_level can only change at shutdown, so in most cases it is
597  * difficult for there to be WAL data that we can still see that was
598  * written at wal_level='minimal'.
599  */
600 
601  if (cmd->slotname)
602  {
605  ereport(ERROR,
606  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
607  errmsg("cannot use a logical replication slot for physical replication")));
608 
609  /*
610  * We don't need to verify the slot's restart_lsn here; instead we
611  * rely on the caller requesting the starting point to use. If the
612  * WAL segment doesn't exist, we'll fail later.
613  */
614  }
615 
616  /*
617  * Select the timeline. If it was given explicitly by the client, use
618  * that. Otherwise use the timeline of the last replayed record, which is
619  * kept in ThisTimeLineID.
620  */
622  {
623  /* this also updates ThisTimeLineID */
624  FlushPtr = GetStandbyFlushRecPtr();
625  }
626  else
627  FlushPtr = GetFlushRecPtr();
628 
629  if (cmd->timeline != 0)
630  {
631  XLogRecPtr switchpoint;
632 
633  sendTimeLine = cmd->timeline;
635  {
636  sendTimeLineIsHistoric = false;
638  }
639  else
640  {
641  List *timeLineHistory;
642 
643  sendTimeLineIsHistoric = true;
644 
645  /*
646  * Check that the timeline the client requested exists, and the
647  * requested start location is on that timeline.
648  */
649  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
650  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
652  list_free_deep(timeLineHistory);
653 
654  /*
655  * Found the requested timeline in the history. Check that
656  * requested startpoint is on that timeline in our history.
657  *
658  * This is quite loose on purpose. We only check that we didn't
659  * fork off the requested timeline before the switchpoint. We
660  * don't check that we switched *to* it before the requested
661  * starting point. This is because the client can legitimately
662  * request to start replication from the beginning of the WAL
663  * segment that contains switchpoint, but on the new timeline, so
664  * that it doesn't end up with a partial segment. If you ask for
665  * too old a starting point, you'll get an error later when we
666  * fail to find the requested WAL segment in pg_wal.
667  *
668  * XXX: we could be more strict here and only allow a startpoint
669  * that's older than the switchpoint, if it's still in the same
670  * WAL segment.
671  */
672  if (!XLogRecPtrIsInvalid(switchpoint) &&
673  switchpoint < cmd->startpoint)
674  {
675  ereport(ERROR,
676  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
677  (uint32) (cmd->startpoint >> 32),
678  (uint32) (cmd->startpoint),
679  cmd->timeline),
680  errdetail("This server's history forked from timeline %u at %X/%X.",
681  cmd->timeline,
682  (uint32) (switchpoint >> 32),
683  (uint32) (switchpoint))));
684  }
685  sendTimeLineValidUpto = switchpoint;
686  }
687  }
688  else
689  {
692  sendTimeLineIsHistoric = false;
693  }
694 
696 
697  /* If there is nothing to stream, don't even enter COPY mode */
699  {
700  /*
701  * When we first start replication the standby will be behind the
702  * primary. For some applications, for example synchronous
703  * replication, it is important to have a clear state for this initial
704  * catchup mode, so we can trigger actions when we change streaming
705  * state later. We may stay in this state for a long time, which is
706  * exactly why we want to be able to monitor whether or not we are
707  * still here.
708  */
710 
711  /* Send a CopyBothResponse message, and start streaming */
712  pq_beginmessage(&buf, 'W');
713  pq_sendbyte(&buf, 0);
714  pq_sendint16(&buf, 0);
715  pq_endmessage(&buf);
716  pq_flush();
717 
718  /*
719  * Don't allow a request to stream from a future point in WAL that
720  * hasn't been flushed to disk in this server yet.
721  */
722  if (FlushPtr < cmd->startpoint)
723  {
724  ereport(ERROR,
725  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
726  (uint32) (cmd->startpoint >> 32),
727  (uint32) (cmd->startpoint),
728  (uint32) (FlushPtr >> 32),
729  (uint32) (FlushPtr))));
730  }
731 
732  /* Start streaming from the requested point */
733  sentPtr = cmd->startpoint;
734 
735  /* Initialize shared memory status, too */
736  SpinLockAcquire(&MyWalSnd->mutex);
737  MyWalSnd->sentPtr = sentPtr;
738  SpinLockRelease(&MyWalSnd->mutex);
739 
741 
742  /* Main loop of walsender */
743  replication_active = true;
744 
746 
747  replication_active = false;
748  if (got_STOPPING)
749  proc_exit(0);
751 
753  }
754 
755  if (cmd->slotname)
757 
758  /*
759  * Copy is finished now. Send a single-row result set indicating the next
760  * timeline.
761  */
763  {
764  char startpos_str[8 + 1 + 8 + 1];
766  TupOutputState *tstate;
767  TupleDesc tupdesc;
768  Datum values[2];
769  bool nulls[2];
770 
771  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
772  (uint32) (sendTimeLineValidUpto >> 32),
774 
776  MemSet(nulls, false, sizeof(nulls));
777 
778  /*
779  * Need a tuple descriptor representing two columns. int8 may seem
780  * like a surprising data type for this, but in theory int4 would not
781  * be wide enough for this, as TimeLineID is unsigned.
782  */
783  tupdesc = CreateTemplateTupleDesc(2);
784  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
785  INT8OID, -1, 0);
786  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
787  TEXTOID, -1, 0);
788 
789  /* prepare for projection of tuple */
790  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
791 
792  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
793  values[1] = CStringGetTextDatum(startpos_str);
794 
795  /* send it to dest */
796  do_tup_output(tstate, values, nulls);
797 
798  end_tup_output(tstate);
799  }
800 
801  /* Send CommandComplete message */
802  EndReplicationCommand("START_STREAMING");
803 }
804 
805 /*
806  * XLogReaderRoutine->page_read callback for logical decoding contexts, as a
807  * walsender process.
808  *
809  * Inside the walsender we can do better than read_local_xlog_page,
810  * which has to do a plain sleep/busy loop, because the walsender's latch gets
811  * set every time WAL is flushed.
812  */
813 static int
815  XLogRecPtr targetRecPtr, char *cur_page)
816 {
817  XLogRecPtr flushptr;
818  int count;
819  WALReadError errinfo;
820  XLogSegNo segno;
821 
822  XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
824  sendTimeLine = state->currTLI;
826  sendTimeLineNextTLI = state->nextTLI;
827 
828  /* make sure we have enough WAL available */
829  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
830 
831  /* fail if not (implies we are going to shut down) */
832  if (flushptr < targetPagePtr + reqLen)
833  return -1;
834 
835  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
836  count = XLOG_BLCKSZ; /* more than one block available */
837  else
838  count = flushptr - targetPagePtr; /* part of the page available */
839 
840  /* now actually read the data, we know it's there */
841  if (!WALRead(state,
842  cur_page,
843  targetPagePtr,
844  XLOG_BLCKSZ,
845  state->seg.ws_tli, /* Pass the current TLI because only
846  * WalSndSegmentOpen controls whether new
847  * TLI is needed. */
848  &errinfo))
849  WALReadRaiseError(&errinfo);
850 
851  /*
852  * After reading into the buffer, check that what we read was valid. We do
853  * this after reading, because even though the segment was present when we
854  * opened it, it might get recycled or removed while we read it. The
855  * read() succeeds in that case, but the data we tried to read might
856  * already have been overwritten with new WAL records.
857  */
858  XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
859  CheckXLogRemoved(segno, state->seg.ws_tli);
860 
861  return count;
862 }
863 
864 /*
865  * Process extra options given to CREATE_REPLICATION_SLOT.
866  */
867 static void
869  bool *reserve_wal,
870  CRSSnapshotAction *snapshot_action)
871 {
872  ListCell *lc;
873  bool snapshot_action_given = false;
874  bool reserve_wal_given = false;
875 
876  /* Parse options */
877  foreach(lc, cmd->options)
878  {
879  DefElem *defel = (DefElem *) lfirst(lc);
880 
881  if (strcmp(defel->defname, "export_snapshot") == 0)
882  {
883  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
884  ereport(ERROR,
885  (errcode(ERRCODE_SYNTAX_ERROR),
886  errmsg("conflicting or redundant options")));
887 
888  snapshot_action_given = true;
889  *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
891  }
892  else if (strcmp(defel->defname, "use_snapshot") == 0)
893  {
894  if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
895  ereport(ERROR,
896  (errcode(ERRCODE_SYNTAX_ERROR),
897  errmsg("conflicting or redundant options")));
898 
899  snapshot_action_given = true;
900  *snapshot_action = CRS_USE_SNAPSHOT;
901  }
902  else if (strcmp(defel->defname, "reserve_wal") == 0)
903  {
904  if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
905  ereport(ERROR,
906  (errcode(ERRCODE_SYNTAX_ERROR),
907  errmsg("conflicting or redundant options")));
908 
909  reserve_wal_given = true;
910  *reserve_wal = true;
911  }
912  else
913  elog(ERROR, "unrecognized option: %s", defel->defname);
914  }
915 }
916 
917 /*
918  * Create a new replication slot.
919  */
920 static void
922 {
923  const char *snapshot_name = NULL;
924  char xloc[MAXFNAMELEN];
925  char *slot_name;
926  bool reserve_wal = false;
927  CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
929  TupOutputState *tstate;
930  TupleDesc tupdesc;
931  Datum values[4];
932  bool nulls[4];
933 
935 
936  parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
937 
938  /* setup state for WalSndSegmentOpen */
939  sendTimeLineIsHistoric = false;
941 
942  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
943  {
944  ReplicationSlotCreate(cmd->slotname, false,
946  }
947  else
948  {
950 
951  /*
952  * Initially create persistent slot as ephemeral - that allows us to
953  * nicely handle errors during initialization because it'll get
954  * dropped if this transaction fails. We'll make it persistent at the
955  * end. Temporary slots can be created as temporary from beginning as
956  * they get dropped on error as well.
957  */
958  ReplicationSlotCreate(cmd->slotname, true,
960  }
961 
962  if (cmd->kind == REPLICATION_KIND_LOGICAL)
963  {
965  bool need_full_snapshot = false;
966 
967  /*
968  * Do options check early so that we can bail before calling the
969  * DecodingContextFindStartpoint which can take long time.
970  */
971  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
972  {
973  if (IsTransactionBlock())
974  ereport(ERROR,
975  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
976  (errmsg("%s must not be called inside a transaction",
977  "CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT")));
978 
979  need_full_snapshot = true;
980  }
981  else if (snapshot_action == CRS_USE_SNAPSHOT)
982  {
983  if (!IsTransactionBlock())
984  ereport(ERROR,
985  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
986  (errmsg("%s must be called inside a transaction",
987  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
988 
990  ereport(ERROR,
991  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
992  (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
993  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
994 
995  if (FirstSnapshotSet)
996  ereport(ERROR,
997  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
998  (errmsg("%s must be called before any query",
999  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
1000 
1001  if (IsSubTransaction())
1002  ereport(ERROR,
1003  /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1004  (errmsg("%s must not be called in a subtransaction",
1005  "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
1006 
1007  need_full_snapshot = true;
1008  }
1009 
1010  ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
1012  XL_ROUTINE(.page_read = logical_read_xlog_page,
1013  .segment_open = WalSndSegmentOpen,
1014  .segment_close = wal_segment_close),
1017 
1018  /*
1019  * Signal that we don't need the timeout mechanism. We're just
1020  * creating the replication slot and don't yet accept feedback
1021  * messages or send keepalives. As we possibly need to wait for
1022  * further WAL the walsender would otherwise possibly be killed too
1023  * soon.
1024  */
1026 
1027  /* build initial snapshot, might take a while */
1029 
1030  /*
1031  * Export or use the snapshot if we've been asked to do so.
1032  *
1033  * NB. We will convert the snapbuild.c kind of snapshot to normal
1034  * snapshot when doing this.
1035  */
1036  if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1037  {
1038  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
1039  }
1040  else if (snapshot_action == CRS_USE_SNAPSHOT)
1041  {
1042  Snapshot snap;
1043 
1046  }
1047 
1048  /* don't need the decoding context anymore */
1049  FreeDecodingContext(ctx);
1050 
1051  if (!cmd->temporary)
1053  }
1054  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
1055  {
1057 
1059 
1060  /* Write this slot to disk if it's a permanent one. */
1061  if (!cmd->temporary)
1063  }
1064 
1065  snprintf(xloc, sizeof(xloc), "%X/%X",
1068 
1070  MemSet(nulls, false, sizeof(nulls));
1071 
1072  /*----------
1073  * Need a tuple descriptor representing four columns:
1074  * - first field: the slot name
1075  * - second field: LSN at which we became consistent
1076  * - third field: exported snapshot's name
1077  * - fourth field: output plugin
1078  *----------
1079  */
1080  tupdesc = CreateTemplateTupleDesc(4);
1081  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1082  TEXTOID, -1, 0);
1083  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1084  TEXTOID, -1, 0);
1085  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1086  TEXTOID, -1, 0);
1087  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1088  TEXTOID, -1, 0);
1089 
1090  /* prepare for projection of tuples */
1091  tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1092 
1093  /* slot_name */
1094  slot_name = NameStr(MyReplicationSlot->data.name);
1095  values[0] = CStringGetTextDatum(slot_name);
1096 
1097  /* consistent wal location */
1098  values[1] = CStringGetTextDatum(xloc);
1099 
1100  /* snapshot name, or NULL if none */
1101  if (snapshot_name != NULL)
1102  values[2] = CStringGetTextDatum(snapshot_name);
1103  else
1104  nulls[2] = true;
1105 
1106  /* plugin, or NULL if none */
1107  if (cmd->plugin != NULL)
1108  values[3] = CStringGetTextDatum(cmd->plugin);
1109  else
1110  nulls[3] = true;
1111 
1112  /* send it to dest */
1113  do_tup_output(tstate, values, nulls);
1114  end_tup_output(tstate);
1115 
1117 }
1118 
1119 /*
1120  * Get rid of a replication slot that is no longer wanted.
1121  */
1122 static void
1124 {
1125  ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1126 }
1127 
1128 /*
1129  * Load previously initiated logical slot and prepare for sending data (via
1130  * WalSndLoop).
1131  */
1132 static void
1134 {
1136  QueryCompletion qc;
1137 
1138  /* make sure that our requirements are still fulfilled */
1140 
1142 
1144 
1146  ereport(ERROR,
1147  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1148  errmsg("cannot read from logical replication slot \"%s\"",
1149  cmd->slotname),
1150  errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
1151 
1152  /*
1153  * Force a disconnect, so that the decoding code doesn't need to care
1154  * about an eventual switch from running in recovery, to running in a
1155  * normal environment. Client code is expected to handle reconnects.
1156  */
1158  {
1159  ereport(LOG,
1160  (errmsg("terminating walsender process after promotion")));
1161  got_STOPPING = true;
1162  }
1163 
1164  /*
1165  * Create our decoding context, making it start at the previously ack'ed
1166  * position.
1167  *
1168  * Do this before sending a CopyBothResponse message, so that any errors
1169  * are reported early.
1170  */
1171  logical_decoding_ctx =
1172  CreateDecodingContext(cmd->startpoint, cmd->options, false,
1173  XL_ROUTINE(.page_read = logical_read_xlog_page,
1174  .segment_open = WalSndSegmentOpen,
1175  .segment_close = wal_segment_close),
1178  xlogreader = logical_decoding_ctx->reader;
1179 
1181 
1182  /* Send a CopyBothResponse message, and start streaming */
1183  pq_beginmessage(&buf, 'W');
1184  pq_sendbyte(&buf, 0);
1185  pq_sendint16(&buf, 0);
1186  pq_endmessage(&buf);
1187  pq_flush();
1188 
1189  /* Start reading WAL from the oldest required WAL. */
1190  XLogBeginRead(logical_decoding_ctx->reader,
1192 
1193  /*
1194  * Report the location after which we'll send out further commits as the
1195  * current sentPtr.
1196  */
1198 
1199  /* Also update the sent position status in shared memory */
1200  SpinLockAcquire(&MyWalSnd->mutex);
1202  SpinLockRelease(&MyWalSnd->mutex);
1203 
1204  replication_active = true;
1205 
1207 
1208  /* Main loop of walsender */
1210 
1211  FreeDecodingContext(logical_decoding_ctx);
1213 
1214  replication_active = false;
1215  if (got_STOPPING)
1216  proc_exit(0);
1218 
1219  /* Get out of COPY mode (CommandComplete). */
1220  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
1221  EndCommand(&qc, DestRemote, false);
1222 }
1223 
1224 /*
1225  * LogicalDecodingContext 'prepare_write' callback.
1226  *
1227  * Prepare a write into a StringInfo.
1228  *
1229  * Don't do anything lasting in here, it's quite possible that nothing will be done
1230  * with the data.
1231  */
1232 static void
1234 {
1235  /* can't have sync rep confused by sending the same LSN several times */
1236  if (!last_write)
1237  lsn = InvalidXLogRecPtr;
1238 
1239  resetStringInfo(ctx->out);
1240 
1241  pq_sendbyte(ctx->out, 'w');
1242  pq_sendint64(ctx->out, lsn); /* dataStart */
1243  pq_sendint64(ctx->out, lsn); /* walEnd */
1244 
1245  /*
1246  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1247  * reserve space here.
1248  */
1249  pq_sendint64(ctx->out, 0); /* sendtime */
1250 }
1251 
1252 /*
1253  * LogicalDecodingContext 'write' callback.
1254  *
1255  * Actually write out data previously prepared by WalSndPrepareWrite out to
1256  * the network. Take as long as needed, but process replies from the other
1257  * side and check timeouts during that.
1258  */
1259 static void
1261  bool last_write)
1262 {
1263  TimestampTz now;
1264 
1265  /*
1266  * Fill the send timestamp last, so that it is taken as late as possible.
1267  * This is somewhat ugly, but the protocol is set as it's already used for
1268  * several releases by streaming physical replication.
1269  */
1270  resetStringInfo(&tmpbuf);
1271  now = GetCurrentTimestamp();
1272  pq_sendint64(&tmpbuf, now);
1273  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1274  tmpbuf.data, sizeof(int64));
1275 
1276  /* output previously gathered data in a CopyData packet */
1277  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1278 
1280 
1281  /* Try to flush pending output to the client */
1282  if (pq_flush_if_writable() != 0)
1283  WalSndShutdown();
1284 
1285  /* Try taking fast path unless we get too close to walsender timeout. */
1287  wal_sender_timeout / 2) &&
1288  !pq_is_send_pending())
1289  {
1290  return;
1291  }
1292 
1293  /* If we have pending write here, go to slow path */
1294  for (;;)
1295  {
1296  int wakeEvents;
1297  long sleeptime;
1298 
1299  /* Check for input from the client */
1301 
1302  /* die if timeout was reached */
1304 
1305  /* Send keepalive if the time has come */
1307 
1308  if (!pq_is_send_pending())
1309  break;
1310 
1312 
1313  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1315 
1316  /* Sleep until something happens or we time out */
1317  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
1318  MyProcPort->sock, sleeptime,
1320 
1321  /* Clear any already-pending wakeups */
1323 
1325 
1326  /* Process any requests or signals received recently */
1327  if (ConfigReloadPending)
1328  {
1329  ConfigReloadPending = false;
1332  }
1333 
1334  /* Try to flush pending output to the client */
1335  if (pq_flush_if_writable() != 0)
1336  WalSndShutdown();
1337  }
1338 
1339  /* reactivate latch so WalSndLoop knows to continue */
1340  SetLatch(MyLatch);
1341 }
1342 
1343 /*
1344  * LogicalDecodingContext 'update_progress' callback.
1345  *
1346  * Write the current position to the lag tracker (see XLogSendPhysical).
1347  */
1348 static void
1350 {
1351  static TimestampTz sendTime = 0;
1353 
1354  /*
1355  * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1356  * avoid flooding the lag tracker when we commit frequently.
1357  */
1358 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1359  if (!TimestampDifferenceExceeds(sendTime, now,
1361  return;
1362 
1363  LagTrackerWrite(lsn, now);
1364  sendTime = now;
1365 }
1366 
1367 /*
1368  * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
1369  *
1370  * Returns end LSN of flushed WAL. Normally this will be >= loc, but
1371  * if we detect a shutdown request (either from postmaster or client)
1372  * we will return early, so caller must always check.
1373  */
1374 static XLogRecPtr
1376 {
1377  int wakeEvents;
1378  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1379 
1380  /*
1381  * Fast path to avoid acquiring the spinlock in case we already know we
1382  * have enough WAL available. This is particularly interesting if we're
1383  * far behind.
1384  */
1385  if (RecentFlushPtr != InvalidXLogRecPtr &&
1386  loc <= RecentFlushPtr)
1387  return RecentFlushPtr;
1388 
1389  /* Get a more recent flush pointer. */
1390  if (!RecoveryInProgress())
1391  RecentFlushPtr = GetFlushRecPtr();
1392  else
1393  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1394 
1395  for (;;)
1396  {
1397  long sleeptime;
1398 
1399  /* Clear any already-pending wakeups */
1401 
1403 
1404  /* Process any requests or signals received recently */
1405  if (ConfigReloadPending)
1406  {
1407  ConfigReloadPending = false;
1410  }
1411 
1412  /* Check for input from the client */
1414 
1415  /*
1416  * If we're shutting down, trigger pending WAL to be written out,
1417  * otherwise we'd possibly end up waiting for WAL that never gets
1418  * written, because walwriter has shut down already.
1419  */
1420  if (got_STOPPING)
1422 
1423  /* Update our idea of the currently flushed position. */
1424  if (!RecoveryInProgress())
1425  RecentFlushPtr = GetFlushRecPtr();
1426  else
1427  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1428 
1429  /*
1430  * If postmaster asked us to stop, don't wait anymore.
1431  *
1432  * It's important to do this check after the recomputation of
1433  * RecentFlushPtr, so we can send all remaining data before shutting
1434  * down.
1435  */
1436  if (got_STOPPING)
1437  break;
1438 
1439  /*
1440  * We only send regular messages to the client for full decoded
1441  * transactions, but a synchronous replication and walsender shutdown
1442  * possibly are waiting for a later location. So, before sleeping, we
1443  * send a ping containing the flush location. If the receiver is
1444  * otherwise idle, this keepalive will trigger a reply. Processing the
1445  * reply will update these MyWalSnd locations.
1446  */
1447  if (MyWalSnd->flush < sentPtr &&
1448  MyWalSnd->write < sentPtr &&
1450  WalSndKeepalive(false);
1451 
1452  /* check whether we're done */
1453  if (loc <= RecentFlushPtr)
1454  break;
1455 
1456  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1457  WalSndCaughtUp = true;
1458 
1459  /*
1460  * Try to flush any pending output to the client.
1461  */
1462  if (pq_flush_if_writable() != 0)
1463  WalSndShutdown();
1464 
1465  /*
1466  * If we have received CopyDone from the client, sent CopyDone
1467  * ourselves, and the output buffer is empty, it's time to exit
1468  * streaming, so fail the current WAL fetch request.
1469  */
1471  !pq_is_send_pending())
1472  break;
1473 
1474  /* die if timeout was reached */
1476 
1477  /* Send keepalive if the time has come */
1479 
1480  /*
1481  * Sleep until something happens or we time out. Also wait for the
1482  * socket becoming writable, if there's still pending output.
1483  * Otherwise we might sit on sendable output data while waiting for
1484  * new WAL to be generated. (But if we have nothing to send, we don't
1485  * want to wake on socket-writable.)
1486  */
1488 
1489  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
1491 
1492  if (pq_is_send_pending())
1493  wakeEvents |= WL_SOCKET_WRITEABLE;
1494 
1495  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
1496  MyProcPort->sock, sleeptime,
1498  }
1499 
1500  /* reactivate latch so WalSndLoop knows to continue */
1501  SetLatch(MyLatch);
1502  return RecentFlushPtr;
1503 }
1504 
1505 /*
1506  * Execute an incoming replication command.
1507  *
1508  * Returns true if the cmd_string was recognized as WalSender command, false
1509  * if not.
1510  */
1511 bool
1512 exec_replication_command(const char *cmd_string)
1513 {
1514  int parse_rc;
1515  Node *cmd_node;
1516  const char *cmdtag;
1517  MemoryContext cmd_context;
1518  MemoryContext old_context;
1519 
1520  /*
1521  * If WAL sender has been told that shutdown is getting close, switch its
1522  * status accordingly to handle the next replication commands correctly.
1523  */
1524  if (got_STOPPING)
1526 
1527  /*
1528  * Throw error if in stopping mode. We need prevent commands that could
1529  * generate WAL while the shutdown checkpoint is being written. To be
1530  * safe, we just prohibit all new commands.
1531  */
1532  if (MyWalSnd->state == WALSNDSTATE_STOPPING)
1533  ereport(ERROR,
1534  (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1535 
1536  /*
1537  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1538  * command arrives. Clean up the old stuff if there's anything.
1539  */
1541 
1543 
1544  /*
1545  * Parse the command.
1546  */
1548  "Replication command context",
1550  old_context = MemoryContextSwitchTo(cmd_context);
1551 
1552  replication_scanner_init(cmd_string);
1553  parse_rc = replication_yyparse();
1554  if (parse_rc != 0)
1555  ereport(ERROR,
1556  (errcode(ERRCODE_SYNTAX_ERROR),
1557  errmsg_internal("replication command parser returned %d",
1558  parse_rc)));
1560 
1561  cmd_node = replication_parse_result;
1562 
1563  /*
1564  * If it's a SQL command, just clean up our mess and return false; the
1565  * caller will take care of executing it.
1566  */
1567  if (IsA(cmd_node, SQLCmd))
1568  {
1569  if (MyDatabaseId == InvalidOid)
1570  ereport(ERROR,
1571  (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1572 
1573  MemoryContextSwitchTo(old_context);
1574  MemoryContextDelete(cmd_context);
1575 
1576  /* Tell the caller that this wasn't a WalSender command. */
1577  return false;
1578  }
1579 
1580  /*
1581  * Report query to various monitoring facilities. For this purpose, we
1582  * report replication commands just like SQL commands.
1583  */
1584  debug_query_string = cmd_string;
1585 
1587 
1588  /*
1589  * Log replication command if log_replication_commands is enabled. Even
1590  * when it's disabled, log the command with DEBUG1 level for backward
1591  * compatibility.
1592  */
1594  (errmsg("received replication command: %s", cmd_string)));
1595 
1596  /*
1597  * Disallow replication commands in aborted transaction blocks.
1598  */
1600  ereport(ERROR,
1601  (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1602  errmsg("current transaction is aborted, "
1603  "commands ignored until end of transaction block")));
1604 
1606 
1607  /*
1608  * Allocate buffers that will be used for each outgoing and incoming
1609  * message. We do this just once per command to reduce palloc overhead.
1610  */
1611  initStringInfo(&output_message);
1612  initStringInfo(&reply_message);
1613  initStringInfo(&tmpbuf);
1614 
1615  switch (cmd_node->type)
1616  {
1617  case T_IdentifySystemCmd:
1618  cmdtag = "IDENTIFY_SYSTEM";
1619  set_ps_display(cmdtag);
1620  IdentifySystem();
1621  EndReplicationCommand(cmdtag);
1622  break;
1623 
1624  case T_BaseBackupCmd:
1625  cmdtag = "BASE_BACKUP";
1626  set_ps_display(cmdtag);
1627  PreventInTransactionBlock(true, cmdtag);
1628  SendBaseBackup((BaseBackupCmd *) cmd_node);
1629  EndReplicationCommand(cmdtag);
1630  break;
1631 
1633  cmdtag = "CREATE_REPLICATION_SLOT";
1634  set_ps_display(cmdtag);
1636  EndReplicationCommand(cmdtag);
1637  break;
1638 
1640  cmdtag = "DROP_REPLICATION_SLOT";
1641  set_ps_display(cmdtag);
1643  EndReplicationCommand(cmdtag);
1644  break;
1645 
1646  case T_StartReplicationCmd:
1647  {
1648  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1649 
1650  cmdtag = "START_REPLICATION";
1651  set_ps_display(cmdtag);
1652  PreventInTransactionBlock(true, cmdtag);
1653 
1654  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1655  StartReplication(cmd);
1656  else
1658 
1659  /* callees already sent their own completion message */
1660 
1661  Assert(xlogreader != NULL);
1662  break;
1663  }
1664 
1665  case T_TimeLineHistoryCmd:
1666  cmdtag = "TIMELINE_HISTORY";
1667  set_ps_display(cmdtag);
1668  PreventInTransactionBlock(true, cmdtag);
1670  EndReplicationCommand(cmdtag);
1671  break;
1672 
1673  case T_VariableShowStmt:
1674  {
1676  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1677 
1678  cmdtag = "SHOW";
1679  set_ps_display(cmdtag);
1680 
1681  /* syscache access needs a transaction environment */
1683  GetPGVariable(n->name, dest);
1685  EndReplicationCommand(cmdtag);
1686  }
1687  break;
1688 
1689  default:
1690  elog(ERROR, "unrecognized replication command node tag: %u",
1691  cmd_node->type);
1692  }
1693 
1694  /* done */
1695  MemoryContextSwitchTo(old_context);
1696  MemoryContextDelete(cmd_context);
1697 
1698  /*
1699  * We need not update ps display or pg_stat_activity, because PostgresMain
1700  * will reset those to "idle". But we must reset debug_query_string to
1701  * ensure it doesn't become a dangling pointer.
1702  */
1703  debug_query_string = NULL;
1704 
1705  return true;
1706 }
1707 
1708 /*
1709  * Process any incoming messages while streaming. Also checks if the remote
1710  * end has closed the connection.
1711  */
1712 static void
1714 {
1715  unsigned char firstchar;
1716  int r;
1717  bool received = false;
1718 
1720 
1721  for (;;)
1722  {
1723  pq_startmsgread();
1724  r = pq_getbyte_if_available(&firstchar);
1725  if (r < 0)
1726  {
1727  /* unexpected error or EOF */
1729  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1730  errmsg("unexpected EOF on standby connection")));
1731  proc_exit(0);
1732  }
1733  if (r == 0)
1734  {
1735  /* no data available without blocking */
1736  pq_endmsgread();
1737  break;
1738  }
1739 
1740  /* Read the message contents */
1741  resetStringInfo(&reply_message);
1742  if (pq_getmessage(&reply_message, 0))
1743  {
1745  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1746  errmsg("unexpected EOF on standby connection")));
1747  proc_exit(0);
1748  }
1749 
1750  /*
1751  * If we already received a CopyDone from the frontend, the frontend
1752  * should not send us anything until we've closed our end of the COPY.
1753  * XXX: In theory, the frontend could already send the next command
1754  * before receiving the CopyDone, but libpq doesn't currently allow
1755  * that.
1756  */
1757  if (streamingDoneReceiving && firstchar != 'X')
1758  ereport(FATAL,
1759  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1760  errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
1761  firstchar)));
1762 
1763  /* Handle the very limited subset of commands expected in this phase */
1764  switch (firstchar)
1765  {
1766  /*
1767  * 'd' means a standby reply wrapped in a CopyData packet.
1768  */
1769  case 'd':
1771  received = true;
1772  break;
1773 
1774  /*
1775  * CopyDone means the standby requested to finish streaming.
1776  * Reply with CopyDone, if we had not sent that already.
1777  */
1778  case 'c':
1779  if (!streamingDoneSending)
1780  {
1781  pq_putmessage_noblock('c', NULL, 0);
1782  streamingDoneSending = true;
1783  }
1784 
1785  streamingDoneReceiving = true;
1786  received = true;
1787  break;
1788 
1789  /*
1790  * 'X' means that the standby is closing down the socket.
1791  */
1792  case 'X':
1793  proc_exit(0);
1794 
1795  default:
1796  ereport(FATAL,
1797  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1798  errmsg("invalid standby message type \"%c\"",
1799  firstchar)));
1800  }
1801  }
1802 
1803  /*
1804  * Save the last reply timestamp if we've received at least one reply.
1805  */
1806  if (received)
1807  {
1809  waiting_for_ping_response = false;
1810  }
1811 }
1812 
1813 /*
1814  * Process a status update message received from standby.
1815  */
1816 static void
1818 {
1819  char msgtype;
1820 
1821  /*
1822  * Check message type from the first byte.
1823  */
1824  msgtype = pq_getmsgbyte(&reply_message);
1825 
1826  switch (msgtype)
1827  {
1828  case 'r':
1830  break;
1831 
1832  case 'h':
1834  break;
1835 
1836  default:
1838  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1839  errmsg("unexpected message type \"%c\"", msgtype)));
1840  proc_exit(0);
1841  }
1842 }
1843 
1844 /*
1845  * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
1846  */
1847 static void
1849 {
1850  bool changed = false;
1852 
1853  Assert(lsn != InvalidXLogRecPtr);
1854  SpinLockAcquire(&slot->mutex);
1855  if (slot->data.restart_lsn != lsn)
1856  {
1857  changed = true;
1858  slot->data.restart_lsn = lsn;
1859  }
1860  SpinLockRelease(&slot->mutex);
1861 
1862  if (changed)
1863  {
1866  }
1867 
1868  /*
1869  * One could argue that the slot should be saved to disk now, but that'd
1870  * be energy wasted - the worst lost information can do here is give us
1871  * wrong information in a statistics view - we'll just potentially be more
1872  * conservative in removing files.
1873  */
1874 }
1875 
1876 /*
1877  * Regular reply from standby advising of WAL locations on standby server.
1878  */
1879 static void
1881 {
1882  XLogRecPtr writePtr,
1883  flushPtr,
1884  applyPtr;
1885  bool replyRequested;
1886  TimeOffset writeLag,
1887  flushLag,
1888  applyLag;
1889  bool clearLagTimes;
1890  TimestampTz now;
1891  TimestampTz replyTime;
1892 
1893  static bool fullyAppliedLastTime = false;
1894 
1895  /* the caller already consumed the msgtype byte */
1896  writePtr = pq_getmsgint64(&reply_message);
1897  flushPtr = pq_getmsgint64(&reply_message);
1898  applyPtr = pq_getmsgint64(&reply_message);
1899  replyTime = pq_getmsgint64(&reply_message);
1900  replyRequested = pq_getmsgbyte(&reply_message);
1901 
1902  if (log_min_messages <= DEBUG2)
1903  {
1904  char *replyTimeStr;
1905 
1906  /* Copy because timestamptz_to_str returns a static buffer */
1907  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
1908 
1909  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
1910  (uint32) (writePtr >> 32), (uint32) writePtr,
1911  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1912  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1913  replyRequested ? " (reply requested)" : "",
1914  replyTimeStr);
1915 
1916  pfree(replyTimeStr);
1917  }
1918 
1919  /* See if we can compute the round-trip lag for these positions. */
1920  now = GetCurrentTimestamp();
1921  writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1922  flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1923  applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1924 
1925  /*
1926  * If the standby reports that it has fully replayed the WAL in two
1927  * consecutive reply messages, then the second such message must result
1928  * from wal_receiver_status_interval expiring on the standby. This is a
1929  * convenient time to forget the lag times measured when it last
1930  * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1931  * until more WAL traffic arrives.
1932  */
1933  clearLagTimes = false;
1934  if (applyPtr == sentPtr)
1935  {
1936  if (fullyAppliedLastTime)
1937  clearLagTimes = true;
1938  fullyAppliedLastTime = true;
1939  }
1940  else
1941  fullyAppliedLastTime = false;
1942 
1943  /* Send a reply if the standby requested one. */
1944  if (replyRequested)
1945  WalSndKeepalive(false);
1946 
1947  /*
1948  * Update shared state for this WalSender process based on reply data from
1949  * standby.
1950  */
1951  {
1952  WalSnd *walsnd = MyWalSnd;
1953 
1954  SpinLockAcquire(&walsnd->mutex);
1955  walsnd->write = writePtr;
1956  walsnd->flush = flushPtr;
1957  walsnd->apply = applyPtr;
1958  if (writeLag != -1 || clearLagTimes)
1959  walsnd->writeLag = writeLag;
1960  if (flushLag != -1 || clearLagTimes)
1961  walsnd->flushLag = flushLag;
1962  if (applyLag != -1 || clearLagTimes)
1963  walsnd->applyLag = applyLag;
1964  walsnd->replyTime = replyTime;
1965  SpinLockRelease(&walsnd->mutex);
1966  }
1967 
1970 
1971  /*
1972  * Advance our local xmin horizon when the client confirmed a flush.
1973  */
1974  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1975  {
1978  else
1980  }
1981 }
1982 
1983 /* compute new replication slot xmin horizon if needed */
1984 static void
1986 {
1987  bool changed = false;
1989 
1990  SpinLockAcquire(&slot->mutex);
1992 
1993  /*
1994  * For physical replication we don't need the interlock provided by xmin
1995  * and effective_xmin since the consequences of a missed increase are
1996  * limited to query cancellations, so set both at once.
1997  */
1998  if (!TransactionIdIsNormal(slot->data.xmin) ||
1999  !TransactionIdIsNormal(feedbackXmin) ||
2000  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
2001  {
2002  changed = true;
2003  slot->data.xmin = feedbackXmin;
2004  slot->effective_xmin = feedbackXmin;
2005  }
2006  if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
2007  !TransactionIdIsNormal(feedbackCatalogXmin) ||
2008  TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
2009  {
2010  changed = true;
2011  slot->data.catalog_xmin = feedbackCatalogXmin;
2012  slot->effective_catalog_xmin = feedbackCatalogXmin;
2013  }
2014  SpinLockRelease(&slot->mutex);
2015 
2016  if (changed)
2017  {
2020  }
2021 }
2022 
2023 /*
2024  * Check that the provided xmin/epoch are sane, that is, not in the future
2025  * and not so far back as to be already wrapped around.
2026  *
2027  * Epoch of nextXid should be same as standby, or if the counter has
2028  * wrapped, then one greater than standby.
2029  *
2030  * This check doesn't care about whether clog exists for these xids
2031  * at all.
2032  */
2033 static bool
2035 {
2036  FullTransactionId nextFullXid;
2037  TransactionId nextXid;
2038  uint32 nextEpoch;
2039 
2040  nextFullXid = ReadNextFullTransactionId();
2041  nextXid = XidFromFullTransactionId(nextFullXid);
2042  nextEpoch = EpochFromFullTransactionId(nextFullXid);
2043 
2044  if (xid <= nextXid)
2045  {
2046  if (epoch != nextEpoch)
2047  return false;
2048  }
2049  else
2050  {
2051  if (epoch + 1 != nextEpoch)
2052  return false;
2053  }
2054 
2055  if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2056  return false; /* epoch OK, but it's wrapped around */
2057 
2058  return true;
2059 }
2060 
2061 /*
2062  * Hot Standby feedback
2063  */
2064 static void
2066 {
2067  TransactionId feedbackXmin;
2068  uint32 feedbackEpoch;
2069  TransactionId feedbackCatalogXmin;
2070  uint32 feedbackCatalogEpoch;
2071  TimestampTz replyTime;
2072 
2073  /*
2074  * Decipher the reply message. The caller already consumed the msgtype
2075  * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2076  * of this message.
2077  */
2078  replyTime = pq_getmsgint64(&reply_message);
2079  feedbackXmin = pq_getmsgint(&reply_message, 4);
2080  feedbackEpoch = pq_getmsgint(&reply_message, 4);
2081  feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2082  feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2083 
2084  if (log_min_messages <= DEBUG2)
2085  {
2086  char *replyTimeStr;
2087 
2088  /* Copy because timestamptz_to_str returns a static buffer */
2089  replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2090 
2091  elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2092  feedbackXmin,
2093  feedbackEpoch,
2094  feedbackCatalogXmin,
2095  feedbackCatalogEpoch,
2096  replyTimeStr);
2097 
2098  pfree(replyTimeStr);
2099  }
2100 
2101  /*
2102  * Update shared state for this WalSender process based on reply data from
2103  * standby.
2104  */
2105  {
2106  WalSnd *walsnd = MyWalSnd;
2107 
2108  SpinLockAcquire(&walsnd->mutex);
2109  walsnd->replyTime = replyTime;
2110  SpinLockRelease(&walsnd->mutex);
2111  }
2112 
2113  /*
2114  * Unset WalSender's xmins if the feedback message values are invalid.
2115  * This happens when the downstream turned hot_standby_feedback off.
2116  */
2117  if (!TransactionIdIsNormal(feedbackXmin)
2118  && !TransactionIdIsNormal(feedbackCatalogXmin))
2119  {
2121  if (MyReplicationSlot != NULL)
2122  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2123  return;
2124  }
2125 
2126  /*
2127  * Check that the provided xmin/epoch are sane, that is, not in the future
2128  * and not so far back as to be already wrapped around. Ignore if not.
2129  */
2130  if (TransactionIdIsNormal(feedbackXmin) &&
2131  !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2132  return;
2133 
2134  if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2135  !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2136  return;
2137 
2138  /*
2139  * Set the WalSender's xmin equal to the standby's requested xmin, so that
2140  * the xmin will be taken into account by GetSnapshotData() /
2141  * ComputeXidHorizons(). This will hold back the removal of dead rows and
2142  * thereby prevent the generation of cleanup conflicts on the standby
2143  * server.
2144  *
2145  * There is a small window for a race condition here: although we just
2146  * checked that feedbackXmin precedes nextXid, the nextXid could have
2147  * gotten advanced between our fetching it and applying the xmin below,
2148  * perhaps far enough to make feedbackXmin wrap around. In that case the
2149  * xmin we set here would be "in the future" and have no effect. No point
2150  * in worrying about this since it's too late to save the desired data
2151  * anyway. Assuming that the standby sends us an increasing sequence of
2152  * xmins, this could only happen during the first reply cycle, else our
2153  * own xmin would prevent nextXid from advancing so far.
2154  *
2155  * We don't bother taking the ProcArrayLock here. Setting the xmin field
2156  * is assumed atomic, and there's no real need to prevent concurrent
2157  * horizon determinations. (If we're moving our xmin forward, this is
2158  * obviously safe, and if we're moving it backwards, well, the data is at
2159  * risk already since a VACUUM could already have determined the horizon.)
2160  *
2161  * If we're using a replication slot we reserve the xmin via that,
2162  * otherwise via the walsender's PGPROC entry. We can only track the
2163  * catalog xmin separately when using a slot, so we store the least of the
2164  * two provided when not using a slot.
2165  *
2166  * XXX: It might make sense to generalize the ephemeral slot concept and
2167  * always use the slot mechanism to handle the feedback xmin.
2168  */
2169  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2170  PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2171  else
2172  {
2173  if (TransactionIdIsNormal(feedbackCatalogXmin)
2174  && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2175  MyProc->xmin = feedbackCatalogXmin;
2176  else
2177  MyProc->xmin = feedbackXmin;
2178  }
2179 }
2180 
2181 /*
2182  * Compute how long send/receive loops should sleep.
2183  *
2184  * If wal_sender_timeout is enabled we want to wake up in time to send
2185  * keepalives and to abort the connection if wal_sender_timeout has been
2186  * reached.
2187  */
2188 static long
2190 {
2191  long sleeptime = 10000; /* 10 s */
2192 
2194  {
2195  TimestampTz wakeup_time;
2196  long sec_to_timeout;
2197  int microsec_to_timeout;
2198 
2199  /*
2200  * At the latest stop sleeping once wal_sender_timeout has been
2201  * reached.
2202  */
2205 
2206  /*
2207  * If no ping has been sent yet, wakeup when it's time to do so.
2208  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2209  * the timeout passed without a response.
2210  */
2213  wal_sender_timeout / 2);
2214 
2215  /* Compute relative time until wakeup. */
2216  TimestampDifference(now, wakeup_time,
2217  &sec_to_timeout, &microsec_to_timeout);
2218 
2219  sleeptime = sec_to_timeout * 1000 +
2220  microsec_to_timeout / 1000;
2221  }
2222 
2223  return sleeptime;
2224 }
2225 
2226 /*
2227  * Check whether there have been responses by the client within
2228  * wal_sender_timeout and shutdown if not. Using last_processing as the
2229  * reference point avoids counting server-side stalls against the client.
2230  * However, a long server-side stall can make WalSndKeepaliveIfNecessary()
2231  * postdate last_processing by more than wal_sender_timeout. If that happens,
2232  * the client must reply almost immediately to avoid a timeout. This rarely
2233  * affects the default configuration, under which clients spontaneously send a
2234  * message every standby_message_timeout = wal_sender_timeout/6 = 10s. We
2235  * could eliminate that problem by recognizing timeout expiration at
2236  * wal_sender_timeout/2 after the keepalive.
2237  */
2238 static void
2240 {
2241  TimestampTz timeout;
2242 
2243  /* don't bail out if we're doing something that doesn't require timeouts */
2244  if (last_reply_timestamp <= 0)
2245  return;
2246 
2249 
2250  if (wal_sender_timeout > 0 && last_processing >= timeout)
2251  {
2252  /*
2253  * Since typically expiration of replication timeout means
2254  * communication problem, we don't send the error message to the
2255  * standby.
2256  */
2258  (errmsg("terminating walsender process due to replication timeout")));
2259 
2260  WalSndShutdown();
2261  }
2262 }
2263 
2264 /* Main loop of walsender process that streams the WAL over Copy messages. */
2265 static void
2267 {
2268  /*
2269  * Initialize the last reply timestamp. That enables timeout processing
2270  * from hereon.
2271  */
2273  waiting_for_ping_response = false;
2274 
2275  /*
2276  * Loop until we reach the end of this timeline or the client requests to
2277  * stop streaming.
2278  */
2279  for (;;)
2280  {
2281  /* Clear any already-pending wakeups */
2283 
2285 
2286  /* Process any requests or signals received recently */
2287  if (ConfigReloadPending)
2288  {
2289  ConfigReloadPending = false;
2292  }
2293 
2294  /* Check for input from the client */
2296 
2297  /*
2298  * If we have received CopyDone from the client, sent CopyDone
2299  * ourselves, and the output buffer is empty, it's time to exit
2300  * streaming.
2301  */
2303  !pq_is_send_pending())
2304  break;
2305 
2306  /*
2307  * If we don't have any pending data in the output buffer, try to send
2308  * some more. If there is some, we don't bother to call send_data
2309  * again until we've flushed it ... but we'd better assume we are not
2310  * caught up.
2311  */
2312  if (!pq_is_send_pending())
2313  send_data();
2314  else
2315  WalSndCaughtUp = false;
2316 
2317  /* Try to flush pending output to the client */
2318  if (pq_flush_if_writable() != 0)
2319  WalSndShutdown();
2320 
2321  /* If nothing remains to be sent right now ... */
2323  {
2324  /*
2325  * If we're in catchup state, move to streaming. This is an
2326  * important state change for users to know about, since before
2327  * this point data loss might occur if the primary dies and we
2328  * need to failover to the standby. The state change is also
2329  * important for synchronous replication, since commits that
2330  * started to wait at that point might wait for some time.
2331  */
2332  if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
2333  {
2334  ereport(DEBUG1,
2335  (errmsg("\"%s\" has now caught up with upstream server",
2336  application_name)));
2338  }
2339 
2340  /*
2341  * When SIGUSR2 arrives, we send any outstanding logs up to the
2342  * shutdown checkpoint record (i.e., the latest record), wait for
2343  * them to be replicated to the standby, and exit. This may be a
2344  * normal termination at shutdown, or a promotion, the walsender
2345  * is not sure which.
2346  */
2347  if (got_SIGUSR2)
2348  WalSndDone(send_data);
2349  }
2350 
2351  /* Check for replication timeout. */
2353 
2354  /* Send keepalive if the time has come */
2356 
2357  /*
2358  * Block if we have unsent data. XXX For logical replication, let
2359  * WalSndWaitForWal() handle any other blocking; idle receivers need
2360  * its additional actions. For physical replication, also block if
2361  * caught up; its send_data does not block.
2362  */
2363  if ((WalSndCaughtUp && send_data != XLogSendLogical &&
2366  {
2367  long sleeptime;
2368  int wakeEvents;
2369 
2370  wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT |
2372 
2373  /*
2374  * Use fresh timestamp, not last_processing, to reduce the chance
2375  * of reaching wal_sender_timeout before sending a keepalive.
2376  */
2378 
2379  if (pq_is_send_pending())
2380  wakeEvents |= WL_SOCKET_WRITEABLE;
2381 
2382  /* Sleep until something happens or we time out */
2383  (void) WaitLatchOrSocket(MyLatch, wakeEvents,
2384  MyProcPort->sock, sleeptime,
2386  }
2387  }
2388 }
2389 
2390 /* Initialize a per-walsender data structure for this walsender process */
2391 static void
2393 {
2394  int i;
2395 
2396  /*
2397  * WalSndCtl should be set up already (we inherit this by fork() or
2398  * EXEC_BACKEND mechanism from the postmaster).
2399  */
2400  Assert(WalSndCtl != NULL);
2401  Assert(MyWalSnd == NULL);
2402 
2403  /*
2404  * Find a free walsender slot and reserve it. This must not fail due to
2405  * the prior check for free WAL senders in InitProcess().
2406  */
2407  for (i = 0; i < max_wal_senders; i++)
2408  {
2409  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2410 
2411  SpinLockAcquire(&walsnd->mutex);
2412 
2413  if (walsnd->pid != 0)
2414  {
2415  SpinLockRelease(&walsnd->mutex);
2416  continue;
2417  }
2418  else
2419  {
2420  /*
2421  * Found a free slot. Reserve it for us.
2422  */
2423  walsnd->pid = MyProcPid;
2424  walsnd->state = WALSNDSTATE_STARTUP;
2425  walsnd->sentPtr = InvalidXLogRecPtr;
2426  walsnd->needreload = false;
2427  walsnd->write = InvalidXLogRecPtr;
2428  walsnd->flush = InvalidXLogRecPtr;
2429  walsnd->apply = InvalidXLogRecPtr;
2430  walsnd->writeLag = -1;
2431  walsnd->flushLag = -1;
2432  walsnd->applyLag = -1;
2433  walsnd->sync_standby_priority = 0;
2434  walsnd->latch = &MyProc->procLatch;
2435  walsnd->replyTime = 0;
2436  SpinLockRelease(&walsnd->mutex);
2437  /* don't need the lock anymore */
2438  MyWalSnd = (WalSnd *) walsnd;
2439 
2440  break;
2441  }
2442  }
2443 
2444  Assert(MyWalSnd != NULL);
2445 
2446  /* Arrange to clean up at walsender exit */
2448 }
2449 
2450 /* Destroy the per-walsender data structure for this walsender process */
2451 static void
2453 {
2454  WalSnd *walsnd = MyWalSnd;
2455 
2456  Assert(walsnd != NULL);
2457 
2458  MyWalSnd = NULL;
2459 
2460  SpinLockAcquire(&walsnd->mutex);
2461  /* clear latch while holding the spinlock, so it can safely be read */
2462  walsnd->latch = NULL;
2463  /* Mark WalSnd struct as no longer being in use. */
2464  walsnd->pid = 0;
2465  SpinLockRelease(&walsnd->mutex);
2466 }
2467 
2468 /* XLogReaderRoutine->segment_open callback */
2469 static void
2471  TimeLineID *tli_p)
2472 {
2473  char path[MAXPGPATH];
2474 
2475  /*-------
2476  * When reading from a historic timeline, and there is a timeline switch
2477  * within this segment, read from the WAL segment belonging to the new
2478  * timeline.
2479  *
2480  * For example, imagine that this server is currently on timeline 5, and
2481  * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
2482  * 0/13002088. In pg_wal, we have these files:
2483  *
2484  * ...
2485  * 000000040000000000000012
2486  * 000000040000000000000013
2487  * 000000050000000000000013
2488  * 000000050000000000000014
2489  * ...
2490  *
2491  * In this situation, when requested to send the WAL from segment 0x13, on
2492  * timeline 4, we read the WAL from file 000000050000000000000013. Archive
2493  * recovery prefers files from newer timelines, so if the segment was
2494  * restored from the archive on this server, the file belonging to the old
2495  * timeline, 000000040000000000000013, might not exist. Their contents are
2496  * equal up to the switchpoint, because at a timeline switch, the used
2497  * portion of the old segment is copied to the new file. -------
2498  */
2499  *tli_p = sendTimeLine;
2501  {
2502  XLogSegNo endSegNo;
2503 
2505  if (state->seg.ws_segno == endSegNo)
2506  *tli_p = sendTimeLineNextTLI;
2507  }
2508 
2509  XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
2510  state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2511  if (state->seg.ws_file >= 0)
2512  return;
2513 
2514  /*
2515  * If the file is not found, assume it's because the standby asked for a
2516  * too old WAL segment that has already been removed or recycled.
2517  */
2518  if (errno == ENOENT)
2519  {
2520  char xlogfname[MAXFNAMELEN];
2521  int save_errno = errno;
2522 
2523  XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
2524  errno = save_errno;
2525  ereport(ERROR,
2527  errmsg("requested WAL segment %s has already been removed",
2528  xlogfname)));
2529  }
2530  else
2531  ereport(ERROR,
2533  errmsg("could not open file \"%s\": %m",
2534  path)));
2535 }
2536 
2537 /*
2538  * Send out the WAL in its normal physical/stored form.
2539  *
2540  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
2541  * but not yet sent to the client, and buffer it in the libpq output
2542  * buffer.
2543  *
2544  * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
2545  * otherwise WalSndCaughtUp is set to false.
2546  */
2547 static void
2549 {
2550  XLogRecPtr SendRqstPtr;
2552  XLogRecPtr endptr;
2553  Size nbytes;
2554  XLogSegNo segno;
2555  WALReadError errinfo;
2556 
2557  /* If requested switch the WAL sender to the stopping state. */
2558  if (got_STOPPING)
2560 
2562  {
2563  WalSndCaughtUp = true;
2564  return;
2565  }
2566 
2567  /* Figure out how far we can safely send the WAL. */
2569  {
2570  /*
2571  * Streaming an old timeline that's in this server's history, but is
2572  * not the one we're currently inserting or replaying. It can be
2573  * streamed up to the point where we switched off that timeline.
2574  */
2575  SendRqstPtr = sendTimeLineValidUpto;
2576  }
2577  else if (am_cascading_walsender)
2578  {
2579  /*
2580  * Streaming the latest timeline on a standby.
2581  *
2582  * Attempt to send all WAL that has already been replayed, so that we
2583  * know it's valid. If we're receiving WAL through streaming
2584  * replication, it's also OK to send any WAL that has been received
2585  * but not replayed.
2586  *
2587  * The timeline we're recovering from can change, or we can be
2588  * promoted. In either case, the current timeline becomes historic. We
2589  * need to detect that so that we don't try to stream past the point
2590  * where we switched to another timeline. We check for promotion or
2591  * timeline switch after calculating FlushPtr, to avoid a race
2592  * condition: if the timeline becomes historic just after we checked
2593  * that it was still current, it's still be OK to stream it up to the
2594  * FlushPtr that was calculated before it became historic.
2595  */
2596  bool becameHistoric = false;
2597 
2598  SendRqstPtr = GetStandbyFlushRecPtr();
2599 
2600  if (!RecoveryInProgress())
2601  {
2602  /*
2603  * We have been promoted. RecoveryInProgress() updated
2604  * ThisTimeLineID to the new current timeline.
2605  */
2606  am_cascading_walsender = false;
2607  becameHistoric = true;
2608  }
2609  else
2610  {
2611  /*
2612  * Still a cascading standby. But is the timeline we're sending
2613  * still the one recovery is recovering from? ThisTimeLineID was
2614  * updated by the GetStandbyFlushRecPtr() call above.
2615  */
2617  becameHistoric = true;
2618  }
2619 
2620  if (becameHistoric)
2621  {
2622  /*
2623  * The timeline we were sending has become historic. Read the
2624  * timeline history file of the new timeline to see where exactly
2625  * we forked off from the timeline we were sending.
2626  */
2627  List *history;
2628 
2631 
2633  list_free_deep(history);
2634 
2635  sendTimeLineIsHistoric = true;
2636 
2637  SendRqstPtr = sendTimeLineValidUpto;
2638  }
2639  }
2640  else
2641  {
2642  /*
2643  * Streaming the current timeline on a primary.
2644  *
2645  * Attempt to send all data that's already been written out and
2646  * fsync'd to disk. We cannot go further than what's been written out
2647  * given the current implementation of WALRead(). And in any case
2648  * it's unsafe to send WAL that is not securely down to disk on the
2649  * primary: if the primary subsequently crashes and restarts, standbys
2650  * must not have applied any WAL that got lost on the primary.
2651  */
2652  SendRqstPtr = GetFlushRecPtr();
2653  }
2654 
2655  /*
2656  * Record the current system time as an approximation of the time at which
2657  * this WAL location was written for the purposes of lag tracking.
2658  *
2659  * In theory we could make XLogFlush() record a time in shmem whenever WAL
2660  * is flushed and we could get that time as well as the LSN when we call
2661  * GetFlushRecPtr() above (and likewise for the cascading standby
2662  * equivalent), but rather than putting any new code into the hot WAL path
2663  * it seems good enough to capture the time here. We should reach this
2664  * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2665  * may take some time, we read the WAL flush pointer and take the time
2666  * very close to together here so that we'll get a later position if it is
2667  * still moving.
2668  *
2669  * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
2670  * this gives us a cheap approximation for the WAL flush time for this
2671  * LSN.
2672  *
2673  * Note that the LSN is not necessarily the LSN for the data contained in
2674  * the present message; it's the end of the WAL, which might be further
2675  * ahead. All the lag tracking machinery cares about is finding out when
2676  * that arbitrary LSN is eventually reported as written, flushed and
2677  * applied, so that it can measure the elapsed time.
2678  */
2679  LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2680 
2681  /*
2682  * If this is a historic timeline and we've reached the point where we
2683  * forked to the next timeline, stop streaming.
2684  *
2685  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2686  * startup process will normally replay all WAL that has been received
2687  * from the primary, before promoting, but if the WAL streaming is
2688  * terminated at a WAL page boundary, the valid portion of the timeline
2689  * might end in the middle of a WAL record. We might've already sent the
2690  * first half of that partial WAL record to the cascading standby, so that
2691  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2692  * replay the partial WAL record either, so it can still follow our
2693  * timeline switch.
2694  */
2696  {
2697  /* close the current file. */
2698  if (xlogreader->seg.ws_file >= 0)
2699  wal_segment_close(xlogreader);
2700 
2701  /* Send CopyDone */
2702  pq_putmessage_noblock('c', NULL, 0);
2703  streamingDoneSending = true;
2704 
2705  WalSndCaughtUp = true;
2706 
2707  elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2709  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2710  return;
2711  }
2712 
2713  /* Do we have any work to do? */
2714  Assert(sentPtr <= SendRqstPtr);
2715  if (SendRqstPtr <= sentPtr)
2716  {
2717  WalSndCaughtUp = true;
2718  return;
2719  }
2720 
2721  /*
2722  * Figure out how much to send in one message. If there's no more than
2723  * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2724  * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2725  *
2726  * The rounding is not only for performance reasons. Walreceiver relies on
2727  * the fact that we never split a WAL record across two messages. Since a
2728  * long WAL record is split at page boundary into continuation records,
2729  * page boundary is always a safe cut-off point. We also assume that
2730  * SendRqstPtr never points to the middle of a WAL record.
2731  */
2732  startptr = sentPtr;
2733  endptr = startptr;
2734  endptr += MAX_SEND_SIZE;
2735 
2736  /* if we went beyond SendRqstPtr, back off */
2737  if (SendRqstPtr <= endptr)
2738  {
2739  endptr = SendRqstPtr;
2741  WalSndCaughtUp = false;
2742  else
2743  WalSndCaughtUp = true;
2744  }
2745  else
2746  {
2747  /* round down to page boundary. */
2748  endptr -= (endptr % XLOG_BLCKSZ);
2749  WalSndCaughtUp = false;
2750  }
2751 
2752  nbytes = endptr - startptr;
2753  Assert(nbytes <= MAX_SEND_SIZE);
2754 
2755  /*
2756  * OK to read and send the slice.
2757  */
2758  resetStringInfo(&output_message);
2759  pq_sendbyte(&output_message, 'w');
2760 
2761  pq_sendint64(&output_message, startptr); /* dataStart */
2762  pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2763  pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2764 
2765  /*
2766  * Read the log directly into the output buffer to avoid extra memcpy
2767  * calls.
2768  */
2769  enlargeStringInfo(&output_message, nbytes);
2770 
2771 retry:
2772  if (!WALRead(xlogreader,
2773  &output_message.data[output_message.len],
2774  startptr,
2775  nbytes,
2776  xlogreader->seg.ws_tli, /* Pass the current TLI because
2777  * only WalSndSegmentOpen controls
2778  * whether new TLI is needed. */
2779  &errinfo))
2780  WALReadRaiseError(&errinfo);
2781 
2782  /* See logical_read_xlog_page(). */
2783  XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
2784  CheckXLogRemoved(segno, xlogreader->seg.ws_tli);
2785 
2786  /*
2787  * During recovery, the currently-open WAL file might be replaced with the
2788  * file of the same name retrieved from archive. So we always need to
2789  * check what we read was valid after reading into the buffer. If it's
2790  * invalid, we try to open and read the file again.
2791  */
2793  {
2794  WalSnd *walsnd = MyWalSnd;
2795  bool reload;
2796 
2797  SpinLockAcquire(&walsnd->mutex);
2798  reload = walsnd->needreload;
2799  walsnd->needreload = false;
2800  SpinLockRelease(&walsnd->mutex);
2801 
2802  if (reload && xlogreader->seg.ws_file >= 0)
2803  {
2804  wal_segment_close(xlogreader);
2805 
2806  goto retry;
2807  }
2808  }
2809 
2810  output_message.len += nbytes;
2811  output_message.data[output_message.len] = '\0';
2812 
2813  /*
2814  * Fill the send timestamp last, so that it is taken as late as possible.
2815  */
2816  resetStringInfo(&tmpbuf);
2817  pq_sendint64(&tmpbuf, GetCurrentTimestamp());
2818  memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2819  tmpbuf.data, sizeof(int64));
2820 
2821  pq_putmessage_noblock('d', output_message.data, output_message.len);
2822 
2823  sentPtr = endptr;
2824 
2825  /* Update shared memory status */
2826  {
2827  WalSnd *walsnd = MyWalSnd;
2828 
2829  SpinLockAcquire(&walsnd->mutex);
2830  walsnd->sentPtr = sentPtr;
2831  SpinLockRelease(&walsnd->mutex);
2832  }
2833 
2834  /* Report progress of XLOG streaming in PS display */
2836  {
2837  char activitymsg[50];
2838 
2839  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
2840  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2841  set_ps_display(activitymsg);
2842  }
2843 }
2844 
2845 /*
2846  * Stream out logically decoded data.
2847  */
2848 static void
2850 {
2851  XLogRecord *record;
2852  char *errm;
2853 
2854  /*
2855  * We'll use the current flush point to determine whether we've caught up.
2856  * This variable is static in order to cache it across calls. Caching is
2857  * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
2858  * spinlock.
2859  */
2860  static XLogRecPtr flushPtr = InvalidXLogRecPtr;
2861 
2862  /*
2863  * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
2864  * true in WalSndWaitForWal, if we're actually waiting. We also set to
2865  * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
2866  * didn't wait - i.e. when we're shutting down.
2867  */
2868  WalSndCaughtUp = false;
2869 
2870  record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
2871 
2872  /* xlog record was invalid */
2873  if (errm != NULL)
2874  elog(ERROR, "%s", errm);
2875 
2876  if (record != NULL)
2877  {
2878  /*
2879  * Note the lack of any call to LagTrackerWrite() which is handled by
2880  * WalSndUpdateProgress which is called by output plugin through
2881  * logical decoding write api.
2882  */
2883  LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
2884 
2885  sentPtr = logical_decoding_ctx->reader->EndRecPtr;
2886  }
2887 
2888  /*
2889  * If first time through in this session, initialize flushPtr. Otherwise,
2890  * we only need to update flushPtr if EndRecPtr is past it.
2891  */
2892  if (flushPtr == InvalidXLogRecPtr)
2893  flushPtr = GetFlushRecPtr();
2894  else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2895  flushPtr = GetFlushRecPtr();
2896 
2897  /* If EndRecPtr is still past our flushPtr, it means we caught up. */
2898  if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2899  WalSndCaughtUp = true;
2900 
2901  /*
2902  * If we're caught up and have been requested to stop, have WalSndLoop()
2903  * terminate the connection in an orderly manner, after writing out all
2904  * the pending data.
2905  */
2907  got_SIGUSR2 = true;
2908 
2909  /* Update shared memory status */
2910  {
2911  WalSnd *walsnd = MyWalSnd;
2912 
2913  SpinLockAcquire(&walsnd->mutex);
2914  walsnd->sentPtr = sentPtr;
2915  SpinLockRelease(&walsnd->mutex);
2916  }
2917 }
2918 
2919 /*
2920  * Shutdown if the sender is caught up.
2921  *
2922  * NB: This should only be called when the shutdown signal has been received
2923  * from postmaster.
2924  *
2925  * Note that if we determine that there's still more data to send, this
2926  * function will return control to the caller.
2927  */
2928 static void
2930 {
2931  XLogRecPtr replicatedPtr;
2932 
2933  /* ... let's just be real sure we're caught up ... */
2934  send_data();
2935 
2936  /*
2937  * To figure out whether all WAL has successfully been replicated, check
2938  * flush location if valid, write otherwise. Tools like pg_receivewal will
2939  * usually (unless in synchronous mode) return an invalid flush location.
2940  */
2941  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2942  MyWalSnd->write : MyWalSnd->flush;
2943 
2944  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2945  !pq_is_send_pending())
2946  {
2947  QueryCompletion qc;
2948 
2949  /* Inform the standby that XLOG streaming is done */
2950  SetQueryCompletion(&qc, CMDTAG_COPY, 0);
2951  EndCommand(&qc, DestRemote, false);
2952  pq_flush();
2953 
2954  proc_exit(0);
2955  }
2957  WalSndKeepalive(true);
2958 }
2959 
2960 /*
2961  * Returns the latest point in WAL that has been safely flushed to disk, and
2962  * can be sent to the standby. This should only be called when in recovery,
2963  * ie. we're streaming to a cascaded standby.
2964  *
2965  * As a side-effect, ThisTimeLineID is updated to the TLI of the last
2966  * replayed WAL record.
2967  */
2968 static XLogRecPtr
2970 {
2971  XLogRecPtr replayPtr;
2972  TimeLineID replayTLI;
2973  XLogRecPtr receivePtr;
2975  XLogRecPtr result;
2976 
2977  /*
2978  * We can safely send what's already been replayed. Also, if walreceiver
2979  * is streaming WAL from the same timeline, we can send anything that it
2980  * has streamed, but hasn't been replayed yet.
2981  */
2982 
2983  receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
2984  replayPtr = GetXLogReplayRecPtr(&replayTLI);
2985 
2986  ThisTimeLineID = replayTLI;
2987 
2988  result = replayPtr;
2989  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2990  result = receivePtr;
2991 
2992  return result;
2993 }
2994 
2995 /*
2996  * Request walsenders to reload the currently-open WAL file
2997  */
2998 void
3000 {
3001  int i;
3002 
3003  for (i = 0; i < max_wal_senders; i++)
3004  {
3005  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3006 
3007  SpinLockAcquire(&walsnd->mutex);
3008  if (walsnd->pid == 0)
3009  {
3010  SpinLockRelease(&walsnd->mutex);
3011  continue;
3012  }
3013  walsnd->needreload = true;
3014  SpinLockRelease(&walsnd->mutex);
3015  }
3016 }
3017 
3018 /*
3019  * Handle PROCSIG_WALSND_INIT_STOPPING signal.
3020  */
3021 void
3023 {
3025 
3026  /*
3027  * If replication has not yet started, die like with SIGTERM. If
3028  * replication is active, only set a flag and wake up the main loop. It
3029  * will send any outstanding WAL, wait for it to be replicated to the
3030  * standby, and then exit gracefully.
3031  */
3032  if (!replication_active)
3033  kill(MyProcPid, SIGTERM);
3034  else
3035  got_STOPPING = true;
3036 }
3037 
3038 /*
3039  * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
3040  * sender should already have been switched to WALSNDSTATE_STOPPING at
3041  * this point.
3042  */
3043 static void
3045 {
3046  int save_errno = errno;
3047 
3048  got_SIGUSR2 = true;
3049  SetLatch(MyLatch);
3050 
3051  errno = save_errno;
3052 }
3053 
3054 /* Set up signal handlers */
3055 void
3057 {
3058  /* Set up signal handlers */
3060  pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3061  pqsignal(SIGTERM, die); /* request shutdown */
3062  /* SIGQUIT handler was already set up by InitPostmasterChild */
3063  InitializeTimeouts(); /* establishes SIGALRM handler */
3066  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3067  * shutdown */
3068 
3069  /* Reset some signals that are accepted by postmaster but not here */
3071 }
3072 
3073 /* Report shared-memory space needed by WalSndShmemInit */
3074 Size
3076 {
3077  Size size = 0;
3078 
3079  size = offsetof(WalSndCtlData, walsnds);
3080  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3081 
3082  return size;
3083 }
3084 
3085 /* Allocate and initialize walsender-related shared memory */
3086 void
3088 {
3089  bool found;
3090  int i;
3091 
3092  WalSndCtl = (WalSndCtlData *)
3093  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3094 
3095  if (!found)
3096  {
3097  /* First time through, so initialize */
3098  MemSet(WalSndCtl, 0, WalSndShmemSize());
3099 
3100  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3101  SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
3102 
3103  for (i = 0; i < max_wal_senders; i++)
3104  {
3105  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3106 
3107  SpinLockInit(&walsnd->mutex);
3108  }
3109  }
3110 }
3111 
3112 /*
3113  * Wake up all walsenders
3114  *
3115  * This will be called inside critical sections, so throwing an error is not
3116  * advisable.
3117  */
3118 void
3120 {
3121  int i;
3122 
3123  for (i = 0; i < max_wal_senders; i++)
3124  {
3125  Latch *latch;
3126  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3127 
3128  /*
3129  * Get latch pointer with spinlock held, for the unlikely case that
3130  * pointer reads aren't atomic (as they're 8 bytes).
3131  */
3132  SpinLockAcquire(&walsnd->mutex);
3133  latch = walsnd->latch;
3134  SpinLockRelease(&walsnd->mutex);
3135 
3136  if (latch != NULL)
3137  SetLatch(latch);
3138  }
3139 }
3140 
3141 /*
3142  * Signal all walsenders to move to stopping state.
3143  *
3144  * This will trigger walsenders to move to a state where no further WAL can be
3145  * generated. See this file's header for details.
3146  */
3147 void
3149 {
3150  int i;
3151 
3152  for (i = 0; i < max_wal_senders; i++)
3153  {
3154  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3155  pid_t pid;
3156 
3157  SpinLockAcquire(&walsnd->mutex);
3158  pid = walsnd->pid;
3159  SpinLockRelease(&walsnd->mutex);
3160 
3161  if (pid == 0)
3162  continue;
3163 
3165  }
3166 }
3167 
3168 /*
3169  * Wait that all the WAL senders have quit or reached the stopping state. This
3170  * is used by the checkpointer to control when the shutdown checkpoint can
3171  * safely be performed.
3172  */
3173 void
3175 {
3176  for (;;)
3177  {
3178  int i;
3179  bool all_stopped = true;
3180 
3181  for (i = 0; i < max_wal_senders; i++)
3182  {
3183  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3184 
3185  SpinLockAcquire(&walsnd->mutex);
3186 
3187  if (walsnd->pid == 0)
3188  {
3189  SpinLockRelease(&walsnd->mutex);
3190  continue;
3191  }
3192 
3193  if (walsnd->state != WALSNDSTATE_STOPPING)
3194  {
3195  all_stopped = false;
3196  SpinLockRelease(&walsnd->mutex);
3197  break;
3198  }
3199  SpinLockRelease(&walsnd->mutex);
3200  }
3201 
3202  /* safe to leave if confirmation is done for all WAL senders */
3203  if (all_stopped)
3204  return;
3205 
3206  pg_usleep(10000L); /* wait for 10 msec */
3207  }
3208 }
3209 
3210 /* Set state for current walsender (only called in walsender) */
3211 void
3213 {
3214  WalSnd *walsnd = MyWalSnd;
3215 
3217 
3218  if (walsnd->state == state)
3219  return;
3220 
3221  SpinLockAcquire(&walsnd->mutex);
3222  walsnd->state = state;
3223  SpinLockRelease(&walsnd->mutex);
3224 }
3225 
3226 /*
3227  * Return a string constant representing the state. This is used
3228  * in system views, and should *not* be translated.
3229  */
3230 static const char *
3232 {
3233  switch (state)
3234  {
3235  case WALSNDSTATE_STARTUP:
3236  return "startup";
3237  case WALSNDSTATE_BACKUP:
3238  return "backup";
3239  case WALSNDSTATE_CATCHUP:
3240  return "catchup";
3241  case WALSNDSTATE_STREAMING:
3242  return "streaming";
3243  case WALSNDSTATE_STOPPING:
3244  return "stopping";
3245  }
3246  return "UNKNOWN";
3247 }
3248 
3249 static Interval *
3251 {
3252  Interval *result = palloc(sizeof(Interval));
3253 
3254  result->month = 0;
3255  result->day = 0;
3256  result->time = offset;
3257 
3258  return result;
3259 }
3260 
3261 /*
3262  * Returns activity of walsenders, including pids and xlog locations sent to
3263  * standby servers.
3264  */
3265 Datum
3267 {
3268 #define PG_STAT_GET_WAL_SENDERS_COLS 12
3269  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3270  TupleDesc tupdesc;
3271  Tuplestorestate *tupstore;
3272  MemoryContext per_query_ctx;
3273  MemoryContext oldcontext;
3274  SyncRepStandbyData *sync_standbys;
3275  int num_standbys;
3276  int i;
3277 
3278  /* check to see if caller supports us returning a tuplestore */
3279  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3280  ereport(ERROR,
3281  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3282  errmsg("set-valued function called in context that cannot accept a set")));
3283  if (!(rsinfo->allowedModes & SFRM_Materialize))
3284  ereport(ERROR,
3285  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3286  errmsg("materialize mode required, but it is not allowed in this context")));
3287 
3288  /* Build a tuple descriptor for our result type */
3289  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3290  elog(ERROR, "return type must be a row type");
3291 
3292  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3293  oldcontext = MemoryContextSwitchTo(per_query_ctx);
3294 
3295  tupstore = tuplestore_begin_heap(true, false, work_mem);
3296  rsinfo->returnMode = SFRM_Materialize;
3297  rsinfo->setResult = tupstore;
3298  rsinfo->setDesc = tupdesc;
3299 
3300  MemoryContextSwitchTo(oldcontext);
3301 
3302  /*
3303  * Get the currently active synchronous standbys. This could be out of
3304  * date before we're done, but we'll use the data anyway.
3305  */
3306  num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3307 
3308  for (i = 0; i < max_wal_senders; i++)
3309  {
3310  WalSnd *walsnd = &WalSndCtl->walsnds[i];
3312  XLogRecPtr write;
3313  XLogRecPtr flush;
3314  XLogRecPtr apply;
3315  TimeOffset writeLag;
3316  TimeOffset flushLag;
3317  TimeOffset applyLag;
3318  int priority;
3319  int pid;
3321  TimestampTz replyTime;
3322  bool is_sync_standby;
3324  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3325  int j;
3326 
3327  /* Collect data from shared memory */
3328  SpinLockAcquire(&walsnd->mutex);
3329  if (walsnd->pid == 0)
3330  {
3331  SpinLockRelease(&walsnd->mutex);
3332  continue;
3333  }
3334  pid = walsnd->pid;
3335  sentPtr = walsnd->sentPtr;
3336  state = walsnd->state;
3337  write = walsnd->write;
3338  flush = walsnd->flush;
3339  apply = walsnd->apply;
3340  writeLag = walsnd->writeLag;
3341  flushLag = walsnd->flushLag;
3342  applyLag = walsnd->applyLag;
3343  priority = walsnd->sync_standby_priority;
3344  replyTime = walsnd->replyTime;
3345  SpinLockRelease(&walsnd->mutex);
3346 
3347  /*
3348  * Detect whether walsender is/was considered synchronous. We can
3349  * provide some protection against stale data by checking the PID
3350  * along with walsnd_index.
3351  */
3352  is_sync_standby = false;
3353  for (j = 0; j < num_standbys; j++)
3354  {
3355  if (sync_standbys[j].walsnd_index == i &&
3356  sync_standbys[j].pid == pid)
3357  {
3358  is_sync_standby = true;
3359  break;
3360  }
3361  }
3362 
3363  memset(nulls, 0, sizeof(nulls));
3364  values[0] = Int32GetDatum(pid);
3365 
3366  if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
3367  {
3368  /*
3369  * Only superusers and members of pg_read_all_stats can see
3370  * details. Other users only get the pid value to know it's a
3371  * walsender, but no details.
3372  */
3373  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3374  }
3375  else
3376  {
3377  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3378 
3379  if (XLogRecPtrIsInvalid(sentPtr))
3380  nulls[2] = true;
3381  values[2] = LSNGetDatum(sentPtr);
3382 
3383  if (XLogRecPtrIsInvalid(write))
3384  nulls[3] = true;
3385  values[3] = LSNGetDatum(write);
3386 
3387  if (XLogRecPtrIsInvalid(flush))
3388  nulls[4] = true;
3389  values[4] = LSNGetDatum(flush);
3390 
3391  if (XLogRecPtrIsInvalid(apply))
3392  nulls[5] = true;
3393  values[5] = LSNGetDatum(apply);
3394 
3395  /*
3396  * Treat a standby such as a pg_basebackup background process
3397  * which always returns an invalid flush location, as an
3398  * asynchronous standby.
3399  */
3400  priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3401 
3402  if (writeLag < 0)
3403  nulls[6] = true;
3404  else
3405  values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3406 
3407  if (flushLag < 0)
3408  nulls[7] = true;
3409  else
3410  values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3411 
3412  if (applyLag < 0)
3413  nulls[8] = true;
3414  else
3415  values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3416 
3417  values[9] = Int32GetDatum(priority);
3418 
3419  /*
3420  * More easily understood version of standby state. This is purely
3421  * informational.
3422  *
3423  * In quorum-based sync replication, the role of each standby
3424  * listed in synchronous_standby_names can be changing very
3425  * frequently. Any standbys considered as "sync" at one moment can
3426  * be switched to "potential" ones at the next moment. So, it's
3427  * basically useless to report "sync" or "potential" as their sync
3428  * states. We report just "quorum" for them.
3429  */
3430  if (priority == 0)
3431  values[10] = CStringGetTextDatum("async");
3432  else if (is_sync_standby)
3434  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3435  else
3436  values[10] = CStringGetTextDatum("potential");
3437 
3438  if (replyTime == 0)
3439  nulls[11] = true;
3440  else
3441  values[11] = TimestampTzGetDatum(replyTime);
3442  }
3443 
3444  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3445  }
3446 
3447  /* clean up and return the tuplestore */
3448  tuplestore_donestoring(tupstore);
3449 
3450  return (Datum) 0;
3451 }
3452 
3453 /*
3454  * Send a keepalive message to standby.
3455  *
3456  * If requestReply is set, the message requests the other party to send
3457  * a message back to us, for heartbeat purposes. We also set a flag to
3458  * let nearby code that we're waiting for that response, to avoid
3459  * repeated requests.
3460  */
3461 static void
3462 WalSndKeepalive(bool requestReply)
3463 {
3464  elog(DEBUG2, "sending replication keepalive");
3465 
3466  /* construct the message... */
3467  resetStringInfo(&output_message);
3468  pq_sendbyte(&output_message, 'k');
3469  pq_sendint64(&output_message, sentPtr);
3470  pq_sendint64(&output_message, GetCurrentTimestamp());
3471  pq_sendbyte(&output_message, requestReply ? 1 : 0);
3472 
3473  /* ... and send it wrapped in CopyData */
3474  pq_putmessage_noblock('d', output_message.data, output_message.len);
3475 
3476  /* Set local flag */
3477  if (requestReply)
3479 }
3480 
3481 /*
3482  * Send keepalive message if too much time has elapsed.
3483  */
3484 static void
3486 {
3487  TimestampTz ping_time;
3488 
3489  /*
3490  * Don't send keepalive messages if timeouts are globally disabled or
3491  * we're doing something not partaking in timeouts.
3492  */
3493  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3494  return;
3495 
3497  return;
3498 
3499  /*
3500  * If half of wal_sender_timeout has lapsed without receiving any reply
3501  * from the standby, send a keep-alive message to the standby requesting
3502  * an immediate reply.
3503  */
3505  wal_sender_timeout / 2);
3506  if (last_processing >= ping_time)
3507  {
3508  WalSndKeepalive(true);
3509 
3510  /* Try to flush pending output to the client */
3511  if (pq_flush_if_writable() != 0)
3512  WalSndShutdown();
3513  }
3514 }
3515 
3516 /*
3517  * Record the end of the WAL and the time it was flushed locally, so that
3518  * LagTrackerRead can compute the elapsed time (lag) when this WAL location is
3519  * eventually reported to have been written, flushed and applied by the
3520  * standby in a reply message.
3521  */
3522 static void
3524 {
3525  bool buffer_full;
3526  int new_write_head;
3527  int i;
3528 
3529  if (!am_walsender)
3530  return;
3531 
3532  /*
3533  * If the lsn hasn't advanced since last time, then do nothing. This way
3534  * we only record a new sample when new WAL has been written.
3535  */
3536  if (lag_tracker->last_lsn == lsn)
3537  return;
3538  lag_tracker->last_lsn = lsn;
3539 
3540  /*
3541  * If advancing the write head of the circular buffer would crash into any
3542  * of the read heads, then the buffer is full. In other words, the
3543  * slowest reader (presumably apply) is the one that controls the release
3544  * of space.
3545  */
3546  new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3547  buffer_full = false;
3548  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3549  {
3550  if (new_write_head == lag_tracker->read_heads[i])
3551  buffer_full = true;
3552  }
3553 
3554  /*
3555  * If the buffer is full, for now we just rewind by one slot and overwrite
3556  * the last sample, as a simple (if somewhat uneven) way to lower the
3557  * sampling rate. There may be better adaptive compaction algorithms.
3558  */
3559  if (buffer_full)
3560  {
3561  new_write_head = lag_tracker->write_head;
3562  if (lag_tracker->write_head > 0)
3563  lag_tracker->write_head--;
3564  else
3565  lag_tracker->write_head = LAG_TRACKER_BUFFER_SIZE - 1;
3566  }
3567 
3568  /* Store a sample at the current write head position. */
3569  lag_tracker->buffer[lag_tracker->write_head].lsn = lsn;
3570  lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
3571  lag_tracker->write_head = new_write_head;
3572 }
3573 
3574 /*
3575  * Find out how much time has elapsed between the moment WAL location 'lsn'
3576  * (or the highest known earlier LSN) was flushed locally and the time 'now'.
3577  * We have a separate read head for each of the reported LSN locations we
3578  * receive in replies from standby; 'head' controls which read head is
3579  * used. Whenever a read head crosses an LSN which was written into the
3580  * lag buffer with LagTrackerWrite, we can use the associated timestamp to
3581  * find out the time this LSN (or an earlier one) was flushed locally, and
3582  * therefore compute the lag.
3583  *
3584  * Return -1 if no new sample data is available, and otherwise the elapsed
3585  * time in microseconds.
3586  */
3587 static TimeOffset
3589 {
3590  TimestampTz time = 0;
3591 
3592  /* Read all unread samples up to this LSN or end of buffer. */
3593  while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
3594  lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
3595  {
3596  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3597  lag_tracker->last_read[head] =
3598  lag_tracker->buffer[lag_tracker->read_heads[head]];
3599  lag_tracker->read_heads[head] =
3600  (lag_tracker->read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
3601  }
3602 
3603  /*
3604  * If the lag tracker is empty, that means the standby has processed
3605  * everything we've ever sent so we should now clear 'last_read'. If we
3606  * didn't do that, we'd risk using a stale and irrelevant sample for
3607  * interpolation at the beginning of the next burst of WAL after a period
3608  * of idleness.
3609  */
3610  if (lag_tracker->read_heads[head] == lag_tracker->write_head)
3611  lag_tracker->last_read[head].time = 0;
3612 
3613  if (time > now)
3614  {
3615  /* If the clock somehow went backwards, treat as not found. */
3616  return -1;
3617  }
3618  else if (time == 0)
3619  {
3620  /*
3621  * We didn't cross a time. If there is a future sample that we
3622  * haven't reached yet, and we've already reached at least one sample,
3623  * let's interpolate the local flushed time. This is mainly useful
3624  * for reporting a completely stuck apply position as having
3625  * increasing lag, since otherwise we'd have to wait for it to
3626  * eventually start moving again and cross one of our samples before
3627  * we can show the lag increasing.
3628  */
3629  if (lag_tracker->read_heads[head] == lag_tracker->write_head)
3630  {
3631  /* There are no future samples, so we can't interpolate. */
3632  return -1;
3633  }
3634  else if (lag_tracker->last_read[head].time != 0)
3635  {
3636  /* We can interpolate between last_read and the next sample. */
3637  double fraction;
3638  WalTimeSample prev = lag_tracker->last_read[head];
3639  WalTimeSample next = lag_tracker->buffer[lag_tracker->read_heads[head]];
3640 
3641  if (lsn < prev.lsn)
3642  {
3643  /*
3644  * Reported LSNs shouldn't normally go backwards, but it's
3645  * possible when there is a timeline change. Treat as not
3646  * found.
3647  */
3648  return -1;
3649  }
3650 
3651  Assert(prev.lsn < next.lsn);
3652 
3653  if (prev.time > next.time)
3654  {
3655  /* If the clock somehow went backwards, treat as not found. */
3656  return -1;
3657  }
3658 
3659  /* See how far we are between the previous and next samples. */
3660  fraction =
3661  (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3662 
3663  /* Scale the local flush time proportionally. */
3664  time = (TimestampTz)
3665  ((double) prev.time + (next.time - prev.time) * fraction);
3666  }
3667  else
3668  {
3669  /*
3670  * We have only a future sample, implying that we were entirely
3671  * caught up but and now there is a new burst of WAL and the
3672  * standby hasn't processed the first sample yet. Until the
3673  * standby reaches the future sample the best we can do is report
3674  * the hypothetical lag if that sample were to be replayed now.
3675  */
3676  time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3677  }
3678  }
3679 
3680  /* Return the elapsed time since local flush time in microseconds. */
3681  Assert(time != 0);
3682  return now - time;
3683 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
static void InitWalSenderSlot(void)
Definition: walsender.c:2392
void InitializeTimeouts(void)
Definition: timeout.c:346
#define pq_is_send_pending()
Definition: libpq.h:41
#define NIL
Definition: pg_list.h:65
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:137
XLogRecPtr write
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:213
static void ProcessStandbyMessage(void)
Definition: walsender.c:1817
#define pg_attribute_noreturn()
Definition: c.h:145
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define IsA(nodeptr, _type_)
Definition: nodes.h:579
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:212
#define AllocSetContextCreate
Definition: memutils.h:170
XLogRecPtr startpoint
Definition: replnodes.h:85
#define DEBUG1
Definition: elog.h:25
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:205
int MyProcPid
Definition: globals.c:40
int wal_sender_timeout
Definition: walsender.c:123
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:812
uint32 TimeLineID
Definition: xlogdefs.h:52
#define pq_flush()
Definition: libpq.h:39
static int32 next
Definition: blutils.c:219
struct Port * MyProcPort
Definition: globals.c:43
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
int wal_segment_size
Definition: xlog.c:117
int write_head
Definition: walsender.c:214
uint32 TransactionId
Definition: c.h:520
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:44
bool wake_wal_senders
Definition: walsender.c:130
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
bool exec_replication_command(const char *cmd_string)
Definition: walsender.c:1512
Oid GetUserId(void)
Definition: miscinit.c:476
#define SIGUSR1
Definition: win32_port.h:165
bool update_process_title
Definition: ps_status.c:36
#define write(a, b, c)
Definition: win32.h:14
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1578
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3044
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:3132
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:464
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action)
Definition: walsender.c:868
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1123
#define SIGCHLD
Definition: win32_port.h:163
PGPROC * MyProc
Definition: proc.c:67
int64 TimestampTz
Definition: timestamp.h:39
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:216
ResourceOwner CurrentResourceOwner
Definition: resowner.c:142
Size WalSndShmemSize(void)
Definition: walsender.c:3075
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:56
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:272
char * pstrdup(const char *in)
Definition: mcxt.c:1187
void CommitTransactionCommand(void)
Definition: xact.c:2947
#define XACT_REPEATABLE_READ
Definition: xact.h:38
#define SpinLockInit(lock)
Definition: spin.h:60
uint8 syncrep_method
Definition: syncrep.h:69
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:958
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:76
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:391
static void XLogSendPhysical(void)
Definition: walsender.c:2548
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:2256
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
TimeOffset flushLag
void ResourceOwnerDelete(ResourceOwner owner)
Definition: resowner.c:712
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:222
Definition: nodes.h:528
static StringInfoData output_message
Definition: walsender.c:157
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
struct cursor * cur
Definition: ecpg.c:28
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:187
void proc_exit(int code)
Definition: ipc.c:104
int errcode(int sqlerrcode)
Definition: elog.c:610
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4702
#define MemSet(start, val, len)
Definition: c.h:949
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2452
#define kill(pid, sig)
Definition: win32_port.h:426
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:2929
#define WL_SOCKET_READABLE
Definition: latch.h:125
void ReplicationSlotSave(void)
Definition: slot.c:698
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8426
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:197
TimestampTz time
Definition: walsender.c:203
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
Definition: snapmgr.c:2222
int sleeptime
Definition: pg_standby.c:41
#define SIGPIPE
Definition: win32_port.h:158
ReplicationSlotPersistentData data
Definition: slot.h:143
TimeOffset writeLag
#define SIGUSR2
Definition: win32_port.h:166
#define LOG
Definition: elog.h:26
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:153
bool RecoveryInProgress(void)
Definition: xlog.c:8074
void SetLatch(Latch *latch)
Definition: latch.c:505
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1677
void list_free_deep(List *list)
Definition: list.c:1390
static int fd(const char *x, int i)
Definition: preproc-init.c:105
pgsocket sock
Definition: libpq-be.h:122
#define PG_BINARY
Definition: c.h:1211
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:8965
void ResetLatch(Latch *latch)
Definition: latch.c:588
slock_t mutex
Node * replication_parse_result
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:87
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:2857
Latch procLatch
Definition: proc.h:121
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength)
Definition: xlogutils.c:689
XLogRecPtr confirmed_flush
Definition: slot.h:91
XLogRecPtr EndRecPtr
Definition: xlogreader.h:176
int32 day
Definition: timestamp.h:47
#define XidFromFullTransactionId(x)
Definition: transam.h:48
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:2034
static LagTracker * lag_tracker
Definition: walsender.c:219
TimeLineID timeline
Definition: replnodes.h:97
ReplicationKind kind
Definition: replnodes.h:56
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:161
void set_ps_display(const char *activity)
Definition: ps_status.c:349
bool IsTransactionBlock(void)
Definition: xact.c:4684
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:145
WALOpenSegment seg
Definition: xlogreader.h:215
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:632
bool am_walsender
Definition: walsender.c:115
#define SpinLockAcquire(lock)
Definition: spin.h:62
void pg_usleep(long microsec)
Definition: signal.c:53
void HandleWalSndInitStopping(void)
Definition: walsender.c:3022
void ReplicationSlotReserveWal(void)
Definition: slot.c:1057
int SendProcSignal(pid_t pid, ProcSignalReason reason, BackendId backendId)
Definition: procsignal.c:250
XLogRecPtr flush
bool defGetBoolean(DefElem *def)
Definition: define.c:111
char data[BLCKSZ]
Definition: c.h:1082
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:526
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:805
ReplicationKind kind
Definition: replnodes.h:82
void WalSndRqstFileReload(void)
Definition: walsender.c:2999
void pfree(void *pointer)
Definition: mcxt.c:1057
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:268
void ConditionVariableCancelSleep(void)
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:3462
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2314
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:676
#define ERROR
Definition: elog.h:43
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:94
static TimestampTz last_processing
Definition: walsender.c:162
void pq_startmsgread(void)
Definition: pqcomm.c:1197
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2372
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1848
#define IntervalPGetDatum(X)
Definition: timestamp.h:33
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:26
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3926
bool FirstSnapshotSet
Definition: snapmgr.c:149
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:166
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:392
void WalSndSignals(void)
Definition: walsender.c:3056
#define TimestampTzGetDatum(X)
Definition: timestamp.h:32
#define FATAL
Definition: elog.h:52
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:925
Latch * latch
void(* WalSndSendDataCallback)(void)
Definition: walsender.c:225
#define MAXPGPATH
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:11488
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2236
void ReplicationSlotPersist(void)
Definition: slot.c:733
TransactionId effective_xmin
Definition: slot.h:139
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1027
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:197
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:434
TransactionId xmin
Definition: proc.h:129
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2155
#define DEBUG2
Definition: elog.h:24
bool XLogBackgroundFlush(void)
Definition: xlog.c:3034
Definition: latch.h:110
Definition: dest.h:89
#define TLHistoryFileName(fname, tli)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
WalSndState state
NodeTag type
Definition: nodes.h:530
static char * buf
Definition: pg_test_fsync.c:68
void WalSndWaitStopping(void)
Definition: walsender.c:3174
static bool streamingDoneSending
Definition: walsender.c:179
uint64 XLogSegNo
Definition: xlogdefs.h:41
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
int errdetail(const char *fmt,...)
Definition: elog.c:957
XLogSegNo ws_segno
Definition: xlogreader.h:47
#define COMMERROR
Definition: elog.h:30
int errcode_for_file_access(void)
Definition: elog.c:633
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:570
#define SIGHUP
Definition: win32_port.h:153
TransactionId catalog_xmin
Definition: slot.h:77
#define pq_flush_if_writable()
Definition: libpq.h:40
TimeOffset time
Definition: timestamp.h:45
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3380
XLogRecPtr currTLIValidUntil
Definition: xlogreader.h:238
#define InvalidTransactionId
Definition: transam.h:31
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:240
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:283
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:438
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1260
unsigned int uint32
Definition: c.h:374
void ReplicationSlotRelease(void)
Definition: slot.c:476
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1386
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition: xlogreader.c:75
TransactionId xmin
Definition: slot.h:69
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
void InitWalSender(void)
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:186
#define SlotIsLogical(slot)
Definition: slot.h:165
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1701
static XLogReaderState * xlogreader
Definition: walsender.c:137
void SyncRepInitConfig(void)
Definition: syncrep.c:412
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:221
static bool WalSndCaughtUp
Definition: walsender.c:183
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:25
MemoryContext TopMemoryContext
Definition: mcxt.c:44
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:3523
XLogRecPtr lsn
Definition: walsender.c:202
int replication_yyparse(void)
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:261
Definition: guc.h:72
XLogRecPtr last_lsn
Definition: walsender.c:212
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:45
int32 month
Definition: timestamp.h:48
#define XL_ROUTINE(...)
Definition: xlogreader.h:116
int max_wal_senders
Definition: walsender.c:121
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2266
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:3485
int CloseTransientFile(int fd)
Definition: fd.c:2549
#define SIG_IGN
Definition: win32_port.h:150
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1233
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
const char * debug_query_string
Definition: postgres.c:88
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1259
static StringInfoData reply_message
Definition: walsender.c:158
#define MAXFNAMELEN
TimeLineID nextTLI
Definition: xlogreader.h:244
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
int64 TimeOffset
Definition: timestamp.h:40
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1375
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2239
Size mul_size(Size s1, Size s2)
Definition: shmem.c:515
#define InvalidBackendId
Definition: backendid.h:23
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:3588
void WalSndErrorCleanup(void)
Definition: walsender.c:295
static volatile sig_atomic_t replication_active
Definition: walsender.c:195
uintptr_t Datum
Definition: postgres.h:367
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3231
XLogRecPtr sentPtr
Size add_size(Size s1, Size s2)
Definition: shmem.c:498
bool am_db_walsender
Definition: walsender.c:118
TransactionId effective_catalog_xmin
Definition: slot.h:140
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1133
Oid MyDatabaseId
Definition: globals.c:85
static void WalSndShutdown(void)
Definition: walsender.c:229
static TimeLineID receiveTLI
Definition: xlog.c:214
int work_mem
Definition: globals.c:121
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:580
void WalSndResourceCleanup(bool isCommit)
Definition: walsender.c:330
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
WalSnd * MyWalSnd
Definition: walsender.c:112
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:840
static XLogRecPtr sentPtr
Definition: walsender.c:154
int log_min_messages
Definition: guc.c:538
void pq_endmsgread(void)
Definition: pqcomm.c:1221
#define InvalidOid
Definition: postgres_ext.h:36
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2470
TimeLineID ThisTimeLineID
Definition: xlog.c:192
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:56
#define ereport(elevel,...)
Definition: elog.h:144
int allowedModes
Definition: execnodes.h:305
void WalSndInitStopping(void)
Definition: walsender.c:3148
TimeLineID currTLI
Definition: xlogreader.h:228
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:401
bool is_member_of_role(Oid member, Oid role)
Definition: acl.c:4916
TimeOffset applyLag
struct SnapBuild * snapshot_builder
Definition: logical.h:43
int errmsg_internal(const char *fmt,...)
Definition: elog.c:911
SetFunctionReturnMode returnMode
Definition: execnodes.h:307
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2189
#define SIG_DFL
Definition: win32_port.h:148
#define SIGNAL_ARGS
Definition: c.h:1273
static TimeLineID sendTimeLine
Definition: walsender.c:145
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:745
#define lfirst(lc)
Definition: pg_list.h:169
void WalSndSetState(WalSndState state)
Definition: walsender.c:3212
int sync_standby_priority
Definition: regguts.h:298
void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel)
Definition: resowner.c:478
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:569
static void XLogSendLogical(void)
Definition: walsender.c:2849
XLogRecPtr restart_lsn
Definition: slot.h:80
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:36
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:215
void StartTransactionCommand(void)
Definition: xact.c:2846
bool needreload
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1880
size_t Size
Definition: c.h:473
#define XLogFileName(fname, tli, logSegNo, wal_segsz_bytes)
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1362
char * dbname
Definition: streamutil.c:51
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:207
int XactIsoLevel
Definition: xact.c:75
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:146
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:148
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1371
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:233
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
WalSndState
Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
Definition: walsender.c:3266
TimeLineID ws_tli
Definition: xlogreader.h:48
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:921
static bool streamingDoneReceiving
Definition: walsender.c:180
Tuplestorestate * setResult
Definition: execnodes.h:310
static StringInfoData tmpbuf
Definition: walsender.c:159
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:986
#define XLogFilePath(path, tli, logSegNo, wal_segsz_bytes)
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition: syncrep.c:726
bool IsSubTransaction(void)
Definition: xact.c:4757
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:540
static Datum values[MAXATTR]
Definition: bootstrap.c:165
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:303
bool log_replication_commands
Definition: walsender.c:125
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4911
#define Int32GetDatum(X)
Definition: postgres.h:479
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:570
char * application_name
Definition: guc.c:563
TupleDesc setDesc
Definition: execnodes.h:311
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:125
void * palloc(Size size)
Definition: mcxt.c:950
int errmsg(const char *fmt,...)
Definition: elog.c:824
XLogReaderState * reader
Definition: logical.h:41
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:298
int ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
Definition: slot.c:367
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:44
CRSSnapshotAction
Definition: walsender.h:20
#define elog(elevel,...)
Definition: elog.h:214
StringInfo out
Definition: logical.h:70
static const unsigned __int64 epoch
Definition: gettimeofday.c:34
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:26
int i
void WalSndShmemInit(void)
Definition: walsender.c:3087
Definition: slot.h:42
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:455
#define NameStr(name)
Definition: c.h:622
TimeLineID timeline
Definition: replnodes.h:84
#define CStringGetTextDatum(s)
Definition: builtins.h:86
void * arg
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:1985
struct Latch * MyLatch
Definition: globals.c:54
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
Definition: xlogreader.c:1063
void ReplicationSlotCleanup(void)
Definition: slot.c:532
WALSegmentContext segcxt
Definition: xlogreader.h:214
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
char * defname
Definition: parsenodes.h:733
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:417
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2969
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:814
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:146
TimestampTz replyTime
void LWLockReleaseAll(void)
Definition: lwlock.c:1911
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3250
slock_t mutex
Definition: slot.h:116
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:551
CommandDest whereToSendOutput
Definition: postgres.c:91
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1652
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
static bool waiting_for_ping_response
Definition: walsender.c:171
XLogRecPtr apply
static void IdentifySystem(void)
Definition: walsender.c:375
Definition: pg_list.h:50
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:93
#define snprintf
Definition: port.h:193
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:282
#define WL_LATCH_SET
Definition: latch.h:124
int16 AttrNumber
Definition: attnum.h:21
#define UINT64_FORMAT
Definition: c.h:417
void replication_scanner_finish(void)
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:755
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:441
static TimestampTz last_reply_timestamp
Definition: walsender.c:168
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1713
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1542
#define MAX_SEND_SIZE
Definition: walsender.c:106
#define die(msg)
Definition: pg_test_fsync.c:97
#define read(a, b, c)
Definition: win32.h:13
static bool sendTimeLineIsHistoric
Definition: walsender.c:147
#define offsetof(type, field)
Definition: c.h:668
void WalSndWakeup(void)
Definition: walsender.c:3119
void ReplicationSlotMarkDirty(void)
Definition: slot.c:716
bool am_cascading_walsender
Definition: walsender.c:116
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:2065
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:24
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:693
static XLogRecPtr startptr
Definition: basebackup.c:116
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
Definition: walsender.c:1349
void replication_scanner_init(const char *query_string)
#define TLHistoryFilePath(path, tli)
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1740
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)