PostgreSQL Source Code  git master
origin.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * origin.c
4  * Logical replication progress tracking support.
5  *
6  * Copyright (c) 2013-2024, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/logical/origin.c
10  *
11  * NOTES
12  *
13  * This file provides the following:
14  * * An infrastructure to name nodes in a replication setup
15  * * A facility to efficiently store and persist replication progress in an
16  * efficient and durable manner.
17  *
18  * Replication origin consist out of a descriptive, user defined, external
19  * name and a short, thus space efficient, internal 2 byte one. This split
20  * exists because replication origin have to be stored in WAL and shared
21  * memory and long descriptors would be inefficient. For now only use 2 bytes
22  * for the internal id of a replication origin as it seems unlikely that there
23  * soon will be more than 65k nodes in one replication setup; and using only
24  * two bytes allow us to be more space efficient.
25  *
26  * Replication progress is tracked in a shared memory table
27  * (ReplicationState) that's dumped to disk every checkpoint. Entries
28  * ('slots') in this table are identified by the internal id. That's the case
29  * because it allows to increase replication progress during crash
30  * recovery. To allow doing so we store the original LSN (from the originating
31  * system) of a transaction in the commit record. That allows to recover the
32  * precise replayed state after crash recovery; without requiring synchronous
33  * commits. Allowing logical replication to use asynchronous commit is
34  * generally good for performance, but especially important as it allows a
35  * single threaded replay process to keep up with a source that has multiple
36  * backends generating changes concurrently. For efficiency and simplicity
37  * reasons a backend can setup one replication origin that's from then used as
38  * the source of changes produced by the backend, until reset again.
39  *
40  * This infrastructure is intended to be used in cooperation with logical
41  * decoding. When replaying from a remote system the configured origin is
42  * provided to output plugins, allowing prevention of replication loops and
43  * other filtering.
44  *
45  * There are several levels of locking at work:
46  *
47  * * To create and drop replication origins an exclusive lock on
48  * pg_replication_slot is required for the duration. That allows us to
49  * safely and conflict free assign new origins using a dirty snapshot.
50  *
51  * * When creating an in-memory replication progress slot the ReplicationOrigin
52  * LWLock has to be held exclusively; when iterating over the replication
53  * progress a shared lock has to be held, the same when advancing the
54  * replication progress of an individual backend that has not setup as the
55  * session's replication origin.
56  *
57  * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58  * replication progress slot that slot's lwlock has to be held. That's
59  * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60  * all our platforms, but it also simplifies memory ordering concerns
61  * between the remote and local lsn. We use a lwlock instead of a spinlock
62  * so it's less harmful to hold the lock over a WAL write
63  * (cf. AdvanceReplicationProgress).
64  *
65  * ---------------------------------------------------------------------------
66  */
67 
68 #include "postgres.h"
69 
70 #include <unistd.h>
71 #include <sys/stat.h>
72 
73 #include "access/genam.h"
74 #include "access/htup_details.h"
75 #include "access/table.h"
76 #include "access/xact.h"
77 #include "access/xloginsert.h"
78 #include "catalog/catalog.h"
79 #include "catalog/indexing.h"
81 #include "funcapi.h"
82 #include "miscadmin.h"
83 #include "nodes/execnodes.h"
84 #include "pgstat.h"
85 #include "replication/origin.h"
86 #include "replication/slot.h"
88 #include "storage/fd.h"
89 #include "storage/ipc.h"
90 #include "storage/lmgr.h"
91 #include "utils/builtins.h"
92 #include "utils/fmgroids.h"
93 #include "utils/pg_lsn.h"
94 #include "utils/rel.h"
95 #include "utils/snapmgr.h"
96 #include "utils/syscache.h"
97 
98 /* paths for replication origin checkpoint files */
99 #define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
100 #define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
101 
102 /*
103  * Replay progress of a single remote node.
104  */
105 typedef struct ReplicationState
106 {
107  /*
108  * Local identifier for the remote node.
109  */
111 
112  /*
113  * Location of the latest commit from the remote side.
114  */
116 
117  /*
118  * Remember the local lsn of the commit record so we can XLogFlush() to it
119  * during a checkpoint so we know the commit record actually is safe on
120  * disk.
121  */
123 
124  /*
125  * PID of backend that's acquired slot, or 0 if none.
126  */
128 
129  /*
130  * Condition variable that's signaled when acquired_by changes.
131  */
133 
134  /*
135  * Lock protecting remote_lsn and local_lsn.
136  */
139 
140 /*
141  * On disk version of ReplicationState.
142  */
144 {
148 
149 
150 typedef struct ReplicationStateCtl
151 {
152  /* Tranche to use for per-origin LWLocks */
154  /* Array of length max_replication_slots */
157 
158 /* external variables */
162 
163 /*
164  * Base address into a shared memory array of replication states of size
165  * max_replication_slots.
166  *
167  * XXX: Should we use a separate variable to size this rather than
168  * max_replication_slots?
169  */
171 
172 /*
173  * Actual shared memory block (replication_states[] is now part of this).
174  */
176 
177 /*
178  * We keep a pointer to this backend's ReplicationState to avoid having to
179  * search the replication_states array in replorigin_session_advance for each
180  * remote commit. (Ownership of a backend's own entry can only be changed by
181  * that backend.)
182  */
184 
185 /* Magic for on disk files. */
186 #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
187 
188 static void
189 replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
190 {
191  if (check_slots && max_replication_slots == 0)
192  ereport(ERROR,
193  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
194  errmsg("cannot query or manipulate replication origin when \"max_replication_slots\" is 0")));
195 
196  if (!recoveryOK && RecoveryInProgress())
197  ereport(ERROR,
198  (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
199  errmsg("cannot manipulate replication origins during recovery")));
200 }
201 
202 
203 /*
204  * IsReservedOriginName
205  * True iff name is either "none" or "any".
206  */
207 static bool
209 {
210  return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
211  (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
212 }
213 
214 /* ---------------------------------------------------------------------------
215  * Functions for working with replication origins themselves.
216  * ---------------------------------------------------------------------------
217  */
218 
219 /*
220  * Check for a persistent replication origin identified by name.
221  *
222  * Returns InvalidOid if the node isn't known yet and missing_ok is true.
223  */
225 replorigin_by_name(const char *roname, bool missing_ok)
226 {
228  Oid roident = InvalidOid;
229  HeapTuple tuple;
230  Datum roname_d;
231 
232  roname_d = CStringGetTextDatum(roname);
233 
234  tuple = SearchSysCache1(REPLORIGNAME, roname_d);
235  if (HeapTupleIsValid(tuple))
236  {
238  roident = ident->roident;
239  ReleaseSysCache(tuple);
240  }
241  else if (!missing_ok)
242  ereport(ERROR,
243  (errcode(ERRCODE_UNDEFINED_OBJECT),
244  errmsg("replication origin \"%s\" does not exist",
245  roname)));
246 
247  return roident;
248 }
249 
250 /*
251  * Create a replication origin.
252  *
253  * Needs to be called in a transaction.
254  */
256 replorigin_create(const char *roname)
257 {
258  Oid roident;
259  HeapTuple tuple = NULL;
260  Relation rel;
261  Datum roname_d;
262  SnapshotData SnapshotDirty;
263  SysScanDesc scan;
265 
266  roname_d = CStringGetTextDatum(roname);
267 
269 
270  /*
271  * We need the numeric replication origin to be 16bit wide, so we cannot
272  * rely on the normal oid allocation. Instead we simply scan
273  * pg_replication_origin for the first unused id. That's not particularly
274  * efficient, but this should be a fairly infrequent operation - we can
275  * easily spend a bit more code on this when it turns out it needs to be
276  * faster.
277  *
278  * We handle concurrency by taking an exclusive lock (allowing reads!)
279  * over the table for the duration of the search. Because we use a "dirty
280  * snapshot" we can read rows that other in-progress sessions have
281  * written, even though they would be invisible with normal snapshots. Due
282  * to the exclusive lock there's no danger that new rows can appear while
283  * we're checking.
284  */
285  InitDirtySnapshot(SnapshotDirty);
286 
287  rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
288 
289  for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
290  {
291  bool nulls[Natts_pg_replication_origin];
292  Datum values[Natts_pg_replication_origin];
293  bool collides;
294 
296 
297  ScanKeyInit(&key,
298  Anum_pg_replication_origin_roident,
299  BTEqualStrategyNumber, F_OIDEQ,
300  ObjectIdGetDatum(roident));
301 
302  scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
303  true /* indexOK */ ,
304  &SnapshotDirty,
305  1, &key);
306 
307  collides = HeapTupleIsValid(systable_getnext(scan));
308 
309  systable_endscan(scan);
310 
311  if (!collides)
312  {
313  /*
314  * Ok, found an unused roident, insert the new row and do a CCI,
315  * so our callers can look it up if they want to.
316  */
317  memset(&nulls, 0, sizeof(nulls));
318 
319  values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
320  values[Anum_pg_replication_origin_roname - 1] = roname_d;
321 
322  tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
323  CatalogTupleInsert(rel, tuple);
325  break;
326  }
327  }
328 
329  /* now release lock again, */
331 
332  if (tuple == NULL)
333  ereport(ERROR,
334  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
335  errmsg("could not find free replication origin ID")));
336 
337  heap_freetuple(tuple);
338  return roident;
339 }
340 
341 /*
342  * Helper function to drop a replication origin.
343  */
344 static void
345 replorigin_state_clear(RepOriginId roident, bool nowait)
346 {
347  int i;
348 
349  /*
350  * Clean up the slot state info, if there is any matching slot.
351  */
352 restart:
353  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
354 
355  for (i = 0; i < max_replication_slots; i++)
356  {
358 
359  if (state->roident == roident)
360  {
361  /* found our slot, is it busy? */
362  if (state->acquired_by != 0)
363  {
364  ConditionVariable *cv;
365 
366  if (nowait)
367  ereport(ERROR,
368  (errcode(ERRCODE_OBJECT_IN_USE),
369  errmsg("could not drop replication origin with ID %d, in use by PID %d",
370  state->roident,
371  state->acquired_by)));
372 
373  /*
374  * We must wait and then retry. Since we don't know which CV
375  * to wait on until here, we can't readily use
376  * ConditionVariablePrepareToSleep (calling it here would be
377  * wrong, since we could miss the signal if we did so); just
378  * use ConditionVariableSleep directly.
379  */
380  cv = &state->origin_cv;
381 
382  LWLockRelease(ReplicationOriginLock);
383 
384  ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
385  goto restart;
386  }
387 
388  /* first make a WAL log entry */
389  {
390  xl_replorigin_drop xlrec;
391 
392  xlrec.node_id = roident;
393  XLogBeginInsert();
394  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
395  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
396  }
397 
398  /* then clear the in-memory slot */
399  state->roident = InvalidRepOriginId;
400  state->remote_lsn = InvalidXLogRecPtr;
401  state->local_lsn = InvalidXLogRecPtr;
402  break;
403  }
404  }
405  LWLockRelease(ReplicationOriginLock);
407 }
408 
409 /*
410  * Drop replication origin (by name).
411  *
412  * Needs to be called in a transaction.
413  */
414 void
415 replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
416 {
417  RepOriginId roident;
418  Relation rel;
419  HeapTuple tuple;
420 
422 
423  rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
424 
425  roident = replorigin_by_name(name, missing_ok);
426 
427  /* Lock the origin to prevent concurrent drops. */
428  LockSharedObject(ReplicationOriginRelationId, roident, 0,
430 
431  tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
432  if (!HeapTupleIsValid(tuple))
433  {
434  if (!missing_ok)
435  elog(ERROR, "cache lookup failed for replication origin with ID %d",
436  roident);
437 
438  /*
439  * We don't need to retain the locks if the origin is already dropped.
440  */
441  UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
444  return;
445  }
446 
447  replorigin_state_clear(roident, nowait);
448 
449  /*
450  * Now, we can delete the catalog entry.
451  */
452  CatalogTupleDelete(rel, &tuple->t_self);
453  ReleaseSysCache(tuple);
454 
456 
457  /* We keep the lock on pg_replication_origin until commit */
458  table_close(rel, NoLock);
459 }
460 
461 /*
462  * Lookup replication origin via its oid and return the name.
463  *
464  * The external name is palloc'd in the calling context.
465  *
466  * Returns true if the origin is known, false otherwise.
467  */
468 bool
469 replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
470 {
471  HeapTuple tuple;
473 
474  Assert(OidIsValid((Oid) roident));
475  Assert(roident != InvalidRepOriginId);
476  Assert(roident != DoNotReplicateId);
477 
478  tuple = SearchSysCache1(REPLORIGIDENT,
479  ObjectIdGetDatum((Oid) roident));
480 
481  if (HeapTupleIsValid(tuple))
482  {
483  ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
484  *roname = text_to_cstring(&ric->roname);
485  ReleaseSysCache(tuple);
486 
487  return true;
488  }
489  else
490  {
491  *roname = NULL;
492 
493  if (!missing_ok)
494  ereport(ERROR,
495  (errcode(ERRCODE_UNDEFINED_OBJECT),
496  errmsg("replication origin with ID %d does not exist",
497  roident)));
498 
499  return false;
500  }
501 }
502 
503 
504 /* ---------------------------------------------------------------------------
505  * Functions for handling replication progress.
506  * ---------------------------------------------------------------------------
507  */
508 
509 Size
511 {
512  Size size = 0;
513 
514  /*
515  * XXX: max_replication_slots is arguably the wrong thing to use, as here
516  * we keep the replay state of *remote* transactions. But for now it seems
517  * sufficient to reuse it, rather than introduce a separate GUC.
518  */
519  if (max_replication_slots == 0)
520  return size;
521 
522  size = add_size(size, offsetof(ReplicationStateCtl, states));
523 
524  size = add_size(size,
526  return size;
527 }
528 
529 void
531 {
532  bool found;
533 
534  if (max_replication_slots == 0)
535  return;
536 
538  ShmemInitStruct("ReplicationOriginState",
540  &found);
542 
543  if (!found)
544  {
545  int i;
546 
548 
550 
551  for (i = 0; i < max_replication_slots; i++)
552  {
556  }
557  }
558 }
559 
560 /* ---------------------------------------------------------------------------
561  * Perform a checkpoint of each replication origin's progress with respect to
562  * the replayed remote_lsn. Make sure that all transactions we refer to in the
563  * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
564  * if the transactions were originally committed asynchronously.
565  *
566  * We store checkpoints in the following format:
567  * +-------+------------------------+------------------+-----+--------+
568  * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
569  * +-------+------------------------+------------------+-----+--------+
570  *
571  * So its just the magic, followed by the statically sized
572  * ReplicationStateOnDisk structs. Note that the maximum number of
573  * ReplicationState is determined by max_replication_slots.
574  * ---------------------------------------------------------------------------
575  */
576 void
578 {
579  const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
580  const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
581  int tmpfd;
582  int i;
584  pg_crc32c crc;
585 
586  if (max_replication_slots == 0)
587  return;
588 
589  INIT_CRC32C(crc);
590 
591  /* make sure no old temp file is remaining */
592  if (unlink(tmppath) < 0 && errno != ENOENT)
593  ereport(PANIC,
595  errmsg("could not remove file \"%s\": %m",
596  tmppath)));
597 
598  /*
599  * no other backend can perform this at the same time; only one checkpoint
600  * can happen at a time.
601  */
602  tmpfd = OpenTransientFile(tmppath,
603  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
604  if (tmpfd < 0)
605  ereport(PANIC,
607  errmsg("could not create file \"%s\": %m",
608  tmppath)));
609 
610  /* write magic */
611  errno = 0;
612  if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
613  {
614  /* if write didn't set errno, assume problem is no disk space */
615  if (errno == 0)
616  errno = ENOSPC;
617  ereport(PANIC,
619  errmsg("could not write to file \"%s\": %m",
620  tmppath)));
621  }
622  COMP_CRC32C(crc, &magic, sizeof(magic));
623 
624  /* prevent concurrent creations/drops */
625  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
626 
627  /* write actual data */
628  for (i = 0; i < max_replication_slots; i++)
629  {
630  ReplicationStateOnDisk disk_state;
631  ReplicationState *curstate = &replication_states[i];
632  XLogRecPtr local_lsn;
633 
634  if (curstate->roident == InvalidRepOriginId)
635  continue;
636 
637  /* zero, to avoid uninitialized padding bytes */
638  memset(&disk_state, 0, sizeof(disk_state));
639 
640  LWLockAcquire(&curstate->lock, LW_SHARED);
641 
642  disk_state.roident = curstate->roident;
643 
644  disk_state.remote_lsn = curstate->remote_lsn;
645  local_lsn = curstate->local_lsn;
646 
647  LWLockRelease(&curstate->lock);
648 
649  /* make sure we only write out a commit that's persistent */
650  XLogFlush(local_lsn);
651 
652  errno = 0;
653  if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
654  sizeof(disk_state))
655  {
656  /* if write didn't set errno, assume problem is no disk space */
657  if (errno == 0)
658  errno = ENOSPC;
659  ereport(PANIC,
661  errmsg("could not write to file \"%s\": %m",
662  tmppath)));
663  }
664 
665  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
666  }
667 
668  LWLockRelease(ReplicationOriginLock);
669 
670  /* write out the CRC */
671  FIN_CRC32C(crc);
672  errno = 0;
673  if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
674  {
675  /* if write didn't set errno, assume problem is no disk space */
676  if (errno == 0)
677  errno = ENOSPC;
678  ereport(PANIC,
680  errmsg("could not write to file \"%s\": %m",
681  tmppath)));
682  }
683 
684  if (CloseTransientFile(tmpfd) != 0)
685  ereport(PANIC,
687  errmsg("could not close file \"%s\": %m",
688  tmppath)));
689 
690  /* fsync, rename to permanent file, fsync file and directory */
691  durable_rename(tmppath, path, PANIC);
692 }
693 
694 /*
695  * Recover replication replay status from checkpoint data saved earlier by
696  * CheckPointReplicationOrigin.
697  *
698  * This only needs to be called at startup and *not* during every checkpoint
699  * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
700  * state thereafter can be recovered by looking at commit records.
701  */
702 void
704 {
705  const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
706  int fd;
707  int readBytes;
709  int last_state = 0;
710  pg_crc32c file_crc;
711  pg_crc32c crc;
712 
713  /* don't want to overwrite already existing state */
714 #ifdef USE_ASSERT_CHECKING
715  static bool already_started = false;
716 
717  Assert(!already_started);
718  already_started = true;
719 #endif
720 
721  if (max_replication_slots == 0)
722  return;
723 
724  INIT_CRC32C(crc);
725 
726  elog(DEBUG2, "starting up replication origin progress state");
727 
728  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
729 
730  /*
731  * might have had max_replication_slots == 0 last run, or we just brought
732  * up a standby.
733  */
734  if (fd < 0 && errno == ENOENT)
735  return;
736  else if (fd < 0)
737  ereport(PANIC,
739  errmsg("could not open file \"%s\": %m",
740  path)));
741 
742  /* verify magic, that is written even if nothing was active */
743  readBytes = read(fd, &magic, sizeof(magic));
744  if (readBytes != sizeof(magic))
745  {
746  if (readBytes < 0)
747  ereport(PANIC,
749  errmsg("could not read file \"%s\": %m",
750  path)));
751  else
752  ereport(PANIC,
754  errmsg("could not read file \"%s\": read %d of %zu",
755  path, readBytes, sizeof(magic))));
756  }
757  COMP_CRC32C(crc, &magic, sizeof(magic));
758 
759  if (magic != REPLICATION_STATE_MAGIC)
760  ereport(PANIC,
761  (errmsg("replication checkpoint has wrong magic %u instead of %u",
762  magic, REPLICATION_STATE_MAGIC)));
763 
764  /* we can skip locking here, no other access is possible */
765 
766  /* recover individual states, until there are no more to be found */
767  while (true)
768  {
769  ReplicationStateOnDisk disk_state;
770 
771  readBytes = read(fd, &disk_state, sizeof(disk_state));
772 
773  /* no further data */
774  if (readBytes == sizeof(crc))
775  {
776  /* not pretty, but simple ... */
777  file_crc = *(pg_crc32c *) &disk_state;
778  break;
779  }
780 
781  if (readBytes < 0)
782  {
783  ereport(PANIC,
785  errmsg("could not read file \"%s\": %m",
786  path)));
787  }
788 
789  if (readBytes != sizeof(disk_state))
790  {
791  ereport(PANIC,
793  errmsg("could not read file \"%s\": read %d of %zu",
794  path, readBytes, sizeof(disk_state))));
795  }
796 
797  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
798 
799  if (last_state == max_replication_slots)
800  ereport(PANIC,
801  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
802  errmsg("could not find free replication state, increase \"max_replication_slots\"")));
803 
804  /* copy data to shared memory */
805  replication_states[last_state].roident = disk_state.roident;
806  replication_states[last_state].remote_lsn = disk_state.remote_lsn;
807  last_state++;
808 
809  ereport(LOG,
810  (errmsg("recovered replication state of node %d to %X/%X",
811  disk_state.roident,
812  LSN_FORMAT_ARGS(disk_state.remote_lsn))));
813  }
814 
815  /* now check checksum */
816  FIN_CRC32C(crc);
817  if (file_crc != crc)
818  ereport(PANIC,
820  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
821  crc, file_crc)));
822 
823  if (CloseTransientFile(fd) != 0)
824  ereport(PANIC,
826  errmsg("could not close file \"%s\": %m",
827  path)));
828 }
829 
830 void
832 {
833  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
834 
835  switch (info)
836  {
837  case XLOG_REPLORIGIN_SET:
838  {
839  xl_replorigin_set *xlrec =
840  (xl_replorigin_set *) XLogRecGetData(record);
841 
843  xlrec->remote_lsn, record->EndRecPtr,
844  xlrec->force /* backward */ ,
845  false /* WAL log */ );
846  break;
847  }
849  {
850  xl_replorigin_drop *xlrec;
851  int i;
852 
853  xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
854 
855  for (i = 0; i < max_replication_slots; i++)
856  {
858 
859  /* found our slot */
860  if (state->roident == xlrec->node_id)
861  {
862  /* reset entry */
863  state->roident = InvalidRepOriginId;
864  state->remote_lsn = InvalidXLogRecPtr;
865  state->local_lsn = InvalidXLogRecPtr;
866  break;
867  }
868  }
869  break;
870  }
871  default:
872  elog(PANIC, "replorigin_redo: unknown op code %u", info);
873  }
874 }
875 
876 
877 /*
878  * Tell the replication origin progress machinery that a commit from 'node'
879  * that originated at the LSN remote_commit on the remote node was replayed
880  * successfully and that we don't need to do so again. In combination with
881  * setting up replorigin_session_origin_lsn and replorigin_session_origin
882  * that ensures we won't lose knowledge about that after a crash if the
883  * transaction had a persistent effect (think of asynchronous commits).
884  *
885  * local_commit needs to be a local LSN of the commit so that we can make sure
886  * upon a checkpoint that enough WAL has been persisted to disk.
887  *
888  * Needs to be called with a RowExclusiveLock on pg_replication_origin,
889  * unless running in recovery.
890  */
891 void
893  XLogRecPtr remote_commit, XLogRecPtr local_commit,
894  bool go_backward, bool wal_log)
895 {
896  int i;
897  ReplicationState *replication_state = NULL;
898  ReplicationState *free_state = NULL;
899 
900  Assert(node != InvalidRepOriginId);
901 
902  /* we don't track DoNotReplicateId */
903  if (node == DoNotReplicateId)
904  return;
905 
906  /*
907  * XXX: For the case where this is called by WAL replay, it'd be more
908  * efficient to restore into a backend local hashtable and only dump into
909  * shmem after recovery is finished. Let's wait with implementing that
910  * till it's shown to be a measurable expense
911  */
912 
913  /* Lock exclusively, as we may have to create a new table entry. */
914  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
915 
916  /*
917  * Search for either an existing slot for the origin, or a free one we can
918  * use.
919  */
920  for (i = 0; i < max_replication_slots; i++)
921  {
922  ReplicationState *curstate = &replication_states[i];
923 
924  /* remember where to insert if necessary */
925  if (curstate->roident == InvalidRepOriginId &&
926  free_state == NULL)
927  {
928  free_state = curstate;
929  continue;
930  }
931 
932  /* not our slot */
933  if (curstate->roident != node)
934  {
935  continue;
936  }
937 
938  /* ok, found slot */
939  replication_state = curstate;
940 
941  LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
942 
943  /* Make sure it's not used by somebody else */
944  if (replication_state->acquired_by != 0)
945  {
946  ereport(ERROR,
947  (errcode(ERRCODE_OBJECT_IN_USE),
948  errmsg("replication origin with ID %d is already active for PID %d",
949  replication_state->roident,
950  replication_state->acquired_by)));
951  }
952 
953  break;
954  }
955 
956  if (replication_state == NULL && free_state == NULL)
957  ereport(ERROR,
958  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
959  errmsg("could not find free replication state slot for replication origin with ID %d",
960  node),
961  errhint("Increase \"max_replication_slots\" and try again.")));
962 
963  if (replication_state == NULL)
964  {
965  /* initialize new slot */
966  LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
967  replication_state = free_state;
968  Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
969  Assert(replication_state->local_lsn == InvalidXLogRecPtr);
970  replication_state->roident = node;
971  }
972 
973  Assert(replication_state->roident != InvalidRepOriginId);
974 
975  /*
976  * If somebody "forcefully" sets this slot, WAL log it, so it's durable
977  * and the standby gets the message. Primarily this will be called during
978  * WAL replay (of commit records) where no WAL logging is necessary.
979  */
980  if (wal_log)
981  {
982  xl_replorigin_set xlrec;
983 
984  xlrec.remote_lsn = remote_commit;
985  xlrec.node_id = node;
986  xlrec.force = go_backward;
987 
988  XLogBeginInsert();
989  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
990 
991  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
992  }
993 
994  /*
995  * Due to - harmless - race conditions during a checkpoint we could see
996  * values here that are older than the ones we already have in memory. We
997  * could also see older values for prepared transactions when the prepare
998  * is sent at a later point of time along with commit prepared and there
999  * are other transactions commits between prepare and commit prepared. See
1000  * ReorderBufferFinishPrepared. Don't overwrite those.
1001  */
1002  if (go_backward || replication_state->remote_lsn < remote_commit)
1003  replication_state->remote_lsn = remote_commit;
1004  if (local_commit != InvalidXLogRecPtr &&
1005  (go_backward || replication_state->local_lsn < local_commit))
1006  replication_state->local_lsn = local_commit;
1007  LWLockRelease(&replication_state->lock);
1008 
1009  /*
1010  * Release *after* changing the LSNs, slot isn't acquired and thus could
1011  * otherwise be dropped anytime.
1012  */
1013  LWLockRelease(ReplicationOriginLock);
1014 }
1015 
1016 
1017 XLogRecPtr
1019 {
1020  int i;
1021  XLogRecPtr local_lsn = InvalidXLogRecPtr;
1022  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1023 
1024  /* prevent slots from being concurrently dropped */
1025  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1026 
1027  for (i = 0; i < max_replication_slots; i++)
1028  {
1030 
1032 
1033  if (state->roident == node)
1034  {
1035  LWLockAcquire(&state->lock, LW_SHARED);
1036 
1037  remote_lsn = state->remote_lsn;
1038  local_lsn = state->local_lsn;
1039 
1040  LWLockRelease(&state->lock);
1041 
1042  break;
1043  }
1044  }
1045 
1046  LWLockRelease(ReplicationOriginLock);
1047 
1048  if (flush && local_lsn != InvalidXLogRecPtr)
1049  XLogFlush(local_lsn);
1050 
1051  return remote_lsn;
1052 }
1053 
1054 /*
1055  * Tear down a (possibly) configured session replication origin during process
1056  * exit.
1057  */
1058 static void
1060 {
1061  ConditionVariable *cv = NULL;
1062 
1063  if (session_replication_state == NULL)
1064  return;
1065 
1066  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1067 
1069  {
1071 
1074  }
1075 
1076  LWLockRelease(ReplicationOriginLock);
1077 
1078  if (cv)
1080 }
1081 
1082 /*
1083  * Setup a replication origin in the shared memory struct if it doesn't
1084  * already exist and cache access to the specific ReplicationSlot so the
1085  * array doesn't have to be searched when calling
1086  * replorigin_session_advance().
1087  *
1088  * Normally only one such cached origin can exist per process so the cached
1089  * value can only be set again after the previous value is torn down with
1090  * replorigin_session_reset(). For this normal case pass acquired_by = 0
1091  * (meaning the slot is not allowed to be already acquired by another process).
1092  *
1093  * However, sometimes multiple processes can safely re-use the same origin slot
1094  * (for example, multiple parallel apply processes can safely use the same
1095  * origin, provided they maintain commit order by allowing only one process to
1096  * commit at a time). For this case the first process must pass acquired_by =
1097  * 0, and then the other processes sharing that same origin can pass
1098  * acquired_by = PID of the first process.
1099  */
1100 void
1102 {
1103  static bool registered_cleanup;
1104  int i;
1105  int free_slot = -1;
1106 
1107  if (!registered_cleanup)
1108  {
1110  registered_cleanup = true;
1111  }
1112 
1114 
1115  if (session_replication_state != NULL)
1116  ereport(ERROR,
1117  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1118  errmsg("cannot setup replication origin when one is already setup")));
1119 
1120  /* Lock exclusively, as we may have to create a new table entry. */
1121  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1122 
1123  /*
1124  * Search for either an existing slot for the origin, or a free one we can
1125  * use.
1126  */
1127  for (i = 0; i < max_replication_slots; i++)
1128  {
1129  ReplicationState *curstate = &replication_states[i];
1130 
1131  /* remember where to insert if necessary */
1132  if (curstate->roident == InvalidRepOriginId &&
1133  free_slot == -1)
1134  {
1135  free_slot = i;
1136  continue;
1137  }
1138 
1139  /* not our slot */
1140  if (curstate->roident != node)
1141  continue;
1142 
1143  else if (curstate->acquired_by != 0 && acquired_by == 0)
1144  {
1145  ereport(ERROR,
1146  (errcode(ERRCODE_OBJECT_IN_USE),
1147  errmsg("replication origin with ID %d is already active for PID %d",
1148  curstate->roident, curstate->acquired_by)));
1149  }
1150 
1151  /* ok, found slot */
1152  session_replication_state = curstate;
1153  break;
1154  }
1155 
1156 
1157  if (session_replication_state == NULL && free_slot == -1)
1158  ereport(ERROR,
1159  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1160  errmsg("could not find free replication state slot for replication origin with ID %d",
1161  node),
1162  errhint("Increase \"max_replication_slots\" and try again.")));
1163  else if (session_replication_state == NULL)
1164  {
1165  /* initialize new slot */
1170  }
1171 
1172 
1174 
1175  if (acquired_by == 0)
1177  else if (session_replication_state->acquired_by != acquired_by)
1178  elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
1179  node, acquired_by);
1180 
1181  LWLockRelease(ReplicationOriginLock);
1182 
1183  /* probably this one is pointless */
1185 }
1186 
1187 /*
1188  * Reset replay state previously setup in this session.
1189  *
1190  * This function may only be called if an origin was setup with
1191  * replorigin_session_setup().
1192  */
1193 void
1195 {
1196  ConditionVariable *cv;
1197 
1199 
1200  if (session_replication_state == NULL)
1201  ereport(ERROR,
1202  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1203  errmsg("no replication origin is configured")));
1204 
1205  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1206 
1210 
1211  LWLockRelease(ReplicationOriginLock);
1212 
1214 }
1215 
1216 /*
1217  * Do the same work replorigin_advance() does, just on the session's
1218  * configured origin.
1219  *
1220  * This is noticeably cheaper than using replorigin_advance().
1221  */
1222 void
1224 {
1227 
1229  if (session_replication_state->local_lsn < local_commit)
1230  session_replication_state->local_lsn = local_commit;
1231  if (session_replication_state->remote_lsn < remote_commit)
1232  session_replication_state->remote_lsn = remote_commit;
1234 }
1235 
1236 /*
1237  * Ask the machinery about the point up to which we successfully replayed
1238  * changes from an already setup replication origin.
1239  */
1240 XLogRecPtr
1242 {
1243  XLogRecPtr remote_lsn;
1244  XLogRecPtr local_lsn;
1245 
1247 
1249  remote_lsn = session_replication_state->remote_lsn;
1250  local_lsn = session_replication_state->local_lsn;
1252 
1253  if (flush && local_lsn != InvalidXLogRecPtr)
1254  XLogFlush(local_lsn);
1255 
1256  return remote_lsn;
1257 }
1258 
1259 
1260 
1261 /* ---------------------------------------------------------------------------
1262  * SQL functions for working with replication origin.
1263  *
1264  * These mostly should be fairly short wrappers around more generic functions.
1265  * ---------------------------------------------------------------------------
1266  */
1267 
1268 /*
1269  * Create replication origin for the passed in name, and return the assigned
1270  * oid.
1271  */
1272 Datum
1274 {
1275  char *name;
1276  RepOriginId roident;
1277 
1278  replorigin_check_prerequisites(false, false);
1279 
1281 
1282  /*
1283  * Replication origins "any and "none" are reserved for system options.
1284  * The origins "pg_xxx" are reserved for internal use.
1285  */
1287  ereport(ERROR,
1288  (errcode(ERRCODE_RESERVED_NAME),
1289  errmsg("replication origin name \"%s\" is reserved",
1290  name),
1291  errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1292  LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
1293 
1294  /*
1295  * If built with appropriate switch, whine when regression-testing
1296  * conventions for replication origin names are violated.
1297  */
1298 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1299  if (strncmp(name, "regress_", 8) != 0)
1300  elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1301 #endif
1302 
1303  roident = replorigin_create(name);
1304 
1305  pfree(name);
1306 
1307  PG_RETURN_OID(roident);
1308 }
1309 
1310 /*
1311  * Drop replication origin.
1312  */
1313 Datum
1315 {
1316  char *name;
1317 
1318  replorigin_check_prerequisites(false, false);
1319 
1321 
1322  replorigin_drop_by_name(name, false, true);
1323 
1324  pfree(name);
1325 
1326  PG_RETURN_VOID();
1327 }
1328 
1329 /*
1330  * Return oid of a replication origin.
1331  */
1332 Datum
1334 {
1335  char *name;
1336  RepOriginId roident;
1337 
1338  replorigin_check_prerequisites(false, false);
1339 
1341  roident = replorigin_by_name(name, true);
1342 
1343  pfree(name);
1344 
1345  if (OidIsValid(roident))
1346  PG_RETURN_OID(roident);
1347  PG_RETURN_NULL();
1348 }
1349 
1350 /*
1351  * Setup a replication origin for this session.
1352  */
1353 Datum
1355 {
1356  char *name;
1357  RepOriginId origin;
1358 
1359  replorigin_check_prerequisites(true, false);
1360 
1362  origin = replorigin_by_name(name, false);
1363  replorigin_session_setup(origin, 0);
1364 
1365  replorigin_session_origin = origin;
1366 
1367  pfree(name);
1368 
1369  PG_RETURN_VOID();
1370 }
1371 
1372 /*
1373  * Reset previously setup origin in this session
1374  */
1375 Datum
1377 {
1378  replorigin_check_prerequisites(true, false);
1379 
1381 
1385 
1386  PG_RETURN_VOID();
1387 }
1388 
1389 /*
1390  * Has a replication origin been setup for this session.
1391  */
1392 Datum
1394 {
1395  replorigin_check_prerequisites(false, false);
1396 
1398 }
1399 
1400 
1401 /*
1402  * Return the replication progress for origin setup in the current session.
1403  *
1404  * If 'flush' is set to true it is ensured that the returned value corresponds
1405  * to a local transaction that has been flushed. This is useful if asynchronous
1406  * commits are used when replaying replicated transactions.
1407  */
1408 Datum
1410 {
1411  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1412  bool flush = PG_GETARG_BOOL(0);
1413 
1414  replorigin_check_prerequisites(true, false);
1415 
1416  if (session_replication_state == NULL)
1417  ereport(ERROR,
1418  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1419  errmsg("no replication origin is configured")));
1420 
1421  remote_lsn = replorigin_session_get_progress(flush);
1422 
1423  if (remote_lsn == InvalidXLogRecPtr)
1424  PG_RETURN_NULL();
1425 
1426  PG_RETURN_LSN(remote_lsn);
1427 }
1428 
1429 Datum
1431 {
1432  XLogRecPtr location = PG_GETARG_LSN(0);
1433 
1434  replorigin_check_prerequisites(true, false);
1435 
1436  if (session_replication_state == NULL)
1437  ereport(ERROR,
1438  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1439  errmsg("no replication origin is configured")));
1440 
1441  replorigin_session_origin_lsn = location;
1443 
1444  PG_RETURN_VOID();
1445 }
1446 
1447 Datum
1449 {
1450  replorigin_check_prerequisites(true, false);
1451 
1454 
1455  PG_RETURN_VOID();
1456 }
1457 
1458 
1459 Datum
1461 {
1462  text *name = PG_GETARG_TEXT_PP(0);
1463  XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1464  RepOriginId node;
1465 
1466  replorigin_check_prerequisites(true, false);
1467 
1468  /* lock to prevent the replication origin from vanishing */
1469  LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1470 
1471  node = replorigin_by_name(text_to_cstring(name), false);
1472 
1473  /*
1474  * Can't sensibly pass a local commit to be flushed at checkpoint - this
1475  * xact hasn't committed yet. This is why this function should be used to
1476  * set up the initial replication state, but not for replay.
1477  */
1478  replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1479  true /* go backward */ , true /* WAL log */ );
1480 
1481  UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1482 
1483  PG_RETURN_VOID();
1484 }
1485 
1486 
1487 /*
1488  * Return the replication progress for an individual replication origin.
1489  *
1490  * If 'flush' is set to true it is ensured that the returned value corresponds
1491  * to a local transaction that has been flushed. This is useful if asynchronous
1492  * commits are used when replaying replicated transactions.
1493  */
1494 Datum
1496 {
1497  char *name;
1498  bool flush;
1499  RepOriginId roident;
1500  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1501 
1502  replorigin_check_prerequisites(true, true);
1503 
1505  flush = PG_GETARG_BOOL(1);
1506 
1507  roident = replorigin_by_name(name, false);
1508  Assert(OidIsValid(roident));
1509 
1510  remote_lsn = replorigin_get_progress(roident, flush);
1511 
1512  if (remote_lsn == InvalidXLogRecPtr)
1513  PG_RETURN_NULL();
1514 
1515  PG_RETURN_LSN(remote_lsn);
1516 }
1517 
1518 
1519 Datum
1521 {
1522  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1523  int i;
1525 
1526  /* we want to return 0 rows if slot is set to zero */
1527  replorigin_check_prerequisites(false, true);
1528 
1529  InitMaterializedSRF(fcinfo, 0);
1530 
1531  /* prevent slots from being concurrently dropped */
1532  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1533 
1534  /*
1535  * Iterate through all possible replication_states, display if they are
1536  * filled. Note that we do not take any locks, so slightly corrupted/out
1537  * of date values are a possibility.
1538  */
1539  for (i = 0; i < max_replication_slots; i++)
1540  {
1544  char *roname;
1545 
1547 
1548  /* unused slot, nothing to display */
1549  if (state->roident == InvalidRepOriginId)
1550  continue;
1551 
1552  memset(values, 0, sizeof(values));
1553  memset(nulls, 1, sizeof(nulls));
1554 
1555  values[0] = ObjectIdGetDatum(state->roident);
1556  nulls[0] = false;
1557 
1558  /*
1559  * We're not preventing the origin to be dropped concurrently, so
1560  * silently accept that it might be gone.
1561  */
1562  if (replorigin_by_oid(state->roident, true,
1563  &roname))
1564  {
1565  values[1] = CStringGetTextDatum(roname);
1566  nulls[1] = false;
1567  }
1568 
1569  LWLockAcquire(&state->lock, LW_SHARED);
1570 
1571  values[2] = LSNGetDatum(state->remote_lsn);
1572  nulls[2] = false;
1573 
1574  values[3] = LSNGetDatum(state->local_lsn);
1575  nulls[3] = false;
1576 
1577  LWLockRelease(&state->lock);
1578 
1579  tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1580  values, nulls);
1581  }
1582 
1583  LWLockRelease(ReplicationOriginLock);
1584 
1585 #undef REPLICATION_ORIGIN_PROGRESS_COLS
1586 
1587  return (Datum) 0;
1588 }
static Datum values[MAXATTR]
Definition: bootstrap.c:151
#define CStringGetTextDatum(s)
Definition: builtins.h:97
uint8_t uint8
Definition: c.h:483
#define Assert(condition)
Definition: c.h:812
#define PG_BINARY
Definition: c.h:1227
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:417
uint32_t uint32
Definition: c.h:485
#define PG_UINT16_MAX
Definition: c.h:541
#define MemSet(start, val, len)
Definition: c.h:974
#define OidIsValid(objectId)
Definition: c.h:729
size_t Size
Definition: c.h:559
bool IsReservedName(const char *name)
Definition: catalog.c:247
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
int64 TimestampTz
Definition: timestamp.h:39
int errcode_for_file_access(void)
Definition: elog.c:876
int errdetail(const char *fmt,...)
Definition: elog.c:1203
int errhint(const char *fmt,...)
Definition: elog.c:1317
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define LOG
Definition: elog.h:31
#define WARNING
Definition: elog.h:36
#define DEBUG2
Definition: elog.h:29
#define PANIC
Definition: elog.h:42
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:781
int CloseTransientFile(int fd)
Definition: fd.c:2831
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2655
#define PG_RETURN_VOID()
Definition: fmgr.h:349
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:268
#define PG_RETURN_NULL()
Definition: fmgr.h:345
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:274
#define PG_RETURN_OID(x)
Definition: fmgr.h:360
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:359
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:606
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:513
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:387
int MyProcPid
Definition: globals.c:46
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1116
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1434
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:653
#define ident
Definition: indent_codes.h:47
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:233
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:365
#define write(a, b, c)
Definition: win32.h:14
#define read(a, b, c)
Definition: win32.h:13
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
int i
Definition: isn.c:72
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1072
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:226
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:107
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1131
#define NoLock
Definition: lockdefs.h:34
#define AccessExclusiveLock
Definition: lockdefs.h:43
#define ExclusiveLock
Definition: lockdefs.h:42
#define RowExclusiveLock
Definition: lockdefs.h:38
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1168
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1781
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:707
@ LWTRANCHE_REPLICATION_ORIGIN_STATE
Definition: lwlock.h:188
@ LW_SHARED
Definition: lwlock.h:115
@ LW_EXCLUSIVE
Definition: lwlock.h:114
void pfree(void *pointer)
Definition: mcxt.c:1521
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:161
static ReplicationStateCtl * replication_states_ctl
Definition: origin.c:175
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:225
Size ReplicationOriginShmemSize(void)
Definition: origin.c:510
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:256
Datum pg_replication_origin_progress(PG_FUNCTION_ARGS)
Definition: origin.c:1495
void replorigin_session_reset(void)
Definition: origin.c:1194
struct ReplicationState ReplicationState
static bool IsReservedOriginName(const char *name)
Definition: origin.c:208
void replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
Definition: origin.c:1223
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:469
Datum pg_replication_origin_advance(PG_FUNCTION_ARGS)
Definition: origin.c:1460
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
Definition: origin.c:1018
#define PG_REPLORIGIN_CHECKPOINT_TMPFILE
Definition: origin.c:100
Datum pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
Definition: origin.c:1409
static ReplicationState * replication_states
Definition: origin.c:170
#define PG_REPLORIGIN_CHECKPOINT_FILENAME
Definition: origin.c:99
Datum pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
Definition: origin.c:1376
Datum pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
Definition: origin.c:1430
Datum pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
Definition: origin.c:1393
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:189
Datum pg_replication_origin_oid(PG_FUNCTION_ARGS)
Definition: origin.c:1333
Datum pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
Definition: origin.c:1354
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition: origin.c:1059
void StartupReplicationOrigin(void)
Definition: origin.c:703
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition: origin.c:415
RepOriginId replorigin_session_origin
Definition: origin.c:159
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:892
static void replorigin_state_clear(RepOriginId roident, bool nowait)
Definition: origin.c:345
void replorigin_session_setup(RepOriginId node, int acquired_by)
Definition: origin.c:1101
void CheckPointReplicationOrigin(void)
Definition: origin.c:577
static ReplicationState * session_replication_state
Definition: origin.c:183
Datum pg_replication_origin_drop(PG_FUNCTION_ARGS)
Definition: origin.c:1314
#define REPLICATION_ORIGIN_PROGRESS_COLS
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1241
void ReplicationOriginShmemInit(void)
Definition: origin.c:530
Datum pg_show_replication_origin_status(PG_FUNCTION_ARGS)
Definition: origin.c:1520
#define REPLICATION_STATE_MAGIC
Definition: origin.c:186
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:160
Datum pg_replication_origin_create(PG_FUNCTION_ARGS)
Definition: origin.c:1273
Datum pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
Definition: origin.c:1448
void replorigin_redo(XLogReaderState *record)
Definition: origin.c:831
struct ReplicationStateCtl ReplicationStateCtl
struct ReplicationStateOnDisk ReplicationStateOnDisk
#define DoNotReplicateId
Definition: origin.h:34
#define InvalidRepOriginId
Definition: origin.h:33
#define XLOG_REPLORIGIN_DROP
Definition: origin.h:31
#define XLOG_REPLORIGIN_SET
Definition: origin.h:30
void * arg
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:41
uint32 pg_crc32c
Definition: pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:98
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:103
return crc
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:33
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
#define PG_RETURN_LSN(x)
Definition: pg_lsn.h:34
FormData_pg_replication_origin * Form_pg_replication_origin
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:36
uintptr_t Datum
Definition: postgres.h:64
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:312
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define RelationGetDescr(relation)
Definition: rel.h:531
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
Size add_size(Size s1, Size s2)
Definition: shmem.c:488
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:382
Size mul_size(Size s1, Size s2)
Definition: shmem.c:505
static pg_noinline void Size size
Definition: slab.c:607
int max_replication_slots
Definition: slot.c:141
#define InitDirtySnapshot(snapshotdata)
Definition: snapmgr.h:40
#define BTEqualStrategyNumber
Definition: stratnum.h:31
ItemPointerData t_self
Definition: htup.h:65
Definition: lwlock.h:42
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition: origin.c:155
XLogRecPtr remote_lsn
Definition: origin.c:146
RepOriginId roident
Definition: origin.c:145
XLogRecPtr remote_lsn
Definition: origin.c:115
XLogRecPtr local_lsn
Definition: origin.c:122
ConditionVariable origin_cv
Definition: origin.c:132
RepOriginId roident
Definition: origin.c:110
LWLock lock
Definition: origin.c:137
TupleDesc setDesc
Definition: execnodes.h:343
Tuplestorestate * setResult
Definition: execnodes.h:342
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207
Definition: regguts.h:323
Definition: c.h:641
RepOriginId node_id
Definition: origin.h:27
RepOriginId node_id
Definition: origin.h:21
XLogRecPtr remote_lsn
Definition: origin.h:20
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:269
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:221
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition: tuplestore.c:784
#define PG_GETARG_TIMESTAMPTZ(n)
Definition: timestamp.h:64
char * text_to_cstring(const text *t)
Definition: varlena.c:217
const char * name
bool IsTransactionState(void)
Definition: xact.c:386
void CommandCounterIncrement(void)
Definition: xact.c:1099
bool RecoveryInProgress(void)
Definition: xlog.c:6334
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2802
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:474
void XLogRegisterData(const char *data, uint32 len)
Definition: xloginsert.c:364
void XLogBeginInsert(void)
Definition: xloginsert.c:149
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:410
#define XLogRecGetData(decoder)
Definition: xlogreader.h:415
#define XLR_INFO_MASK
Definition: xlogrecord.h:62