PostgreSQL Source Code  git master
origin.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * origin.c
4  * Logical replication progress tracking support.
5  *
6  * Copyright (c) 2013-2023, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/logical/origin.c
10  *
11  * NOTES
12  *
13  * This file provides the following:
14  * * An infrastructure to name nodes in a replication setup
15  * * A facility to efficiently store and persist replication progress in an
16  * efficient and durable manner.
17  *
18  * Replication origin consist out of a descriptive, user defined, external
19  * name and a short, thus space efficient, internal 2 byte one. This split
20  * exists because replication origin have to be stored in WAL and shared
21  * memory and long descriptors would be inefficient. For now only use 2 bytes
22  * for the internal id of a replication origin as it seems unlikely that there
23  * soon will be more than 65k nodes in one replication setup; and using only
24  * two bytes allow us to be more space efficient.
25  *
26  * Replication progress is tracked in a shared memory table
27  * (ReplicationState) that's dumped to disk every checkpoint. Entries
28  * ('slots') in this table are identified by the internal id. That's the case
29  * because it allows to increase replication progress during crash
30  * recovery. To allow doing so we store the original LSN (from the originating
31  * system) of a transaction in the commit record. That allows to recover the
32  * precise replayed state after crash recovery; without requiring synchronous
33  * commits. Allowing logical replication to use asynchronous commit is
34  * generally good for performance, but especially important as it allows a
35  * single threaded replay process to keep up with a source that has multiple
36  * backends generating changes concurrently. For efficiency and simplicity
37  * reasons a backend can setup one replication origin that's from then used as
38  * the source of changes produced by the backend, until reset again.
39  *
40  * This infrastructure is intended to be used in cooperation with logical
41  * decoding. When replaying from a remote system the configured origin is
42  * provided to output plugins, allowing prevention of replication loops and
43  * other filtering.
44  *
45  * There are several levels of locking at work:
46  *
47  * * To create and drop replication origins an exclusive lock on
48  * pg_replication_slot is required for the duration. That allows us to
49  * safely and conflict free assign new origins using a dirty snapshot.
50  *
51  * * When creating an in-memory replication progress slot the ReplicationOrigin
52  * LWLock has to be held exclusively; when iterating over the replication
53  * progress a shared lock has to be held, the same when advancing the
54  * replication progress of an individual backend that has not setup as the
55  * session's replication origin.
56  *
57  * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58  * replication progress slot that slot's lwlock has to be held. That's
59  * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60  * all our platforms, but it also simplifies memory ordering concerns
61  * between the remote and local lsn. We use a lwlock instead of a spinlock
62  * so it's less harmful to hold the lock over a WAL write
63  * (cf. AdvanceReplicationProgress).
64  *
65  * ---------------------------------------------------------------------------
66  */
67 
68 #include "postgres.h"
69 
70 #include <unistd.h>
71 #include <sys/stat.h>
72 
73 #include "access/genam.h"
74 #include "access/htup_details.h"
75 #include "access/table.h"
76 #include "access/xact.h"
77 #include "access/xloginsert.h"
78 #include "catalog/catalog.h"
79 #include "catalog/indexing.h"
81 #include "funcapi.h"
82 #include "miscadmin.h"
83 #include "nodes/execnodes.h"
84 #include "pgstat.h"
85 #include "replication/logical.h"
86 #include "replication/origin.h"
88 #include "storage/copydir.h"
89 #include "storage/fd.h"
90 #include "storage/ipc.h"
91 #include "storage/lmgr.h"
92 #include "utils/builtins.h"
93 #include "utils/fmgroids.h"
94 #include "utils/pg_lsn.h"
95 #include "utils/rel.h"
96 #include "utils/snapmgr.h"
97 #include "utils/syscache.h"
98 
99 /*
100  * Replay progress of a single remote node.
101  */
102 typedef struct ReplicationState
103 {
104  /*
105  * Local identifier for the remote node.
106  */
108 
109  /*
110  * Location of the latest commit from the remote side.
111  */
113 
114  /*
115  * Remember the local lsn of the commit record so we can XLogFlush() to it
116  * during a checkpoint so we know the commit record actually is safe on
117  * disk.
118  */
120 
121  /*
122  * PID of backend that's acquired slot, or 0 if none.
123  */
125 
126  /*
127  * Condition variable that's signaled when acquired_by changes.
128  */
130 
131  /*
132  * Lock protecting remote_lsn and local_lsn.
133  */
136 
137 /*
138  * On disk version of ReplicationState.
139  */
141 {
145 
146 
147 typedef struct ReplicationStateCtl
148 {
149  /* Tranche to use for per-origin LWLocks */
151  /* Array of length max_replication_slots */
154 
155 /* external variables */
159 
160 /*
161  * Base address into a shared memory array of replication states of size
162  * max_replication_slots.
163  *
164  * XXX: Should we use a separate variable to size this rather than
165  * max_replication_slots?
166  */
168 
169 /*
170  * Actual shared memory block (replication_states[] is now part of this).
171  */
173 
174 /*
175  * Backend-local, cached element from ReplicationState for use in a backend
176  * replaying remote commits, so we don't have to search ReplicationState for
177  * the backends current RepOriginId.
178  */
180 
181 /* Magic for on disk files. */
182 #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
183 
184 static void
185 replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
186 {
187  if (check_slots && max_replication_slots == 0)
188  ereport(ERROR,
189  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
190  errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
191 
192  if (!recoveryOK && RecoveryInProgress())
193  ereport(ERROR,
194  (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
195  errmsg("cannot manipulate replication origins during recovery")));
196 }
197 
198 
199 /*
200  * IsReservedOriginName
201  * True iff name is either "none" or "any".
202  */
203 static bool
205 {
206  return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
208 }
209 
210 /* ---------------------------------------------------------------------------
211  * Functions for working with replication origins themselves.
212  * ---------------------------------------------------------------------------
213  */
214 
215 /*
216  * Check for a persistent replication origin identified by name.
217  *
218  * Returns InvalidOid if the node isn't known yet and missing_ok is true.
219  */
221 replorigin_by_name(const char *roname, bool missing_ok)
222 {
224  Oid roident = InvalidOid;
225  HeapTuple tuple;
226  Datum roname_d;
227 
228  roname_d = CStringGetTextDatum(roname);
229 
230  tuple = SearchSysCache1(REPLORIGNAME, roname_d);
231  if (HeapTupleIsValid(tuple))
232  {
233  ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
234  roident = ident->roident;
235  ReleaseSysCache(tuple);
236  }
237  else if (!missing_ok)
238  ereport(ERROR,
239  (errcode(ERRCODE_UNDEFINED_OBJECT),
240  errmsg("replication origin \"%s\" does not exist",
241  roname)));
242 
243  return roident;
244 }
245 
246 /*
247  * Create a replication origin.
248  *
249  * Needs to be called in a transaction.
250  */
252 replorigin_create(const char *roname)
253 {
254  Oid roident;
255  HeapTuple tuple = NULL;
256  Relation rel;
257  Datum roname_d;
258  SnapshotData SnapshotDirty;
259  SysScanDesc scan;
261 
262  roname_d = CStringGetTextDatum(roname);
263 
265 
266  /*
267  * We need the numeric replication origin to be 16bit wide, so we cannot
268  * rely on the normal oid allocation. Instead we simply scan
269  * pg_replication_origin for the first unused id. That's not particularly
270  * efficient, but this should be a fairly infrequent operation - we can
271  * easily spend a bit more code on this when it turns out it needs to be
272  * faster.
273  *
274  * We handle concurrency by taking an exclusive lock (allowing reads!)
275  * over the table for the duration of the search. Because we use a "dirty
276  * snapshot" we can read rows that other in-progress sessions have
277  * written, even though they would be invisible with normal snapshots. Due
278  * to the exclusive lock there's no danger that new rows can appear while
279  * we're checking.
280  */
281  InitDirtySnapshot(SnapshotDirty);
282 
283  rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
284 
285  for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
286  {
287  bool nulls[Natts_pg_replication_origin];
288  Datum values[Natts_pg_replication_origin];
289  bool collides;
290 
292 
293  ScanKeyInit(&key,
294  Anum_pg_replication_origin_roident,
295  BTEqualStrategyNumber, F_OIDEQ,
296  ObjectIdGetDatum(roident));
297 
298  scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
299  true /* indexOK */ ,
300  &SnapshotDirty,
301  1, &key);
302 
303  collides = HeapTupleIsValid(systable_getnext(scan));
304 
305  systable_endscan(scan);
306 
307  if (!collides)
308  {
309  /*
310  * Ok, found an unused roident, insert the new row and do a CCI,
311  * so our callers can look it up if they want to.
312  */
313  memset(&nulls, 0, sizeof(nulls));
314 
315  values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
316  values[Anum_pg_replication_origin_roname - 1] = roname_d;
317 
318  tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
319  CatalogTupleInsert(rel, tuple);
321  break;
322  }
323  }
324 
325  /* now release lock again, */
327 
328  if (tuple == NULL)
329  ereport(ERROR,
330  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
331  errmsg("could not find free replication origin ID")));
332 
333  heap_freetuple(tuple);
334  return roident;
335 }
336 
337 /*
338  * Helper function to drop a replication origin.
339  */
340 static void
341 replorigin_state_clear(RepOriginId roident, bool nowait)
342 {
343  int i;
344 
345  /*
346  * Clean up the slot state info, if there is any matching slot.
347  */
348 restart:
349  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
350 
351  for (i = 0; i < max_replication_slots; i++)
352  {
354 
355  if (state->roident == roident)
356  {
357  /* found our slot, is it busy? */
358  if (state->acquired_by != 0)
359  {
360  ConditionVariable *cv;
361 
362  if (nowait)
363  ereport(ERROR,
364  (errcode(ERRCODE_OBJECT_IN_USE),
365  errmsg("could not drop replication origin with ID %d, in use by PID %d",
366  state->roident,
367  state->acquired_by)));
368 
369  /*
370  * We must wait and then retry. Since we don't know which CV
371  * to wait on until here, we can't readily use
372  * ConditionVariablePrepareToSleep (calling it here would be
373  * wrong, since we could miss the signal if we did so); just
374  * use ConditionVariableSleep directly.
375  */
376  cv = &state->origin_cv;
377 
378  LWLockRelease(ReplicationOriginLock);
379 
381  goto restart;
382  }
383 
384  /* first make a WAL log entry */
385  {
386  xl_replorigin_drop xlrec;
387 
388  xlrec.node_id = roident;
389  XLogBeginInsert();
390  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
391  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
392  }
393 
394  /* then clear the in-memory slot */
395  state->roident = InvalidRepOriginId;
396  state->remote_lsn = InvalidXLogRecPtr;
397  state->local_lsn = InvalidXLogRecPtr;
398  break;
399  }
400  }
401  LWLockRelease(ReplicationOriginLock);
403 }
404 
405 /*
406  * Drop replication origin (by name).
407  *
408  * Needs to be called in a transaction.
409  */
410 void
411 replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
412 {
413  RepOriginId roident;
414  Relation rel;
415  HeapTuple tuple;
416 
418 
419  rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
420 
421  roident = replorigin_by_name(name, missing_ok);
422 
423  /* Lock the origin to prevent concurrent drops. */
424  LockSharedObject(ReplicationOriginRelationId, roident, 0,
426 
428  if (!HeapTupleIsValid(tuple))
429  {
430  if (!missing_ok)
431  elog(ERROR, "cache lookup failed for replication origin with ID %d",
432  roident);
433 
434  /*
435  * We don't need to retain the locks if the origin is already dropped.
436  */
437  UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
440  return;
441  }
442 
443  replorigin_state_clear(roident, nowait);
444 
445  /*
446  * Now, we can delete the catalog entry.
447  */
448  CatalogTupleDelete(rel, &tuple->t_self);
449  ReleaseSysCache(tuple);
450 
452 
453  /* We keep the lock on pg_replication_origin until commit */
454  table_close(rel, NoLock);
455 }
456 
457 /*
458  * Lookup replication origin via its oid and return the name.
459  *
460  * The external name is palloc'd in the calling context.
461  *
462  * Returns true if the origin is known, false otherwise.
463  */
464 bool
465 replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
466 {
467  HeapTuple tuple;
469 
470  Assert(OidIsValid((Oid) roident));
471  Assert(roident != InvalidRepOriginId);
472  Assert(roident != DoNotReplicateId);
473 
475  ObjectIdGetDatum((Oid) roident));
476 
477  if (HeapTupleIsValid(tuple))
478  {
479  ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
480  *roname = text_to_cstring(&ric->roname);
481  ReleaseSysCache(tuple);
482 
483  return true;
484  }
485  else
486  {
487  *roname = NULL;
488 
489  if (!missing_ok)
490  ereport(ERROR,
491  (errcode(ERRCODE_UNDEFINED_OBJECT),
492  errmsg("replication origin with ID %d does not exist",
493  roident)));
494 
495  return false;
496  }
497 }
498 
499 
500 /* ---------------------------------------------------------------------------
501  * Functions for handling replication progress.
502  * ---------------------------------------------------------------------------
503  */
504 
505 Size
507 {
508  Size size = 0;
509 
510  /*
511  * XXX: max_replication_slots is arguably the wrong thing to use, as here
512  * we keep the replay state of *remote* transactions. But for now it seems
513  * sufficient to reuse it, rather than introduce a separate GUC.
514  */
515  if (max_replication_slots == 0)
516  return size;
517 
518  size = add_size(size, offsetof(ReplicationStateCtl, states));
519 
520  size = add_size(size,
522  return size;
523 }
524 
525 void
527 {
528  bool found;
529 
530  if (max_replication_slots == 0)
531  return;
532 
534  ShmemInitStruct("ReplicationOriginState",
536  &found);
538 
539  if (!found)
540  {
541  int i;
542 
544 
546 
547  for (i = 0; i < max_replication_slots; i++)
548  {
552  }
553  }
554 }
555 
556 /* ---------------------------------------------------------------------------
557  * Perform a checkpoint of each replication origin's progress with respect to
558  * the replayed remote_lsn. Make sure that all transactions we refer to in the
559  * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
560  * if the transactions were originally committed asynchronously.
561  *
562  * We store checkpoints in the following format:
563  * +-------+------------------------+------------------+-----+--------+
564  * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
565  * +-------+------------------------+------------------+-----+--------+
566  *
567  * So its just the magic, followed by the statically sized
568  * ReplicationStateOnDisk structs. Note that the maximum number of
569  * ReplicationState is determined by max_replication_slots.
570  * ---------------------------------------------------------------------------
571  */
572 void
574 {
575  const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
576  const char *path = "pg_logical/replorigin_checkpoint";
577  int tmpfd;
578  int i;
580  pg_crc32c crc;
581 
582  if (max_replication_slots == 0)
583  return;
584 
585  INIT_CRC32C(crc);
586 
587  /* make sure no old temp file is remaining */
588  if (unlink(tmppath) < 0 && errno != ENOENT)
589  ereport(PANIC,
591  errmsg("could not remove file \"%s\": %m",
592  tmppath)));
593 
594  /*
595  * no other backend can perform this at the same time; only one checkpoint
596  * can happen at a time.
597  */
598  tmpfd = OpenTransientFile(tmppath,
599  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
600  if (tmpfd < 0)
601  ereport(PANIC,
603  errmsg("could not create file \"%s\": %m",
604  tmppath)));
605 
606  /* write magic */
607  errno = 0;
608  if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
609  {
610  /* if write didn't set errno, assume problem is no disk space */
611  if (errno == 0)
612  errno = ENOSPC;
613  ereport(PANIC,
615  errmsg("could not write to file \"%s\": %m",
616  tmppath)));
617  }
618  COMP_CRC32C(crc, &magic, sizeof(magic));
619 
620  /* prevent concurrent creations/drops */
621  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
622 
623  /* write actual data */
624  for (i = 0; i < max_replication_slots; i++)
625  {
626  ReplicationStateOnDisk disk_state;
627  ReplicationState *curstate = &replication_states[i];
628  XLogRecPtr local_lsn;
629 
630  if (curstate->roident == InvalidRepOriginId)
631  continue;
632 
633  /* zero, to avoid uninitialized padding bytes */
634  memset(&disk_state, 0, sizeof(disk_state));
635 
636  LWLockAcquire(&curstate->lock, LW_SHARED);
637 
638  disk_state.roident = curstate->roident;
639 
640  disk_state.remote_lsn = curstate->remote_lsn;
641  local_lsn = curstate->local_lsn;
642 
643  LWLockRelease(&curstate->lock);
644 
645  /* make sure we only write out a commit that's persistent */
646  XLogFlush(local_lsn);
647 
648  errno = 0;
649  if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
650  sizeof(disk_state))
651  {
652  /* if write didn't set errno, assume problem is no disk space */
653  if (errno == 0)
654  errno = ENOSPC;
655  ereport(PANIC,
657  errmsg("could not write to file \"%s\": %m",
658  tmppath)));
659  }
660 
661  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
662  }
663 
664  LWLockRelease(ReplicationOriginLock);
665 
666  /* write out the CRC */
667  FIN_CRC32C(crc);
668  errno = 0;
669  if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
670  {
671  /* if write didn't set errno, assume problem is no disk space */
672  if (errno == 0)
673  errno = ENOSPC;
674  ereport(PANIC,
676  errmsg("could not write to file \"%s\": %m",
677  tmppath)));
678  }
679 
680  if (CloseTransientFile(tmpfd) != 0)
681  ereport(PANIC,
683  errmsg("could not close file \"%s\": %m",
684  tmppath)));
685 
686  /* fsync, rename to permanent file, fsync file and directory */
687  durable_rename(tmppath, path, PANIC);
688 }
689 
690 /*
691  * Recover replication replay status from checkpoint data saved earlier by
692  * CheckPointReplicationOrigin.
693  *
694  * This only needs to be called at startup and *not* during every checkpoint
695  * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
696  * state thereafter can be recovered by looking at commit records.
697  */
698 void
700 {
701  const char *path = "pg_logical/replorigin_checkpoint";
702  int fd;
703  int readBytes;
705  int last_state = 0;
706  pg_crc32c file_crc;
707  pg_crc32c crc;
708 
709  /* don't want to overwrite already existing state */
710 #ifdef USE_ASSERT_CHECKING
711  static bool already_started = false;
712 
713  Assert(!already_started);
714  already_started = true;
715 #endif
716 
717  if (max_replication_slots == 0)
718  return;
719 
720  INIT_CRC32C(crc);
721 
722  elog(DEBUG2, "starting up replication origin progress state");
723 
724  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
725 
726  /*
727  * might have had max_replication_slots == 0 last run, or we just brought
728  * up a standby.
729  */
730  if (fd < 0 && errno == ENOENT)
731  return;
732  else if (fd < 0)
733  ereport(PANIC,
735  errmsg("could not open file \"%s\": %m",
736  path)));
737 
738  /* verify magic, that is written even if nothing was active */
739  readBytes = read(fd, &magic, sizeof(magic));
740  if (readBytes != sizeof(magic))
741  {
742  if (readBytes < 0)
743  ereport(PANIC,
745  errmsg("could not read file \"%s\": %m",
746  path)));
747  else
748  ereport(PANIC,
750  errmsg("could not read file \"%s\": read %d of %zu",
751  path, readBytes, sizeof(magic))));
752  }
753  COMP_CRC32C(crc, &magic, sizeof(magic));
754 
755  if (magic != REPLICATION_STATE_MAGIC)
756  ereport(PANIC,
757  (errmsg("replication checkpoint has wrong magic %u instead of %u",
758  magic, REPLICATION_STATE_MAGIC)));
759 
760  /* we can skip locking here, no other access is possible */
761 
762  /* recover individual states, until there are no more to be found */
763  while (true)
764  {
765  ReplicationStateOnDisk disk_state;
766 
767  readBytes = read(fd, &disk_state, sizeof(disk_state));
768 
769  /* no further data */
770  if (readBytes == sizeof(crc))
771  {
772  /* not pretty, but simple ... */
773  file_crc = *(pg_crc32c *) &disk_state;
774  break;
775  }
776 
777  if (readBytes < 0)
778  {
779  ereport(PANIC,
781  errmsg("could not read file \"%s\": %m",
782  path)));
783  }
784 
785  if (readBytes != sizeof(disk_state))
786  {
787  ereport(PANIC,
789  errmsg("could not read file \"%s\": read %d of %zu",
790  path, readBytes, sizeof(disk_state))));
791  }
792 
793  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
794 
795  if (last_state == max_replication_slots)
796  ereport(PANIC,
797  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
798  errmsg("could not find free replication state, increase max_replication_slots")));
799 
800  /* copy data to shared memory */
801  replication_states[last_state].roident = disk_state.roident;
802  replication_states[last_state].remote_lsn = disk_state.remote_lsn;
803  last_state++;
804 
805  ereport(LOG,
806  (errmsg("recovered replication state of node %d to %X/%X",
807  disk_state.roident,
808  LSN_FORMAT_ARGS(disk_state.remote_lsn))));
809  }
810 
811  /* now check checksum */
812  FIN_CRC32C(crc);
813  if (file_crc != crc)
814  ereport(PANIC,
816  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
817  crc, file_crc)));
818 
819  if (CloseTransientFile(fd) != 0)
820  ereport(PANIC,
822  errmsg("could not close file \"%s\": %m",
823  path)));
824 }
825 
826 void
828 {
829  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
830 
831  switch (info)
832  {
833  case XLOG_REPLORIGIN_SET:
834  {
835  xl_replorigin_set *xlrec =
836  (xl_replorigin_set *) XLogRecGetData(record);
837 
839  xlrec->remote_lsn, record->EndRecPtr,
840  xlrec->force /* backward */ ,
841  false /* WAL log */ );
842  break;
843  }
845  {
846  xl_replorigin_drop *xlrec;
847  int i;
848 
849  xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
850 
851  for (i = 0; i < max_replication_slots; i++)
852  {
854 
855  /* found our slot */
856  if (state->roident == xlrec->node_id)
857  {
858  /* reset entry */
859  state->roident = InvalidRepOriginId;
860  state->remote_lsn = InvalidXLogRecPtr;
861  state->local_lsn = InvalidXLogRecPtr;
862  break;
863  }
864  }
865  break;
866  }
867  default:
868  elog(PANIC, "replorigin_redo: unknown op code %u", info);
869  }
870 }
871 
872 
873 /*
874  * Tell the replication origin progress machinery that a commit from 'node'
875  * that originated at the LSN remote_commit on the remote node was replayed
876  * successfully and that we don't need to do so again. In combination with
877  * setting up replorigin_session_origin_lsn and replorigin_session_origin
878  * that ensures we won't lose knowledge about that after a crash if the
879  * transaction had a persistent effect (think of asynchronous commits).
880  *
881  * local_commit needs to be a local LSN of the commit so that we can make sure
882  * upon a checkpoint that enough WAL has been persisted to disk.
883  *
884  * Needs to be called with a RowExclusiveLock on pg_replication_origin,
885  * unless running in recovery.
886  */
887 void
889  XLogRecPtr remote_commit, XLogRecPtr local_commit,
890  bool go_backward, bool wal_log)
891 {
892  int i;
893  ReplicationState *replication_state = NULL;
894  ReplicationState *free_state = NULL;
895 
896  Assert(node != InvalidRepOriginId);
897 
898  /* we don't track DoNotReplicateId */
899  if (node == DoNotReplicateId)
900  return;
901 
902  /*
903  * XXX: For the case where this is called by WAL replay, it'd be more
904  * efficient to restore into a backend local hashtable and only dump into
905  * shmem after recovery is finished. Let's wait with implementing that
906  * till it's shown to be a measurable expense
907  */
908 
909  /* Lock exclusively, as we may have to create a new table entry. */
910  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
911 
912  /*
913  * Search for either an existing slot for the origin, or a free one we can
914  * use.
915  */
916  for (i = 0; i < max_replication_slots; i++)
917  {
918  ReplicationState *curstate = &replication_states[i];
919 
920  /* remember where to insert if necessary */
921  if (curstate->roident == InvalidRepOriginId &&
922  free_state == NULL)
923  {
924  free_state = curstate;
925  continue;
926  }
927 
928  /* not our slot */
929  if (curstate->roident != node)
930  {
931  continue;
932  }
933 
934  /* ok, found slot */
935  replication_state = curstate;
936 
937  LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
938 
939  /* Make sure it's not used by somebody else */
940  if (replication_state->acquired_by != 0)
941  {
942  ereport(ERROR,
943  (errcode(ERRCODE_OBJECT_IN_USE),
944  errmsg("replication origin with ID %d is already active for PID %d",
945  replication_state->roident,
946  replication_state->acquired_by)));
947  }
948 
949  break;
950  }
951 
952  if (replication_state == NULL && free_state == NULL)
953  ereport(ERROR,
954  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
955  errmsg("could not find free replication state slot for replication origin with ID %d",
956  node),
957  errhint("Increase max_replication_slots and try again.")));
958 
959  if (replication_state == NULL)
960  {
961  /* initialize new slot */
962  LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
963  replication_state = free_state;
964  Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
965  Assert(replication_state->local_lsn == InvalidXLogRecPtr);
966  replication_state->roident = node;
967  }
968 
969  Assert(replication_state->roident != InvalidRepOriginId);
970 
971  /*
972  * If somebody "forcefully" sets this slot, WAL log it, so it's durable
973  * and the standby gets the message. Primarily this will be called during
974  * WAL replay (of commit records) where no WAL logging is necessary.
975  */
976  if (wal_log)
977  {
978  xl_replorigin_set xlrec;
979 
980  xlrec.remote_lsn = remote_commit;
981  xlrec.node_id = node;
982  xlrec.force = go_backward;
983 
984  XLogBeginInsert();
985  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
986 
987  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
988  }
989 
990  /*
991  * Due to - harmless - race conditions during a checkpoint we could see
992  * values here that are older than the ones we already have in memory. We
993  * could also see older values for prepared transactions when the prepare
994  * is sent at a later point of time along with commit prepared and there
995  * are other transactions commits between prepare and commit prepared. See
996  * ReorderBufferFinishPrepared. Don't overwrite those.
997  */
998  if (go_backward || replication_state->remote_lsn < remote_commit)
999  replication_state->remote_lsn = remote_commit;
1000  if (local_commit != InvalidXLogRecPtr &&
1001  (go_backward || replication_state->local_lsn < local_commit))
1002  replication_state->local_lsn = local_commit;
1003  LWLockRelease(&replication_state->lock);
1004 
1005  /*
1006  * Release *after* changing the LSNs, slot isn't acquired and thus could
1007  * otherwise be dropped anytime.
1008  */
1009  LWLockRelease(ReplicationOriginLock);
1010 }
1011 
1012 
1013 XLogRecPtr
1015 {
1016  int i;
1017  XLogRecPtr local_lsn = InvalidXLogRecPtr;
1018  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1019 
1020  /* prevent slots from being concurrently dropped */
1021  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1022 
1023  for (i = 0; i < max_replication_slots; i++)
1024  {
1026 
1028 
1029  if (state->roident == node)
1030  {
1031  LWLockAcquire(&state->lock, LW_SHARED);
1032 
1033  remote_lsn = state->remote_lsn;
1034  local_lsn = state->local_lsn;
1035 
1036  LWLockRelease(&state->lock);
1037 
1038  break;
1039  }
1040  }
1041 
1042  LWLockRelease(ReplicationOriginLock);
1043 
1044  if (flush && local_lsn != InvalidXLogRecPtr)
1045  XLogFlush(local_lsn);
1046 
1047  return remote_lsn;
1048 }
1049 
1050 /*
1051  * Tear down a (possibly) configured session replication origin during process
1052  * exit.
1053  */
1054 static void
1056 {
1057  ConditionVariable *cv = NULL;
1058 
1059  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1060 
1061  if (session_replication_state != NULL &&
1063  {
1065 
1068  }
1069 
1070  LWLockRelease(ReplicationOriginLock);
1071 
1072  if (cv)
1074 }
1075 
1076 /*
1077  * Setup a replication origin in the shared memory struct if it doesn't
1078  * already exist and cache access to the specific ReplicationSlot so the
1079  * array doesn't have to be searched when calling
1080  * replorigin_session_advance().
1081  *
1082  * Normally only one such cached origin can exist per process so the cached
1083  * value can only be set again after the previous value is torn down with
1084  * replorigin_session_reset(). For this normal case pass acquired_by = 0
1085  * (meaning the slot is not allowed to be already acquired by another process).
1086  *
1087  * However, sometimes multiple processes can safely re-use the same origin slot
1088  * (for example, multiple parallel apply processes can safely use the same
1089  * origin, provided they maintain commit order by allowing only one process to
1090  * commit at a time). For this case the first process must pass acquired_by =
1091  * 0, and then the other processes sharing that same origin can pass
1092  * acquired_by = PID of the first process.
1093  */
1094 void
1096 {
1097  static bool registered_cleanup;
1098  int i;
1099  int free_slot = -1;
1100 
1101  if (!registered_cleanup)
1102  {
1104  registered_cleanup = true;
1105  }
1106 
1108 
1109  if (session_replication_state != NULL)
1110  ereport(ERROR,
1111  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1112  errmsg("cannot setup replication origin when one is already setup")));
1113 
1114  /* Lock exclusively, as we may have to create a new table entry. */
1115  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1116 
1117  /*
1118  * Search for either an existing slot for the origin, or a free one we can
1119  * use.
1120  */
1121  for (i = 0; i < max_replication_slots; i++)
1122  {
1123  ReplicationState *curstate = &replication_states[i];
1124 
1125  /* remember where to insert if necessary */
1126  if (curstate->roident == InvalidRepOriginId &&
1127  free_slot == -1)
1128  {
1129  free_slot = i;
1130  continue;
1131  }
1132 
1133  /* not our slot */
1134  if (curstate->roident != node)
1135  continue;
1136 
1137  else if (curstate->acquired_by != 0 && acquired_by == 0)
1138  {
1139  ereport(ERROR,
1140  (errcode(ERRCODE_OBJECT_IN_USE),
1141  errmsg("replication origin with ID %d is already active for PID %d",
1142  curstate->roident, curstate->acquired_by)));
1143  }
1144 
1145  /* ok, found slot */
1146  session_replication_state = curstate;
1147  }
1148 
1149 
1150  if (session_replication_state == NULL && free_slot == -1)
1151  ereport(ERROR,
1152  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1153  errmsg("could not find free replication state slot for replication origin with ID %d",
1154  node),
1155  errhint("Increase max_replication_slots and try again.")));
1156  else if (session_replication_state == NULL)
1157  {
1158  /* initialize new slot */
1163  }
1164 
1165 
1167 
1168  if (acquired_by == 0)
1170  else if (session_replication_state->acquired_by != acquired_by)
1171  elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
1172  node, acquired_by);
1173 
1174  LWLockRelease(ReplicationOriginLock);
1175 
1176  /* probably this one is pointless */
1178 }
1179 
1180 /*
1181  * Reset replay state previously setup in this session.
1182  *
1183  * This function may only be called if an origin was setup with
1184  * replorigin_session_setup().
1185  */
1186 void
1188 {
1189  ConditionVariable *cv;
1190 
1192 
1193  if (session_replication_state == NULL)
1194  ereport(ERROR,
1195  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1196  errmsg("no replication origin is configured")));
1197 
1198  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1199 
1203 
1204  LWLockRelease(ReplicationOriginLock);
1205 
1207 }
1208 
1209 /*
1210  * Do the same work replorigin_advance() does, just on the session's
1211  * configured origin.
1212  *
1213  * This is noticeably cheaper than using replorigin_advance().
1214  */
1215 void
1217 {
1220 
1222  if (session_replication_state->local_lsn < local_commit)
1223  session_replication_state->local_lsn = local_commit;
1224  if (session_replication_state->remote_lsn < remote_commit)
1225  session_replication_state->remote_lsn = remote_commit;
1227 }
1228 
1229 /*
1230  * Ask the machinery about the point up to which we successfully replayed
1231  * changes from an already setup replication origin.
1232  */
1233 XLogRecPtr
1235 {
1236  XLogRecPtr remote_lsn;
1237  XLogRecPtr local_lsn;
1238 
1240 
1242  remote_lsn = session_replication_state->remote_lsn;
1243  local_lsn = session_replication_state->local_lsn;
1245 
1246  if (flush && local_lsn != InvalidXLogRecPtr)
1247  XLogFlush(local_lsn);
1248 
1249  return remote_lsn;
1250 }
1251 
1252 
1253 
1254 /* ---------------------------------------------------------------------------
1255  * SQL functions for working with replication origin.
1256  *
1257  * These mostly should be fairly short wrappers around more generic functions.
1258  * ---------------------------------------------------------------------------
1259  */
1260 
1261 /*
1262  * Create replication origin for the passed in name, and return the assigned
1263  * oid.
1264  */
1265 Datum
1267 {
1268  char *name;
1269  RepOriginId roident;
1270 
1271  replorigin_check_prerequisites(false, false);
1272 
1274 
1275  /*
1276  * Replication origins "any and "none" are reserved for system options.
1277  * The origins "pg_xxx" are reserved for internal use.
1278  */
1280  ereport(ERROR,
1281  (errcode(ERRCODE_RESERVED_NAME),
1282  errmsg("replication origin name \"%s\" is reserved",
1283  name),
1284  errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1286 
1287  /*
1288  * If built with appropriate switch, whine when regression-testing
1289  * conventions for replication origin names are violated.
1290  */
1291 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1292  if (strncmp(name, "regress_", 8) != 0)
1293  elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1294 #endif
1295 
1296  roident = replorigin_create(name);
1297 
1298  pfree(name);
1299 
1300  PG_RETURN_OID(roident);
1301 }
1302 
1303 /*
1304  * Drop replication origin.
1305  */
1306 Datum
1308 {
1309  char *name;
1310 
1311  replorigin_check_prerequisites(false, false);
1312 
1314 
1315  replorigin_drop_by_name(name, false, true);
1316 
1317  pfree(name);
1318 
1319  PG_RETURN_VOID();
1320 }
1321 
1322 /*
1323  * Return oid of a replication origin.
1324  */
1325 Datum
1327 {
1328  char *name;
1329  RepOriginId roident;
1330 
1331  replorigin_check_prerequisites(false, false);
1332 
1334  roident = replorigin_by_name(name, true);
1335 
1336  pfree(name);
1337 
1338  if (OidIsValid(roident))
1339  PG_RETURN_OID(roident);
1340  PG_RETURN_NULL();
1341 }
1342 
1343 /*
1344  * Setup a replication origin for this session.
1345  */
1346 Datum
1348 {
1349  char *name;
1350  RepOriginId origin;
1351 
1352  replorigin_check_prerequisites(true, false);
1353 
1355  origin = replorigin_by_name(name, false);
1356  replorigin_session_setup(origin, 0);
1357 
1358  replorigin_session_origin = origin;
1359 
1360  pfree(name);
1361 
1362  PG_RETURN_VOID();
1363 }
1364 
1365 /*
1366  * Reset previously setup origin in this session
1367  */
1368 Datum
1370 {
1371  replorigin_check_prerequisites(true, false);
1372 
1374 
1378 
1379  PG_RETURN_VOID();
1380 }
1381 
1382 /*
1383  * Has a replication origin been setup for this session.
1384  */
1385 Datum
1387 {
1388  replorigin_check_prerequisites(false, false);
1389 
1391 }
1392 
1393 
1394 /*
1395  * Return the replication progress for origin setup in the current session.
1396  *
1397  * If 'flush' is set to true it is ensured that the returned value corresponds
1398  * to a local transaction that has been flushed. This is useful if asynchronous
1399  * commits are used when replaying replicated transactions.
1400  */
1401 Datum
1403 {
1404  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1405  bool flush = PG_GETARG_BOOL(0);
1406 
1407  replorigin_check_prerequisites(true, false);
1408 
1409  if (session_replication_state == NULL)
1410  ereport(ERROR,
1411  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1412  errmsg("no replication origin is configured")));
1413 
1414  remote_lsn = replorigin_session_get_progress(flush);
1415 
1416  if (remote_lsn == InvalidXLogRecPtr)
1417  PG_RETURN_NULL();
1418 
1419  PG_RETURN_LSN(remote_lsn);
1420 }
1421 
1422 Datum
1424 {
1425  XLogRecPtr location = PG_GETARG_LSN(0);
1426 
1427  replorigin_check_prerequisites(true, false);
1428 
1429  if (session_replication_state == NULL)
1430  ereport(ERROR,
1431  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1432  errmsg("no replication origin is configured")));
1433 
1434  replorigin_session_origin_lsn = location;
1436 
1437  PG_RETURN_VOID();
1438 }
1439 
1440 Datum
1442 {
1443  replorigin_check_prerequisites(true, false);
1444 
1447 
1448  PG_RETURN_VOID();
1449 }
1450 
1451 
1452 Datum
1454 {
1455  text *name = PG_GETARG_TEXT_PP(0);
1456  XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1457  RepOriginId node;
1458 
1459  replorigin_check_prerequisites(true, false);
1460 
1461  /* lock to prevent the replication origin from vanishing */
1462  LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1463 
1464  node = replorigin_by_name(text_to_cstring(name), false);
1465 
1466  /*
1467  * Can't sensibly pass a local commit to be flushed at checkpoint - this
1468  * xact hasn't committed yet. This is why this function should be used to
1469  * set up the initial replication state, but not for replay.
1470  */
1471  replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1472  true /* go backward */ , true /* WAL log */ );
1473 
1474  UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1475 
1476  PG_RETURN_VOID();
1477 }
1478 
1479 
1480 /*
1481  * Return the replication progress for an individual replication origin.
1482  *
1483  * If 'flush' is set to true it is ensured that the returned value corresponds
1484  * to a local transaction that has been flushed. This is useful if asynchronous
1485  * commits are used when replaying replicated transactions.
1486  */
1487 Datum
1489 {
1490  char *name;
1491  bool flush;
1492  RepOriginId roident;
1493  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1494 
1495  replorigin_check_prerequisites(true, true);
1496 
1498  flush = PG_GETARG_BOOL(1);
1499 
1500  roident = replorigin_by_name(name, false);
1501  Assert(OidIsValid(roident));
1502 
1503  remote_lsn = replorigin_get_progress(roident, flush);
1504 
1505  if (remote_lsn == InvalidXLogRecPtr)
1506  PG_RETURN_NULL();
1507 
1508  PG_RETURN_LSN(remote_lsn);
1509 }
1510 
1511 
1512 Datum
1514 {
1515  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1516  int i;
1518 
1519  /* we want to return 0 rows if slot is set to zero */
1520  replorigin_check_prerequisites(false, true);
1521 
1522  InitMaterializedSRF(fcinfo, 0);
1523 
1524  /* prevent slots from being concurrently dropped */
1525  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1526 
1527  /*
1528  * Iterate through all possible replication_states, display if they are
1529  * filled. Note that we do not take any locks, so slightly corrupted/out
1530  * of date values are a possibility.
1531  */
1532  for (i = 0; i < max_replication_slots; i++)
1533  {
1537  char *roname;
1538 
1540 
1541  /* unused slot, nothing to display */
1542  if (state->roident == InvalidRepOriginId)
1543  continue;
1544 
1545  memset(values, 0, sizeof(values));
1546  memset(nulls, 1, sizeof(nulls));
1547 
1548  values[0] = ObjectIdGetDatum(state->roident);
1549  nulls[0] = false;
1550 
1551  /*
1552  * We're not preventing the origin to be dropped concurrently, so
1553  * silently accept that it might be gone.
1554  */
1555  if (replorigin_by_oid(state->roident, true,
1556  &roname))
1557  {
1558  values[1] = CStringGetTextDatum(roname);
1559  nulls[1] = false;
1560  }
1561 
1562  LWLockAcquire(&state->lock, LW_SHARED);
1563 
1564  values[2] = LSNGetDatum(state->remote_lsn);
1565  nulls[2] = false;
1566 
1567  values[3] = LSNGetDatum(state->local_lsn);
1568  nulls[3] = false;
1569 
1570  LWLockRelease(&state->lock);
1571 
1572  tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1573  values, nulls);
1574  }
1575 
1576  LWLockRelease(ReplicationOriginLock);
1577 
1578 #undef REPLICATION_ORIGIN_PROGRESS_COLS
1579 
1580  return (Datum) 0;
1581 }
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define CStringGetTextDatum(s)
Definition: builtins.h:94
unsigned int uint32
Definition: c.h:490
#define PG_BINARY
Definition: c.h:1260
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:382
#define PG_UINT16_MAX
Definition: c.h:571
unsigned char uint8
Definition: c.h:488
#define MemSet(start, val, len)
Definition: c.h:1004
#define OidIsValid(objectId)
Definition: c.h:759
size_t Size
Definition: c.h:589
bool IsReservedName(const char *name)
Definition: catalog.c:219
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
void ConditionVariableCancelSleep(void)
int64 TimestampTz
Definition: timestamp.h:39
int errcode_for_file_access(void)
Definition: elog.c:881
int errdetail(const char *fmt,...)
Definition: elog.c:1202
int errhint(const char *fmt,...)
Definition: elog.c:1316
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define LOG
Definition: elog.h:31
#define WARNING
Definition: elog.h:36
#define DEBUG2
Definition: elog.h:29
#define PANIC
Definition: elog.h:42
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
const char * name
Definition: encode.c:571
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:688
int CloseTransientFile(int fd)
Definition: fd.c:2609
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2433
#define PG_RETURN_VOID()
Definition: fmgr.h:349
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:268
#define PG_RETURN_NULL()
Definition: fmgr.h:345
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:274
#define PG_RETURN_OID(x)
Definition: fmgr.h:360
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:359
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:599
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:506
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:387
int MyProcPid
Definition: globals.c:44
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:653
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:221
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:350
#define write(a, b, c)
Definition: win32.h:14
#define read(a, b, c)
Definition: win32.h:13
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
int i
Definition: isn.c:73
Assert(fmt[strlen(fmt) - 1] !='\n')
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1046
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:228
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:109
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1067
#define NoLock
Definition: lockdefs.h:34
#define AccessExclusiveLock
Definition: lockdefs.h:43
#define ExclusiveLock
Definition: lockdefs.h:42
#define RowExclusiveLock
Definition: lockdefs.h:38
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1195
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:730
@ LWTRANCHE_REPLICATION_ORIGIN_STATE
Definition: lwlock.h:189
@ LW_SHARED
Definition: lwlock.h:116
@ LW_EXCLUSIVE
Definition: lwlock.h:115
void pfree(void *pointer)
Definition: mcxt.c:1436
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:158
static ReplicationStateCtl * replication_states_ctl
Definition: origin.c:172
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:221
Size ReplicationOriginShmemSize(void)
Definition: origin.c:506
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:252
Datum pg_replication_origin_progress(PG_FUNCTION_ARGS)
Definition: origin.c:1488
void replorigin_session_reset(void)
Definition: origin.c:1187
struct ReplicationState ReplicationState
static bool IsReservedOriginName(const char *name)
Definition: origin.c:204
void replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
Definition: origin.c:1216
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:465
Datum pg_replication_origin_advance(PG_FUNCTION_ARGS)
Definition: origin.c:1453
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
Definition: origin.c:1014
Datum pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
Definition: origin.c:1402
static ReplicationState * replication_states
Definition: origin.c:167
Datum pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
Definition: origin.c:1369
Datum pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
Definition: origin.c:1423
Datum pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
Definition: origin.c:1386
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:185
Datum pg_replication_origin_oid(PG_FUNCTION_ARGS)
Definition: origin.c:1326
Datum pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
Definition: origin.c:1347
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition: origin.c:1055
void StartupReplicationOrigin(void)
Definition: origin.c:699
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition: origin.c:411
RepOriginId replorigin_session_origin
Definition: origin.c:156
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:888
static void replorigin_state_clear(RepOriginId roident, bool nowait)
Definition: origin.c:341
void replorigin_session_setup(RepOriginId node, int acquired_by)
Definition: origin.c:1095
void CheckPointReplicationOrigin(void)
Definition: origin.c:573
static ReplicationState * session_replication_state
Definition: origin.c:179
Datum pg_replication_origin_drop(PG_FUNCTION_ARGS)
Definition: origin.c:1307
#define REPLICATION_ORIGIN_PROGRESS_COLS
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1234
void ReplicationOriginShmemInit(void)
Definition: origin.c:526
Datum pg_show_replication_origin_status(PG_FUNCTION_ARGS)
Definition: origin.c:1513
#define REPLICATION_STATE_MAGIC
Definition: origin.c:182
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:157
Datum pg_replication_origin_create(PG_FUNCTION_ARGS)
Definition: origin.c:1266
Datum pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
Definition: origin.c:1441
void replorigin_redo(XLogReaderState *record)
Definition: origin.c:827
struct ReplicationStateCtl ReplicationStateCtl
struct ReplicationStateOnDisk ReplicationStateOnDisk
#define DoNotReplicateId
Definition: origin.h:34
#define InvalidRepOriginId
Definition: origin.h:33
#define XLOG_REPLORIGIN_DROP
Definition: origin.h:31
#define XLOG_REPLORIGIN_SET
Definition: origin.h:30
void * arg
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:41
uint32 pg_crc32c
Definition: pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:89
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:94
return crc
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:33
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
#define PG_RETURN_LSN(x)
Definition: pg_lsn.h:34
FormData_pg_replication_origin * Form_pg_replication_origin
#define LOGICALREP_ORIGIN_NONE
#define LOGICALREP_ORIGIN_ANY
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:36
uintptr_t Datum
Definition: postgres.h:64
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:312
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define RelationGetDescr(relation)
Definition: rel.h:527
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
Size add_size(Size s1, Size s2)
Definition: shmem.c:502
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:396
Size mul_size(Size s1, Size s2)
Definition: shmem.c:519
int max_replication_slots
Definition: slot.c:101
#define InitDirtySnapshot(snapshotdata)
Definition: snapmgr.h:74
#define BTEqualStrategyNumber
Definition: stratnum.h:31
ItemPointerData t_self
Definition: htup.h:65
Definition: lwlock.h:40
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition: origin.c:152
XLogRecPtr remote_lsn
Definition: origin.c:143
RepOriginId roident
Definition: origin.c:142
XLogRecPtr remote_lsn
Definition: origin.c:112
XLogRecPtr local_lsn
Definition: origin.c:119
ConditionVariable origin_cv
Definition: origin.c:129
RepOriginId roident
Definition: origin.c:107
LWLock lock
Definition: origin.c:134
TupleDesc setDesc
Definition: execnodes.h:332
Tuplestorestate * setResult
Definition: execnodes.h:331
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207
Definition: regguts.h:318
Definition: c.h:671
RepOriginId node_id
Definition: origin.h:27
RepOriginId node_id
Definition: origin.h:21
XLogRecPtr remote_lsn
Definition: origin.h:20
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:865
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:817
@ REPLORIGIDENT
Definition: syscache.h:90
@ REPLORIGNAME
Definition: syscache.h:91
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
#define PG_GETARG_TIMESTAMPTZ(n)
Definition: timestamp.h:64
char * text_to_cstring(const text *t)
Definition: varlena.c:215
@ WAIT_EVENT_REPLICATION_ORIGIN_DROP
Definition: wait_event.h:126
bool IsTransactionState(void)
Definition: xact.c:378
void CommandCounterIncrement(void)
Definition: xact.c:1078
bool RecoveryInProgress(void)
Definition: xlog.c:5905
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2514
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void XLogRegisterData(char *data, uint32 len)
Definition: xloginsert.c:351
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:451
void XLogBeginInsert(void)
Definition: xloginsert.c:150
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:409
#define XLogRecGetData(decoder)
Definition: xlogreader.h:414
#define XLR_INFO_MASK
Definition: xlogrecord.h:62