PostgreSQL Source Code  git master
origin.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * origin.c
4  * Logical replication progress tracking support.
5  *
6  * Copyright (c) 2013-2022, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/logical/origin.c
10  *
11  * NOTES
12  *
13  * This file provides the following:
14  * * An infrastructure to name nodes in a replication setup
15  * * A facility to efficiently store and persist replication progress in an
16  * efficient and durable manner.
17  *
18  * Replication origin consist out of a descriptive, user defined, external
19  * name and a short, thus space efficient, internal 2 byte one. This split
20  * exists because replication origin have to be stored in WAL and shared
21  * memory and long descriptors would be inefficient. For now only use 2 bytes
22  * for the internal id of a replication origin as it seems unlikely that there
23  * soon will be more than 65k nodes in one replication setup; and using only
24  * two bytes allow us to be more space efficient.
25  *
26  * Replication progress is tracked in a shared memory table
27  * (ReplicationState) that's dumped to disk every checkpoint. Entries
28  * ('slots') in this table are identified by the internal id. That's the case
29  * because it allows to increase replication progress during crash
30  * recovery. To allow doing so we store the original LSN (from the originating
31  * system) of a transaction in the commit record. That allows to recover the
32  * precise replayed state after crash recovery; without requiring synchronous
33  * commits. Allowing logical replication to use asynchronous commit is
34  * generally good for performance, but especially important as it allows a
35  * single threaded replay process to keep up with a source that has multiple
36  * backends generating changes concurrently. For efficiency and simplicity
37  * reasons a backend can setup one replication origin that's from then used as
38  * the source of changes produced by the backend, until reset again.
39  *
40  * This infrastructure is intended to be used in cooperation with logical
41  * decoding. When replaying from a remote system the configured origin is
42  * provided to output plugins, allowing prevention of replication loops and
43  * other filtering.
44  *
45  * There are several levels of locking at work:
46  *
47  * * To create and drop replication origins an exclusive lock on
48  * pg_replication_slot is required for the duration. That allows us to
49  * safely and conflict free assign new origins using a dirty snapshot.
50  *
51  * * When creating an in-memory replication progress slot the ReplicationOrigin
52  * LWLock has to be held exclusively; when iterating over the replication
53  * progress a shared lock has to be held, the same when advancing the
54  * replication progress of an individual backend that has not setup as the
55  * session's replication origin.
56  *
57  * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58  * replication progress slot that slot's lwlock has to be held. That's
59  * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60  * all our platforms, but it also simplifies memory ordering concerns
61  * between the remote and local lsn. We use a lwlock instead of a spinlock
62  * so it's less harmful to hold the lock over a WAL write
63  * (cf. AdvanceReplicationProgress).
64  *
65  * ---------------------------------------------------------------------------
66  */
67 
68 #include "postgres.h"
69 
70 #include <unistd.h>
71 #include <sys/stat.h>
72 
73 #include "access/genam.h"
74 #include "access/htup_details.h"
75 #include "access/table.h"
76 #include "access/xact.h"
77 #include "access/xloginsert.h"
78 #include "catalog/catalog.h"
79 #include "catalog/indexing.h"
81 #include "funcapi.h"
82 #include "miscadmin.h"
83 #include "nodes/execnodes.h"
84 #include "pgstat.h"
85 #include "replication/logical.h"
86 #include "replication/origin.h"
88 #include "storage/copydir.h"
89 #include "storage/fd.h"
90 #include "storage/ipc.h"
91 #include "storage/lmgr.h"
92 #include "utils/builtins.h"
93 #include "utils/fmgroids.h"
94 #include "utils/pg_lsn.h"
95 #include "utils/rel.h"
96 #include "utils/snapmgr.h"
97 #include "utils/syscache.h"
98 
99 /*
100  * Replay progress of a single remote node.
101  */
102 typedef struct ReplicationState
103 {
104  /*
105  * Local identifier for the remote node.
106  */
108 
109  /*
110  * Location of the latest commit from the remote side.
111  */
113 
114  /*
115  * Remember the local lsn of the commit record so we can XLogFlush() to it
116  * during a checkpoint so we know the commit record actually is safe on
117  * disk.
118  */
120 
121  /*
122  * PID of backend that's acquired slot, or 0 if none.
123  */
125 
126  /*
127  * Condition variable that's signaled when acquired_by changes.
128  */
130 
131  /*
132  * Lock protecting remote_lsn and local_lsn.
133  */
136 
137 /*
138  * On disk version of ReplicationState.
139  */
141 {
145 
146 
147 typedef struct ReplicationStateCtl
148 {
149  /* Tranche to use for per-origin LWLocks */
151  /* Array of length max_replication_slots */
154 
155 /* external variables */
159 
160 /*
161  * Base address into a shared memory array of replication states of size
162  * max_replication_slots.
163  *
164  * XXX: Should we use a separate variable to size this rather than
165  * max_replication_slots?
166  */
168 
169 /*
170  * Actual shared memory block (replication_states[] is now part of this).
171  */
173 
174 /*
175  * Backend-local, cached element from ReplicationState for use in a backend
176  * replaying remote commits, so we don't have to search ReplicationState for
177  * the backends current RepOriginId.
178  */
180 
181 /* Magic for on disk files. */
182 #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
183 
184 static void
185 replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
186 {
187  if (check_slots && max_replication_slots == 0)
188  ereport(ERROR,
189  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
190  errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
191 
192  if (!recoveryOK && RecoveryInProgress())
193  ereport(ERROR,
194  (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
195  errmsg("cannot manipulate replication origins during recovery")));
196 }
197 
198 
199 /*
200  * IsReservedOriginName
201  * True iff name is either "none" or "any".
202  */
203 static bool
205 {
206  return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
208 }
209 
210 /* ---------------------------------------------------------------------------
211  * Functions for working with replication origins themselves.
212  * ---------------------------------------------------------------------------
213  */
214 
215 /*
216  * Check for a persistent replication origin identified by name.
217  *
218  * Returns InvalidOid if the node isn't known yet and missing_ok is true.
219  */
221 replorigin_by_name(const char *roname, bool missing_ok)
222 {
224  Oid roident = InvalidOid;
225  HeapTuple tuple;
226  Datum roname_d;
227 
228  roname_d = CStringGetTextDatum(roname);
229 
230  tuple = SearchSysCache1(REPLORIGNAME, roname_d);
231  if (HeapTupleIsValid(tuple))
232  {
233  ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
234  roident = ident->roident;
235  ReleaseSysCache(tuple);
236  }
237  else if (!missing_ok)
238  ereport(ERROR,
239  (errcode(ERRCODE_UNDEFINED_OBJECT),
240  errmsg("replication origin \"%s\" does not exist",
241  roname)));
242 
243  return roident;
244 }
245 
246 /*
247  * Create a replication origin.
248  *
249  * Needs to be called in a transaction.
250  */
252 replorigin_create(const char *roname)
253 {
254  Oid roident;
255  HeapTuple tuple = NULL;
256  Relation rel;
257  Datum roname_d;
258  SnapshotData SnapshotDirty;
259  SysScanDesc scan;
261 
262  roname_d = CStringGetTextDatum(roname);
263 
265 
266  /*
267  * We need the numeric replication origin to be 16bit wide, so we cannot
268  * rely on the normal oid allocation. Instead we simply scan
269  * pg_replication_origin for the first unused id. That's not particularly
270  * efficient, but this should be a fairly infrequent operation - we can
271  * easily spend a bit more code on this when it turns out it needs to be
272  * faster.
273  *
274  * We handle concurrency by taking an exclusive lock (allowing reads!)
275  * over the table for the duration of the search. Because we use a "dirty
276  * snapshot" we can read rows that other in-progress sessions have
277  * written, even though they would be invisible with normal snapshots. Due
278  * to the exclusive lock there's no danger that new rows can appear while
279  * we're checking.
280  */
281  InitDirtySnapshot(SnapshotDirty);
282 
283  rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
284 
285  for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
286  {
287  bool nulls[Natts_pg_replication_origin];
288  Datum values[Natts_pg_replication_origin];
289  bool collides;
290 
292 
293  ScanKeyInit(&key,
294  Anum_pg_replication_origin_roident,
295  BTEqualStrategyNumber, F_OIDEQ,
296  ObjectIdGetDatum(roident));
297 
298  scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
299  true /* indexOK */ ,
300  &SnapshotDirty,
301  1, &key);
302 
303  collides = HeapTupleIsValid(systable_getnext(scan));
304 
305  systable_endscan(scan);
306 
307  if (!collides)
308  {
309  /*
310  * Ok, found an unused roident, insert the new row and do a CCI,
311  * so our callers can look it up if they want to.
312  */
313  memset(&nulls, 0, sizeof(nulls));
314 
315  values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
316  values[Anum_pg_replication_origin_roname - 1] = roname_d;
317 
318  tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
319  CatalogTupleInsert(rel, tuple);
321  break;
322  }
323  }
324 
325  /* now release lock again, */
327 
328  if (tuple == NULL)
329  ereport(ERROR,
330  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
331  errmsg("could not find free replication origin ID")));
332 
333  heap_freetuple(tuple);
334  return roident;
335 }
336 
337 /*
338  * Helper function to drop a replication origin.
339  */
340 static void
341 replorigin_drop_guts(Relation rel, RepOriginId roident, bool nowait)
342 {
343  HeapTuple tuple;
344  int i;
345 
346  /*
347  * First, clean up the slot state info, if there is any matching slot.
348  */
349 restart:
350  tuple = NULL;
351  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
352 
353  for (i = 0; i < max_replication_slots; i++)
354  {
356 
357  if (state->roident == roident)
358  {
359  /* found our slot, is it busy? */
360  if (state->acquired_by != 0)
361  {
362  ConditionVariable *cv;
363 
364  if (nowait)
365  ereport(ERROR,
366  (errcode(ERRCODE_OBJECT_IN_USE),
367  errmsg("could not drop replication origin with ID %d, in use by PID %d",
368  state->roident,
369  state->acquired_by)));
370 
371  /*
372  * We must wait and then retry. Since we don't know which CV
373  * to wait on until here, we can't readily use
374  * ConditionVariablePrepareToSleep (calling it here would be
375  * wrong, since we could miss the signal if we did so); just
376  * use ConditionVariableSleep directly.
377  */
378  cv = &state->origin_cv;
379 
380  LWLockRelease(ReplicationOriginLock);
381 
383  goto restart;
384  }
385 
386  /* first make a WAL log entry */
387  {
388  xl_replorigin_drop xlrec;
389 
390  xlrec.node_id = roident;
391  XLogBeginInsert();
392  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
393  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
394  }
395 
396  /* then clear the in-memory slot */
397  state->roident = InvalidRepOriginId;
398  state->remote_lsn = InvalidXLogRecPtr;
399  state->local_lsn = InvalidXLogRecPtr;
400  break;
401  }
402  }
403  LWLockRelease(ReplicationOriginLock);
405 
406  /*
407  * Now, we can delete the catalog entry.
408  */
410  if (!HeapTupleIsValid(tuple))
411  elog(ERROR, "cache lookup failed for replication origin with ID %d",
412  roident);
413 
414  CatalogTupleDelete(rel, &tuple->t_self);
415  ReleaseSysCache(tuple);
416 
418 }
419 
420 /*
421  * Drop replication origin (by name).
422  *
423  * Needs to be called in a transaction.
424  */
425 void
426 replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
427 {
428  RepOriginId roident;
429  Relation rel;
430 
432 
433  /*
434  * To interlock against concurrent drops, we hold ExclusiveLock on
435  * pg_replication_origin till xact commit.
436  *
437  * XXX We can optimize this by acquiring the lock on a specific origin by
438  * using LockSharedObject if required. However, for that, we first to
439  * acquire a lock on ReplicationOriginRelationId, get the origin_id, lock
440  * the specific origin and then re-check if the origin still exists.
441  */
442  rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
443 
444  roident = replorigin_by_name(name, missing_ok);
445 
446  if (OidIsValid(roident))
447  replorigin_drop_guts(rel, roident, nowait);
448 
449  /* We keep the lock on pg_replication_origin until commit */
450  table_close(rel, NoLock);
451 }
452 
453 /*
454  * Lookup replication origin via its oid and return the name.
455  *
456  * The external name is palloc'd in the calling context.
457  *
458  * Returns true if the origin is known, false otherwise.
459  */
460 bool
461 replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
462 {
463  HeapTuple tuple;
465 
466  Assert(OidIsValid((Oid) roident));
467  Assert(roident != InvalidRepOriginId);
468  Assert(roident != DoNotReplicateId);
469 
471  ObjectIdGetDatum((Oid) roident));
472 
473  if (HeapTupleIsValid(tuple))
474  {
475  ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
476  *roname = text_to_cstring(&ric->roname);
477  ReleaseSysCache(tuple);
478 
479  return true;
480  }
481  else
482  {
483  *roname = NULL;
484 
485  if (!missing_ok)
486  ereport(ERROR,
487  (errcode(ERRCODE_UNDEFINED_OBJECT),
488  errmsg("replication origin with ID %d does not exist",
489  roident)));
490 
491  return false;
492  }
493 }
494 
495 
496 /* ---------------------------------------------------------------------------
497  * Functions for handling replication progress.
498  * ---------------------------------------------------------------------------
499  */
500 
501 Size
503 {
504  Size size = 0;
505 
506  /*
507  * XXX: max_replication_slots is arguably the wrong thing to use, as here
508  * we keep the replay state of *remote* transactions. But for now it seems
509  * sufficient to reuse it, rather than introduce a separate GUC.
510  */
511  if (max_replication_slots == 0)
512  return size;
513 
514  size = add_size(size, offsetof(ReplicationStateCtl, states));
515 
516  size = add_size(size,
518  return size;
519 }
520 
521 void
523 {
524  bool found;
525 
526  if (max_replication_slots == 0)
527  return;
528 
530  ShmemInitStruct("ReplicationOriginState",
532  &found);
534 
535  if (!found)
536  {
537  int i;
538 
540 
542 
543  for (i = 0; i < max_replication_slots; i++)
544  {
548  }
549  }
550 }
551 
552 /* ---------------------------------------------------------------------------
553  * Perform a checkpoint of each replication origin's progress with respect to
554  * the replayed remote_lsn. Make sure that all transactions we refer to in the
555  * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
556  * if the transactions were originally committed asynchronously.
557  *
558  * We store checkpoints in the following format:
559  * +-------+------------------------+------------------+-----+--------+
560  * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
561  * +-------+------------------------+------------------+-----+--------+
562  *
563  * So its just the magic, followed by the statically sized
564  * ReplicationStateOnDisk structs. Note that the maximum number of
565  * ReplicationState is determined by max_replication_slots.
566  * ---------------------------------------------------------------------------
567  */
568 void
570 {
571  const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
572  const char *path = "pg_logical/replorigin_checkpoint";
573  int tmpfd;
574  int i;
576  pg_crc32c crc;
577 
578  if (max_replication_slots == 0)
579  return;
580 
581  INIT_CRC32C(crc);
582 
583  /* make sure no old temp file is remaining */
584  if (unlink(tmppath) < 0 && errno != ENOENT)
585  ereport(PANIC,
587  errmsg("could not remove file \"%s\": %m",
588  tmppath)));
589 
590  /*
591  * no other backend can perform this at the same time; only one checkpoint
592  * can happen at a time.
593  */
594  tmpfd = OpenTransientFile(tmppath,
595  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
596  if (tmpfd < 0)
597  ereport(PANIC,
599  errmsg("could not create file \"%s\": %m",
600  tmppath)));
601 
602  /* write magic */
603  errno = 0;
604  if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
605  {
606  /* if write didn't set errno, assume problem is no disk space */
607  if (errno == 0)
608  errno = ENOSPC;
609  ereport(PANIC,
611  errmsg("could not write to file \"%s\": %m",
612  tmppath)));
613  }
614  COMP_CRC32C(crc, &magic, sizeof(magic));
615 
616  /* prevent concurrent creations/drops */
617  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
618 
619  /* write actual data */
620  for (i = 0; i < max_replication_slots; i++)
621  {
622  ReplicationStateOnDisk disk_state;
623  ReplicationState *curstate = &replication_states[i];
624  XLogRecPtr local_lsn;
625 
626  if (curstate->roident == InvalidRepOriginId)
627  continue;
628 
629  /* zero, to avoid uninitialized padding bytes */
630  memset(&disk_state, 0, sizeof(disk_state));
631 
632  LWLockAcquire(&curstate->lock, LW_SHARED);
633 
634  disk_state.roident = curstate->roident;
635 
636  disk_state.remote_lsn = curstate->remote_lsn;
637  local_lsn = curstate->local_lsn;
638 
639  LWLockRelease(&curstate->lock);
640 
641  /* make sure we only write out a commit that's persistent */
642  XLogFlush(local_lsn);
643 
644  errno = 0;
645  if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
646  sizeof(disk_state))
647  {
648  /* if write didn't set errno, assume problem is no disk space */
649  if (errno == 0)
650  errno = ENOSPC;
651  ereport(PANIC,
653  errmsg("could not write to file \"%s\": %m",
654  tmppath)));
655  }
656 
657  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
658  }
659 
660  LWLockRelease(ReplicationOriginLock);
661 
662  /* write out the CRC */
663  FIN_CRC32C(crc);
664  errno = 0;
665  if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
666  {
667  /* if write didn't set errno, assume problem is no disk space */
668  if (errno == 0)
669  errno = ENOSPC;
670  ereport(PANIC,
672  errmsg("could not write to file \"%s\": %m",
673  tmppath)));
674  }
675 
676  if (CloseTransientFile(tmpfd) != 0)
677  ereport(PANIC,
679  errmsg("could not close file \"%s\": %m",
680  tmppath)));
681 
682  /* fsync, rename to permanent file, fsync file and directory */
683  durable_rename(tmppath, path, PANIC);
684 }
685 
686 /*
687  * Recover replication replay status from checkpoint data saved earlier by
688  * CheckPointReplicationOrigin.
689  *
690  * This only needs to be called at startup and *not* during every checkpoint
691  * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
692  * state thereafter can be recovered by looking at commit records.
693  */
694 void
696 {
697  const char *path = "pg_logical/replorigin_checkpoint";
698  int fd;
699  int readBytes;
701  int last_state = 0;
702  pg_crc32c file_crc;
703  pg_crc32c crc;
704 
705  /* don't want to overwrite already existing state */
706 #ifdef USE_ASSERT_CHECKING
707  static bool already_started = false;
708 
709  Assert(!already_started);
710  already_started = true;
711 #endif
712 
713  if (max_replication_slots == 0)
714  return;
715 
716  INIT_CRC32C(crc);
717 
718  elog(DEBUG2, "starting up replication origin progress state");
719 
720  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
721 
722  /*
723  * might have had max_replication_slots == 0 last run, or we just brought
724  * up a standby.
725  */
726  if (fd < 0 && errno == ENOENT)
727  return;
728  else if (fd < 0)
729  ereport(PANIC,
731  errmsg("could not open file \"%s\": %m",
732  path)));
733 
734  /* verify magic, that is written even if nothing was active */
735  readBytes = read(fd, &magic, sizeof(magic));
736  if (readBytes != sizeof(magic))
737  {
738  if (readBytes < 0)
739  ereport(PANIC,
741  errmsg("could not read file \"%s\": %m",
742  path)));
743  else
744  ereport(PANIC,
746  errmsg("could not read file \"%s\": read %d of %zu",
747  path, readBytes, sizeof(magic))));
748  }
749  COMP_CRC32C(crc, &magic, sizeof(magic));
750 
751  if (magic != REPLICATION_STATE_MAGIC)
752  ereport(PANIC,
753  (errmsg("replication checkpoint has wrong magic %u instead of %u",
754  magic, REPLICATION_STATE_MAGIC)));
755 
756  /* we can skip locking here, no other access is possible */
757 
758  /* recover individual states, until there are no more to be found */
759  while (true)
760  {
761  ReplicationStateOnDisk disk_state;
762 
763  readBytes = read(fd, &disk_state, sizeof(disk_state));
764 
765  /* no further data */
766  if (readBytes == sizeof(crc))
767  {
768  /* not pretty, but simple ... */
769  file_crc = *(pg_crc32c *) &disk_state;
770  break;
771  }
772 
773  if (readBytes < 0)
774  {
775  ereport(PANIC,
777  errmsg("could not read file \"%s\": %m",
778  path)));
779  }
780 
781  if (readBytes != sizeof(disk_state))
782  {
783  ereport(PANIC,
785  errmsg("could not read file \"%s\": read %d of %zu",
786  path, readBytes, sizeof(disk_state))));
787  }
788 
789  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
790 
791  if (last_state == max_replication_slots)
792  ereport(PANIC,
793  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
794  errmsg("could not find free replication state, increase max_replication_slots")));
795 
796  /* copy data to shared memory */
797  replication_states[last_state].roident = disk_state.roident;
798  replication_states[last_state].remote_lsn = disk_state.remote_lsn;
799  last_state++;
800 
801  ereport(LOG,
802  (errmsg("recovered replication state of node %d to %X/%X",
803  disk_state.roident,
804  LSN_FORMAT_ARGS(disk_state.remote_lsn))));
805  }
806 
807  /* now check checksum */
808  FIN_CRC32C(crc);
809  if (file_crc != crc)
810  ereport(PANIC,
812  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
813  crc, file_crc)));
814 
815  if (CloseTransientFile(fd) != 0)
816  ereport(PANIC,
818  errmsg("could not close file \"%s\": %m",
819  path)));
820 }
821 
822 void
824 {
825  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
826 
827  switch (info)
828  {
829  case XLOG_REPLORIGIN_SET:
830  {
831  xl_replorigin_set *xlrec =
832  (xl_replorigin_set *) XLogRecGetData(record);
833 
835  xlrec->remote_lsn, record->EndRecPtr,
836  xlrec->force /* backward */ ,
837  false /* WAL log */ );
838  break;
839  }
841  {
842  xl_replorigin_drop *xlrec;
843  int i;
844 
845  xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
846 
847  for (i = 0; i < max_replication_slots; i++)
848  {
850 
851  /* found our slot */
852  if (state->roident == xlrec->node_id)
853  {
854  /* reset entry */
855  state->roident = InvalidRepOriginId;
856  state->remote_lsn = InvalidXLogRecPtr;
857  state->local_lsn = InvalidXLogRecPtr;
858  break;
859  }
860  }
861  break;
862  }
863  default:
864  elog(PANIC, "replorigin_redo: unknown op code %u", info);
865  }
866 }
867 
868 
869 /*
870  * Tell the replication origin progress machinery that a commit from 'node'
871  * that originated at the LSN remote_commit on the remote node was replayed
872  * successfully and that we don't need to do so again. In combination with
873  * setting up replorigin_session_origin_lsn and replorigin_session_origin
874  * that ensures we won't lose knowledge about that after a crash if the
875  * transaction had a persistent effect (think of asynchronous commits).
876  *
877  * local_commit needs to be a local LSN of the commit so that we can make sure
878  * upon a checkpoint that enough WAL has been persisted to disk.
879  *
880  * Needs to be called with a RowExclusiveLock on pg_replication_origin,
881  * unless running in recovery.
882  */
883 void
885  XLogRecPtr remote_commit, XLogRecPtr local_commit,
886  bool go_backward, bool wal_log)
887 {
888  int i;
889  ReplicationState *replication_state = NULL;
890  ReplicationState *free_state = NULL;
891 
892  Assert(node != InvalidRepOriginId);
893 
894  /* we don't track DoNotReplicateId */
895  if (node == DoNotReplicateId)
896  return;
897 
898  /*
899  * XXX: For the case where this is called by WAL replay, it'd be more
900  * efficient to restore into a backend local hashtable and only dump into
901  * shmem after recovery is finished. Let's wait with implementing that
902  * till it's shown to be a measurable expense
903  */
904 
905  /* Lock exclusively, as we may have to create a new table entry. */
906  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
907 
908  /*
909  * Search for either an existing slot for the origin, or a free one we can
910  * use.
911  */
912  for (i = 0; i < max_replication_slots; i++)
913  {
914  ReplicationState *curstate = &replication_states[i];
915 
916  /* remember where to insert if necessary */
917  if (curstate->roident == InvalidRepOriginId &&
918  free_state == NULL)
919  {
920  free_state = curstate;
921  continue;
922  }
923 
924  /* not our slot */
925  if (curstate->roident != node)
926  {
927  continue;
928  }
929 
930  /* ok, found slot */
931  replication_state = curstate;
932 
933  LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
934 
935  /* Make sure it's not used by somebody else */
936  if (replication_state->acquired_by != 0)
937  {
938  ereport(ERROR,
939  (errcode(ERRCODE_OBJECT_IN_USE),
940  errmsg("replication origin with ID %d is already active for PID %d",
941  replication_state->roident,
942  replication_state->acquired_by)));
943  }
944 
945  break;
946  }
947 
948  if (replication_state == NULL && free_state == NULL)
949  ereport(ERROR,
950  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
951  errmsg("could not find free replication state slot for replication origin with ID %d",
952  node),
953  errhint("Increase max_replication_slots and try again.")));
954 
955  if (replication_state == NULL)
956  {
957  /* initialize new slot */
958  LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
959  replication_state = free_state;
960  Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
961  Assert(replication_state->local_lsn == InvalidXLogRecPtr);
962  replication_state->roident = node;
963  }
964 
965  Assert(replication_state->roident != InvalidRepOriginId);
966 
967  /*
968  * If somebody "forcefully" sets this slot, WAL log it, so it's durable
969  * and the standby gets the message. Primarily this will be called during
970  * WAL replay (of commit records) where no WAL logging is necessary.
971  */
972  if (wal_log)
973  {
974  xl_replorigin_set xlrec;
975 
976  xlrec.remote_lsn = remote_commit;
977  xlrec.node_id = node;
978  xlrec.force = go_backward;
979 
980  XLogBeginInsert();
981  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
982 
983  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
984  }
985 
986  /*
987  * Due to - harmless - race conditions during a checkpoint we could see
988  * values here that are older than the ones we already have in memory. We
989  * could also see older values for prepared transactions when the prepare
990  * is sent at a later point of time along with commit prepared and there
991  * are other transactions commits between prepare and commit prepared. See
992  * ReorderBufferFinishPrepared. Don't overwrite those.
993  */
994  if (go_backward || replication_state->remote_lsn < remote_commit)
995  replication_state->remote_lsn = remote_commit;
996  if (local_commit != InvalidXLogRecPtr &&
997  (go_backward || replication_state->local_lsn < local_commit))
998  replication_state->local_lsn = local_commit;
999  LWLockRelease(&replication_state->lock);
1000 
1001  /*
1002  * Release *after* changing the LSNs, slot isn't acquired and thus could
1003  * otherwise be dropped anytime.
1004  */
1005  LWLockRelease(ReplicationOriginLock);
1006 }
1007 
1008 
1009 XLogRecPtr
1011 {
1012  int i;
1013  XLogRecPtr local_lsn = InvalidXLogRecPtr;
1014  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1015 
1016  /* prevent slots from being concurrently dropped */
1017  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1018 
1019  for (i = 0; i < max_replication_slots; i++)
1020  {
1022 
1024 
1025  if (state->roident == node)
1026  {
1027  LWLockAcquire(&state->lock, LW_SHARED);
1028 
1029  remote_lsn = state->remote_lsn;
1030  local_lsn = state->local_lsn;
1031 
1032  LWLockRelease(&state->lock);
1033 
1034  break;
1035  }
1036  }
1037 
1038  LWLockRelease(ReplicationOriginLock);
1039 
1040  if (flush && local_lsn != InvalidXLogRecPtr)
1041  XLogFlush(local_lsn);
1042 
1043  return remote_lsn;
1044 }
1045 
1046 /*
1047  * Tear down a (possibly) configured session replication origin during process
1048  * exit.
1049  */
1050 static void
1052 {
1053  ConditionVariable *cv = NULL;
1054 
1055  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1056 
1057  if (session_replication_state != NULL &&
1059  {
1061 
1064  }
1065 
1066  LWLockRelease(ReplicationOriginLock);
1067 
1068  if (cv)
1070 }
1071 
1072 /*
1073  * Setup a replication origin in the shared memory struct if it doesn't
1074  * already exist and cache access to the specific ReplicationSlot so the
1075  * array doesn't have to be searched when calling
1076  * replorigin_session_advance().
1077  *
1078  * Obviously only one such cached origin can exist per process and the current
1079  * cached value can only be set again after the previous value is torn down
1080  * with replorigin_session_reset().
1081  */
1082 void
1084 {
1085  static bool registered_cleanup;
1086  int i;
1087  int free_slot = -1;
1088 
1089  if (!registered_cleanup)
1090  {
1092  registered_cleanup = true;
1093  }
1094 
1096 
1097  if (session_replication_state != NULL)
1098  ereport(ERROR,
1099  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1100  errmsg("cannot setup replication origin when one is already setup")));
1101 
1102  /* Lock exclusively, as we may have to create a new table entry. */
1103  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1104 
1105  /*
1106  * Search for either an existing slot for the origin, or a free one we can
1107  * use.
1108  */
1109  for (i = 0; i < max_replication_slots; i++)
1110  {
1111  ReplicationState *curstate = &replication_states[i];
1112 
1113  /* remember where to insert if necessary */
1114  if (curstate->roident == InvalidRepOriginId &&
1115  free_slot == -1)
1116  {
1117  free_slot = i;
1118  continue;
1119  }
1120 
1121  /* not our slot */
1122  if (curstate->roident != node)
1123  continue;
1124 
1125  else if (curstate->acquired_by != 0)
1126  {
1127  ereport(ERROR,
1128  (errcode(ERRCODE_OBJECT_IN_USE),
1129  errmsg("replication origin with ID %d is already active for PID %d",
1130  curstate->roident, curstate->acquired_by)));
1131  }
1132 
1133  /* ok, found slot */
1134  session_replication_state = curstate;
1135  }
1136 
1137 
1138  if (session_replication_state == NULL && free_slot == -1)
1139  ereport(ERROR,
1140  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1141  errmsg("could not find free replication state slot for replication origin with ID %d",
1142  node),
1143  errhint("Increase max_replication_slots and try again.")));
1144  else if (session_replication_state == NULL)
1145  {
1146  /* initialize new slot */
1151  }
1152 
1153 
1155 
1157 
1158  LWLockRelease(ReplicationOriginLock);
1159 
1160  /* probably this one is pointless */
1162 }
1163 
1164 /*
1165  * Reset replay state previously setup in this session.
1166  *
1167  * This function may only be called if an origin was setup with
1168  * replorigin_session_setup().
1169  */
1170 void
1172 {
1173  ConditionVariable *cv;
1174 
1176 
1177  if (session_replication_state == NULL)
1178  ereport(ERROR,
1179  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1180  errmsg("no replication origin is configured")));
1181 
1182  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1183 
1187 
1188  LWLockRelease(ReplicationOriginLock);
1189 
1191 }
1192 
1193 /*
1194  * Do the same work replorigin_advance() does, just on the session's
1195  * configured origin.
1196  *
1197  * This is noticeably cheaper than using replorigin_advance().
1198  */
1199 void
1201 {
1204 
1206  if (session_replication_state->local_lsn < local_commit)
1207  session_replication_state->local_lsn = local_commit;
1208  if (session_replication_state->remote_lsn < remote_commit)
1209  session_replication_state->remote_lsn = remote_commit;
1211 }
1212 
1213 /*
1214  * Ask the machinery about the point up to which we successfully replayed
1215  * changes from an already setup replication origin.
1216  */
1217 XLogRecPtr
1219 {
1220  XLogRecPtr remote_lsn;
1221  XLogRecPtr local_lsn;
1222 
1224 
1226  remote_lsn = session_replication_state->remote_lsn;
1227  local_lsn = session_replication_state->local_lsn;
1229 
1230  if (flush && local_lsn != InvalidXLogRecPtr)
1231  XLogFlush(local_lsn);
1232 
1233  return remote_lsn;
1234 }
1235 
1236 
1237 
1238 /* ---------------------------------------------------------------------------
1239  * SQL functions for working with replication origin.
1240  *
1241  * These mostly should be fairly short wrappers around more generic functions.
1242  * ---------------------------------------------------------------------------
1243  */
1244 
1245 /*
1246  * Create replication origin for the passed in name, and return the assigned
1247  * oid.
1248  */
1249 Datum
1251 {
1252  char *name;
1253  RepOriginId roident;
1254 
1255  replorigin_check_prerequisites(false, false);
1256 
1258 
1259  /*
1260  * Replication origins "any and "none" are reserved for system options.
1261  * The origins "pg_xxx" are reserved for internal use.
1262  */
1264  ereport(ERROR,
1265  (errcode(ERRCODE_RESERVED_NAME),
1266  errmsg("replication origin name \"%s\" is reserved",
1267  name),
1268  errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1270 
1271  /*
1272  * If built with appropriate switch, whine when regression-testing
1273  * conventions for replication origin names are violated.
1274  */
1275 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1276  if (strncmp(name, "regress_", 8) != 0)
1277  elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1278 #endif
1279 
1280  roident = replorigin_create(name);
1281 
1282  pfree(name);
1283 
1284  PG_RETURN_OID(roident);
1285 }
1286 
1287 /*
1288  * Drop replication origin.
1289  */
1290 Datum
1292 {
1293  char *name;
1294 
1295  replorigin_check_prerequisites(false, false);
1296 
1298 
1299  replorigin_drop_by_name(name, false, true);
1300 
1301  pfree(name);
1302 
1303  PG_RETURN_VOID();
1304 }
1305 
1306 /*
1307  * Return oid of a replication origin.
1308  */
1309 Datum
1311 {
1312  char *name;
1313  RepOriginId roident;
1314 
1315  replorigin_check_prerequisites(false, false);
1316 
1318  roident = replorigin_by_name(name, true);
1319 
1320  pfree(name);
1321 
1322  if (OidIsValid(roident))
1323  PG_RETURN_OID(roident);
1324  PG_RETURN_NULL();
1325 }
1326 
1327 /*
1328  * Setup a replication origin for this session.
1329  */
1330 Datum
1332 {
1333  char *name;
1334  RepOriginId origin;
1335 
1336  replorigin_check_prerequisites(true, false);
1337 
1339  origin = replorigin_by_name(name, false);
1340  replorigin_session_setup(origin);
1341 
1342  replorigin_session_origin = origin;
1343 
1344  pfree(name);
1345 
1346  PG_RETURN_VOID();
1347 }
1348 
1349 /*
1350  * Reset previously setup origin in this session
1351  */
1352 Datum
1354 {
1355  replorigin_check_prerequisites(true, false);
1356 
1358 
1362 
1363  PG_RETURN_VOID();
1364 }
1365 
1366 /*
1367  * Has a replication origin been setup for this session.
1368  */
1369 Datum
1371 {
1372  replorigin_check_prerequisites(false, false);
1373 
1375 }
1376 
1377 
1378 /*
1379  * Return the replication progress for origin setup in the current session.
1380  *
1381  * If 'flush' is set to true it is ensured that the returned value corresponds
1382  * to a local transaction that has been flushed. This is useful if asynchronous
1383  * commits are used when replaying replicated transactions.
1384  */
1385 Datum
1387 {
1388  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1389  bool flush = PG_GETARG_BOOL(0);
1390 
1391  replorigin_check_prerequisites(true, false);
1392 
1393  if (session_replication_state == NULL)
1394  ereport(ERROR,
1395  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1396  errmsg("no replication origin is configured")));
1397 
1398  remote_lsn = replorigin_session_get_progress(flush);
1399 
1400  if (remote_lsn == InvalidXLogRecPtr)
1401  PG_RETURN_NULL();
1402 
1403  PG_RETURN_LSN(remote_lsn);
1404 }
1405 
1406 Datum
1408 {
1409  XLogRecPtr location = PG_GETARG_LSN(0);
1410 
1411  replorigin_check_prerequisites(true, false);
1412 
1413  if (session_replication_state == NULL)
1414  ereport(ERROR,
1415  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1416  errmsg("no replication origin is configured")));
1417 
1418  replorigin_session_origin_lsn = location;
1420 
1421  PG_RETURN_VOID();
1422 }
1423 
1424 Datum
1426 {
1427  replorigin_check_prerequisites(true, false);
1428 
1431 
1432  PG_RETURN_VOID();
1433 }
1434 
1435 
1436 Datum
1438 {
1439  text *name = PG_GETARG_TEXT_PP(0);
1440  XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1441  RepOriginId node;
1442 
1443  replorigin_check_prerequisites(true, false);
1444 
1445  /* lock to prevent the replication origin from vanishing */
1446  LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1447 
1448  node = replorigin_by_name(text_to_cstring(name), false);
1449 
1450  /*
1451  * Can't sensibly pass a local commit to be flushed at checkpoint - this
1452  * xact hasn't committed yet. This is why this function should be used to
1453  * set up the initial replication state, but not for replay.
1454  */
1455  replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1456  true /* go backward */ , true /* WAL log */ );
1457 
1458  UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1459 
1460  PG_RETURN_VOID();
1461 }
1462 
1463 
1464 /*
1465  * Return the replication progress for an individual replication origin.
1466  *
1467  * If 'flush' is set to true it is ensured that the returned value corresponds
1468  * to a local transaction that has been flushed. This is useful if asynchronous
1469  * commits are used when replaying replicated transactions.
1470  */
1471 Datum
1473 {
1474  char *name;
1475  bool flush;
1476  RepOriginId roident;
1477  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1478 
1479  replorigin_check_prerequisites(true, true);
1480 
1482  flush = PG_GETARG_BOOL(1);
1483 
1484  roident = replorigin_by_name(name, false);
1485  Assert(OidIsValid(roident));
1486 
1487  remote_lsn = replorigin_get_progress(roident, flush);
1488 
1489  if (remote_lsn == InvalidXLogRecPtr)
1490  PG_RETURN_NULL();
1491 
1492  PG_RETURN_LSN(remote_lsn);
1493 }
1494 
1495 
1496 Datum
1498 {
1499  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1500  int i;
1502 
1503  /* we want to return 0 rows if slot is set to zero */
1504  replorigin_check_prerequisites(false, true);
1505 
1506  SetSingleFuncCall(fcinfo, 0);
1507 
1508  /* prevent slots from being concurrently dropped */
1509  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1510 
1511  /*
1512  * Iterate through all possible replication_states, display if they are
1513  * filled. Note that we do not take any locks, so slightly corrupted/out
1514  * of date values are a possibility.
1515  */
1516  for (i = 0; i < max_replication_slots; i++)
1517  {
1521  char *roname;
1522 
1524 
1525  /* unused slot, nothing to display */
1526  if (state->roident == InvalidRepOriginId)
1527  continue;
1528 
1529  memset(values, 0, sizeof(values));
1530  memset(nulls, 1, sizeof(nulls));
1531 
1532  values[0] = ObjectIdGetDatum(state->roident);
1533  nulls[0] = false;
1534 
1535  /*
1536  * We're not preventing the origin to be dropped concurrently, so
1537  * silently accept that it might be gone.
1538  */
1539  if (replorigin_by_oid(state->roident, true,
1540  &roname))
1541  {
1542  values[1] = CStringGetTextDatum(roname);
1543  nulls[1] = false;
1544  }
1545 
1546  LWLockAcquire(&state->lock, LW_SHARED);
1547 
1548  values[2] = LSNGetDatum(state->remote_lsn);
1549  nulls[2] = false;
1550 
1551  values[3] = LSNGetDatum(state->local_lsn);
1552  nulls[3] = false;
1553 
1554  LWLockRelease(&state->lock);
1555 
1556  tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1557  values, nulls);
1558  }
1559 
1560  LWLockRelease(ReplicationOriginLock);
1561 
1562 #undef REPLICATION_ORIGIN_PROGRESS_COLS
1563 
1564  return (Datum) 0;
1565 }
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define CStringGetTextDatum(s)
Definition: builtins.h:85
unsigned int uint32
Definition: c.h:442
#define PG_BINARY
Definition: c.h:1254
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:362
#define PG_UINT16_MAX
Definition: c.h:523
unsigned char uint8
Definition: c.h:440
#define MemSet(start, val, len)
Definition: c.h:998
#define OidIsValid(objectId)
Definition: c.h:711
size_t Size
Definition: c.h:541
bool IsReservedName(const char *name)
Definition: catalog.c:219
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
void ConditionVariableCancelSleep(void)
int64 TimestampTz
Definition: timestamp.h:39
int errcode_for_file_access(void)
Definition: elog.c:718
int errdetail(const char *fmt,...)
Definition: elog.c:1039
int errhint(const char *fmt,...)
Definition: elog.c:1153
int errcode(int sqlerrcode)
Definition: elog.c:695
int errmsg(const char *fmt,...)
Definition: elog.c:906
#define LOG
Definition: elog.h:27
#define WARNING
Definition: elog.h:32
#define DEBUG2
Definition: elog.h:25
#define PANIC
Definition: elog.h:38
#define ERROR
Definition: elog.h:35
#define ereport(elevel,...)
Definition: elog.h:145
const char * name
Definition: encode.c:561
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:689
int CloseTransientFile(int fd)
Definition: fd.c:2610
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2434
#define PG_RETURN_VOID()
Definition: fmgr.h:349
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:268
#define PG_RETURN_NULL()
Definition: fmgr.h:345
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:274
#define PG_RETURN_OID(x)
Definition: fmgr.h:360
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:359
void SetSingleFuncCall(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:598
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:505
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:386
int MyProcPid
Definition: globals.c:44
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:649
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:221
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:350
#define write(a, b, c)
Definition: win32.h:14
#define read(a, b, c)
Definition: win32.h:13
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
int i
Definition: isn.c:73
Assert(fmt[strlen(fmt) - 1] !='\n')
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:228
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:109
#define NoLock
Definition: lockdefs.h:34
#define ExclusiveLock
Definition: lockdefs.h:42
#define RowExclusiveLock
Definition: lockdefs.h:38
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1196
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1800
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:734
@ LWTRANCHE_REPLICATION_ORIGIN_STATE
Definition: lwlock.h:178
@ LW_SHARED
Definition: lwlock.h:105
@ LW_EXCLUSIVE
Definition: lwlock.h:104
void pfree(void *pointer)
Definition: mcxt.c:1252
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:158
static ReplicationStateCtl * replication_states_ctl
Definition: origin.c:172
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:221
Size ReplicationOriginShmemSize(void)
Definition: origin.c:502
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:252
Datum pg_replication_origin_progress(PG_FUNCTION_ARGS)
Definition: origin.c:1472
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1083
void replorigin_session_reset(void)
Definition: origin.c:1171
struct ReplicationState ReplicationState
static bool IsReservedOriginName(const char *name)
Definition: origin.c:204
void replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
Definition: origin.c:1200
static void replorigin_drop_guts(Relation rel, RepOriginId roident, bool nowait)
Definition: origin.c:341
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:461
Datum pg_replication_origin_advance(PG_FUNCTION_ARGS)
Definition: origin.c:1437
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
Definition: origin.c:1010
Datum pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
Definition: origin.c:1386
static ReplicationState * replication_states
Definition: origin.c:167
Datum pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
Definition: origin.c:1353
Datum pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
Definition: origin.c:1407
Datum pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
Definition: origin.c:1370
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:185
Datum pg_replication_origin_oid(PG_FUNCTION_ARGS)
Definition: origin.c:1310
Datum pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
Definition: origin.c:1331
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition: origin.c:1051
void StartupReplicationOrigin(void)
Definition: origin.c:695
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition: origin.c:426
RepOriginId replorigin_session_origin
Definition: origin.c:156
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:884
void CheckPointReplicationOrigin(void)
Definition: origin.c:569
static ReplicationState * session_replication_state
Definition: origin.c:179
Datum pg_replication_origin_drop(PG_FUNCTION_ARGS)
Definition: origin.c:1291
#define REPLICATION_ORIGIN_PROGRESS_COLS
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1218
void ReplicationOriginShmemInit(void)
Definition: origin.c:522
Datum pg_show_replication_origin_status(PG_FUNCTION_ARGS)
Definition: origin.c:1497
#define REPLICATION_STATE_MAGIC
Definition: origin.c:182
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:157
Datum pg_replication_origin_create(PG_FUNCTION_ARGS)
Definition: origin.c:1250
Datum pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
Definition: origin.c:1425
void replorigin_redo(XLogReaderState *record)
Definition: origin.c:823
struct ReplicationStateCtl ReplicationStateCtl
struct ReplicationStateOnDisk ReplicationStateOnDisk
#define DoNotReplicateId
Definition: origin.h:34
#define InvalidRepOriginId
Definition: origin.h:33
#define XLOG_REPLORIGIN_DROP
Definition: origin.h:31
#define XLOG_REPLORIGIN_SET
Definition: origin.h:30
void * arg
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:41
uint32 pg_crc32c
Definition: pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:89
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:94
return crc
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:33
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
#define PG_RETURN_LSN(x)
Definition: pg_lsn.h:34
FormData_pg_replication_origin * Form_pg_replication_origin
#define LOGICALREP_ORIGIN_NONE
#define LOGICALREP_ORIGIN_ANY
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:36
uintptr_t Datum
Definition: postgres.h:412
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:600
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:660
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define RelationGetDescr(relation)
Definition: rel.h:527
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
int max_replication_slots
Definition: slot.c:101
#define InitDirtySnapshot(snapshotdata)
Definition: snapmgr.h:74
#define BTEqualStrategyNumber
Definition: stratnum.h:31
ItemPointerData t_self
Definition: htup.h:65
Definition: lwlock.h:32
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition: origin.c:152
XLogRecPtr remote_lsn
Definition: origin.c:143
RepOriginId roident
Definition: origin.c:142
XLogRecPtr remote_lsn
Definition: origin.c:112
XLogRecPtr local_lsn
Definition: origin.c:119
ConditionVariable origin_cv
Definition: origin.c:129
RepOriginId roident
Definition: origin.c:107
LWLock lock
Definition: origin.c:134
TupleDesc setDesc
Definition: execnodes.h:332
Tuplestorestate * setResult
Definition: execnodes.h:331
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207
Definition: regguts.h:318
Definition: c.h:623
RepOriginId node_id
Definition: origin.h:27
RepOriginId node_id
Definition: origin.h:21
XLogRecPtr remote_lsn
Definition: origin.h:20
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1221
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1173
@ REPLORIGIDENT
Definition: syscache.h:90
@ REPLORIGNAME
Definition: syscache.h:91
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
#define PG_GETARG_TIMESTAMPTZ(n)
Definition: timestamp.h:64
char * text_to_cstring(const text *t)
Definition: varlena.c:222
@ WAIT_EVENT_REPLICATION_ORIGIN_DROP
Definition: wait_event.h:124
bool IsTransactionState(void)
Definition: xact.c:374
void CommandCounterIncrement(void)
Definition: xact.c:1074
bool RecoveryInProgress(void)
Definition: xlog.c:5939
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2516
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void XLogRegisterData(char *data, uint32 len)
Definition: xloginsert.c:351
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:451
void XLogBeginInsert(void)
Definition: xloginsert.c:150
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:411
#define XLogRecGetData(decoder)
Definition: xlogreader.h:416
#define XLR_INFO_MASK
Definition: xlogrecord.h:62