PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
walsender.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * walsender.c
4  *
5  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6  * care of sending XLOG from the primary server to a single recipient.
7  * (Note that there can be more than one walsender process concurrently.)
8  * It is started by the postmaster when the walreceiver of a standby server
9  * connects to the primary server and requests XLOG streaming replication.
10  *
11  * A walsender is similar to a regular backend, ie. there is a one-to-one
12  * relationship between a connection and a walsender process, but instead
13  * of processing SQL queries, it understands a small set of special
14  * replication-mode commands. The START_REPLICATION command begins streaming
15  * WAL to the client. While streaming, the walsender keeps reading XLOG
16  * records from the disk and sends them to the standby server over the
17  * COPY protocol, until the either side ends the replication by exiting COPY
18  * mode (or until the connection is closed).
19  *
20  * Normal termination is by SIGTERM, which instructs the walsender to
21  * close the connection and exit(0) at next convenient moment. Emergency
22  * termination is by SIGQUIT; like any backend, the walsender will simply
23  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24  * are treated as not a crash but approximately normal termination;
25  * the walsender will exit quickly without sending any more XLOG records.
26  *
27  * If the server is shut down, postmaster sends us SIGUSR2 after all
28  * regular backends have exited and the shutdown checkpoint has been written.
29  * This instructs walsender to send any outstanding WAL, including the
30  * shutdown checkpoint record, wait for it to be replicated to the standby,
31  * and then exit.
32  *
33  *
34  * Portions Copyright (c) 2010-2017, PostgreSQL Global Development Group
35  *
36  * IDENTIFICATION
37  * src/backend/replication/walsender.c
38  *
39  *-------------------------------------------------------------------------
40  */
41 #include "postgres.h"
42 
43 #include <signal.h>
44 #include <unistd.h>
45 
46 #include "access/printtup.h"
47 #include "access/timeline.h"
48 #include "access/transam.h"
49 #include "access/xact.h"
50 #include "access/xlog_internal.h"
51 
52 #include "catalog/pg_type.h"
53 #include "commands/dbcommands.h"
54 #include "funcapi.h"
55 #include "libpq/libpq.h"
56 #include "libpq/pqformat.h"
57 #include "miscadmin.h"
58 #include "nodes/replnodes.h"
59 #include "pgstat.h"
60 #include "replication/basebackup.h"
61 #include "replication/decode.h"
62 #include "replication/logical.h"
64 #include "replication/slot.h"
65 #include "replication/snapbuild.h"
66 #include "replication/syncrep.h"
68 #include "replication/walsender.h"
71 #include "storage/fd.h"
72 #include "storage/ipc.h"
73 #include "storage/pmsignal.h"
74 #include "storage/proc.h"
75 #include "storage/procarray.h"
76 #include "tcop/dest.h"
77 #include "tcop/tcopprot.h"
78 #include "utils/builtins.h"
79 #include "utils/guc.h"
80 #include "utils/memutils.h"
81 #include "utils/pg_lsn.h"
82 #include "utils/portal.h"
83 #include "utils/ps_status.h"
84 #include "utils/resowner.h"
85 #include "utils/timeout.h"
86 #include "utils/timestamp.h"
87 
88 /*
89  * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
90  *
91  * We don't have a good idea of what a good value would be; there's some
92  * overhead per message in both walsender and walreceiver, but on the other
93  * hand sending large batches makes walsender less responsive to signals
94  * because signals are checked only between messages. 128kB (with
95  * default 8k blocks) seems like a reasonable guess for now.
96  */
97 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
98 
99 /* Array of WalSnds in shared memory */
101 
102 /* My slot in the shared memory array */
104 
105 /* Global state */
106 bool am_walsender = false; /* Am I a walsender process? */
107 bool am_cascading_walsender = false; /* Am I cascading WAL to
108  * another standby? */
109 bool am_db_walsender = false; /* Connected to a database? */
110 
111 /* User-settable parameters for walsender */
112 int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
113 int wal_sender_timeout = 60 * 1000; /* maximum time to send one
114  * WAL data message */
116 
117 /*
118  * State for WalSndWakeupRequest
119  */
120 bool wake_wal_senders = false;
121 
122 /*
123  * These variables are used similarly to openLogFile/SegNo/Off,
124  * but for walsender to read the XLOG.
125  */
126 static int sendFile = -1;
127 static XLogSegNo sendSegNo = 0;
128 static uint32 sendOff = 0;
129 
130 /* Timeline ID of the currently open file */
132 
133 /*
134  * These variables keep track of the state of the timeline we're currently
135  * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
136  * the timeline is not the latest timeline on this server, and the server's
137  * history forked off from that timeline at sendTimeLineValidUpto.
138  */
141 static bool sendTimeLineIsHistoric = false;
143 
144 /*
145  * How far have we sent WAL already? This is also advertised in
146  * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
147  */
148 static XLogRecPtr sentPtr = 0;
149 
150 /* Buffers for constructing outgoing messages and processing reply messages. */
154 
155 /*
156  * Timestamp of the last receipt of the reply from the standby. Set to 0 if
157  * wal_sender_timeout doesn't need to be active.
158  */
160 
161 /* Have we sent a heartbeat message asking for reply, since last reply? */
162 static bool waiting_for_ping_response = false;
163 
164 /*
165  * While streaming WAL in Copy mode, streamingDoneSending is set to true
166  * after we have sent CopyDone. We should not send any more CopyData messages
167  * after that. streamingDoneReceiving is set to true when we receive CopyDone
168  * from the other end. When both become true, it's time to exit Copy mode.
169  */
172 
173 /* Are we there yet? */
174 static bool WalSndCaughtUp = false;
175 
176 /* Flags set by signal handlers for later service in main loop */
177 static volatile sig_atomic_t got_SIGHUP = false;
178 static volatile sig_atomic_t walsender_ready_to_stop = false;
179 
180 /*
181  * This is set while we are streaming. When not set, SIGUSR2 signal will be
182  * handled like SIGTERM. When set, the main loop is responsible for checking
183  * walsender_ready_to_stop and terminating when it's set (after streaming any
184  * remaining WAL).
185  */
186 static volatile sig_atomic_t replication_active = false;
187 
190 
191 /* Signal handlers */
192 static void WalSndSigHupHandler(SIGNAL_ARGS);
195 
196 /* Prototypes for private functions */
197 typedef void (*WalSndSendDataCallback) (void);
198 static void WalSndLoop(WalSndSendDataCallback send_data);
199 static void InitWalSenderSlot(void);
200 static void WalSndKill(int code, Datum arg);
202 static void XLogSendPhysical(void);
203 static void XLogSendLogical(void);
204 static void WalSndDone(WalSndSendDataCallback send_data);
205 static XLogRecPtr GetStandbyFlushRecPtr(void);
206 static void IdentifySystem(void);
209 static void StartReplication(StartReplicationCmd *cmd);
211 static void ProcessStandbyMessage(void);
212 static void ProcessStandbyReplyMessage(void);
213 static void ProcessStandbyHSFeedbackMessage(void);
214 static void ProcessRepliesIfAny(void);
215 static void WalSndKeepalive(bool requestReply);
217 static void WalSndCheckTimeOut(TimestampTz now);
218 static long WalSndComputeSleeptime(TimestampTz now);
219 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
220 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
222 
223 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
224 
225 
226 /* Initialize walsender process before entering the main command loop */
227 void
228 InitWalSender(void)
229 {
231 
232  /* Create a per-walsender data structure in shared memory */
234 
235  /* Set up resource owner */
236  CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
237 
238  /*
239  * Let postmaster know that we're a WAL sender. Once we've declared us as
240  * a WAL sender process, postmaster will let us outlive the bgwriter and
241  * kill us last in the shutdown sequence, so we get a chance to stream all
242  * remaining WAL at shutdown, including the shutdown checkpoint. Note that
243  * there's no going back, and we mustn't write any WAL records after this.
244  */
247 }
248 
249 /*
250  * Clean up after an error.
251  *
252  * WAL sender processes don't use transactions like regular backends do.
253  * This function does any cleanup requited after an error in a WAL sender
254  * process, similar to what transaction abort does in a regular backend.
255  */
256 void
258 {
262 
263  if (sendFile >= 0)
264  {
265  close(sendFile);
266  sendFile = -1;
267  }
268 
269  if (MyReplicationSlot != NULL)
271 
273 
274  replication_active = false;
276  proc_exit(0);
277 
278  /* Revert back to startup state */
280 }
281 
282 /*
283  * Handle a client's connection abort in an orderly manner.
284  */
285 static void
286 WalSndShutdown(void)
287 {
288  /*
289  * Reset whereToSendOutput to prevent ereport from attempting to send any
290  * more messages to the standby.
291  */
294 
295  proc_exit(0);
296  abort(); /* keep the compiler quiet */
297 }
298 
299 /*
300  * Handle the IDENTIFY_SYSTEM command.
301  */
302 static void
304 {
305  char sysid[32];
306  char xpos[MAXFNAMELEN];
307  XLogRecPtr logptr;
308  char *dbname = NULL;
309  DestReceiver *dest;
310  TupOutputState *tstate;
311  TupleDesc tupdesc;
312  Datum values[4];
313  bool nulls[4];
314 
315  /*
316  * Reply with a result set with one row, four columns. First col is system
317  * ID, second is timeline ID, third is current xlog location and the
318  * fourth contains the database name if we are connected to one.
319  */
320 
321  snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
323 
326  {
327  /* this also updates ThisTimeLineID */
328  logptr = GetStandbyFlushRecPtr();
329  }
330  else
331  logptr = GetFlushRecPtr();
332 
333  snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
334 
335  if (MyDatabaseId != InvalidOid)
336  {
338 
339  /* syscache access needs a transaction env. */
341  /* make dbname live outside TX context */
345  /* CommitTransactionCommand switches to TopMemoryContext */
347  }
348 
350  MemSet(nulls, false, sizeof(nulls));
351 
352  /* need a tuple descriptor representing four columns */
353  tupdesc = CreateTemplateTupleDesc(4, false);
354  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
355  TEXTOID, -1, 0);
356  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
357  INT4OID, -1, 0);
358  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
359  TEXTOID, -1, 0);
360  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
361  TEXTOID, -1, 0);
362 
363  /* prepare for projection of tuples */
364  tstate = begin_tup_output_tupdesc(dest, tupdesc);
365 
366  /* column 1: system identifier */
367  values[0] = CStringGetTextDatum(sysid);
368 
369  /* column 2: timeline */
370  values[1] = Int32GetDatum(ThisTimeLineID);
371 
372  /* column 3: xlog position */
373  values[2] = CStringGetTextDatum(xpos);
374 
375  /* column 4: database name, or NULL if none */
376  if (dbname)
377  values[3] = CStringGetTextDatum(dbname);
378  else
379  nulls[3] = true;
380 
381  /* send it to dest */
382  do_tup_output(tstate, values, nulls);
383 
384  end_tup_output(tstate);
385 }
386 
387 
388 /*
389  * Handle TIMELINE_HISTORY command.
390  */
391 static void
393 {
395  char histfname[MAXFNAMELEN];
396  char path[MAXPGPATH];
397  int fd;
398  off_t histfilelen;
399  off_t bytesleft;
400  Size len;
401 
402  /*
403  * Reply with a result set with one row, and two columns. The first col is
404  * the name of the history file, 2nd is the contents.
405  */
406 
407  TLHistoryFileName(histfname, cmd->timeline);
408  TLHistoryFilePath(path, cmd->timeline);
409 
410  /* Send a RowDescription message */
411  pq_beginmessage(&buf, 'T');
412  pq_sendint(&buf, 2, 2); /* 2 fields */
413 
414  /* first field */
415  pq_sendstring(&buf, "filename"); /* col name */
416  pq_sendint(&buf, 0, 4); /* table oid */
417  pq_sendint(&buf, 0, 2); /* attnum */
418  pq_sendint(&buf, TEXTOID, 4); /* type oid */
419  pq_sendint(&buf, -1, 2); /* typlen */
420  pq_sendint(&buf, 0, 4); /* typmod */
421  pq_sendint(&buf, 0, 2); /* format code */
422 
423  /* second field */
424  pq_sendstring(&buf, "content"); /* col name */
425  pq_sendint(&buf, 0, 4); /* table oid */
426  pq_sendint(&buf, 0, 2); /* attnum */
427  pq_sendint(&buf, BYTEAOID, 4); /* type oid */
428  pq_sendint(&buf, -1, 2); /* typlen */
429  pq_sendint(&buf, 0, 4); /* typmod */
430  pq_sendint(&buf, 0, 2); /* format code */
431  pq_endmessage(&buf);
432 
433  /* Send a DataRow message */
434  pq_beginmessage(&buf, 'D');
435  pq_sendint(&buf, 2, 2); /* # of columns */
436  len = strlen(histfname);
437  pq_sendint(&buf, len, 4); /* col1 len */
438  pq_sendbytes(&buf, histfname, len);
439 
440  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666);
441  if (fd < 0)
442  ereport(ERROR,
444  errmsg("could not open file \"%s\": %m", path)));
445 
446  /* Determine file length and send it to client */
447  histfilelen = lseek(fd, 0, SEEK_END);
448  if (histfilelen < 0)
449  ereport(ERROR,
451  errmsg("could not seek to end of file \"%s\": %m", path)));
452  if (lseek(fd, 0, SEEK_SET) != 0)
453  ereport(ERROR,
455  errmsg("could not seek to beginning of file \"%s\": %m", path)));
456 
457  pq_sendint(&buf, histfilelen, 4); /* col2 len */
458 
459  bytesleft = histfilelen;
460  while (bytesleft > 0)
461  {
462  char rbuf[BLCKSZ];
463  int nread;
464 
465  nread = read(fd, rbuf, sizeof(rbuf));
466  if (nread <= 0)
467  ereport(ERROR,
469  errmsg("could not read file \"%s\": %m",
470  path)));
471  pq_sendbytes(&buf, rbuf, nread);
472  bytesleft -= nread;
473  }
474  CloseTransientFile(fd);
475 
476  pq_endmessage(&buf);
477 }
478 
479 /*
480  * Handle START_REPLICATION command.
481  *
482  * At the moment, this never returns, but an ereport(ERROR) will take us back
483  * to the main loop.
484  */
485 static void
487 {
489  XLogRecPtr FlushPtr;
490 
491  /*
492  * We assume here that we're logging enough information in the WAL for
493  * log-shipping, since this is checked in PostmasterMain().
494  *
495  * NOTE: wal_level can only change at shutdown, so in most cases it is
496  * difficult for there to be WAL data that we can still see that was
497  * written at wal_level='minimal'.
498  */
499 
500  if (cmd->slotname)
501  {
504  ereport(ERROR,
505  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
506  (errmsg("cannot use a logical replication slot for physical replication"))));
507  }
508 
509  /*
510  * Select the timeline. If it was given explicitly by the client, use
511  * that. Otherwise use the timeline of the last replayed record, which is
512  * kept in ThisTimeLineID.
513  */
515  {
516  /* this also updates ThisTimeLineID */
517  FlushPtr = GetStandbyFlushRecPtr();
518  }
519  else
520  FlushPtr = GetFlushRecPtr();
521 
522  if (cmd->timeline != 0)
523  {
524  XLogRecPtr switchpoint;
525 
526  sendTimeLine = cmd->timeline;
528  {
529  sendTimeLineIsHistoric = false;
531  }
532  else
533  {
534  List *timeLineHistory;
535 
536  sendTimeLineIsHistoric = true;
537 
538  /*
539  * Check that the timeline the client requested for exists, and
540  * the requested start location is on that timeline.
541  */
542  timeLineHistory = readTimeLineHistory(ThisTimeLineID);
543  switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
545  list_free_deep(timeLineHistory);
546 
547  /*
548  * Found the requested timeline in the history. Check that
549  * requested startpoint is on that timeline in our history.
550  *
551  * This is quite loose on purpose. We only check that we didn't
552  * fork off the requested timeline before the switchpoint. We
553  * don't check that we switched *to* it before the requested
554  * starting point. This is because the client can legitimately
555  * request to start replication from the beginning of the WAL
556  * segment that contains switchpoint, but on the new timeline, so
557  * that it doesn't end up with a partial segment. If you ask for a
558  * too old starting point, you'll get an error later when we fail
559  * to find the requested WAL segment in pg_wal.
560  *
561  * XXX: we could be more strict here and only allow a startpoint
562  * that's older than the switchpoint, if it's still in the same
563  * WAL segment.
564  */
565  if (!XLogRecPtrIsInvalid(switchpoint) &&
566  switchpoint < cmd->startpoint)
567  {
568  ereport(ERROR,
569  (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
570  (uint32) (cmd->startpoint >> 32),
571  (uint32) (cmd->startpoint),
572  cmd->timeline),
573  errdetail("This server's history forked from timeline %u at %X/%X.",
574  cmd->timeline,
575  (uint32) (switchpoint >> 32),
576  (uint32) (switchpoint))));
577  }
578  sendTimeLineValidUpto = switchpoint;
579  }
580  }
581  else
582  {
585  sendTimeLineIsHistoric = false;
586  }
587 
589 
590  /* If there is nothing to stream, don't even enter COPY mode */
592  {
593  /*
594  * When we first start replication the standby will be behind the
595  * primary. For some applications, for example, synchronous
596  * replication, it is important to have a clear state for this initial
597  * catchup mode, so we can trigger actions when we change streaming
598  * state later. We may stay in this state for a long time, which is
599  * exactly why we want to be able to monitor whether or not we are
600  * still here.
601  */
603 
604  /* Send a CopyBothResponse message, and start streaming */
605  pq_beginmessage(&buf, 'W');
606  pq_sendbyte(&buf, 0);
607  pq_sendint(&buf, 0, 2);
608  pq_endmessage(&buf);
609  pq_flush();
610 
611  /*
612  * Don't allow a request to stream from a future point in WAL that
613  * hasn't been flushed to disk in this server yet.
614  */
615  if (FlushPtr < cmd->startpoint)
616  {
617  ereport(ERROR,
618  (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
619  (uint32) (cmd->startpoint >> 32),
620  (uint32) (cmd->startpoint),
621  (uint32) (FlushPtr >> 32),
622  (uint32) (FlushPtr))));
623  }
624 
625  /* Start streaming from the requested point */
626  sentPtr = cmd->startpoint;
627 
628  /* Initialize shared memory status, too */
629  {
630  WalSnd *walsnd = MyWalSnd;
631 
632  SpinLockAcquire(&walsnd->mutex);
633  walsnd->sentPtr = sentPtr;
634  SpinLockRelease(&walsnd->mutex);
635  }
636 
638 
639  /* Main loop of walsender */
640  replication_active = true;
641 
643 
644  replication_active = false;
646  proc_exit(0);
648 
650  }
651 
652  if (cmd->slotname)
654 
655  /*
656  * Copy is finished now. Send a single-row result set indicating the next
657  * timeline.
658  */
660  {
661  char startpos_str[8 + 1 + 8 + 1];
662  DestReceiver *dest;
663  TupOutputState *tstate;
664  TupleDesc tupdesc;
665  Datum values[2];
666  bool nulls[2];
667 
668  snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
669  (uint32) (sendTimeLineValidUpto >> 32),
671 
673  MemSet(nulls, false, sizeof(nulls));
674 
675  /*
676  * Need a tuple descriptor representing two columns.
677  * int8 may seem like a surprising data type for this, but in theory
678  * int4 would not be wide enough for this, as TimeLineID is unsigned.
679  */
680  tupdesc = CreateTemplateTupleDesc(2, false);
681  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
682  INT8OID, -1, 0);
683  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
684  TEXTOID, -1, 0);
685 
686  /* prepare for projection of tuple */
687  tstate = begin_tup_output_tupdesc(dest, tupdesc);
688 
689  values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
690  values[1] = CStringGetTextDatum(startpos_str);
691 
692  /* send it to dest */
693  do_tup_output(tstate, values, nulls);
694 
695  end_tup_output(tstate);
696  }
697 
698  /* Send CommandComplete message */
699  pq_puttextmessage('C', "START_STREAMING");
700 }
701 
702 /*
703  * read_page callback for logical decoding contexts, as a walsender process.
704  *
705  * Inside the walsender we can do better than logical_read_local_xlog_page,
706  * which has to do a plain sleep/busy loop, because the walsender's latch gets
707  * set every time WAL is flushed.
708  */
709 static int
711  XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
712 {
713  XLogRecPtr flushptr;
714  int count;
715 
716  /* make sure we have enough WAL available */
717  flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
718 
719  /* more than one block available */
720  if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
721  count = XLOG_BLCKSZ;
722  /* not enough WAL synced, that can happen during shutdown */
723  else if (targetPagePtr + reqLen > flushptr)
724  return -1;
725  /* part of the page available */
726  else
727  count = flushptr - targetPagePtr;
728 
729  /* now actually read the data, we know it's there */
730  XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
731 
732  return count;
733 }
734 
735 /*
736  * Create a new replication slot.
737  */
738 static void
740 {
741  const char *snapshot_name = NULL;
742  char xpos[MAXFNAMELEN];
743  char *slot_name;
744  DestReceiver *dest;
745  TupOutputState *tstate;
746  TupleDesc tupdesc;
747  Datum values[4];
748  bool nulls[4];
749 
751 
752  /* setup state for XLogReadPage */
753  sendTimeLineIsHistoric = false;
755 
756  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
757  {
758  ReplicationSlotCreate(cmd->slotname, false,
760  }
761  else
762  {
764 
765  /*
766  * Initially create persistent slot as ephemeral - that allows us to
767  * nicely handle errors during initialization because it'll get
768  * dropped if this transaction fails. We'll make it persistent at the
769  * end. Temporary slots can be created as temporary from beginning as
770  * they get dropped on error as well.
771  */
772  ReplicationSlotCreate(cmd->slotname, true,
774  }
775 
776  if (cmd->kind == REPLICATION_KIND_LOGICAL)
777  {
779 
783 
784  /*
785  * Signal that we don't need the timeout mechanism. We're just
786  * creating the replication slot and don't yet accept feedback
787  * messages or send keepalives. As we possibly need to wait for
788  * further WAL the walsender would otherwise possibly be killed too
789  * soon.
790  */
792 
793  /* build initial snapshot, might take a while */
795 
796  /*
797  * Export a plain (not of the snapbuild.c type) snapshot to the user
798  * that can be imported into another session.
799  */
800  snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
801 
802  /* don't need the decoding context anymore */
803  FreeDecodingContext(ctx);
804 
805  if (!cmd->temporary)
807  }
808  else if (cmd->kind == REPLICATION_KIND_PHYSICAL && cmd->reserve_wal)
809  {
811 
813 
814  /* Write this slot to disk if it's permanent one. */
815  if (!cmd->temporary)
817  }
818 
819  snprintf(xpos, sizeof(xpos), "%X/%X",
822 
824  MemSet(nulls, false, sizeof(nulls));
825 
826  /*
827  * Need a tuple descriptor representing four columns:
828  * - first field: the slot name
829  * - second field: LSN at which we became consistent
830  * - third field: exported snapshot's name
831  * - fourth field: output plugin
832  */
833  tupdesc = CreateTemplateTupleDesc(4, false);
834  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
835  TEXTOID, -1, 0);
836  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
837  TEXTOID, -1, 0);
838  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
839  TEXTOID, -1, 0);
840  TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
841  TEXTOID, -1, 0);
842 
843  /* prepare for projection of tuples */
844  tstate = begin_tup_output_tupdesc(dest, tupdesc);
845 
846  /* slot_name */
847  slot_name = NameStr(MyReplicationSlot->data.name);
848  values[0] = CStringGetTextDatum(slot_name);
849 
850  /* consistent wal location */
851  values[1] = CStringGetTextDatum(xpos);
852 
853  /* snapshot name, or NULL if none */
854  if (snapshot_name != NULL)
855  values[2] = CStringGetTextDatum(snapshot_name);
856  else
857  nulls[2] = true;
858 
859  /* plugin, or NULL if none */
860  if (cmd->plugin != NULL)
861  values[3] = CStringGetTextDatum(cmd->plugin);
862  else
863  nulls[3] = true;
864 
865  /* send it to dest */
866  do_tup_output(tstate, values, nulls);
867  end_tup_output(tstate);
868 
870 }
871 
872 /*
873  * Get rid of a replication slot that is no longer wanted.
874  */
875 static void
877 {
879  EndCommand("DROP_REPLICATION_SLOT", DestRemote);
880 }
881 
882 /*
883  * Load previously initiated logical slot and prepare for sending data (via
884  * WalSndLoop).
885  */
886 static void
888 {
890 
891  /* make sure that our requirements are still fulfilled */
893 
895 
897 
898  /*
899  * Force a disconnect, so that the decoding code doesn't need to care
900  * about an eventual switch from running in recovery, to running in a
901  * normal environment. Client code is expected to handle reconnects.
902  */
904  {
905  ereport(LOG,
906  (errmsg("terminating walsender process after promotion")));
908  }
909 
911 
912  /* Send a CopyBothResponse message, and start streaming */
913  pq_beginmessage(&buf, 'W');
914  pq_sendbyte(&buf, 0);
915  pq_sendint(&buf, 0, 2);
916  pq_endmessage(&buf);
917  pq_flush();
918 
919  /* setup state for XLogReadPage */
920  sendTimeLineIsHistoric = false;
922 
923  /*
924  * Initialize position to the last ack'ed one, then the xlog records begin
925  * to be shipped from that position.
926  */
927  logical_decoding_ctx = CreateDecodingContext(
928  cmd->startpoint, cmd->options,
931 
932  /* Start reading WAL from the oldest required WAL. */
934 
935  /*
936  * Report the location after which we'll send out further commits as the
937  * current sentPtr.
938  */
940 
941  /* Also update the sent position status in shared memory */
942  {
943  WalSnd *walsnd = MyWalSnd;
944 
945  SpinLockAcquire(&walsnd->mutex);
947  SpinLockRelease(&walsnd->mutex);
948  }
949 
950  replication_active = true;
951 
953 
954  /* Main loop of walsender */
956 
957  FreeDecodingContext(logical_decoding_ctx);
959 
960  replication_active = false;
962  proc_exit(0);
964 
965  /* Get out of COPY mode (CommandComplete). */
966  EndCommand("COPY 0", DestRemote);
967 }
968 
969 /*
970  * LogicalDecodingContext 'prepare_write' callback.
971  *
972  * Prepare a write into a StringInfo.
973  *
974  * Don't do anything lasting in here, it's quite possible that nothing will done
975  * with the data.
976  */
977 static void
979 {
980  /* can't have sync rep confused by sending the same LSN several times */
981  if (!last_write)
982  lsn = InvalidXLogRecPtr;
983 
984  resetStringInfo(ctx->out);
985 
986  pq_sendbyte(ctx->out, 'w');
987  pq_sendint64(ctx->out, lsn); /* dataStart */
988  pq_sendint64(ctx->out, lsn); /* walEnd */
989 
990  /*
991  * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
992  * reserve space here.
993  */
994  pq_sendint64(ctx->out, 0); /* sendtime */
995 }
996 
997 /*
998  * LogicalDecodingContext 'write' callback.
999  *
1000  * Actually write out data previously prepared by WalSndPrepareWrite out to
1001  * the network. Take as long as needed, but process replies from the other
1002  * side and check timeouts during that.
1003  */
1004 static void
1006  bool last_write)
1007 {
1008  /* output previously gathered data in a CopyData packet */
1009  pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1010 
1011  /*
1012  * Fill the send timestamp last, so that it is taken as late as possible.
1013  * This is somewhat ugly, but the protocol's set as it's already used for
1014  * several releases by streaming physical replication.
1015  */
1016  resetStringInfo(&tmpbuf);
1018  memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1019  tmpbuf.data, sizeof(int64));
1020 
1021  /* fast path */
1022  /* Try to flush pending output to the client */
1023  if (pq_flush_if_writable() != 0)
1024  WalSndShutdown();
1025 
1026  if (!pq_is_send_pending())
1027  return;
1028 
1029  for (;;)
1030  {
1031  int wakeEvents;
1032  long sleeptime;
1033  TimestampTz now;
1034 
1035  /*
1036  * Emergency bailout if postmaster has died. This is to avoid the
1037  * necessity for manual cleanup of all postmaster children.
1038  */
1039  if (!PostmasterIsAlive())
1040  exit(1);
1041 
1042  /* Clear any already-pending wakeups */
1044 
1046 
1047  /* Process any requests or signals received recently */
1048  if (got_SIGHUP)
1049  {
1050  got_SIGHUP = false;
1053  }
1054 
1055  /* Check for input from the client */
1057 
1058  /* Try to flush pending output to the client */
1059  if (pq_flush_if_writable() != 0)
1060  WalSndShutdown();
1061 
1062  /* If we finished clearing the buffered data, we're done here. */
1063  if (!pq_is_send_pending())
1064  break;
1065 
1066  now = GetCurrentTimestamp();
1067 
1068  /* die if timeout was reached */
1069  WalSndCheckTimeOut(now);
1070 
1071  /* Send keepalive if the time has come */
1073 
1074  sleeptime = WalSndComputeSleeptime(now);
1075 
1076  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1078 
1079  /* Sleep until something happens or we time out */
1080  WaitLatchOrSocket(MyLatch, wakeEvents,
1081  MyProcPort->sock, sleeptime,
1083  }
1084 
1085  /* reactivate latch so WalSndLoop knows to continue */
1086  SetLatch(MyLatch);
1087 }
1088 
1089 /*
1090  * Wait till WAL < loc is flushed to disk so it can be safely read.
1091  */
1092 static XLogRecPtr
1094 {
1095  int wakeEvents;
1096  static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1097 
1098 
1099  /*
1100  * Fast path to avoid acquiring the spinlock in the we already know we
1101  * have enough WAL available. This is particularly interesting if we're
1102  * far behind.
1103  */
1104  if (RecentFlushPtr != InvalidXLogRecPtr &&
1105  loc <= RecentFlushPtr)
1106  return RecentFlushPtr;
1107 
1108  /* Get a more recent flush pointer. */
1109  if (!RecoveryInProgress())
1110  RecentFlushPtr = GetFlushRecPtr();
1111  else
1112  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1113 
1114  for (;;)
1115  {
1116  long sleeptime;
1117  TimestampTz now;
1118 
1119  /*
1120  * Emergency bailout if postmaster has died. This is to avoid the
1121  * necessity for manual cleanup of all postmaster children.
1122  */
1123  if (!PostmasterIsAlive())
1124  exit(1);
1125 
1126  /* Clear any already-pending wakeups */
1128 
1130 
1131  /* Process any requests or signals received recently */
1132  if (got_SIGHUP)
1133  {
1134  got_SIGHUP = false;
1137  }
1138 
1139  /* Check for input from the client */
1141 
1142  /* Update our idea of the currently flushed position. */
1143  if (!RecoveryInProgress())
1144  RecentFlushPtr = GetFlushRecPtr();
1145  else
1146  RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1147 
1148  /*
1149  * If postmaster asked us to stop, don't wait here anymore. This will
1150  * cause the xlogreader to return without reading a full record, which
1151  * is the fastest way to reach the mainloop which then can quit.
1152  *
1153  * It's important to do this check after the recomputation of
1154  * RecentFlushPtr, so we can send all remaining data before shutting
1155  * down.
1156  */
1158  break;
1159 
1160  /*
1161  * We only send regular messages to the client for full decoded
1162  * transactions, but a synchronous replication and walsender shutdown
1163  * possibly are waiting for a later location. So we send pings
1164  * containing the flush location every now and then.
1165  */
1166  if (MyWalSnd->flush < sentPtr &&
1167  MyWalSnd->write < sentPtr &&
1169  {
1170  WalSndKeepalive(false);
1172  }
1173 
1174  /* check whether we're done */
1175  if (loc <= RecentFlushPtr)
1176  break;
1177 
1178  /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1179  WalSndCaughtUp = true;
1180 
1181  /*
1182  * Try to flush pending output to the client. Also wait for the socket
1183  * becoming writable, if there's still pending output after an attempt
1184  * to flush. Otherwise we might just sit on output data while waiting
1185  * for new WAL being generated.
1186  */
1187  if (pq_flush_if_writable() != 0)
1188  WalSndShutdown();
1189 
1190  now = GetCurrentTimestamp();
1191 
1192  /* die if timeout was reached */
1193  WalSndCheckTimeOut(now);
1194 
1195  /* Send keepalive if the time has come */
1197 
1198  sleeptime = WalSndComputeSleeptime(now);
1199 
1200  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1202 
1203  if (pq_is_send_pending())
1204  wakeEvents |= WL_SOCKET_WRITEABLE;
1205 
1206  /* Sleep until something happens or we time out */
1207  WaitLatchOrSocket(MyLatch, wakeEvents,
1208  MyProcPort->sock, sleeptime,
1210  }
1211 
1212  /* reactivate latch so WalSndLoop knows to continue */
1213  SetLatch(MyLatch);
1214  return RecentFlushPtr;
1215 }
1216 
1217 /*
1218  * Execute an incoming replication command.
1219  */
1220 void
1221 exec_replication_command(const char *cmd_string)
1222 {
1223  int parse_rc;
1224  Node *cmd_node;
1225  MemoryContext cmd_context;
1226  MemoryContext old_context;
1227 
1228  /*
1229  * Log replication command if log_replication_commands is enabled. Even
1230  * when it's disabled, log the command with DEBUG1 level for backward
1231  * compatibility.
1232  */
1234  (errmsg("received replication command: %s", cmd_string)));
1235 
1236  /*
1237  * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1238  * command arrives. Clean up the old stuff if there's anything.
1239  */
1241 
1243 
1245  "Replication command context",
1247  old_context = MemoryContextSwitchTo(cmd_context);
1248 
1249  replication_scanner_init(cmd_string);
1250  parse_rc = replication_yyparse();
1251  if (parse_rc != 0)
1252  ereport(ERROR,
1253  (errcode(ERRCODE_SYNTAX_ERROR),
1254  (errmsg_internal("replication command parser returned %d",
1255  parse_rc))));
1256 
1257  cmd_node = replication_parse_result;
1258 
1259  /*
1260  * Allocate buffers that will be used for each outgoing and incoming
1261  * message. We do this just once per command to reduce palloc overhead.
1262  */
1263  initStringInfo(&output_message);
1264  initStringInfo(&reply_message);
1265  initStringInfo(&tmpbuf);
1266 
1267  switch (cmd_node->type)
1268  {
1269  case T_IdentifySystemCmd:
1270  IdentifySystem();
1271  break;
1272 
1273  case T_BaseBackupCmd:
1274  SendBaseBackup((BaseBackupCmd *) cmd_node);
1275  break;
1276 
1279  break;
1280 
1283  break;
1284 
1285  case T_StartReplicationCmd:
1286  {
1287  StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1288 
1289  if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1290  StartReplication(cmd);
1291  else
1293  break;
1294  }
1295 
1296  case T_TimeLineHistoryCmd:
1298  break;
1299 
1300  case T_VariableShowStmt:
1301  {
1303  VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1304 
1305  GetPGVariable(n->name, dest);
1306  }
1307  break;
1308 
1309  default:
1310  elog(ERROR, "unrecognized replication command node tag: %u",
1311  cmd_node->type);
1312  }
1313 
1314  /* done */
1315  MemoryContextSwitchTo(old_context);
1316  MemoryContextDelete(cmd_context);
1317 
1318  /* Send CommandComplete message */
1319  EndCommand("SELECT", DestRemote);
1320 }
1321 
1322 /*
1323  * Process any incoming messages while streaming. Also checks if the remote
1324  * end has closed the connection.
1325  */
1326 static void
1328 {
1329  unsigned char firstchar;
1330  int r;
1331  bool received = false;
1332 
1333  for (;;)
1334  {
1335  pq_startmsgread();
1336  r = pq_getbyte_if_available(&firstchar);
1337  if (r < 0)
1338  {
1339  /* unexpected error or EOF */
1341  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1342  errmsg("unexpected EOF on standby connection")));
1343  proc_exit(0);
1344  }
1345  if (r == 0)
1346  {
1347  /* no data available without blocking */
1348  pq_endmsgread();
1349  break;
1350  }
1351 
1352  /* Read the message contents */
1353  resetStringInfo(&reply_message);
1354  if (pq_getmessage(&reply_message, 0))
1355  {
1357  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1358  errmsg("unexpected EOF on standby connection")));
1359  proc_exit(0);
1360  }
1361 
1362  /*
1363  * If we already received a CopyDone from the frontend, the frontend
1364  * should not send us anything until we've closed our end of the COPY.
1365  * XXX: In theory, the frontend could already send the next command
1366  * before receiving the CopyDone, but libpq doesn't currently allow
1367  * that.
1368  */
1369  if (streamingDoneReceiving && firstchar != 'X')
1370  ereport(FATAL,
1371  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1372  errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
1373  firstchar)));
1374 
1375  /* Handle the very limited subset of commands expected in this phase */
1376  switch (firstchar)
1377  {
1378  /*
1379  * 'd' means a standby reply wrapped in a CopyData packet.
1380  */
1381  case 'd':
1383  received = true;
1384  break;
1385 
1386  /*
1387  * CopyDone means the standby requested to finish streaming.
1388  * Reply with CopyDone, if we had not sent that already.
1389  */
1390  case 'c':
1391  if (!streamingDoneSending)
1392  {
1393  pq_putmessage_noblock('c', NULL, 0);
1394  streamingDoneSending = true;
1395  }
1396 
1397  streamingDoneReceiving = true;
1398  received = true;
1399  break;
1400 
1401  /*
1402  * 'X' means that the standby is closing down the socket.
1403  */
1404  case 'X':
1405  proc_exit(0);
1406 
1407  default:
1408  ereport(FATAL,
1409  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1410  errmsg("invalid standby message type \"%c\"",
1411  firstchar)));
1412  }
1413  }
1414 
1415  /*
1416  * Save the last reply timestamp if we've received at least one reply.
1417  */
1418  if (received)
1419  {
1421  waiting_for_ping_response = false;
1422  }
1423 }
1424 
1425 /*
1426  * Process a status update message received from standby.
1427  */
1428 static void
1430 {
1431  char msgtype;
1432 
1433  /*
1434  * Check message type from the first byte.
1435  */
1436  msgtype = pq_getmsgbyte(&reply_message);
1437 
1438  switch (msgtype)
1439  {
1440  case 'r':
1442  break;
1443 
1444  case 'h':
1446  break;
1447 
1448  default:
1450  (errcode(ERRCODE_PROTOCOL_VIOLATION),
1451  errmsg("unexpected message type \"%c\"", msgtype)));
1452  proc_exit(0);
1453  }
1454 }
1455 
1456 /*
1457  * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
1458  */
1459 static void
1461 {
1462  bool changed = false;
1464 
1465  Assert(lsn != InvalidXLogRecPtr);
1466  SpinLockAcquire(&slot->mutex);
1467  if (slot->data.restart_lsn != lsn)
1468  {
1469  changed = true;
1470  slot->data.restart_lsn = lsn;
1471  }
1472  SpinLockRelease(&slot->mutex);
1473 
1474  if (changed)
1475  {
1478  }
1479 
1480  /*
1481  * One could argue that the slot should be saved to disk now, but that'd
1482  * be energy wasted - the worst lost information can do here is give us
1483  * wrong information in a statistics view - we'll just potentially be more
1484  * conservative in removing files.
1485  */
1486 }
1487 
1488 /*
1489  * Regular reply from standby advising of WAL positions on standby server.
1490  */
1491 static void
1493 {
1494  XLogRecPtr writePtr,
1495  flushPtr,
1496  applyPtr;
1497  bool replyRequested;
1498 
1499  /* the caller already consumed the msgtype byte */
1500  writePtr = pq_getmsgint64(&reply_message);
1501  flushPtr = pq_getmsgint64(&reply_message);
1502  applyPtr = pq_getmsgint64(&reply_message);
1503  (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1504  replyRequested = pq_getmsgbyte(&reply_message);
1505 
1506  elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
1507  (uint32) (writePtr >> 32), (uint32) writePtr,
1508  (uint32) (flushPtr >> 32), (uint32) flushPtr,
1509  (uint32) (applyPtr >> 32), (uint32) applyPtr,
1510  replyRequested ? " (reply requested)" : "");
1511 
1512  /* Send a reply if the standby requested one. */
1513  if (replyRequested)
1514  WalSndKeepalive(false);
1515 
1516  /*
1517  * Update shared state for this WalSender process based on reply data from
1518  * standby.
1519  */
1520  {
1521  WalSnd *walsnd = MyWalSnd;
1522 
1523  SpinLockAcquire(&walsnd->mutex);
1524  walsnd->write = writePtr;
1525  walsnd->flush = flushPtr;
1526  walsnd->apply = applyPtr;
1527  SpinLockRelease(&walsnd->mutex);
1528  }
1529 
1532 
1533  /*
1534  * Advance our local xmin horizon when the client confirmed a flush.
1535  */
1536  if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1537  {
1540  else
1542  }
1543 }
1544 
1545 /* compute new replication slot xmin horizon if needed */
1546 static void
1548 {
1549  bool changed = false;
1551 
1552  SpinLockAcquire(&slot->mutex);
1554 
1555  /*
1556  * For physical replication we don't need the interlock provided by xmin
1557  * and effective_xmin since the consequences of a missed increase are
1558  * limited to query cancellations, so set both at once.
1559  */
1560  if (!TransactionIdIsNormal(slot->data.xmin) ||
1561  !TransactionIdIsNormal(feedbackXmin) ||
1562  TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1563  {
1564  changed = true;
1565  slot->data.xmin = feedbackXmin;
1566  slot->effective_xmin = feedbackXmin;
1567  }
1568  SpinLockRelease(&slot->mutex);
1569 
1570  if (changed)
1571  {
1574  }
1575 }
1576 
1577 /*
1578  * Hot Standby feedback
1579  */
1580 static void
1582 {
1583  TransactionId nextXid;
1584  uint32 nextEpoch;
1585  TransactionId feedbackXmin;
1586  uint32 feedbackEpoch;
1587 
1588  /*
1589  * Decipher the reply message. The caller already consumed the msgtype
1590  * byte.
1591  */
1592  (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1593  feedbackXmin = pq_getmsgint(&reply_message, 4);
1594  feedbackEpoch = pq_getmsgint(&reply_message, 4);
1595 
1596  elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
1597  feedbackXmin,
1598  feedbackEpoch);
1599 
1600  /* Unset WalSender's xmin if the feedback message value is invalid */
1601  if (!TransactionIdIsNormal(feedbackXmin))
1602  {
1604  if (MyReplicationSlot != NULL)
1605  PhysicalReplicationSlotNewXmin(feedbackXmin);
1606  return;
1607  }
1608 
1609  /*
1610  * Check that the provided xmin/epoch are sane, that is, not in the future
1611  * and not so far back as to be already wrapped around. Ignore if not.
1612  *
1613  * Epoch of nextXid should be same as standby, or if the counter has
1614  * wrapped, then one greater than standby.
1615  */
1616  GetNextXidAndEpoch(&nextXid, &nextEpoch);
1617 
1618  if (feedbackXmin <= nextXid)
1619  {
1620  if (feedbackEpoch != nextEpoch)
1621  return;
1622  }
1623  else
1624  {
1625  if (feedbackEpoch + 1 != nextEpoch)
1626  return;
1627  }
1628 
1629  if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid))
1630  return; /* epoch OK, but it's wrapped around */
1631 
1632  /*
1633  * Set the WalSender's xmin equal to the standby's requested xmin, so that
1634  * the xmin will be taken into account by GetOldestXmin. This will hold
1635  * back the removal of dead rows and thereby prevent the generation of
1636  * cleanup conflicts on the standby server.
1637  *
1638  * There is a small window for a race condition here: although we just
1639  * checked that feedbackXmin precedes nextXid, the nextXid could have
1640  * gotten advanced between our fetching it and applying the xmin below,
1641  * perhaps far enough to make feedbackXmin wrap around. In that case the
1642  * xmin we set here would be "in the future" and have no effect. No point
1643  * in worrying about this since it's too late to save the desired data
1644  * anyway. Assuming that the standby sends us an increasing sequence of
1645  * xmins, this could only happen during the first reply cycle, else our
1646  * own xmin would prevent nextXid from advancing so far.
1647  *
1648  * We don't bother taking the ProcArrayLock here. Setting the xmin field
1649  * is assumed atomic, and there's no real need to prevent a concurrent
1650  * GetOldestXmin. (If we're moving our xmin forward, this is obviously
1651  * safe, and if we're moving it backwards, well, the data is at risk
1652  * already since a VACUUM could have just finished calling GetOldestXmin.)
1653  *
1654  * If we're using a replication slot we reserve the xmin via that,
1655  * otherwise via the walsender's PGXACT entry.
1656  *
1657  * XXX: It might make sense to generalize the ephemeral slot concept and
1658  * always use the slot mechanism to handle the feedback xmin.
1659  */
1660  if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
1661  PhysicalReplicationSlotNewXmin(feedbackXmin);
1662  else
1663  MyPgXact->xmin = feedbackXmin;
1664 }
1665 
1666 /*
1667  * Compute how long send/receive loops should sleep.
1668  *
1669  * If wal_sender_timeout is enabled we want to wake up in time to send
1670  * keepalives and to abort the connection if wal_sender_timeout has been
1671  * reached.
1672  */
1673 static long
1675 {
1676  long sleeptime = 10000; /* 10 s */
1677 
1679  {
1680  TimestampTz wakeup_time;
1681  long sec_to_timeout;
1682  int microsec_to_timeout;
1683 
1684  /*
1685  * At the latest stop sleeping once wal_sender_timeout has been
1686  * reached.
1687  */
1690 
1691  /*
1692  * If no ping has been sent yet, wakeup when it's time to do so.
1693  * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
1694  * the timeout passed without a response.
1695  */
1698  wal_sender_timeout / 2);
1699 
1700  /* Compute relative time until wakeup. */
1701  TimestampDifference(now, wakeup_time,
1702  &sec_to_timeout, &microsec_to_timeout);
1703 
1704  sleeptime = sec_to_timeout * 1000 +
1705  microsec_to_timeout / 1000;
1706  }
1707 
1708  return sleeptime;
1709 }
1710 
1711 /*
1712  * Check whether there have been responses by the client within
1713  * wal_sender_timeout and shutdown if not.
1714  */
1715 static void
1717 {
1718  TimestampTz timeout;
1719 
1720  /* don't bail out if we're doing something that doesn't require timeouts */
1721  if (last_reply_timestamp <= 0)
1722  return;
1723 
1726 
1727  if (wal_sender_timeout > 0 && now >= timeout)
1728  {
1729  /*
1730  * Since typically expiration of replication timeout means
1731  * communication problem, we don't send the error message to the
1732  * standby.
1733  */
1735  (errmsg("terminating walsender process due to replication timeout")));
1736 
1737  WalSndShutdown();
1738  }
1739 }
1740 
1741 /* Main loop of walsender process that streams the WAL over Copy messages. */
1742 static void
1744 {
1745  /*
1746  * Initialize the last reply timestamp. That enables timeout processing
1747  * from hereon.
1748  */
1750  waiting_for_ping_response = false;
1751 
1752  /* Report to pgstat that this process is a WAL sender */
1753  pgstat_report_activity(STATE_RUNNING, "walsender");
1754 
1755  /*
1756  * Loop until we reach the end of this timeline or the client requests to
1757  * stop streaming.
1758  */
1759  for (;;)
1760  {
1761  TimestampTz now;
1762 
1763  /*
1764  * Emergency bailout if postmaster has died. This is to avoid the
1765  * necessity for manual cleanup of all postmaster children.
1766  */
1767  if (!PostmasterIsAlive())
1768  exit(1);
1769 
1770  /* Clear any already-pending wakeups */
1772 
1774 
1775  /* Process any requests or signals received recently */
1776  if (got_SIGHUP)
1777  {
1778  got_SIGHUP = false;
1781  }
1782 
1783  /* Check for input from the client */
1785 
1786  /*
1787  * If we have received CopyDone from the client, sent CopyDone
1788  * ourselves, and the output buffer is empty, it's time to exit
1789  * streaming.
1790  */
1792  break;
1793 
1794  /*
1795  * If we don't have any pending data in the output buffer, try to send
1796  * some more. If there is some, we don't bother to call send_data
1797  * again until we've flushed it ... but we'd better assume we are not
1798  * caught up.
1799  */
1800  if (!pq_is_send_pending())
1801  send_data();
1802  else
1803  WalSndCaughtUp = false;
1804 
1805  /* Try to flush pending output to the client */
1806  if (pq_flush_if_writable() != 0)
1807  WalSndShutdown();
1808 
1809  /* If nothing remains to be sent right now ... */
1811  {
1812  /*
1813  * If we're in catchup state, move to streaming. This is an
1814  * important state change for users to know about, since before
1815  * this point data loss might occur if the primary dies and we
1816  * need to failover to the standby. The state change is also
1817  * important for synchronous replication, since commits that
1818  * started to wait at that point might wait for some time.
1819  */
1820  if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
1821  {
1822  ereport(DEBUG1,
1823  (errmsg("standby \"%s\" has now caught up with primary",
1824  application_name)));
1826  }
1827 
1828  /*
1829  * When SIGUSR2 arrives, we send any outstanding logs up to the
1830  * shutdown checkpoint record (i.e., the latest record), wait for
1831  * them to be replicated to the standby, and exit. This may be a
1832  * normal termination at shutdown, or a promotion, the walsender
1833  * is not sure which.
1834  */
1836  WalSndDone(send_data);
1837  }
1838 
1839  now = GetCurrentTimestamp();
1840 
1841  /* Check for replication timeout. */
1842  WalSndCheckTimeOut(now);
1843 
1844  /* Send keepalive if the time has come */
1846 
1847  /*
1848  * We don't block if not caught up, unless there is unsent data
1849  * pending in which case we'd better block until the socket is
1850  * write-ready. This test is only needed for the case where the
1851  * send_data callback handled a subset of the available data but then
1852  * pq_flush_if_writable flushed it all --- we should immediately try
1853  * to send more.
1854  */
1856  {
1857  long sleeptime;
1858  int wakeEvents;
1859 
1860  wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT |
1862 
1863  sleeptime = WalSndComputeSleeptime(now);
1864 
1865  if (pq_is_send_pending())
1866  wakeEvents |= WL_SOCKET_WRITEABLE;
1867 
1868  /* Sleep until something happens or we time out */
1869  WaitLatchOrSocket(MyLatch, wakeEvents,
1870  MyProcPort->sock, sleeptime,
1872  }
1873  }
1874  return;
1875 }
1876 
1877 /* Initialize a per-walsender data structure for this walsender process */
1878 static void
1880 {
1881  int i;
1882 
1883  /*
1884  * WalSndCtl should be set up already (we inherit this by fork() or
1885  * EXEC_BACKEND mechanism from the postmaster).
1886  */
1887  Assert(WalSndCtl != NULL);
1888  Assert(MyWalSnd == NULL);
1889 
1890  /*
1891  * Find a free walsender slot and reserve it. If this fails, we must be
1892  * out of WalSnd structures.
1893  */
1894  for (i = 0; i < max_wal_senders; i++)
1895  {
1896  WalSnd *walsnd = &WalSndCtl->walsnds[i];
1897 
1898  SpinLockAcquire(&walsnd->mutex);
1899 
1900  if (walsnd->pid != 0)
1901  {
1902  SpinLockRelease(&walsnd->mutex);
1903  continue;
1904  }
1905  else
1906  {
1907  /*
1908  * Found a free slot. Reserve it for us.
1909  */
1910  walsnd->pid = MyProcPid;
1911  walsnd->sentPtr = InvalidXLogRecPtr;
1912  walsnd->write = InvalidXLogRecPtr;
1913  walsnd->flush = InvalidXLogRecPtr;
1914  walsnd->apply = InvalidXLogRecPtr;
1915  walsnd->state = WALSNDSTATE_STARTUP;
1916  walsnd->latch = &MyProc->procLatch;
1917  SpinLockRelease(&walsnd->mutex);
1918  /* don't need the lock anymore */
1919  MyWalSnd = (WalSnd *) walsnd;
1920 
1921  break;
1922  }
1923  }
1924  if (MyWalSnd == NULL)
1925  ereport(FATAL,
1926  (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
1927  errmsg("number of requested standby connections "
1928  "exceeds max_wal_senders (currently %d)",
1929  max_wal_senders)));
1930 
1931  /* Arrange to clean up at walsender exit */
1933 }
1934 
1935 /* Destroy the per-walsender data structure for this walsender process */
1936 static void
1938 {
1939  WalSnd *walsnd = MyWalSnd;
1940 
1941  Assert(walsnd != NULL);
1942 
1943  MyWalSnd = NULL;
1944 
1945  SpinLockAcquire(&walsnd->mutex);
1946  /* clear latch while holding the spinlock, so it can safely be read */
1947  walsnd->latch = NULL;
1948  /* Mark WalSnd struct as no longer being in use. */
1949  walsnd->pid = 0;
1950  SpinLockRelease(&walsnd->mutex);
1951 }
1952 
1953 /*
1954  * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
1955  *
1956  * XXX probably this should be improved to suck data directly from the
1957  * WAL buffers when possible.
1958  *
1959  * Will open, and keep open, one WAL segment stored in the global file
1960  * descriptor sendFile. This means if XLogRead is used once, there will
1961  * always be one descriptor left open until the process ends, but never
1962  * more than one.
1963  */
1964 static void
1965 XLogRead(char *buf, XLogRecPtr startptr, Size count)
1966 {
1967  char *p;
1968  XLogRecPtr recptr;
1969  Size nbytes;
1970  XLogSegNo segno;
1971 
1972 retry:
1973  p = buf;
1974  recptr = startptr;
1975  nbytes = count;
1976 
1977  while (nbytes > 0)
1978  {
1979  uint32 startoff;
1980  int segbytes;
1981  int readbytes;
1982 
1983  startoff = recptr % XLogSegSize;
1984 
1985  if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
1986  {
1987  char path[MAXPGPATH];
1988 
1989  /* Switch to another logfile segment */
1990  if (sendFile >= 0)
1991  close(sendFile);
1992 
1993  XLByteToSeg(recptr, sendSegNo);
1994 
1995  /*-------
1996  * When reading from a historic timeline, and there is a timeline
1997  * switch within this segment, read from the WAL segment belonging
1998  * to the new timeline.
1999  *
2000  * For example, imagine that this server is currently on timeline
2001  * 5, and we're streaming timeline 4. The switch from timeline 4
2002  * to 5 happened at 0/13002088. In pg_wal, we have these files:
2003  *
2004  * ...
2005  * 000000040000000000000012
2006  * 000000040000000000000013
2007  * 000000050000000000000013
2008  * 000000050000000000000014
2009  * ...
2010  *
2011  * In this situation, when requested to send the WAL from
2012  * segment 0x13, on timeline 4, we read the WAL from file
2013  * 000000050000000000000013. Archive recovery prefers files from
2014  * newer timelines, so if the segment was restored from the
2015  * archive on this server, the file belonging to the old timeline,
2016  * 000000040000000000000013, might not exist. Their contents are
2017  * equal up to the switchpoint, because at a timeline switch, the
2018  * used portion of the old segment is copied to the new file.
2019  *-------
2020  */
2023  {
2024  XLogSegNo endSegNo;
2025 
2027  if (sendSegNo == endSegNo)
2029  }
2030 
2032 
2033  sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
2034  if (sendFile < 0)
2035  {
2036  /*
2037  * If the file is not found, assume it's because the standby
2038  * asked for a too old WAL segment that has already been
2039  * removed or recycled.
2040  */
2041  if (errno == ENOENT)
2042  ereport(ERROR,
2044  errmsg("requested WAL segment %s has already been removed",
2046  else
2047  ereport(ERROR,
2049  errmsg("could not open file \"%s\": %m",
2050  path)));
2051  }
2052  sendOff = 0;
2053  }
2054 
2055  /* Need to seek in the file? */
2056  if (sendOff != startoff)
2057  {
2058  if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
2059  ereport(ERROR,
2061  errmsg("could not seek in log segment %s to offset %u: %m",
2063  startoff)));
2064  sendOff = startoff;
2065  }
2066 
2067  /* How many bytes are within this segment? */
2068  if (nbytes > (XLogSegSize - startoff))
2069  segbytes = XLogSegSize - startoff;
2070  else
2071  segbytes = nbytes;
2072 
2073  readbytes = read(sendFile, p, segbytes);
2074  if (readbytes <= 0)
2075  {
2076  ereport(ERROR,
2078  errmsg("could not read from log segment %s, offset %u, length %lu: %m",
2080  sendOff, (unsigned long) segbytes)));
2081  }
2082 
2083  /* Update state for read */
2084  recptr += readbytes;
2085 
2086  sendOff += readbytes;
2087  nbytes -= readbytes;
2088  p += readbytes;
2089  }
2090 
2091  /*
2092  * After reading into the buffer, check that what we read was valid. We do
2093  * this after reading, because even though the segment was present when we
2094  * opened it, it might get recycled or removed while we read it. The
2095  * read() succeeds in that case, but the data we tried to read might
2096  * already have been overwritten with new WAL records.
2097  */
2098  XLByteToSeg(startptr, segno);
2100 
2101  /*
2102  * During recovery, the currently-open WAL file might be replaced with the
2103  * file of the same name retrieved from archive. So we always need to
2104  * check what we read was valid after reading into the buffer. If it's
2105  * invalid, we try to open and read the file again.
2106  */
2108  {
2109  WalSnd *walsnd = MyWalSnd;
2110  bool reload;
2111 
2112  SpinLockAcquire(&walsnd->mutex);
2113  reload = walsnd->needreload;
2114  walsnd->needreload = false;
2115  SpinLockRelease(&walsnd->mutex);
2116 
2117  if (reload && sendFile >= 0)
2118  {
2119  close(sendFile);
2120  sendFile = -1;
2121 
2122  goto retry;
2123  }
2124  }
2125 }
2126 
2127 /*
2128  * Send out the WAL in its normal physical/stored form.
2129  *
2130  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
2131  * but not yet sent to the client, and buffer it in the libpq output
2132  * buffer.
2133  *
2134  * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
2135  * otherwise WalSndCaughtUp is set to false.
2136  */
2137 static void
2139 {
2140  XLogRecPtr SendRqstPtr;
2141  XLogRecPtr startptr;
2142  XLogRecPtr endptr;
2143  Size nbytes;
2144 
2146  {
2147  WalSndCaughtUp = true;
2148  return;
2149  }
2150 
2151  /* Figure out how far we can safely send the WAL. */
2153  {
2154  /*
2155  * Streaming an old timeline that's in this server's history, but is
2156  * not the one we're currently inserting or replaying. It can be
2157  * streamed up to the point where we switched off that timeline.
2158  */
2159  SendRqstPtr = sendTimeLineValidUpto;
2160  }
2161  else if (am_cascading_walsender)
2162  {
2163  /*
2164  * Streaming the latest timeline on a standby.
2165  *
2166  * Attempt to send all WAL that has already been replayed, so that we
2167  * know it's valid. If we're receiving WAL through streaming
2168  * replication, it's also OK to send any WAL that has been received
2169  * but not replayed.
2170  *
2171  * The timeline we're recovering from can change, or we can be
2172  * promoted. In either case, the current timeline becomes historic. We
2173  * need to detect that so that we don't try to stream past the point
2174  * where we switched to another timeline. We check for promotion or
2175  * timeline switch after calculating FlushPtr, to avoid a race
2176  * condition: if the timeline becomes historic just after we checked
2177  * that it was still current, it's still be OK to stream it up to the
2178  * FlushPtr that was calculated before it became historic.
2179  */
2180  bool becameHistoric = false;
2181 
2182  SendRqstPtr = GetStandbyFlushRecPtr();
2183 
2184  if (!RecoveryInProgress())
2185  {
2186  /*
2187  * We have been promoted. RecoveryInProgress() updated
2188  * ThisTimeLineID to the new current timeline.
2189  */
2190  am_cascading_walsender = false;
2191  becameHistoric = true;
2192  }
2193  else
2194  {
2195  /*
2196  * Still a cascading standby. But is the timeline we're sending
2197  * still the one recovery is recovering from? ThisTimeLineID was
2198  * updated by the GetStandbyFlushRecPtr() call above.
2199  */
2201  becameHistoric = true;
2202  }
2203 
2204  if (becameHistoric)
2205  {
2206  /*
2207  * The timeline we were sending has become historic. Read the
2208  * timeline history file of the new timeline to see where exactly
2209  * we forked off from the timeline we were sending.
2210  */
2211  List *history;
2212 
2215 
2217  list_free_deep(history);
2218 
2219  sendTimeLineIsHistoric = true;
2220 
2221  SendRqstPtr = sendTimeLineValidUpto;
2222  }
2223  }
2224  else
2225  {
2226  /*
2227  * Streaming the current timeline on a master.
2228  *
2229  * Attempt to send all data that's already been written out and
2230  * fsync'd to disk. We cannot go further than what's been written out
2231  * given the current implementation of XLogRead(). And in any case
2232  * it's unsafe to send WAL that is not securely down to disk on the
2233  * master: if the master subsequently crashes and restarts, slaves
2234  * must not have applied any WAL that gets lost on the master.
2235  */
2236  SendRqstPtr = GetFlushRecPtr();
2237  }
2238 
2239  /*
2240  * If this is a historic timeline and we've reached the point where we
2241  * forked to the next timeline, stop streaming.
2242  *
2243  * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2244  * startup process will normally replay all WAL that has been received
2245  * from the master, before promoting, but if the WAL streaming is
2246  * terminated at a WAL page boundary, the valid portion of the timeline
2247  * might end in the middle of a WAL record. We might've already sent the
2248  * first half of that partial WAL record to the cascading standby, so that
2249  * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2250  * replay the partial WAL record either, so it can still follow our
2251  * timeline switch.
2252  */
2254  {
2255  /* close the current file. */
2256  if (sendFile >= 0)
2257  close(sendFile);
2258  sendFile = -1;
2259 
2260  /* Send CopyDone */
2261  pq_putmessage_noblock('c', NULL, 0);
2262  streamingDoneSending = true;
2263 
2264  WalSndCaughtUp = true;
2265 
2266  elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2268  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2269  return;
2270  }
2271 
2272  /* Do we have any work to do? */
2273  Assert(sentPtr <= SendRqstPtr);
2274  if (SendRqstPtr <= sentPtr)
2275  {
2276  WalSndCaughtUp = true;
2277  return;
2278  }
2279 
2280  /*
2281  * Figure out how much to send in one message. If there's no more than
2282  * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2283  * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2284  *
2285  * The rounding is not only for performance reasons. Walreceiver relies on
2286  * the fact that we never split a WAL record across two messages. Since a
2287  * long WAL record is split at page boundary into continuation records,
2288  * page boundary is always a safe cut-off point. We also assume that
2289  * SendRqstPtr never points to the middle of a WAL record.
2290  */
2291  startptr = sentPtr;
2292  endptr = startptr;
2293  endptr += MAX_SEND_SIZE;
2294 
2295  /* if we went beyond SendRqstPtr, back off */
2296  if (SendRqstPtr <= endptr)
2297  {
2298  endptr = SendRqstPtr;
2300  WalSndCaughtUp = false;
2301  else
2302  WalSndCaughtUp = true;
2303  }
2304  else
2305  {
2306  /* round down to page boundary. */
2307  endptr -= (endptr % XLOG_BLCKSZ);
2308  WalSndCaughtUp = false;
2309  }
2310 
2311  nbytes = endptr - startptr;
2312  Assert(nbytes <= MAX_SEND_SIZE);
2313 
2314  /*
2315  * OK to read and send the slice.
2316  */
2317  resetStringInfo(&output_message);
2318  pq_sendbyte(&output_message, 'w');
2319 
2320  pq_sendint64(&output_message, startptr); /* dataStart */
2321  pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2322  pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2323 
2324  /*
2325  * Read the log directly into the output buffer to avoid extra memcpy
2326  * calls.
2327  */
2328  enlargeStringInfo(&output_message, nbytes);
2329  XLogRead(&output_message.data[output_message.len], startptr, nbytes);
2330  output_message.len += nbytes;
2331  output_message.data[output_message.len] = '\0';
2332 
2333  /*
2334  * Fill the send timestamp last, so that it is taken as late as possible.
2335  */
2336  resetStringInfo(&tmpbuf);
2338  memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2339  tmpbuf.data, sizeof(int64));
2340 
2341  pq_putmessage_noblock('d', output_message.data, output_message.len);
2342 
2343  sentPtr = endptr;
2344 
2345  /* Update shared memory status */
2346  {
2347  WalSnd *walsnd = MyWalSnd;
2348 
2349  SpinLockAcquire(&walsnd->mutex);
2350  walsnd->sentPtr = sentPtr;
2351  SpinLockRelease(&walsnd->mutex);
2352  }
2353 
2354  /* Report progress of XLOG streaming in PS display */
2356  {
2357  char activitymsg[50];
2358 
2359  snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
2360  (uint32) (sentPtr >> 32), (uint32) sentPtr);
2361  set_ps_display(activitymsg, false);
2362  }
2363 
2364  return;
2365 }
2366 
2367 /*
2368  * Stream out logically decoded data.
2369  */
2370 static void
2372 {
2373  XLogRecord *record;
2374  char *errm;
2375 
2376  /*
2377  * Don't know whether we've caught up yet. We'll set it to true in
2378  * WalSndWaitForWal, if we're actually waiting. We also set to true if
2379  * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
2380  * i.e. when we're shutting down.
2381  */
2382  WalSndCaughtUp = false;
2383 
2384  record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
2386 
2387  /* xlog record was invalid */
2388  if (errm != NULL)
2389  elog(ERROR, "%s", errm);
2390 
2391  if (record != NULL)
2392  {
2393  LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
2394 
2395  sentPtr = logical_decoding_ctx->reader->EndRecPtr;
2396  }
2397  else
2398  {
2399  /*
2400  * If the record we just wanted read is at or beyond the flushed
2401  * point, then we're caught up.
2402  */
2403  if (logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr())
2404  WalSndCaughtUp = true;
2405  }
2406 
2407  /* Update shared memory status */
2408  {
2409  WalSnd *walsnd = MyWalSnd;
2410 
2411  SpinLockAcquire(&walsnd->mutex);
2412  walsnd->sentPtr = sentPtr;
2413  SpinLockRelease(&walsnd->mutex);
2414  }
2415 }
2416 
2417 /*
2418  * Shutdown if the sender is caught up.
2419  *
2420  * NB: This should only be called when the shutdown signal has been received
2421  * from postmaster.
2422  *
2423  * Note that if we determine that there's still more data to send, this
2424  * function will return control to the caller.
2425  */
2426 static void
2428 {
2429  XLogRecPtr replicatedPtr;
2430 
2431  /* ... let's just be real sure we're caught up ... */
2432  send_data();
2433 
2434  /*
2435  * To figure out whether all WAL has successfully been replicated, check
2436  * flush location if valid, write otherwise. Tools like pg_receivewal
2437  * will usually (unless in synchronous mode) return an invalid flush
2438  * location.
2439  */
2440  replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2441  MyWalSnd->write : MyWalSnd->flush;
2442 
2443  if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2444  !pq_is_send_pending())
2445  {
2446  /* Inform the standby that XLOG streaming is done */
2447  EndCommand("COPY 0", DestRemote);
2448  pq_flush();
2449 
2450  proc_exit(0);
2451  }
2453  {
2454  WalSndKeepalive(true);
2456  }
2457 }
2458 
2459 /*
2460  * Returns the latest point in WAL that has been safely flushed to disk, and
2461  * can be sent to the standby. This should only be called when in recovery,
2462  * ie. we're streaming to a cascaded standby.
2463  *
2464  * As a side-effect, ThisTimeLineID is updated to the TLI of the last
2465  * replayed WAL record.
2466  */
2467 static XLogRecPtr
2469 {
2470  XLogRecPtr replayPtr;
2471  TimeLineID replayTLI;
2472  XLogRecPtr receivePtr;
2474  XLogRecPtr result;
2475 
2476  /*
2477  * We can safely send what's already been replayed. Also, if walreceiver
2478  * is streaming WAL from the same timeline, we can send anything that it
2479  * has streamed, but hasn't been replayed yet.
2480  */
2481 
2482  receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
2483  replayPtr = GetXLogReplayRecPtr(&replayTLI);
2484 
2485  ThisTimeLineID = replayTLI;
2486 
2487  result = replayPtr;
2488  if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2489  result = receivePtr;
2490 
2491  return result;
2492 }
2493 
2494 /*
2495  * Request walsenders to reload the currently-open WAL file
2496  */
2497 void
2499 {
2500  int i;
2501 
2502  for (i = 0; i < max_wal_senders; i++)
2503  {
2504  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2505 
2506  if (walsnd->pid == 0)
2507  continue;
2508 
2509  SpinLockAcquire(&walsnd->mutex);
2510  walsnd->needreload = true;
2511  SpinLockRelease(&walsnd->mutex);
2512  }
2513 }
2514 
2515 /* SIGHUP: set flag to re-read config file at next convenient time */
2516 static void
2518 {
2519  int save_errno = errno;
2520 
2521  got_SIGHUP = true;
2522 
2523  SetLatch(MyLatch);
2524 
2525  errno = save_errno;
2526 }
2527 
2528 /* SIGUSR1: set flag to send WAL records */
2529 static void
2531 {
2532  int save_errno = errno;
2533 
2535 
2536  errno = save_errno;
2537 }
2538 
2539 /* SIGUSR2: set flag to do a last cycle and shut down afterwards */
2540 static void
2542 {
2543  int save_errno = errno;
2544 
2545  /*
2546  * If replication has not yet started, die like with SIGTERM. If
2547  * replication is active, only set a flag and wake up the main loop. It
2548  * will send any outstanding WAL, wait for it to be replicated to the
2549  * standby, and then exit gracefully.
2550  */
2551  if (!replication_active)
2552  kill(MyProcPid, SIGTERM);
2553 
2554  walsender_ready_to_stop = true;
2555  SetLatch(MyLatch);
2556 
2557  errno = save_errno;
2558 }
2559 
2560 /* Set up signal handlers */
2561 void
2563 {
2564  /* Set up signal handlers */
2565  pqsignal(SIGHUP, WalSndSigHupHandler); /* set flag to read config
2566  * file */
2567  pqsignal(SIGINT, SIG_IGN); /* not used */
2568  pqsignal(SIGTERM, die); /* request shutdown */
2569  pqsignal(SIGQUIT, quickdie); /* hard crash time */
2570  InitializeTimeouts(); /* establishes SIGALRM handler */
2572  pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */
2573  pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
2574  * shutdown */
2575 
2576  /* Reset some signals that are accepted by postmaster but not here */
2582 }
2583 
2584 /* Report shared-memory space needed by WalSndShmemInit */
2585 Size
2587 {
2588  Size size = 0;
2589 
2590  size = offsetof(WalSndCtlData, walsnds);
2591  size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
2592 
2593  return size;
2594 }
2595 
2596 /* Allocate and initialize walsender-related shared memory */
2597 void
2599 {
2600  bool found;
2601  int i;
2602 
2603  WalSndCtl = (WalSndCtlData *)
2604  ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
2605 
2606  if (!found)
2607  {
2608  /* First time through, so initialize */
2609  MemSet(WalSndCtl, 0, WalSndShmemSize());
2610 
2611  for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
2612  SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
2613 
2614  for (i = 0; i < max_wal_senders; i++)
2615  {
2616  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2617 
2618  SpinLockInit(&walsnd->mutex);
2619  }
2620  }
2621 }
2622 
2623 /*
2624  * Wake up all walsenders
2625  *
2626  * This will be called inside critical sections, so throwing an error is not
2627  * adviseable.
2628  */
2629 void
2631 {
2632  int i;
2633 
2634  for (i = 0; i < max_wal_senders; i++)
2635  {
2636  Latch *latch;
2637  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2638 
2639  /*
2640  * Get latch pointer with spinlock held, for the unlikely case that
2641  * pointer reads aren't atomic (as they're 8 bytes).
2642  */
2643  SpinLockAcquire(&walsnd->mutex);
2644  latch = walsnd->latch;
2645  SpinLockRelease(&walsnd->mutex);
2646 
2647  if (latch != NULL)
2648  SetLatch(latch);
2649  }
2650 }
2651 
2652 /* Set state for current walsender (only called in walsender) */
2653 void
2655 {
2656  WalSnd *walsnd = MyWalSnd;
2657 
2659 
2660  if (walsnd->state == state)
2661  return;
2662 
2663  SpinLockAcquire(&walsnd->mutex);
2664  walsnd->state = state;
2665  SpinLockRelease(&walsnd->mutex);
2666 }
2667 
2668 /*
2669  * Return a string constant representing the state. This is used
2670  * in system views, and should *not* be translated.
2671  */
2672 static const char *
2674 {
2675  switch (state)
2676  {
2677  case WALSNDSTATE_STARTUP:
2678  return "startup";
2679  case WALSNDSTATE_BACKUP:
2680  return "backup";
2681  case WALSNDSTATE_CATCHUP:
2682  return "catchup";
2683  case WALSNDSTATE_STREAMING:
2684  return "streaming";
2685  }
2686  return "UNKNOWN";
2687 }
2688 
2689 
2690 /*
2691  * Returns activity of walsenders, including pids and xlog locations sent to
2692  * standby servers.
2693  */
2694 Datum
2696 {
2697 #define PG_STAT_GET_WAL_SENDERS_COLS 8
2698  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
2699  TupleDesc tupdesc;
2700  Tuplestorestate *tupstore;
2701  MemoryContext per_query_ctx;
2702  MemoryContext oldcontext;
2703  List *sync_standbys;
2704  int i;
2705 
2706  /* check to see if caller supports us returning a tuplestore */
2707  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
2708  ereport(ERROR,
2709  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2710  errmsg("set-valued function called in context that cannot accept a set")));
2711  if (!(rsinfo->allowedModes & SFRM_Materialize))
2712  ereport(ERROR,
2713  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2714  errmsg("materialize mode required, but it is not " \
2715  "allowed in this context")));
2716 
2717  /* Build a tuple descriptor for our result type */
2718  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
2719  elog(ERROR, "return type must be a row type");
2720 
2721  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
2722  oldcontext = MemoryContextSwitchTo(per_query_ctx);
2723 
2724  tupstore = tuplestore_begin_heap(true, false, work_mem);
2725  rsinfo->returnMode = SFRM_Materialize;
2726  rsinfo->setResult = tupstore;
2727  rsinfo->setDesc = tupdesc;
2728 
2729  MemoryContextSwitchTo(oldcontext);
2730 
2731  /*
2732  * Get the currently active synchronous standbys.
2733  */
2734  LWLockAcquire(SyncRepLock, LW_SHARED);
2735  sync_standbys = SyncRepGetSyncStandbys(NULL);
2736  LWLockRelease(SyncRepLock);
2737 
2738  for (i = 0; i < max_wal_senders; i++)
2739  {
2740  WalSnd *walsnd = &WalSndCtl->walsnds[i];
2742  XLogRecPtr write;
2743  XLogRecPtr flush;
2744  XLogRecPtr apply;
2745  int priority;
2748  bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
2749 
2750  if (walsnd->pid == 0)
2751  continue;
2752 
2753  SpinLockAcquire(&walsnd->mutex);
2754  sentPtr = walsnd->sentPtr;
2755  state = walsnd->state;
2756  write = walsnd->write;
2757  flush = walsnd->flush;
2758  apply = walsnd->apply;
2759  priority = walsnd->sync_standby_priority;
2760  SpinLockRelease(&walsnd->mutex);
2761 
2762  memset(nulls, 0, sizeof(nulls));
2763  values[0] = Int32GetDatum(walsnd->pid);
2764 
2765  if (!superuser())
2766  {
2767  /*
2768  * Only superusers can see details. Other users only get the pid
2769  * value to know it's a walsender, but no details.
2770  */
2771  MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
2772  }
2773  else
2774  {
2775  values[1] = CStringGetTextDatum(WalSndGetStateString(state));
2776 
2777  if (XLogRecPtrIsInvalid(sentPtr))
2778  nulls[2] = true;
2779  values[2] = LSNGetDatum(sentPtr);
2780 
2781  if (XLogRecPtrIsInvalid(write))
2782  nulls[3] = true;
2783  values[3] = LSNGetDatum(write);
2784 
2785  if (XLogRecPtrIsInvalid(flush))
2786  nulls[4] = true;
2787  values[4] = LSNGetDatum(flush);
2788 
2789  if (XLogRecPtrIsInvalid(apply))
2790  nulls[5] = true;
2791  values[5] = LSNGetDatum(apply);
2792 
2793  /*
2794  * Treat a standby such as a pg_basebackup background process
2795  * which always returns an invalid flush location, as an
2796  * asynchronous standby.
2797  */
2798  priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority;
2799 
2800  values[6] = Int32GetDatum(priority);
2801 
2802  /*
2803  * More easily understood version of standby state. This is purely
2804  * informational.
2805  *
2806  * In quorum-based sync replication, the role of each standby
2807  * listed in synchronous_standby_names can be changing very
2808  * frequently. Any standbys considered as "sync" at one moment can
2809  * be switched to "potential" ones at the next moment. So, it's
2810  * basically useless to report "sync" or "potential" as their sync
2811  * states. We report just "quorum" for them.
2812  */
2813  if (priority == 0)
2814  values[7] = CStringGetTextDatum("async");
2815  else if (list_member_int(sync_standbys, i))
2817  CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
2818  else
2819  values[7] = CStringGetTextDatum("potential");
2820  }
2821 
2822  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
2823  }
2824 
2825  /* clean up and return the tuplestore */
2826  tuplestore_donestoring(tupstore);
2827 
2828  return (Datum) 0;
2829 }
2830 
2831 /*
2832  * This function is used to send keepalive message to standby.
2833  * If requestReply is set, sets a flag in the message requesting the standby
2834  * to send a message back to us, for heartbeat purposes.
2835  */
2836 static void
2837 WalSndKeepalive(bool requestReply)
2838 {
2839  elog(DEBUG2, "sending replication keepalive");
2840 
2841  /* construct the message... */
2842  resetStringInfo(&output_message);
2843  pq_sendbyte(&output_message, 'k');
2844  pq_sendint64(&output_message, sentPtr);
2845  pq_sendint64(&output_message, GetCurrentIntegerTimestamp());
2846  pq_sendbyte(&output_message, requestReply ? 1 : 0);
2847 
2848  /* ... and send it wrapped in CopyData */
2849  pq_putmessage_noblock('d', output_message.data, output_message.len);
2850 }
2851 
2852 /*
2853  * Send keepalive message if too much time has elapsed.
2854  */
2855 static void
2857 {
2858  TimestampTz ping_time;
2859 
2860  /*
2861  * Don't send keepalive messages if timeouts are globally disabled or
2862  * we're doing something not partaking in timeouts.
2863  */
2864  if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
2865  return;
2866 
2868  return;
2869 
2870  /*
2871  * If half of wal_sender_timeout has lapsed without receiving any reply
2872  * from the standby, send a keep-alive message to the standby requesting
2873  * an immediate reply.
2874  */
2876  wal_sender_timeout / 2);
2877  if (now >= ping_time)
2878  {
2879  WalSndKeepalive(true);
2881 
2882  /* Try to flush pending output to the client */
2883  if (pq_flush_if_writable() != 0)
2884  WalSndShutdown();
2885  }
2886 }
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:735
static void WalSndXLogSendHandler(SIGNAL_ARGS)
Definition: walsender.c:2530
static void InitWalSenderSlot(void)
Definition: walsender.c:1879
void InitializeTimeouts(void)
Definition: timeout.c:340
#define pq_is_send_pending()
Definition: libpq.h:42
#define NIL
Definition: pg_list.h:69
#define XLogSegSize
Definition: xlog_internal.h:92
XLogRecPtr write
static void ProcessStandbyMessage(void)
Definition: walsender.c:1429
#define SIGUSR1
Definition: win32.h:211
#define WL_SOCKET_WRITEABLE
Definition: latch.h:126
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define IsA(nodeptr, _type_)
Definition: nodes.h:559
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
XLogRecPtr startpoint
Definition: replnodes.h:84
#define DEBUG1
Definition: elog.h:25
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
int MyProcPid
Definition: globals.c:38
int wal_sender_timeout
Definition: walsender.c:113
#define SIGCONT
Definition: win32.h:205
uint32 TimeLineID
Definition: xlogdefs.h:45
#define pq_flush()
Definition: libpq.h:40
XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
struct Port * MyProcPort
Definition: globals.c:40
void pq_sendbyte(StringInfo buf, int byt)
Definition: pqformat.c:105
#define WL_TIMEOUT
Definition: latch.h:127
void ProcessConfigFile(GucContext context)
uint32 TransactionId
Definition: c.h:394
bool wake_wal_senders
Definition: walsender.c:120
#define SYNC_REP_PRIORITY
Definition: syncrep.h:36
bool update_process_title
Definition: ps_status.c:35
#define write(a, b, c)
Definition: win32.h:19
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1687
TransactionId xmin
Definition: proc.h:203
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:2541
void pgstat_report_activity(BackendState state, const char *cmd_str)
Definition: pgstat.c:2805
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:392
#define TEXTOID
Definition: pg_type.h:324
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:876
PGPROC * MyProc
Definition: proc.c:67
#define SIGWINCH
Definition: win32.h:209
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
Size WalSndShmemSize(void)
Definition: walsender.c:2586
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:239
void CommitTransactionCommand(void)
Definition: xact.c:2745
#define SpinLockInit(lock)
Definition: spin.h:60
uint8 syncrep_method
Definition: syncrep.h:51
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:75
static int sendFile
Definition: walsender.c:126
static void XLogSendPhysical(void)
Definition: walsender.c:2138
void do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull)
Definition: execTuples.c:1250
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
#define SIGTTIN
Definition: win32.h:207
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency)
Definition: slot.c:219
void set_ps_display(const char *activity, bool force)
Definition: ps_status.c:326
#define INT4OID
Definition: pg_type.h:316
Definition: nodes.h:508
static StringInfoData output_message
Definition: walsender.c:151
WalSndCtlData * WalSndCtl
Definition: walsender.c:100
struct cursor * cur
Definition: ecpg.c:28
void proc_exit(int code)
Definition: ipc.c:99
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
Definition: walsender.c:1547
bool superuser(void)
Definition: superuser.c:47
#define MemSet(start, val, len)
Definition: c.h:853
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:1937
void ReplicationSlotCleanup()
Definition: slot.c:412
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:2427
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define WL_SOCKET_READABLE
Definition: latch.h:125
void ReplicationSlotSave(void)
Definition: slot.c:562
XLogRecPtr GetFlushRecPtr(void)
Definition: xlog.c:8153
List * SyncRepGetSyncStandbys(bool *am_sync)
Definition: syncrep.c:672
void pq_sendstring(StringInfo buf, const char *str)
Definition: pqformat.c:186
int sleeptime
Definition: pg_standby.c:40
#define pg_attribute_noreturn()
Definition: c.h:649
ReplicationSlotPersistentData data
Definition: slot.h:115
void ResetLatch(volatile Latch *latch)
Definition: latch.c:462
#define LOG
Definition: elog.h:26
bool RecoveryInProgress(void)
Definition: xlog.c:7804
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:93
void list_free_deep(List *list)
Definition: list.c:1147
static int fd(const char *x, int i)
Definition: preproc-init.c:105
pgsocket sock
Definition: libpq-be.h:120
#define PG_BINARY
Definition: c.h:1038
#define SIGQUIT
Definition: win32.h:197
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc.c:7852
slock_t mutex
Node * replication_parse_result
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
Latch procLatch
Definition: proc.h:93
XLogRecord * XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
Definition: xlogreader.c:193
XLogRecPtr confirmed_flush
Definition: slot.h:76
PGXACT * MyPgXact
Definition: proc.c:68
XLogRecPtr EndRecPtr
Definition: xlogreader.h:115
TimeLineID timeline
Definition: replnodes.h:96
ReplicationKind kind
Definition: replnodes.h:56
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1714
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
Definition: walsender.c:710
static void WalSndSigHupHandler(SIGNAL_ARGS)
Definition: walsender.c:2517
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:512
bool am_walsender
Definition: walsender.c:106
#define SpinLockAcquire(lock)
Definition: spin.h:62
void ReplicationSlotReserveWal(void)
Definition: slot.c:824
XLogRecPtr flush
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:414
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:664
void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
Definition: xlog.c:8222
double TimestampTz
Definition: timestamp.h:51
ReplicationKind kind
Definition: replnodes.h:81
void WalSndRqstFileReload(void)
Definition: walsender.c:2498
#define SIG_IGN
Definition: win32.h:193
void ConditionVariableCancelSleep(void)
static volatile sig_atomic_t got_SIGHUP
Definition: walsender.c:177
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:319
static void WalSndKeepalive(bool requestReply)
Definition: walsender.c:2837
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:1308
static volatile sig_atomic_t walsender_ready_to_stop
Definition: walsender.c:178
void EndCommand(const char *commandTag, CommandDest dest)
Definition: dest.c:157
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:563
#define ERROR
Definition: elog.h:43
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:93
void pq_startmsgread(void)
Definition: pqcomm.c:1157
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:1460
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3742
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
void WalSndSignals(void)
Definition: walsender.c:2562
#define FATAL
Definition: elog.h:52
void SendBaseBackup(BaseBackupCmd *cmd)
Definition: basebackup.c:683
Latch * latch
#define MAXPGPATH
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
Definition: xlog.c:10983
void ReplicationSlotPersist(void)
Definition: slot.c:597
TransactionId effective_xmin
Definition: slot.h:111
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:987
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:188
static TimeLineID curFileTimeLine
Definition: walsender.c:131
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:145
#define XLogFilePath(path, tli, logSegNo)
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2049
#define DEBUG2
Definition: elog.h:24
bool list_member_int(const List *list, int datum)
Definition: list.c:485
Definition: latch.h:110
Definition: dest.h:88
#define TLHistoryFileName(fname, tli)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:348
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write)
Definition: logical.c:327
WalSndState state
NodeTag type
Definition: nodes.h:510
char * XLogFileNameP(TimeLineID tli, XLogSegNo segno)
Definition: xlog.c:10039
static char * buf
Definition: pg_test_fsync.c:65
static bool streamingDoneSending
Definition: walsender.c:170
uint64 XLogSegNo
Definition: xlogdefs.h:34
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:109
bool PostmasterIsAlive(void)
Definition: pmsignal.c:272
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2093
int errdetail(const char *fmt,...)
Definition: elog.c:873
#define COMMERROR
Definition: elog.h:30
int errcode_for_file_access(void)
Definition: elog.c:598
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:486
#define pq_flush_if_writable()
Definition: libpq.h:41
#define InvalidTransactionId
Definition: transam.h:31
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:277
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:94
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1005
static XLogRecPtr logical_startptr
Definition: walsender.c:189
unsigned int uint32
Definition: c.h:265
void ReplicationSlotRelease(void)
Definition: slot.c:374
static void XLogRead(char *buf, XLogRecPtr startptr, Size count)
Definition: walsender.c:1965
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1124
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
TransactionId xmin
Definition: slot.h:57
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
void InitWalSender(void)
#define SlotIsLogical(slot)
Definition: slot.h:134
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:2102
void SyncRepInitConfig(void)
Definition: syncrep.c:377
#define ereport(elevel, rest)
Definition: elog.h:122
void exec_replication_command(const char *cmd_string)
Definition: walsender.c:1221
static bool WalSndCaughtUp
Definition: walsender.c:174
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
int replication_yyparse(void)
Definition: guc.h:72
int max_wal_senders
Definition: walsender.c:112
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc)
Definition: execTuples.c:1232
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:1743
int CloseTransientFile(int fd)
Definition: fd.c:2254
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:978
void initStringInfo(StringInfo str)
Definition: stringinfo.c:65
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1219
static StringInfoData reply_message
Definition: walsender.c:152
#define MAXFNAMELEN
#define SpinLockRelease(lock)
Definition: spin.h:64
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:316
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1093
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
void WalSndErrorCleanup(void)
Definition: walsender.c:257
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:440
void(* WalSndSendDataCallback)(void)
Definition: walsender.c:197
static volatile sig_atomic_t replication_active
Definition: walsender.c:186
uintptr_t Datum
Definition: postgres.h:374
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:2673
XLogRecPtr sentPtr
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
bool am_db_walsender
Definition: walsender.c:109
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:887
Oid MyDatabaseId
Definition: globals.c:76
static void WalSndShutdown(void)
Definition: walsender.c:201
static TimeLineID receiveTLI
Definition: xlog.c:199
int work_mem
Definition: globals.c:112
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:553
WalSnd * MyWalSnd
Definition: walsender.c:103
static XLogRecPtr sentPtr
Definition: walsender.c:148
#define SIGPIPE
Definition: win32.h:201
#define SIGHUP
Definition: win32.h:196
void ReplicationSlotDrop(const char *name)
Definition: slot.c:439
void pq_endmsgread(void)
Definition: pqcomm.c:1181
#define InvalidOid
Definition: postgres_ext.h:36
TimeLineID ThisTimeLineID
Definition: xlog.c:177
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:80
#define SIG_DFL
Definition: win32.h:191
int allowedModes
Definition: execnodes.h:199
#define INT8OID
Definition: pg_type.h:304
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:168
static void WalSndKeepaliveIfNecessary(TimestampTz now)
Definition: walsender.c:2856
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:432
struct SnapBuild * snapshot_builder
Definition: logical.h:38
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
SetFunctionReturnMode returnMode
Definition: execnodes.h:201
ReplicationSlot * MyReplicationSlot
Definition: slot.c:95
#define XLByteToSeg(xlrp, logSegNo)
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:1674
void SetLatch(volatile Latch *latch)
Definition: latch.c:380
#define SIGNAL_ARGS
Definition: c.h:1079
#define NULL
Definition: c.h:226
static TimeLineID sendTimeLine
Definition: walsender.c:139
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:671
static void WalSndCheckTimeOut(TimestampTz now)
Definition: walsender.c:1716
void WalSndSetState(WalSndState state)
Definition: walsender.c:2654
int WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:321
int sync_standby_priority
Definition: regguts.h:298
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:457
static void XLogSendLogical(void)
Definition: walsender.c:2371
XLogRecPtr restart_lsn
Definition: slot.h:68
void ReplicationSlotAcquire(const char *name)
Definition: slot.c:325
void StartTransactionCommand(void)
Definition: xact.c:2675
bool needreload
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:1492
size_t Size
Definition: c.h:353
char * dbname
Definition: streamutil.c:41
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1110
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:140
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:142
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:884
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:133
TupleDesc CreateTemplateTupleDesc(int natts, bool hasoid)
Definition: tupdesc.c:41
WalSndState
static XLogSegNo sendSegNo
Definition: walsender.c:127
Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
Definition: walsender.c:2695
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:739
static bool streamingDoneReceiving
Definition: walsender.c:171
#define BYTEAOID
Definition: pg_type.h:292
Tuplestorestate * setResult
Definition: execnodes.h:204
static StringInfoData tmpbuf
Definition: walsender.c:153
#define SIGTTOU
Definition: win32.h:208
static Datum values[MAXATTR]
Definition: bootstrap.c:162
#define PG_STAT_GET_WAL_SENDERS_COLS
ExprContext * econtext
Definition: execnodes.h:197
bool log_replication_commands
Definition: walsender.c:115
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4660
#define Int32GetDatum(X)
Definition: postgres.h:487
char * application_name
Definition: guc.c:471
TupleDesc setDesc
Definition: execnodes.h:205
void pq_sendbytes(StringInfo buf, const char *data, int datalen)
Definition: pqformat.c:115
int errmsg(const char *fmt,...)
Definition: elog.c:797
XLogReaderState * reader
Definition: logical.h:35
void SHMQueueInit(SHM_QUEUE *queue)
Definition: shmqueue.c:36
void pq_sendint(StringInfo buf, int i, int b)
Definition: pqformat.c:236
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:344
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:45
void die(SIGNAL_ARGS)
Definition: postgres.c:2617
StringInfo out
Definition: logical.h:57
int i
void WalSndShmemInit(void)
Definition: walsender.c:2598
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:486
LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write)
Definition: logical.c:211
#define NameStr(name)
Definition: c.h:495
TimeLineID timeline
Definition: replnodes.h:83
#define CStringGetTextDatum(s)
Definition: builtins.h:90
void * arg
struct Latch * MyLatch
Definition: globals.c:51
#define PG_FUNCTION_ARGS
Definition: fmgr.h:150
static uint32 sendOff
Definition: walsender.c:128
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:448
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
static XLogRecPtr GetStandbyFlushRecPtr(void)
Definition: walsender.c:2468
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:113
#define elog
Definition: elog.h:219
void LWLockReleaseAll(void)
Definition: lwlock.c:1813
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:28
slock_t mutex
Definition: slot.h:88
#define close(a)
Definition: win32.h:17
void latch_sigusr1_handler(void)
Definition: latch.c:1541
CommandDest whereToSendOutput
Definition: postgres.c:86
#define SIGCHLD
Definition: win32.h:206
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1790
#define TransactionIdIsNormal(xid)
Definition: transam.h:42
#define XLByteInSeg(xlrp, logSegNo)
static bool waiting_for_ping_response
Definition: walsender.c:162
void pq_puttextmessage(char msgtype, const char *str)
Definition: pqformat.c:400
XLogRecPtr apply
static void IdentifySystem(void)
Definition: walsender.c:303
Definition: pg_list.h:45
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:76
void pq_sendint64(StringInfo buf, int64 i)
Definition: pqformat.c:271
#define WL_LATCH_SET
Definition: latch.h:124
int16 AttrNumber
Definition: attnum.h:21
void quickdie(SIGNAL_ARGS)
Definition: postgres.c:2556
#define UINT64_FORMAT
Definition: c.h:313
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:616
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:405
static TimestampTz last_reply_timestamp
Definition: walsender.c:159
static void ProcessRepliesIfAny(void)
Definition: walsender.c:1327
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1651
#define MAX_SEND_SIZE
Definition: walsender.c:97
#define read(a, b, c)
Definition: win32.h:18
#define SIGUSR2
Definition: win32.h:212
static bool sendTimeLineIsHistoric
Definition: walsender.c:141
#define offsetof(type, field)
Definition: c.h:551
void WalSndWakeup(void)
Definition: walsender.c:2630
void ReplicationSlotMarkDirty(void)
Definition: slot.c:580
int64 GetCurrentIntegerTimestamp(void)
bool am_cascading_walsender
Definition: walsender.c:107
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:1581
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:630
int BasicOpenFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:899
void replication_scanner_init(const char *query_string)
ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char *name)
Definition: resowner.c:416
#define TLHistoryFilePath(path, tli)