PostgreSQL Source Code  git master
origin.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * origin.c
4  * Logical replication progress tracking support.
5  *
6  * Copyright (c) 2013-2020, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/logical/origin.c
10  *
11  * NOTES
12  *
13  * This file provides the following:
14  * * An infrastructure to name nodes in a replication setup
15  * * A facility to efficiently store and persist replication progress in an
16  * efficient and durable manner.
17  *
18  * Replication origin consist out of a descriptive, user defined, external
19  * name and a short, thus space efficient, internal 2 byte one. This split
20  * exists because replication origin have to be stored in WAL and shared
21  * memory and long descriptors would be inefficient. For now only use 2 bytes
22  * for the internal id of a replication origin as it seems unlikely that there
23  * soon will be more than 65k nodes in one replication setup; and using only
24  * two bytes allow us to be more space efficient.
25  *
26  * Replication progress is tracked in a shared memory table
27  * (ReplicationState) that's dumped to disk every checkpoint. Entries
28  * ('slots') in this table are identified by the internal id. That's the case
29  * because it allows to increase replication progress during crash
30  * recovery. To allow doing so we store the original LSN (from the originating
31  * system) of a transaction in the commit record. That allows to recover the
32  * precise replayed state after crash recovery; without requiring synchronous
33  * commits. Allowing logical replication to use asynchronous commit is
34  * generally good for performance, but especially important as it allows a
35  * single threaded replay process to keep up with a source that has multiple
36  * backends generating changes concurrently. For efficiency and simplicity
37  * reasons a backend can setup one replication origin that's from then used as
38  * the source of changes produced by the backend, until reset again.
39  *
40  * This infrastructure is intended to be used in cooperation with logical
41  * decoding. When replaying from a remote system the configured origin is
42  * provided to output plugins, allowing prevention of replication loops and
43  * other filtering.
44  *
45  * There are several levels of locking at work:
46  *
47  * * To create and drop replication origins an exclusive lock on
48  * pg_replication_slot is required for the duration. That allows us to
49  * safely and conflict free assign new origins using a dirty snapshot.
50  *
51  * * When creating an in-memory replication progress slot the ReplicationOrigin
52  * LWLock has to be held exclusively; when iterating over the replication
53  * progress a shared lock has to be held, the same when advancing the
54  * replication progress of an individual backend that has not setup as the
55  * session's replication origin.
56  *
57  * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58  * replication progress slot that slot's lwlock has to be held. That's
59  * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60  * all our platforms, but it also simplifies memory ordering concerns
61  * between the remote and local lsn. We use a lwlock instead of a spinlock
62  * so it's less harmful to hold the lock over a WAL write
63  * (cf. AdvanceReplicationProgress).
64  *
65  * ---------------------------------------------------------------------------
66  */
67 
68 #include "postgres.h"
69 
70 #include <unistd.h>
71 #include <sys/stat.h>
72 
73 #include "access/genam.h"
74 #include "access/htup_details.h"
75 #include "access/table.h"
76 #include "access/xact.h"
77 #include "catalog/catalog.h"
78 #include "catalog/indexing.h"
79 #include "funcapi.h"
80 #include "miscadmin.h"
81 #include "nodes/execnodes.h"
82 #include "pgstat.h"
83 #include "replication/logical.h"
84 #include "replication/origin.h"
86 #include "storage/copydir.h"
87 #include "storage/fd.h"
88 #include "storage/ipc.h"
89 #include "storage/lmgr.h"
90 #include "utils/builtins.h"
91 #include "utils/fmgroids.h"
92 #include "utils/pg_lsn.h"
93 #include "utils/rel.h"
94 #include "utils/snapmgr.h"
95 #include "utils/syscache.h"
96 
97 /*
98  * Replay progress of a single remote node.
99  */
100 typedef struct ReplicationState
101 {
102  /*
103  * Local identifier for the remote node.
104  */
106 
107  /*
108  * Location of the latest commit from the remote side.
109  */
111 
112  /*
113  * Remember the local lsn of the commit record so we can XLogFlush() to it
114  * during a checkpoint so we know the commit record actually is safe on
115  * disk.
116  */
118 
119  /*
120  * PID of backend that's acquired slot, or 0 if none.
121  */
123 
124  /*
125  * Condition variable that's signaled when acquired_by changes.
126  */
128 
129  /*
130  * Lock protecting remote_lsn and local_lsn.
131  */
134 
135 /*
136  * On disk version of ReplicationState.
137  */
139 {
143 
144 
145 typedef struct ReplicationStateCtl
146 {
147  /* Tranche to use for per-origin LWLocks */
149  /* Array of length max_replication_slots */
152 
153 /* external variables */
157 
158 /*
159  * Base address into a shared memory array of replication states of size
160  * max_replication_slots.
161  *
162  * XXX: Should we use a separate variable to size this rather than
163  * max_replication_slots?
164  */
166 
167 /*
168  * Actual shared memory block (replication_states[] is now part of this).
169  */
171 
172 /*
173  * Backend-local, cached element from ReplicationState for use in a backend
174  * replaying remote commits, so we don't have to search ReplicationState for
175  * the backends current RepOriginId.
176  */
178 
179 /* Magic for on disk files. */
180 #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
181 
182 static void
183 replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
184 {
185  if (check_slots && max_replication_slots == 0)
186  ereport(ERROR,
187  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
188  errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
189 
190  if (!recoveryOK && RecoveryInProgress())
191  ereport(ERROR,
192  (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
193  errmsg("cannot manipulate replication origins during recovery")));
194 
195 }
196 
197 
198 /* ---------------------------------------------------------------------------
199  * Functions for working with replication origins themselves.
200  * ---------------------------------------------------------------------------
201  */
202 
203 /*
204  * Check for a persistent replication origin identified by name.
205  *
206  * Returns InvalidOid if the node isn't known yet and missing_ok is true.
207  */
209 replorigin_by_name(char *roname, bool missing_ok)
210 {
213  HeapTuple tuple;
214  Datum roname_d;
215 
216  roname_d = CStringGetTextDatum(roname);
217 
218  tuple = SearchSysCache1(REPLORIGNAME, roname_d);
219  if (HeapTupleIsValid(tuple))
220  {
221  ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
222  roident = ident->roident;
223  ReleaseSysCache(tuple);
224  }
225  else if (!missing_ok)
226  ereport(ERROR,
227  (errcode(ERRCODE_UNDEFINED_OBJECT),
228  errmsg("replication origin \"%s\" does not exist",
229  roname)));
230 
231  return roident;
232 }
233 
234 /*
235  * Create a replication origin.
236  *
237  * Needs to be called in a transaction.
238  */
240 replorigin_create(char *roname)
241 {
242  Oid roident;
243  HeapTuple tuple = NULL;
244  Relation rel;
245  Datum roname_d;
246  SnapshotData SnapshotDirty;
247  SysScanDesc scan;
249 
250  roname_d = CStringGetTextDatum(roname);
251 
253 
254  /*
255  * We need the numeric replication origin to be 16bit wide, so we cannot
256  * rely on the normal oid allocation. Instead we simply scan
257  * pg_replication_origin for the first unused id. That's not particularly
258  * efficient, but this should be a fairly infrequent operation - we can
259  * easily spend a bit more code on this when it turns out it needs to be
260  * faster.
261  *
262  * We handle concurrency by taking an exclusive lock (allowing reads!)
263  * over the table for the duration of the search. Because we use a "dirty
264  * snapshot" we can read rows that other in-progress sessions have
265  * written, even though they would be invisible with normal snapshots. Due
266  * to the exclusive lock there's no danger that new rows can appear while
267  * we're checking.
268  */
269  InitDirtySnapshot(SnapshotDirty);
270 
271  rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
272 
273  for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
274  {
275  bool nulls[Natts_pg_replication_origin];
276  Datum values[Natts_pg_replication_origin];
277  bool collides;
278 
280 
281  ScanKeyInit(&key,
282  Anum_pg_replication_origin_roident,
283  BTEqualStrategyNumber, F_OIDEQ,
284  ObjectIdGetDatum(roident));
285 
287  true /* indexOK */ ,
288  &SnapshotDirty,
289  1, &key);
290 
291  collides = HeapTupleIsValid(systable_getnext(scan));
292 
293  systable_endscan(scan);
294 
295  if (!collides)
296  {
297  /*
298  * Ok, found an unused roident, insert the new row and do a CCI,
299  * so our callers can look it up if they want to.
300  */
301  memset(&nulls, 0, sizeof(nulls));
302 
303  values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
304  values[Anum_pg_replication_origin_roname - 1] = roname_d;
305 
306  tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
307  CatalogTupleInsert(rel, tuple);
309  break;
310  }
311  }
312 
313  /* now release lock again, */
315 
316  if (tuple == NULL)
317  ereport(ERROR,
318  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
319  errmsg("could not find free replication origin OID")));
320 
321  heap_freetuple(tuple);
322  return roident;
323 }
324 
325 
326 /*
327  * Drop replication origin.
328  *
329  * Needs to be called in a transaction.
330  */
331 void
333 {
334  HeapTuple tuple;
335  Relation rel;
336  int i;
337 
339 
340  /*
341  * To interlock against concurrent drops, we hold ExclusiveLock on
342  * pg_replication_origin throughout this function.
343  */
344  rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
345 
346  /*
347  * First, clean up the slot state info, if there is any matching slot.
348  */
349 restart:
350  tuple = NULL;
351  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
352 
353  for (i = 0; i < max_replication_slots; i++)
354  {
355  ReplicationState *state = &replication_states[i];
356 
357  if (state->roident == roident)
358  {
359  /* found our slot, is it busy? */
360  if (state->acquired_by != 0)
361  {
362  ConditionVariable *cv;
363 
364  if (nowait)
365  ereport(ERROR,
366  (errcode(ERRCODE_OBJECT_IN_USE),
367  errmsg("could not drop replication origin with OID %d, in use by PID %d",
368  state->roident,
369  state->acquired_by)));
370 
371  /*
372  * We must wait and then retry. Since we don't know which CV
373  * to wait on until here, we can't readily use
374  * ConditionVariablePrepareToSleep (calling it here would be
375  * wrong, since we could miss the signal if we did so); just
376  * use ConditionVariableSleep directly.
377  */
378  cv = &state->origin_cv;
379 
380  LWLockRelease(ReplicationOriginLock);
381 
383  goto restart;
384  }
385 
386  /* first make a WAL log entry */
387  {
388  xl_replorigin_drop xlrec;
389 
390  xlrec.node_id = roident;
391  XLogBeginInsert();
392  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
393  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
394  }
395 
396  /* then clear the in-memory slot */
397  state->roident = InvalidRepOriginId;
398  state->remote_lsn = InvalidXLogRecPtr;
399  state->local_lsn = InvalidXLogRecPtr;
400  break;
401  }
402  }
403  LWLockRelease(ReplicationOriginLock);
405 
406  /*
407  * Now, we can delete the catalog entry.
408  */
410  if (!HeapTupleIsValid(tuple))
411  elog(ERROR, "cache lookup failed for replication origin with oid %u",
412  roident);
413 
414  CatalogTupleDelete(rel, &tuple->t_self);
415  ReleaseSysCache(tuple);
416 
418 
419  /* now release lock again */
421 }
422 
423 
424 /*
425  * Lookup replication origin via its oid and return the name.
426  *
427  * The external name is palloc'd in the calling context.
428  *
429  * Returns true if the origin is known, false otherwise.
430  */
431 bool
432 replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
433 {
434  HeapTuple tuple;
436 
437  Assert(OidIsValid((Oid) roident));
438  Assert(roident != InvalidRepOriginId);
439  Assert(roident != DoNotReplicateId);
440 
442  ObjectIdGetDatum((Oid) roident));
443 
444  if (HeapTupleIsValid(tuple))
445  {
446  ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
447  *roname = text_to_cstring(&ric->roname);
448  ReleaseSysCache(tuple);
449 
450  return true;
451  }
452  else
453  {
454  *roname = NULL;
455 
456  if (!missing_ok)
457  ereport(ERROR,
458  (errcode(ERRCODE_UNDEFINED_OBJECT),
459  errmsg("replication origin with OID %u does not exist",
460  roident)));
461 
462  return false;
463  }
464 }
465 
466 
467 /* ---------------------------------------------------------------------------
468  * Functions for handling replication progress.
469  * ---------------------------------------------------------------------------
470  */
471 
472 Size
474 {
475  Size size = 0;
476 
477  /*
478  * XXX: max_replication_slots is arguably the wrong thing to use, as here
479  * we keep the replay state of *remote* transactions. But for now it seems
480  * sufficient to reuse it, rather than introduce a separate GUC.
481  */
482  if (max_replication_slots == 0)
483  return size;
484 
485  size = add_size(size, offsetof(ReplicationStateCtl, states));
486 
487  size = add_size(size,
489  return size;
490 }
491 
492 void
494 {
495  bool found;
496 
497  if (max_replication_slots == 0)
498  return;
499 
500  replication_states_ctl = (ReplicationStateCtl *)
501  ShmemInitStruct("ReplicationOriginState",
503  &found);
504  replication_states = replication_states_ctl->states;
505 
506  if (!found)
507  {
508  int i;
509 
510  MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
511 
512  replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
513 
514  for (i = 0; i < max_replication_slots; i++)
515  {
516  LWLockInitialize(&replication_states[i].lock,
517  replication_states_ctl->tranche_id);
518  ConditionVariableInit(&replication_states[i].origin_cv);
519  }
520  }
521 }
522 
523 /* ---------------------------------------------------------------------------
524  * Perform a checkpoint of each replication origin's progress with respect to
525  * the replayed remote_lsn. Make sure that all transactions we refer to in the
526  * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
527  * if the transactions were originally committed asynchronously.
528  *
529  * We store checkpoints in the following format:
530  * +-------+------------------------+------------------+-----+--------+
531  * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
532  * +-------+------------------------+------------------+-----+--------+
533  *
534  * So its just the magic, followed by the statically sized
535  * ReplicationStateOnDisk structs. Note that the maximum number of
536  * ReplicationState is determined by max_replication_slots.
537  * ---------------------------------------------------------------------------
538  */
539 void
541 {
542  const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
543  const char *path = "pg_logical/replorigin_checkpoint";
544  int tmpfd;
545  int i;
547  pg_crc32c crc;
548 
549  if (max_replication_slots == 0)
550  return;
551 
552  INIT_CRC32C(crc);
553 
554  /* make sure no old temp file is remaining */
555  if (unlink(tmppath) < 0 && errno != ENOENT)
556  ereport(PANIC,
558  errmsg("could not remove file \"%s\": %m",
559  tmppath)));
560 
561  /*
562  * no other backend can perform this at the same time, we're protected by
563  * CheckpointLock.
564  */
565  tmpfd = OpenTransientFile(tmppath,
566  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
567  if (tmpfd < 0)
568  ereport(PANIC,
570  errmsg("could not create file \"%s\": %m",
571  tmppath)));
572 
573  /* write magic */
574  errno = 0;
575  if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
576  {
577  /* if write didn't set errno, assume problem is no disk space */
578  if (errno == 0)
579  errno = ENOSPC;
580  ereport(PANIC,
582  errmsg("could not write to file \"%s\": %m",
583  tmppath)));
584  }
585  COMP_CRC32C(crc, &magic, sizeof(magic));
586 
587  /* prevent concurrent creations/drops */
588  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
589 
590  /* write actual data */
591  for (i = 0; i < max_replication_slots; i++)
592  {
593  ReplicationStateOnDisk disk_state;
594  ReplicationState *curstate = &replication_states[i];
596 
597  if (curstate->roident == InvalidRepOriginId)
598  continue;
599 
600  /* zero, to avoid uninitialized padding bytes */
601  memset(&disk_state, 0, sizeof(disk_state));
602 
603  LWLockAcquire(&curstate->lock, LW_SHARED);
604 
605  disk_state.roident = curstate->roident;
606 
607  disk_state.remote_lsn = curstate->remote_lsn;
608  local_lsn = curstate->local_lsn;
609 
610  LWLockRelease(&curstate->lock);
611 
612  /* make sure we only write out a commit that's persistent */
613  XLogFlush(local_lsn);
614 
615  errno = 0;
616  if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
617  sizeof(disk_state))
618  {
619  /* if write didn't set errno, assume problem is no disk space */
620  if (errno == 0)
621  errno = ENOSPC;
622  ereport(PANIC,
624  errmsg("could not write to file \"%s\": %m",
625  tmppath)));
626  }
627 
628  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
629  }
630 
631  LWLockRelease(ReplicationOriginLock);
632 
633  /* write out the CRC */
634  FIN_CRC32C(crc);
635  errno = 0;
636  if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
637  {
638  /* if write didn't set errno, assume problem is no disk space */
639  if (errno == 0)
640  errno = ENOSPC;
641  ereport(PANIC,
643  errmsg("could not write to file \"%s\": %m",
644  tmppath)));
645  }
646 
647  if (CloseTransientFile(tmpfd) != 0)
648  ereport(PANIC,
650  errmsg("could not close file \"%s\": %m",
651  tmppath)));
652 
653  /* fsync, rename to permanent file, fsync file and directory */
654  durable_rename(tmppath, path, PANIC);
655 }
656 
657 /*
658  * Recover replication replay status from checkpoint data saved earlier by
659  * CheckPointReplicationOrigin.
660  *
661  * This only needs to be called at startup and *not* during every checkpoint
662  * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
663  * state thereafter can be recovered by looking at commit records.
664  */
665 void
667 {
668  const char *path = "pg_logical/replorigin_checkpoint";
669  int fd;
670  int readBytes;
672  int last_state = 0;
673  pg_crc32c file_crc;
674  pg_crc32c crc;
675 
676  /* don't want to overwrite already existing state */
677 #ifdef USE_ASSERT_CHECKING
678  static bool already_started = false;
679 
680  Assert(!already_started);
681  already_started = true;
682 #endif
683 
684  if (max_replication_slots == 0)
685  return;
686 
687  INIT_CRC32C(crc);
688 
689  elog(DEBUG2, "starting up replication origin progress state");
690 
691  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
692 
693  /*
694  * might have had max_replication_slots == 0 last run, or we just brought
695  * up a standby.
696  */
697  if (fd < 0 && errno == ENOENT)
698  return;
699  else if (fd < 0)
700  ereport(PANIC,
702  errmsg("could not open file \"%s\": %m",
703  path)));
704 
705  /* verify magic, that is written even if nothing was active */
706  readBytes = read(fd, &magic, sizeof(magic));
707  if (readBytes != sizeof(magic))
708  {
709  if (readBytes < 0)
710  ereport(PANIC,
712  errmsg("could not read file \"%s\": %m",
713  path)));
714  else
715  ereport(PANIC,
717  errmsg("could not read file \"%s\": read %d of %zu",
718  path, readBytes, sizeof(magic))));
719  }
720  COMP_CRC32C(crc, &magic, sizeof(magic));
721 
722  if (magic != REPLICATION_STATE_MAGIC)
723  ereport(PANIC,
724  (errmsg("replication checkpoint has wrong magic %u instead of %u",
725  magic, REPLICATION_STATE_MAGIC)));
726 
727  /* we can skip locking here, no other access is possible */
728 
729  /* recover individual states, until there are no more to be found */
730  while (true)
731  {
732  ReplicationStateOnDisk disk_state;
733 
734  readBytes = read(fd, &disk_state, sizeof(disk_state));
735 
736  /* no further data */
737  if (readBytes == sizeof(crc))
738  {
739  /* not pretty, but simple ... */
740  file_crc = *(pg_crc32c *) &disk_state;
741  break;
742  }
743 
744  if (readBytes < 0)
745  {
746  ereport(PANIC,
748  errmsg("could not read file \"%s\": %m",
749  path)));
750  }
751 
752  if (readBytes != sizeof(disk_state))
753  {
754  ereport(PANIC,
756  errmsg("could not read file \"%s\": read %d of %zu",
757  path, readBytes, sizeof(disk_state))));
758  }
759 
760  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
761 
762  if (last_state == max_replication_slots)
763  ereport(PANIC,
764  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
765  errmsg("could not find free replication state, increase max_replication_slots")));
766 
767  /* copy data to shared memory */
768  replication_states[last_state].roident = disk_state.roident;
769  replication_states[last_state].remote_lsn = disk_state.remote_lsn;
770  last_state++;
771 
772  elog(LOG, "recovered replication state of node %u to %X/%X",
773  disk_state.roident,
774  (uint32) (disk_state.remote_lsn >> 32),
775  (uint32) disk_state.remote_lsn);
776  }
777 
778  /* now check checksum */
779  FIN_CRC32C(crc);
780  if (file_crc != crc)
781  ereport(PANIC,
782  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
783  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
784  crc, file_crc)));
785 
786  if (CloseTransientFile(fd) != 0)
787  ereport(PANIC,
789  errmsg("could not close file \"%s\": %m",
790  path)));
791 }
792 
793 void
795 {
796  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
797 
798  switch (info)
799  {
800  case XLOG_REPLORIGIN_SET:
801  {
802  xl_replorigin_set *xlrec =
803  (xl_replorigin_set *) XLogRecGetData(record);
804 
806  xlrec->remote_lsn, record->EndRecPtr,
807  xlrec->force /* backward */ ,
808  false /* WAL log */ );
809  break;
810  }
812  {
813  xl_replorigin_drop *xlrec;
814  int i;
815 
816  xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
817 
818  for (i = 0; i < max_replication_slots; i++)
819  {
820  ReplicationState *state = &replication_states[i];
821 
822  /* found our slot */
823  if (state->roident == xlrec->node_id)
824  {
825  /* reset entry */
826  state->roident = InvalidRepOriginId;
827  state->remote_lsn = InvalidXLogRecPtr;
828  state->local_lsn = InvalidXLogRecPtr;
829  break;
830  }
831  }
832  break;
833  }
834  default:
835  elog(PANIC, "replorigin_redo: unknown op code %u", info);
836  }
837 }
838 
839 
840 /*
841  * Tell the replication origin progress machinery that a commit from 'node'
842  * that originated at the LSN remote_commit on the remote node was replayed
843  * successfully and that we don't need to do so again. In combination with
844  * setting up replorigin_session_origin_lsn and replorigin_session_origin
845  * that ensures we won't loose knowledge about that after a crash if the
846  * transaction had a persistent effect (think of asynchronous commits).
847  *
848  * local_commit needs to be a local LSN of the commit so that we can make sure
849  * upon a checkpoint that enough WAL has been persisted to disk.
850  *
851  * Needs to be called with a RowExclusiveLock on pg_replication_origin,
852  * unless running in recovery.
853  */
854 void
856  XLogRecPtr remote_commit, XLogRecPtr local_commit,
857  bool go_backward, bool wal_log)
858 {
859  int i;
860  ReplicationState *replication_state = NULL;
861  ReplicationState *free_state = NULL;
862 
863  Assert(node != InvalidRepOriginId);
864 
865  /* we don't track DoNotReplicateId */
866  if (node == DoNotReplicateId)
867  return;
868 
869  /*
870  * XXX: For the case where this is called by WAL replay, it'd be more
871  * efficient to restore into a backend local hashtable and only dump into
872  * shmem after recovery is finished. Let's wait with implementing that
873  * till it's shown to be a measurable expense
874  */
875 
876  /* Lock exclusively, as we may have to create a new table entry. */
877  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
878 
879  /*
880  * Search for either an existing slot for the origin, or a free one we can
881  * use.
882  */
883  for (i = 0; i < max_replication_slots; i++)
884  {
885  ReplicationState *curstate = &replication_states[i];
886 
887  /* remember where to insert if necessary */
888  if (curstate->roident == InvalidRepOriginId &&
889  free_state == NULL)
890  {
891  free_state = curstate;
892  continue;
893  }
894 
895  /* not our slot */
896  if (curstate->roident != node)
897  {
898  continue;
899  }
900 
901  /* ok, found slot */
902  replication_state = curstate;
903 
904  LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
905 
906  /* Make sure it's not used by somebody else */
907  if (replication_state->acquired_by != 0)
908  {
909  ereport(ERROR,
910  (errcode(ERRCODE_OBJECT_IN_USE),
911  errmsg("replication origin with OID %d is already active for PID %d",
912  replication_state->roident,
913  replication_state->acquired_by)));
914  }
915 
916  break;
917  }
918 
919  if (replication_state == NULL && free_state == NULL)
920  ereport(ERROR,
921  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
922  errmsg("could not find free replication state slot for replication origin with OID %u",
923  node),
924  errhint("Increase max_replication_slots and try again.")));
925 
926  if (replication_state == NULL)
927  {
928  /* initialize new slot */
929  LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
930  replication_state = free_state;
931  Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
932  Assert(replication_state->local_lsn == InvalidXLogRecPtr);
933  replication_state->roident = node;
934  }
935 
936  Assert(replication_state->roident != InvalidRepOriginId);
937 
938  /*
939  * If somebody "forcefully" sets this slot, WAL log it, so it's durable
940  * and the standby gets the message. Primarily this will be called during
941  * WAL replay (of commit records) where no WAL logging is necessary.
942  */
943  if (wal_log)
944  {
945  xl_replorigin_set xlrec;
946 
947  xlrec.remote_lsn = remote_commit;
948  xlrec.node_id = node;
949  xlrec.force = go_backward;
950 
951  XLogBeginInsert();
952  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
953 
954  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
955  }
956 
957  /*
958  * Due to - harmless - race conditions during a checkpoint we could see
959  * values here that are older than the ones we already have in memory.
960  * Don't overwrite those.
961  */
962  if (go_backward || replication_state->remote_lsn < remote_commit)
963  replication_state->remote_lsn = remote_commit;
964  if (local_commit != InvalidXLogRecPtr &&
965  (go_backward || replication_state->local_lsn < local_commit))
966  replication_state->local_lsn = local_commit;
967  LWLockRelease(&replication_state->lock);
968 
969  /*
970  * Release *after* changing the LSNs, slot isn't acquired and thus could
971  * otherwise be dropped anytime.
972  */
973  LWLockRelease(ReplicationOriginLock);
974 }
975 
976 
979 {
980  int i;
983 
984  /* prevent slots from being concurrently dropped */
985  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
986 
987  for (i = 0; i < max_replication_slots; i++)
988  {
990 
991  state = &replication_states[i];
992 
993  if (state->roident == node)
994  {
995  LWLockAcquire(&state->lock, LW_SHARED);
996 
997  remote_lsn = state->remote_lsn;
998  local_lsn = state->local_lsn;
999 
1000  LWLockRelease(&state->lock);
1001 
1002  break;
1003  }
1004  }
1005 
1006  LWLockRelease(ReplicationOriginLock);
1007 
1008  if (flush && local_lsn != InvalidXLogRecPtr)
1009  XLogFlush(local_lsn);
1010 
1011  return remote_lsn;
1012 }
1013 
1014 /*
1015  * Tear down a (possibly) configured session replication origin during process
1016  * exit.
1017  */
1018 static void
1020 {
1021  ConditionVariable *cv = NULL;
1022 
1023  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1024 
1025  if (session_replication_state != NULL &&
1026  session_replication_state->acquired_by == MyProcPid)
1027  {
1028  cv = &session_replication_state->origin_cv;
1029 
1030  session_replication_state->acquired_by = 0;
1031  session_replication_state = NULL;
1032  }
1033 
1034  LWLockRelease(ReplicationOriginLock);
1035 
1036  if (cv)
1038 }
1039 
1040 /*
1041  * Setup a replication origin in the shared memory struct if it doesn't
1042  * already exists and cache access to the specific ReplicationSlot so the
1043  * array doesn't have to be searched when calling
1044  * replorigin_session_advance().
1045  *
1046  * Obviously only one such cached origin can exist per process and the current
1047  * cached value can only be set again after the previous value is torn down
1048  * with replorigin_session_reset().
1049  */
1050 void
1052 {
1053  static bool registered_cleanup;
1054  int i;
1055  int free_slot = -1;
1056 
1057  if (!registered_cleanup)
1058  {
1060  registered_cleanup = true;
1061  }
1062 
1064 
1065  if (session_replication_state != NULL)
1066  ereport(ERROR,
1067  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1068  errmsg("cannot setup replication origin when one is already setup")));
1069 
1070  /* Lock exclusively, as we may have to create a new table entry. */
1071  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1072 
1073  /*
1074  * Search for either an existing slot for the origin, or a free one we can
1075  * use.
1076  */
1077  for (i = 0; i < max_replication_slots; i++)
1078  {
1079  ReplicationState *curstate = &replication_states[i];
1080 
1081  /* remember where to insert if necessary */
1082  if (curstate->roident == InvalidRepOriginId &&
1083  free_slot == -1)
1084  {
1085  free_slot = i;
1086  continue;
1087  }
1088 
1089  /* not our slot */
1090  if (curstate->roident != node)
1091  continue;
1092 
1093  else if (curstate->acquired_by != 0)
1094  {
1095  ereport(ERROR,
1096  (errcode(ERRCODE_OBJECT_IN_USE),
1097  errmsg("replication origin with OID %d is already active for PID %d",
1098  curstate->roident, curstate->acquired_by)));
1099  }
1100 
1101  /* ok, found slot */
1102  session_replication_state = curstate;
1103  }
1104 
1105 
1106  if (session_replication_state == NULL && free_slot == -1)
1107  ereport(ERROR,
1108  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1109  errmsg("could not find free replication state slot for replication origin with OID %u",
1110  node),
1111  errhint("Increase max_replication_slots and try again.")));
1112  else if (session_replication_state == NULL)
1113  {
1114  /* initialize new slot */
1115  session_replication_state = &replication_states[free_slot];
1116  Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
1117  Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
1118  session_replication_state->roident = node;
1119  }
1120 
1121 
1122  Assert(session_replication_state->roident != InvalidRepOriginId);
1123 
1124  session_replication_state->acquired_by = MyProcPid;
1125 
1126  LWLockRelease(ReplicationOriginLock);
1127 
1128  /* probably this one is pointless */
1129  ConditionVariableBroadcast(&session_replication_state->origin_cv);
1130 }
1131 
1132 /*
1133  * Reset replay state previously setup in this session.
1134  *
1135  * This function may only be called if an origin was setup with
1136  * replorigin_session_setup().
1137  */
1138 void
1140 {
1141  ConditionVariable *cv;
1142 
1144 
1145  if (session_replication_state == NULL)
1146  ereport(ERROR,
1147  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1148  errmsg("no replication origin is configured")));
1149 
1150  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1151 
1152  session_replication_state->acquired_by = 0;
1153  cv = &session_replication_state->origin_cv;
1154  session_replication_state = NULL;
1155 
1156  LWLockRelease(ReplicationOriginLock);
1157 
1159 }
1160 
1161 /*
1162  * Do the same work replorigin_advance() does, just on the session's
1163  * configured origin.
1164  *
1165  * This is noticeably cheaper than using replorigin_advance().
1166  */
1167 void
1169 {
1170  Assert(session_replication_state != NULL);
1171  Assert(session_replication_state->roident != InvalidRepOriginId);
1172 
1173  LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1174  if (session_replication_state->local_lsn < local_commit)
1175  session_replication_state->local_lsn = local_commit;
1176  if (session_replication_state->remote_lsn < remote_commit)
1177  session_replication_state->remote_lsn = remote_commit;
1178  LWLockRelease(&session_replication_state->lock);
1179 }
1180 
1181 /*
1182  * Ask the machinery about the point up to which we successfully replayed
1183  * changes from an already setup replication origin.
1184  */
1185 XLogRecPtr
1187 {
1190 
1191  Assert(session_replication_state != NULL);
1192 
1193  LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1194  remote_lsn = session_replication_state->remote_lsn;
1195  local_lsn = session_replication_state->local_lsn;
1196  LWLockRelease(&session_replication_state->lock);
1197 
1198  if (flush && local_lsn != InvalidXLogRecPtr)
1199  XLogFlush(local_lsn);
1200 
1201  return remote_lsn;
1202 }
1203 
1204 
1205 
1206 /* ---------------------------------------------------------------------------
1207  * SQL functions for working with replication origin.
1208  *
1209  * These mostly should be fairly short wrappers around more generic functions.
1210  * ---------------------------------------------------------------------------
1211  */
1212 
1213 /*
1214  * Create replication origin for the passed in name, and return the assigned
1215  * oid.
1216  */
1217 Datum
1219 {
1220  char *name;
1222 
1223  replorigin_check_prerequisites(false, false);
1224 
1226 
1227  /* Replication origins "pg_xxx" are reserved for internal use */
1228  if (IsReservedName(name))
1229  ereport(ERROR,
1230  (errcode(ERRCODE_RESERVED_NAME),
1231  errmsg("replication origin name \"%s\" is reserved",
1232  name),
1233  errdetail("Origin names starting with \"pg_\" are reserved.")));
1234 
1235  /*
1236  * If built with appropriate switch, whine when regression-testing
1237  * conventions for replication origin names are violated.
1238  */
1239 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1240  if (strncmp(name, "regress_", 8) != 0)
1241  elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1242 #endif
1243 
1244  roident = replorigin_create(name);
1245 
1246  pfree(name);
1247 
1248  PG_RETURN_OID(roident);
1249 }
1250 
1251 /*
1252  * Drop replication origin.
1253  */
1254 Datum
1256 {
1257  char *name;
1259 
1260  replorigin_check_prerequisites(false, false);
1261 
1263 
1264  roident = replorigin_by_name(name, false);
1265  Assert(OidIsValid(roident));
1266 
1267  replorigin_drop(roident, true);
1268 
1269  pfree(name);
1270 
1271  PG_RETURN_VOID();
1272 }
1273 
1274 /*
1275  * Return oid of a replication origin.
1276  */
1277 Datum
1279 {
1280  char *name;
1282 
1283  replorigin_check_prerequisites(false, false);
1284 
1286  roident = replorigin_by_name(name, true);
1287 
1288  pfree(name);
1289 
1290  if (OidIsValid(roident))
1291  PG_RETURN_OID(roident);
1292  PG_RETURN_NULL();
1293 }
1294 
1295 /*
1296  * Setup a replication origin for this session.
1297  */
1298 Datum
1300 {
1301  char *name;
1302  RepOriginId origin;
1303 
1304  replorigin_check_prerequisites(true, false);
1305 
1307  origin = replorigin_by_name(name, false);
1308  replorigin_session_setup(origin);
1309 
1310  replorigin_session_origin = origin;
1311 
1312  pfree(name);
1313 
1314  PG_RETURN_VOID();
1315 }
1316 
1317 /*
1318  * Reset previously setup origin in this session
1319  */
1320 Datum
1322 {
1323  replorigin_check_prerequisites(true, false);
1324 
1326 
1330 
1331  PG_RETURN_VOID();
1332 }
1333 
1334 /*
1335  * Has a replication origin been setup for this session.
1336  */
1337 Datum
1339 {
1340  replorigin_check_prerequisites(false, false);
1341 
1343 }
1344 
1345 
1346 /*
1347  * Return the replication progress for origin setup in the current session.
1348  *
1349  * If 'flush' is set to true it is ensured that the returned value corresponds
1350  * to a local transaction that has been flushed. This is useful if asynchronous
1351  * commits are used when replaying replicated transactions.
1352  */
1353 Datum
1355 {
1357  bool flush = PG_GETARG_BOOL(0);
1358 
1359  replorigin_check_prerequisites(true, false);
1360 
1361  if (session_replication_state == NULL)
1362  ereport(ERROR,
1363  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1364  errmsg("no replication origin is configured")));
1365 
1366  remote_lsn = replorigin_session_get_progress(flush);
1367 
1368  if (remote_lsn == InvalidXLogRecPtr)
1369  PG_RETURN_NULL();
1370 
1371  PG_RETURN_LSN(remote_lsn);
1372 }
1373 
1374 Datum
1376 {
1377  XLogRecPtr location = PG_GETARG_LSN(0);
1378 
1379  replorigin_check_prerequisites(true, false);
1380 
1381  if (session_replication_state == NULL)
1382  ereport(ERROR,
1383  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1384  errmsg("no replication origin is configured")));
1385 
1386  replorigin_session_origin_lsn = location;
1388 
1389  PG_RETURN_VOID();
1390 }
1391 
1392 Datum
1394 {
1395  replorigin_check_prerequisites(true, false);
1396 
1399 
1400  PG_RETURN_VOID();
1401 }
1402 
1403 
1404 Datum
1406 {
1407  text *name = PG_GETARG_TEXT_PP(0);
1408  XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1409  RepOriginId node;
1410 
1411  replorigin_check_prerequisites(true, false);
1412 
1413  /* lock to prevent the replication origin from vanishing */
1414  LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1415 
1416  node = replorigin_by_name(text_to_cstring(name), false);
1417 
1418  /*
1419  * Can't sensibly pass a local commit to be flushed at checkpoint - this
1420  * xact hasn't committed yet. This is why this function should be used to
1421  * set up the initial replication state, but not for replay.
1422  */
1423  replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1424  true /* go backward */ , true /* WAL log */ );
1425 
1426  UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1427 
1428  PG_RETURN_VOID();
1429 }
1430 
1431 
1432 /*
1433  * Return the replication progress for an individual replication origin.
1434  *
1435  * If 'flush' is set to true it is ensured that the returned value corresponds
1436  * to a local transaction that has been flushed. This is useful if asynchronous
1437  * commits are used when replaying replicated transactions.
1438  */
1439 Datum
1441 {
1442  char *name;
1443  bool flush;
1446 
1447  replorigin_check_prerequisites(true, true);
1448 
1450  flush = PG_GETARG_BOOL(1);
1451 
1452  roident = replorigin_by_name(name, false);
1453  Assert(OidIsValid(roident));
1454 
1455  remote_lsn = replorigin_get_progress(roident, flush);
1456 
1457  if (remote_lsn == InvalidXLogRecPtr)
1458  PG_RETURN_NULL();
1459 
1460  PG_RETURN_LSN(remote_lsn);
1461 }
1462 
1463 
1464 Datum
1466 {
1467  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1468  TupleDesc tupdesc;
1469  Tuplestorestate *tupstore;
1470  MemoryContext per_query_ctx;
1471  MemoryContext oldcontext;
1472  int i;
1474 
1475  /* we want to return 0 rows if slot is set to zero */
1476  replorigin_check_prerequisites(false, true);
1477 
1478  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1479  ereport(ERROR,
1480  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1481  errmsg("set-valued function called in context that cannot accept a set")));
1482  if (!(rsinfo->allowedModes & SFRM_Materialize))
1483  ereport(ERROR,
1484  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1485  errmsg("materialize mode required, but it is not allowed in this context")));
1486  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1487  elog(ERROR, "return type must be a row type");
1488 
1489  if (tupdesc->natts != REPLICATION_ORIGIN_PROGRESS_COLS)
1490  elog(ERROR, "wrong function definition");
1491 
1492  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1493  oldcontext = MemoryContextSwitchTo(per_query_ctx);
1494 
1495  tupstore = tuplestore_begin_heap(true, false, work_mem);
1496  rsinfo->returnMode = SFRM_Materialize;
1497  rsinfo->setResult = tupstore;
1498  rsinfo->setDesc = tupdesc;
1499 
1500  MemoryContextSwitchTo(oldcontext);
1501 
1502 
1503  /* prevent slots from being concurrently dropped */
1504  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1505 
1506  /*
1507  * Iterate through all possible replication_states, display if they are
1508  * filled. Note that we do not take any locks, so slightly corrupted/out
1509  * of date values are a possibility.
1510  */
1511  for (i = 0; i < max_replication_slots; i++)
1512  {
1516  char *roname;
1517 
1518  state = &replication_states[i];
1519 
1520  /* unused slot, nothing to display */
1521  if (state->roident == InvalidRepOriginId)
1522  continue;
1523 
1524  memset(values, 0, sizeof(values));
1525  memset(nulls, 1, sizeof(nulls));
1526 
1527  values[0] = ObjectIdGetDatum(state->roident);
1528  nulls[0] = false;
1529 
1530  /*
1531  * We're not preventing the origin to be dropped concurrently, so
1532  * silently accept that it might be gone.
1533  */
1534  if (replorigin_by_oid(state->roident, true,
1535  &roname))
1536  {
1537  values[1] = CStringGetTextDatum(roname);
1538  nulls[1] = false;
1539  }
1540 
1541  LWLockAcquire(&state->lock, LW_SHARED);
1542 
1543  values[2] = LSNGetDatum(state->remote_lsn);
1544  nulls[2] = false;
1545 
1546  values[3] = LSNGetDatum(state->local_lsn);
1547  nulls[3] = false;
1548 
1549  LWLockRelease(&state->lock);
1550 
1551  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1552  }
1553 
1554  tuplestore_donestoring(tupstore);
1555 
1556  LWLockRelease(ReplicationOriginLock);
1557 
1558 #undef REPLICATION_ORIGIN_PROGRESS_COLS
1559 
1560  return (Datum) 0;
1561 }
static ReplicationState * session_replication_state
Definition: origin.c:177
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:183
Definition: lwlock.h:31
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define IsA(nodeptr, _type_)
Definition: nodes.h:578
#define InitDirtySnapshot(snapshotdata)
Definition: snapmgr.h:75
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:205
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
int MyProcPid
Definition: globals.c:40
int errhint(const char *fmt,...)
Definition: elog.c:1149
XLogRecPtr local_lsn
Definition: origin.c:117
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:569
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
#define RelationGetDescr(relation)
Definition: rel.h:483
#define DoNotReplicateId
Definition: origin.h:34
Datum pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
Definition: origin.c:1375
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:199
#define write(a, b, c)
Definition: win32.h:14
Datum pg_replication_origin_drop(PG_FUNCTION_ARGS)
Definition: origin.c:1255
#define ExclusiveLock
Definition: lockdefs.h:44
int64 TimestampTz
Definition: timestamp.h:39
XLogRecPtr remote_lsn
Definition: origin.c:141
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:268
uint32 pg_crc32c
Definition: pg_crc32c.h:38
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition: origin.c:1019
RepOriginId roident
Definition: origin.c:140
void replorigin_drop(RepOriginId roident, bool nowait)
Definition: origin.c:332
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
unsigned char uint8
Definition: c.h:427
uint16 RepOriginId
Definition: xlogdefs.h:58
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:338
void ConditionVariableBroadcast(ConditionVariable *cv)
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1186
int errcode(int sqlerrcode)
Definition: elog.c:691
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
#define MemSet(start, val, len)
Definition: c.h:1004
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:350
void ReplicationOriginShmemInit(void)
Definition: origin.c:493
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:855
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:274
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1051
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:432
#define ReplicationOriginIdentIndex
#define LOG
Definition: elog.h:26
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
bool RecoveryInProgress(void)
Definition: xlog.c:8070
bool IsReservedName(const char *name)
Definition: catalog.c:210
#define OidIsValid(objectId)
Definition: c.h:706
#define PANIC
Definition: elog.h:53
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2847
static int fd(const char *x, int i)
Definition: preproc-init.c:105
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:357
#define PG_BINARY
Definition: c.h:1267
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:209
RepOriginId roident
Definition: origin.c:105
XLogRecPtr EndRecPtr
Definition: xlogreader.h:176
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
Datum pg_replication_origin_advance(PG_FUNCTION_ARGS)
Definition: origin.c:1405
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1811
#define PG_RETURN_LSN(x)
Definition: pg_lsn.h:25
void CheckPointReplicationOrigin(void)
Definition: origin.c:540
void ConditionVariableInit(ConditionVariable *cv)
void replorigin_redo(XLogReaderState *record)
Definition: origin.c:794
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:476
void pfree(void *pointer)
Definition: mcxt.c:1057
#define XLogRecGetData(decoder)
Definition: xlogreader.h:310
void ConditionVariableCancelSleep(void)
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
#define PG_UINT16_MAX
Definition: c.h:510
void replorigin_session_reset(void)
Definition: origin.c:1139
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2372
FormData_pg_replication_origin * Form_pg_replication_origin
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:392
LWLock lock
Definition: origin.c:132
ItemPointerData t_self
Definition: htup.h:65
#define DEBUG2
Definition: elog.h:24
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:155
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:156
#define XLOG_REPLORIGIN_SET
Definition: origin.h:30
void StartupReplicationOrigin(void)
Definition: origin.c:666
#define RowExclusiveLock
Definition: lockdefs.h:38
int errdetail(const char *fmt,...)
Definition: elog.c:1035
int errcode_for_file_access(void)
Definition: elog.c:714
struct ReplicationState ReplicationState
unsigned int uint32
Definition: c.h:429
Datum pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
Definition: origin.c:1299
Datum pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
Definition: origin.c:1321
RepOriginId replorigin_create(char *roname)
Definition: origin.c:240
Datum pg_show_replication_origin_status(PG_FUNCTION_ARGS)
Definition: origin.c:1465
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:305
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:659
Datum pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
Definition: origin.c:1338
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:45
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:744
int CloseTransientFile(int fd)
Definition: fd.c:2549
#define WARNING
Definition: elog.h:40
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1115
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:330
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:422
#define XLOG_REPLORIGIN_DROP
Definition: origin.h:31
Size mul_size(Size s1, Size s2)
Definition: shmem.c:515
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:24
struct ReplicationStateOnDisk ReplicationStateOnDisk
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:359
uintptr_t Datum
Definition: postgres.h:367
void CommandCounterIncrement(void)
Definition: xact.c:1021
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1163
Size add_size(Size s1, Size s2)
Definition: shmem.c:498
int work_mem
Definition: globals.c:121
Size ReplicationOriginShmemSize(void)
Definition: origin.c:473
#define REPLICATION_STATE_MAGIC
Definition: origin.c:180
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:155
static ReplicationStateCtl * replication_states_ctl
Definition: origin.c:170
Datum pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
Definition: origin.c:1393
int allowedModes
Definition: execnodes.h:304
#define PG_RETURN_VOID()
Definition: fmgr.h:349
struct ReplicationStateCtl ReplicationStateCtl
SetFunctionReturnMode returnMode
Definition: execnodes.h:306
int max_replication_slots
Definition: slot.c:99
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
XLogRecPtr remote_lsn
Definition: origin.c:110
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
Datum pg_replication_origin_progress(PG_FUNCTION_ARGS)
Definition: origin.c:1440
RepOriginId node_id
Definition: origin.h:27
uint64 XLogRecPtr
Definition: xlogdefs.h:21
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition: origin.c:150
#define Assert(condition)
Definition: c.h:800
#define XLR_INFO_MASK
Definition: xlogrecord.h:62
RepOriginId replorigin_session_origin
Definition: origin.c:154
Definition: regguts.h:298
RepOriginId node_id
Definition: origin.h:21
ConditionVariable origin_cv
Definition: origin.c:127
size_t Size
Definition: c.h:528
void replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
Definition: origin.c:1168
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1207
bool IsTransactionState(void)
Definition: xact.c:371
XLogRecPtr remote_lsn
Definition: origin.h:20
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:232
const char * name
Definition: encode.c:561
#define InvalidRepOriginId
Definition: origin.h:33
Tuplestorestate * setResult
Definition: execnodes.h:309
#define DatumGetPointer(X)
Definition: postgres.h:549
static Datum values[MAXATTR]
Definition: bootstrap.c:165
char * text_to_cstring(const text *t)
Definition: varlena.c:221
ExprContext * econtext
Definition: execnodes.h:302
TupleDesc setDesc
Definition: execnodes.h:310
int errmsg(const char *fmt,...)
Definition: elog.c:902
Datum pg_replication_origin_oid(PG_FUNCTION_ARGS)
Definition: origin.c:1278
#define PG_GETARG_TIMESTAMPTZ(n)
Definition: timestamp.h:36
#define elog(elevel,...)
Definition: elog.h:228
int i
Datum pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
Definition: origin.c:1354
static ReplicationState * replication_states
Definition: origin.c:165
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
#define CStringGetTextDatum(s)
Definition: builtins.h:86
void * arg
Definition: c.h:617
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:108
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:89
Datum pg_replication_origin_create(PG_FUNCTION_ARGS)
Definition: origin.c:1218
#define REPLICATION_ORIGIN_PROGRESS_COLS
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
void XLogBeginInsert(void)
Definition: xloginsert.c:123
#define PG_RETURN_OID(x)
Definition: fmgr.h:360
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:94
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
Definition: origin.c:978
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:221
#define PG_RETURN_NULL()
Definition: fmgr.h:345
#define read(a, b, c)
Definition: win32.h:13
#define BTEqualStrategyNumber
Definition: stratnum.h:31
#define offsetof(type, field)
Definition: c.h:723