PostgreSQL Source Code  git master
origin.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * origin.c
4  * Logical replication progress tracking support.
5  *
6  * Copyright (c) 2013-2017, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/logical/origin.c
10  *
11  * NOTES
12  *
13  * This file provides the following:
14  * * An infrastructure to name nodes in a replication setup
15  * * A facility to efficiently store and persist replication progress in an
16  * efficient and durable manner.
17  *
18  * Replication origin consist out of a descriptive, user defined, external
19  * name and a short, thus space efficient, internal 2 byte one. This split
20  * exists because replication origin have to be stored in WAL and shared
21  * memory and long descriptors would be inefficient. For now only use 2 bytes
22  * for the internal id of a replication origin as it seems unlikely that there
23  * soon will be more than 65k nodes in one replication setup; and using only
24  * two bytes allow us to be more space efficient.
25  *
26  * Replication progress is tracked in a shared memory table
27  * (ReplicationState) that's dumped to disk every checkpoint. Entries
28  * ('slots') in this table are identified by the internal id. That's the case
29  * because it allows to increase replication progress during crash
30  * recovery. To allow doing so we store the original LSN (from the originating
31  * system) of a transaction in the commit record. That allows to recover the
32  * precise replayed state after crash recovery; without requiring synchronous
33  * commits. Allowing logical replication to use asynchronous commit is
34  * generally good for performance, but especially important as it allows a
35  * single threaded replay process to keep up with a source that has multiple
36  * backends generating changes concurrently. For efficiency and simplicity
37  * reasons a backend can setup one replication origin that's from then used as
38  * the source of changes produced by the backend, until reset again.
39  *
40  * This infrastructure is intended to be used in cooperation with logical
41  * decoding. When replaying from a remote system the configured origin is
42  * provided to output plugins, allowing prevention of replication loops and
43  * other filtering.
44  *
45  * There are several levels of locking at work:
46  *
47  * * To create and drop replication origins an exclusive lock on
48  * pg_replication_slot is required for the duration. That allows us to
49  * safely and conflict free assign new origins using a dirty snapshot.
50  *
51  * * When creating an in-memory replication progress slot the ReplicationOrigin
52  * LWLock has to be held exclusively; when iterating over the replication
53  * progress a shared lock has to be held, the same when advancing the
54  * replication progress of an individual backend that has not setup as the
55  * session's replication origin.
56  *
57  * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58  * replication progress slot that slot's lwlock has to be held. That's
59  * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60  * all our platforms, but it also simplifies memory ordering concerns
61  * between the remote and local lsn. We use a lwlock instead of a spinlock
62  * so it's less harmful to hold the lock over a WAL write
63  * (c.f. AdvanceReplicationProgress).
64  *
65  * ---------------------------------------------------------------------------
66  */
67 
68 #include "postgres.h"
69 
70 #include <unistd.h>
71 #include <sys/stat.h>
72 
73 #include "funcapi.h"
74 #include "miscadmin.h"
75 
76 #include "access/genam.h"
77 #include "access/heapam.h"
78 #include "access/htup_details.h"
79 #include "access/xact.h"
80 
81 #include "catalog/indexing.h"
82 #include "nodes/execnodes.h"
83 
84 #include "replication/origin.h"
85 #include "replication/logical.h"
86 #include "pgstat.h"
87 #include "storage/fd.h"
88 #include "storage/ipc.h"
89 #include "storage/lmgr.h"
91 #include "storage/copydir.h"
92 
93 #include "utils/builtins.h"
94 #include "utils/fmgroids.h"
95 #include "utils/pg_lsn.h"
96 #include "utils/rel.h"
97 #include "utils/syscache.h"
98 #include "utils/tqual.h"
99 
100 /*
101  * Replay progress of a single remote node.
102  */
103 typedef struct ReplicationState
104 {
105  /*
106  * Local identifier for the remote node.
107  */
109 
110  /*
111  * Location of the latest commit from the remote side.
112  */
114 
115  /*
116  * Remember the local lsn of the commit record so we can XLogFlush() to it
117  * during a checkpoint so we know the commit record actually is safe on
118  * disk.
119  */
121 
122  /*
123  * PID of backend that's acquired slot, or 0 if none.
124  */
126 
127  /*
128  * Condition variable that's signalled when acquired_by changes.
129  */
131 
132  /*
133  * Lock protecting remote_lsn and local_lsn.
134  */
137 
138 /*
139  * On disk version of ReplicationState.
140  */
142 {
146 
147 
148 typedef struct ReplicationStateCtl
149 {
151  ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
153 
154 /* external variables */
158 
159 /*
160  * Base address into a shared memory array of replication states of size
161  * max_replication_slots.
162  *
163  * XXX: Should we use a separate variable to size this rather than
164  * max_replication_slots?
165  */
168 
169 /*
170  * Backend-local, cached element from ReplicationState for use in a backend
171  * replaying remote commits, so we don't have to search ReplicationState for
172  * the backends current RepOriginId.
173  */
175 
176 /* Magic for on disk files. */
177 #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
178 
179 static void
180 replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
181 {
182  if (!superuser())
183  ereport(ERROR,
184  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
185  errmsg("only superusers can query or manipulate replication origins")));
186 
187  if (check_slots && max_replication_slots == 0)
188  ereport(ERROR,
189  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
190  errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
191 
192  if (!recoveryOK && RecoveryInProgress())
193  ereport(ERROR,
194  (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
195  errmsg("cannot manipulate replication origins during recovery")));
196 
197 }
198 
199 
200 /* ---------------------------------------------------------------------------
201  * Functions for working with replication origins themselves.
202  * ---------------------------------------------------------------------------
203  */
204 
205 /*
206  * Check for a persistent replication origin identified by name.
207  *
208  * Returns InvalidOid if the node isn't known yet and missing_ok is true.
209  */
211 replorigin_by_name(char *roname, bool missing_ok)
212 {
215  HeapTuple tuple;
216  Datum roname_d;
217 
218  roname_d = CStringGetTextDatum(roname);
219 
220  tuple = SearchSysCache1(REPLORIGNAME, roname_d);
221  if (HeapTupleIsValid(tuple))
222  {
223  ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
224  roident = ident->roident;
225  ReleaseSysCache(tuple);
226  }
227  else if (!missing_ok)
228  ereport(ERROR,
229  (errcode(ERRCODE_UNDEFINED_OBJECT),
230  errmsg("replication origin \"%s\" does not exist",
231  roname)));
232 
233  return roident;
234 }
235 
236 /*
237  * Create a replication origin.
238  *
239  * Needs to be called in a transaction.
240  */
242 replorigin_create(char *roname)
243 {
244  Oid roident;
245  HeapTuple tuple = NULL;
246  Relation rel;
247  Datum roname_d;
248  SnapshotData SnapshotDirty;
249  SysScanDesc scan;
250  ScanKeyData key;
251 
252  roname_d = CStringGetTextDatum(roname);
253 
255 
256  /*
257  * We need the numeric replication origin to be 16bit wide, so we cannot
258  * rely on the normal oid allocation. Instead we simply scan
259  * pg_replication_origin for the first unused id. That's not particularly
260  * efficient, but this should be a fairly infrequent operation - we can
261  * easily spend a bit more code on this when it turns out it needs to be
262  * faster.
263  *
264  * We handle concurrency by taking an exclusive lock (allowing reads!)
265  * over the table for the duration of the search. Because we use a "dirty
266  * snapshot" we can read rows that other in-progress sessions have
267  * written, even though they would be invisible with normal snapshots. Due
268  * to the exclusive lock there's no danger that new rows can appear while
269  * we're checking.
270  */
271  InitDirtySnapshot(SnapshotDirty);
272 
274 
275  for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
276  {
277  bool nulls[Natts_pg_replication_origin];
279  bool collides;
280 
282 
283  ScanKeyInit(&key,
285  BTEqualStrategyNumber, F_OIDEQ,
286  ObjectIdGetDatum(roident));
287 
289  true /* indexOK */ ,
290  &SnapshotDirty,
291  1, &key);
292 
293  collides = HeapTupleIsValid(systable_getnext(scan));
294 
295  systable_endscan(scan);
296 
297  if (!collides)
298  {
299  /*
300  * Ok, found an unused roident, insert the new row and do a CCI,
301  * so our callers can look it up if they want to.
302  */
303  memset(&nulls, 0, sizeof(nulls));
304 
306  values[Anum_pg_replication_origin_roname - 1] = roname_d;
307 
308  tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
309  CatalogTupleInsert(rel, tuple);
311  break;
312  }
313  }
314 
315  /* now release lock again, */
317 
318  if (tuple == NULL)
319  ereport(ERROR,
320  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
321  errmsg("could not find free replication origin OID")));
322 
323  heap_freetuple(tuple);
324  return roident;
325 }
326 
327 
328 /*
329  * Drop replication origin.
330  *
331  * Needs to be called in a transaction.
332  */
333 void
335 {
336  HeapTuple tuple;
337  Relation rel;
338  int i;
339 
341 
343 
344 restart:
345  tuple = NULL;
346  /* cleanup the slot state info */
347  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
348 
349  for (i = 0; i < max_replication_slots; i++)
350  {
351  ReplicationState *state = &replication_states[i];
352 
353  /* found our slot */
354  if (state->roident == roident)
355  {
356  if (state->acquired_by != 0)
357  {
358  ConditionVariable *cv;
359 
360  if (nowait)
361  ereport(ERROR,
362  (errcode(ERRCODE_OBJECT_IN_USE),
363  errmsg("could not drop replication origin with OID %d, in use by PID %d",
364  state->roident,
365  state->acquired_by)));
366  cv = &state->origin_cv;
367 
368  LWLockRelease(ReplicationOriginLock);
372  goto restart;
373  }
374 
375  /* first WAL log */
376  {
377  xl_replorigin_drop xlrec;
378 
379  xlrec.node_id = roident;
380  XLogBeginInsert();
381  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
382  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
383  }
384 
385  /* then reset the in-memory entry */
386  state->roident = InvalidRepOriginId;
387  state->remote_lsn = InvalidXLogRecPtr;
388  state->local_lsn = InvalidXLogRecPtr;
389  break;
390  }
391  }
392  LWLockRelease(ReplicationOriginLock);
393 
395  if (!HeapTupleIsValid(tuple))
396  elog(ERROR, "cache lookup failed for replication origin with oid %u",
397  roident);
398 
399  CatalogTupleDelete(rel, &tuple->t_self);
400  ReleaseSysCache(tuple);
401 
403 
404  /* now release lock again */
406 }
407 
408 
409 /*
410  * Lookup replication origin via it's oid and return the name.
411  *
412  * The external name is palloc'd in the calling context.
413  *
414  * Returns true if the origin is known, false otherwise.
415  */
416 bool
417 replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
418 {
419  HeapTuple tuple;
421 
422  Assert(OidIsValid((Oid) roident));
423  Assert(roident != InvalidRepOriginId);
424  Assert(roident != DoNotReplicateId);
425 
427  ObjectIdGetDatum((Oid) roident));
428 
429  if (HeapTupleIsValid(tuple))
430  {
431  ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
432  *roname = text_to_cstring(&ric->roname);
433  ReleaseSysCache(tuple);
434 
435  return true;
436  }
437  else
438  {
439  *roname = NULL;
440 
441  if (!missing_ok)
442  ereport(ERROR,
443  (errcode(ERRCODE_UNDEFINED_OBJECT),
444  errmsg("replication origin with OID %u does not exist",
445  roident)));
446 
447  return false;
448  }
449 }
450 
451 
452 /* ---------------------------------------------------------------------------
453  * Functions for handling replication progress.
454  * ---------------------------------------------------------------------------
455  */
456 
457 Size
459 {
460  Size size = 0;
461 
462  /*
463  * XXX: max_replication_slots is arguably the wrong thing to use, as here
464  * we keep the replay state of *remote* transactions. But for now it seems
465  * sufficient to reuse it, lest we introduce a separate GUC.
466  */
467  if (max_replication_slots == 0)
468  return size;
469 
470  size = add_size(size, offsetof(ReplicationStateCtl, states));
471 
472  size = add_size(size,
474  return size;
475 }
476 
477 void
479 {
480  bool found;
481 
482  if (max_replication_slots == 0)
483  return;
484 
485  replication_states_ctl = (ReplicationStateCtl *)
486  ShmemInitStruct("ReplicationOriginState",
488  &found);
489  replication_states = replication_states_ctl->states;
490 
491  if (!found)
492  {
493  int i;
494 
495  replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN;
496 
497  MemSet(replication_states, 0, ReplicationOriginShmemSize());
498 
499  for (i = 0; i < max_replication_slots; i++)
500  {
501  LWLockInitialize(&replication_states[i].lock,
502  replication_states_ctl->tranche_id);
503  ConditionVariableInit(&replication_states[i].origin_cv);
504  }
505  }
506 
507  LWLockRegisterTranche(replication_states_ctl->tranche_id,
508  "replication_origin");
509 }
510 
511 /* ---------------------------------------------------------------------------
512  * Perform a checkpoint of each replication origin's progress with respect to
513  * the replayed remote_lsn. Make sure that all transactions we refer to in the
514  * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
515  * if the transactions were originally committed asynchronously.
516  *
517  * We store checkpoints in the following format:
518  * +-------+------------------------+------------------+-----+--------+
519  * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
520  * +-------+------------------------+------------------+-----+--------+
521  *
522  * So its just the magic, followed by the statically sized
523  * ReplicationStateOnDisk structs. Note that the maximum number of
524  * ReplicationState is determined by max_replication_slots.
525  * ---------------------------------------------------------------------------
526  */
527 void
529 {
530  const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
531  const char *path = "pg_logical/replorigin_checkpoint";
532  int tmpfd;
533  int i;
535  pg_crc32c crc;
536 
537  if (max_replication_slots == 0)
538  return;
539 
540  INIT_CRC32C(crc);
541 
542  /* make sure no old temp file is remaining */
543  if (unlink(tmppath) < 0 && errno != ENOENT)
544  ereport(PANIC,
546  errmsg("could not remove file \"%s\": %m",
547  tmppath)));
548 
549  /*
550  * no other backend can perform this at the same time, we're protected by
551  * CheckpointLock.
552  */
553  tmpfd = OpenTransientFile(tmppath,
554  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
555  if (tmpfd < 0)
556  ereport(PANIC,
558  errmsg("could not create file \"%s\": %m",
559  tmppath)));
560 
561  /* write magic */
562  if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
563  {
564  CloseTransientFile(tmpfd);
565  ereport(PANIC,
567  errmsg("could not write to file \"%s\": %m",
568  tmppath)));
569  }
570  COMP_CRC32C(crc, &magic, sizeof(magic));
571 
572  /* prevent concurrent creations/drops */
573  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
574 
575  /* write actual data */
576  for (i = 0; i < max_replication_slots; i++)
577  {
578  ReplicationStateOnDisk disk_state;
579  ReplicationState *curstate = &replication_states[i];
581 
582  if (curstate->roident == InvalidRepOriginId)
583  continue;
584 
585  /* zero, to avoid uninitialized padding bytes */
586  memset(&disk_state, 0, sizeof(disk_state));
587 
588  LWLockAcquire(&curstate->lock, LW_SHARED);
589 
590  disk_state.roident = curstate->roident;
591 
592  disk_state.remote_lsn = curstate->remote_lsn;
593  local_lsn = curstate->local_lsn;
594 
595  LWLockRelease(&curstate->lock);
596 
597  /* make sure we only write out a commit that's persistent */
598  XLogFlush(local_lsn);
599 
600  if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
601  sizeof(disk_state))
602  {
603  CloseTransientFile(tmpfd);
604  ereport(PANIC,
606  errmsg("could not write to file \"%s\": %m",
607  tmppath)));
608  }
609 
610  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
611  }
612 
613  LWLockRelease(ReplicationOriginLock);
614 
615  /* write out the CRC */
616  FIN_CRC32C(crc);
617  if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
618  {
619  CloseTransientFile(tmpfd);
620  ereport(PANIC,
622  errmsg("could not write to file \"%s\": %m",
623  tmppath)));
624  }
625 
626  CloseTransientFile(tmpfd);
627 
628  /* fsync, rename to permanent file, fsync file and directory */
629  durable_rename(tmppath, path, PANIC);
630 }
631 
632 /*
633  * Recover replication replay status from checkpoint data saved earlier by
634  * CheckPointReplicationOrigin.
635  *
636  * This only needs to be called at startup and *not* during every checkpoint
637  * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
638  * state thereafter can be recovered by looking at commit records.
639  */
640 void
642 {
643  const char *path = "pg_logical/replorigin_checkpoint";
644  int fd;
645  int readBytes;
647  int last_state = 0;
648  pg_crc32c file_crc;
649  pg_crc32c crc;
650 
651  /* don't want to overwrite already existing state */
652 #ifdef USE_ASSERT_CHECKING
653  static bool already_started = false;
654 
655  Assert(!already_started);
656  already_started = true;
657 #endif
658 
659  if (max_replication_slots == 0)
660  return;
661 
662  INIT_CRC32C(crc);
663 
664  elog(DEBUG2, "starting up replication origin progress state");
665 
666  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
667 
668  /*
669  * might have had max_replication_slots == 0 last run, or we just brought
670  * up a standby.
671  */
672  if (fd < 0 && errno == ENOENT)
673  return;
674  else if (fd < 0)
675  ereport(PANIC,
677  errmsg("could not open file \"%s\": %m",
678  path)));
679 
680  /* verify magic, that is written even if nothing was active */
681  readBytes = read(fd, &magic, sizeof(magic));
682  if (readBytes != sizeof(magic))
683  ereport(PANIC,
684  (errmsg("could not read file \"%s\": %m",
685  path)));
686  COMP_CRC32C(crc, &magic, sizeof(magic));
687 
688  if (magic != REPLICATION_STATE_MAGIC)
689  ereport(PANIC,
690  (errmsg("replication checkpoint has wrong magic %u instead of %u",
691  magic, REPLICATION_STATE_MAGIC)));
692 
693  /* we can skip locking here, no other access is possible */
694 
695  /* recover individual states, until there are no more to be found */
696  while (true)
697  {
698  ReplicationStateOnDisk disk_state;
699 
700  readBytes = read(fd, &disk_state, sizeof(disk_state));
701 
702  /* no further data */
703  if (readBytes == sizeof(crc))
704  {
705  /* not pretty, but simple ... */
706  file_crc = *(pg_crc32c *) &disk_state;
707  break;
708  }
709 
710  if (readBytes < 0)
711  {
712  ereport(PANIC,
714  errmsg("could not read file \"%s\": %m",
715  path)));
716  }
717 
718  if (readBytes != sizeof(disk_state))
719  {
720  ereport(PANIC,
722  errmsg("could not read file \"%s\": read %d of %zu",
723  path, readBytes, sizeof(disk_state))));
724  }
725 
726  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
727 
728  if (last_state == max_replication_slots)
729  ereport(PANIC,
730  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
731  errmsg("could not find free replication state, increase max_replication_slots")));
732 
733  /* copy data to shared memory */
734  replication_states[last_state].roident = disk_state.roident;
735  replication_states[last_state].remote_lsn = disk_state.remote_lsn;
736  last_state++;
737 
738  elog(LOG, "recovered replication state of node %u to %X/%X",
739  disk_state.roident,
740  (uint32) (disk_state.remote_lsn >> 32),
741  (uint32) disk_state.remote_lsn);
742  }
743 
744  /* now check checksum */
745  FIN_CRC32C(crc);
746  if (file_crc != crc)
747  ereport(PANIC,
748  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
749  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
750  crc, file_crc)));
751 
752  CloseTransientFile(fd);
753 }
754 
755 void
757 {
758  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
759 
760  switch (info)
761  {
762  case XLOG_REPLORIGIN_SET:
763  {
764  xl_replorigin_set *xlrec =
765  (xl_replorigin_set *) XLogRecGetData(record);
766 
768  xlrec->remote_lsn, record->EndRecPtr,
769  xlrec->force /* backward */ ,
770  false /* WAL log */ );
771  break;
772  }
774  {
775  xl_replorigin_drop *xlrec;
776  int i;
777 
778  xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
779 
780  for (i = 0; i < max_replication_slots; i++)
781  {
782  ReplicationState *state = &replication_states[i];
783 
784  /* found our slot */
785  if (state->roident == xlrec->node_id)
786  {
787  /* reset entry */
788  state->roident = InvalidRepOriginId;
789  state->remote_lsn = InvalidXLogRecPtr;
790  state->local_lsn = InvalidXLogRecPtr;
791  break;
792  }
793  }
794  break;
795  }
796  default:
797  elog(PANIC, "replorigin_redo: unknown op code %u", info);
798  }
799 }
800 
801 
802 /*
803  * Tell the replication origin progress machinery that a commit from 'node'
804  * that originated at the LSN remote_commit on the remote node was replayed
805  * successfully and that we don't need to do so again. In combination with
806  * setting up replorigin_session_origin_lsn and replorigin_session_origin
807  * that ensures we won't loose knowledge about that after a crash if the
808  * transaction had a persistent effect (think of asynchronous commits).
809  *
810  * local_commit needs to be a local LSN of the commit so that we can make sure
811  * upon a checkpoint that enough WAL has been persisted to disk.
812  *
813  * Needs to be called with a RowExclusiveLock on pg_replication_origin,
814  * unless running in recovery.
815  */
816 void
818  XLogRecPtr remote_commit, XLogRecPtr local_commit,
819  bool go_backward, bool wal_log)
820 {
821  int i;
822  ReplicationState *replication_state = NULL;
823  ReplicationState *free_state = NULL;
824 
825  Assert(node != InvalidRepOriginId);
826 
827  /* we don't track DoNotReplicateId */
828  if (node == DoNotReplicateId)
829  return;
830 
831  /*
832  * XXX: For the case where this is called by WAL replay, it'd be more
833  * efficient to restore into a backend local hashtable and only dump into
834  * shmem after recovery is finished. Let's wait with implementing that
835  * till it's shown to be a measurable expense
836  */
837 
838  /* Lock exclusively, as we may have to create a new table entry. */
839  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
840 
841  /*
842  * Search for either an existing slot for the origin, or a free one we can
843  * use.
844  */
845  for (i = 0; i < max_replication_slots; i++)
846  {
847  ReplicationState *curstate = &replication_states[i];
848 
849  /* remember where to insert if necessary */
850  if (curstate->roident == InvalidRepOriginId &&
851  free_state == NULL)
852  {
853  free_state = curstate;
854  continue;
855  }
856 
857  /* not our slot */
858  if (curstate->roident != node)
859  {
860  continue;
861  }
862 
863  /* ok, found slot */
864  replication_state = curstate;
865 
866  LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
867 
868  /* Make sure it's not used by somebody else */
869  if (replication_state->acquired_by != 0)
870  {
871  ereport(ERROR,
872  (errcode(ERRCODE_OBJECT_IN_USE),
873  errmsg("replication origin with OID %d is already active for PID %d",
874  replication_state->roident,
875  replication_state->acquired_by)));
876  }
877 
878  break;
879  }
880 
881  if (replication_state == NULL && free_state == NULL)
882  ereport(ERROR,
883  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
884  errmsg("could not find free replication state slot for replication origin with OID %u",
885  node),
886  errhint("Increase max_replication_slots and try again.")));
887 
888  if (replication_state == NULL)
889  {
890  /* initialize new slot */
891  LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
892  replication_state = free_state;
893  Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
894  Assert(replication_state->local_lsn == InvalidXLogRecPtr);
895  replication_state->roident = node;
896  }
897 
898  Assert(replication_state->roident != InvalidRepOriginId);
899 
900  /*
901  * If somebody "forcefully" sets this slot, WAL log it, so it's durable
902  * and the standby gets the message. Primarily this will be called during
903  * WAL replay (of commit records) where no WAL logging is necessary.
904  */
905  if (wal_log)
906  {
907  xl_replorigin_set xlrec;
908 
909  xlrec.remote_lsn = remote_commit;
910  xlrec.node_id = node;
911  xlrec.force = go_backward;
912 
913  XLogBeginInsert();
914  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
915 
916  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
917  }
918 
919  /*
920  * Due to - harmless - race conditions during a checkpoint we could see
921  * values here that are older than the ones we already have in memory.
922  * Don't overwrite those.
923  */
924  if (go_backward || replication_state->remote_lsn < remote_commit)
925  replication_state->remote_lsn = remote_commit;
926  if (local_commit != InvalidXLogRecPtr &&
927  (go_backward || replication_state->local_lsn < local_commit))
928  replication_state->local_lsn = local_commit;
929  LWLockRelease(&replication_state->lock);
930 
931  /*
932  * Release *after* changing the LSNs, slot isn't acquired and thus could
933  * otherwise be dropped anytime.
934  */
935  LWLockRelease(ReplicationOriginLock);
936 }
937 
938 
941 {
942  int i;
945 
946  /* prevent slots from being concurrently dropped */
947  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
948 
949  for (i = 0; i < max_replication_slots; i++)
950  {
952 
953  state = &replication_states[i];
954 
955  if (state->roident == node)
956  {
957  LWLockAcquire(&state->lock, LW_SHARED);
958 
959  remote_lsn = state->remote_lsn;
960  local_lsn = state->local_lsn;
961 
962  LWLockRelease(&state->lock);
963 
964  break;
965  }
966  }
967 
968  LWLockRelease(ReplicationOriginLock);
969 
970  if (flush && local_lsn != InvalidXLogRecPtr)
971  XLogFlush(local_lsn);
972 
973  return remote_lsn;
974 }
975 
976 /*
977  * Tear down a (possibly) configured session replication origin during process
978  * exit.
979  */
980 static void
982 {
983  ConditionVariable *cv = NULL;
984 
985  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
986 
987  if (session_replication_state != NULL &&
988  session_replication_state->acquired_by == MyProcPid)
989  {
990  cv = &session_replication_state->origin_cv;
991 
992  session_replication_state->acquired_by = 0;
993  session_replication_state = NULL;
994  }
995 
996  LWLockRelease(ReplicationOriginLock);
997 
998  if (cv)
1000 }
1001 
1002 /*
1003  * Setup a replication origin in the shared memory struct if it doesn't
1004  * already exists and cache access to the specific ReplicationSlot so the
1005  * array doesn't have to be searched when calling
1006  * replorigin_session_advance().
1007  *
1008  * Obviously only one such cached origin can exist per process and the current
1009  * cached value can only be set again after the previous value is torn down
1010  * with replorigin_session_reset().
1011  */
1012 void
1014 {
1015  static bool registered_cleanup;
1016  int i;
1017  int free_slot = -1;
1018 
1019  if (!registered_cleanup)
1020  {
1022  registered_cleanup = true;
1023  }
1024 
1026 
1027  if (session_replication_state != NULL)
1028  ereport(ERROR,
1029  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1030  errmsg("cannot setup replication origin when one is already setup")));
1031 
1032  /* Lock exclusively, as we may have to create a new table entry. */
1033  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1034 
1035  /*
1036  * Search for either an existing slot for the origin, or a free one we can
1037  * use.
1038  */
1039  for (i = 0; i < max_replication_slots; i++)
1040  {
1041  ReplicationState *curstate = &replication_states[i];
1042 
1043  /* remember where to insert if necessary */
1044  if (curstate->roident == InvalidRepOriginId &&
1045  free_slot == -1)
1046  {
1047  free_slot = i;
1048  continue;
1049  }
1050 
1051  /* not our slot */
1052  if (curstate->roident != node)
1053  continue;
1054 
1055  else if (curstate->acquired_by != 0)
1056  {
1057  ereport(ERROR,
1058  (errcode(ERRCODE_OBJECT_IN_USE),
1059  errmsg("replication identifier %d is already active for PID %d",
1060  curstate->roident, curstate->acquired_by)));
1061  }
1062 
1063  /* ok, found slot */
1064  session_replication_state = curstate;
1065  }
1066 
1067 
1068  if (session_replication_state == NULL && free_slot == -1)
1069  ereport(ERROR,
1070  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1071  errmsg("could not find free replication state slot for replication origin with OID %u",
1072  node),
1073  errhint("Increase max_replication_slots and try again.")));
1074  else if (session_replication_state == NULL)
1075  {
1076  /* initialize new slot */
1077  session_replication_state = &replication_states[free_slot];
1078  Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
1079  Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
1080  session_replication_state->roident = node;
1081  }
1082 
1083 
1084  Assert(session_replication_state->roident != InvalidRepOriginId);
1085 
1086  session_replication_state->acquired_by = MyProcPid;
1087 
1088  LWLockRelease(ReplicationOriginLock);
1089 
1090  /* probably this one is pointless */
1091  ConditionVariableBroadcast(&session_replication_state->origin_cv);
1092 }
1093 
1094 /*
1095  * Reset replay state previously setup in this session.
1096  *
1097  * This function may only be called if an origin was setup with
1098  * replorigin_session_setup().
1099  */
1100 void
1102 {
1103  ConditionVariable *cv;
1104 
1106 
1107  if (session_replication_state == NULL)
1108  ereport(ERROR,
1109  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1110  errmsg("no replication origin is configured")));
1111 
1112  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1113 
1114  session_replication_state->acquired_by = 0;
1115  cv = &session_replication_state->origin_cv;
1116  session_replication_state = NULL;
1117 
1118  LWLockRelease(ReplicationOriginLock);
1119 
1121 }
1122 
1123 /*
1124  * Do the same work replorigin_advance() does, just on the session's
1125  * configured origin.
1126  *
1127  * This is noticeably cheaper than using replorigin_advance().
1128  */
1129 void
1131 {
1132  Assert(session_replication_state != NULL);
1133  Assert(session_replication_state->roident != InvalidRepOriginId);
1134 
1135  LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1136  if (session_replication_state->local_lsn < local_commit)
1137  session_replication_state->local_lsn = local_commit;
1138  if (session_replication_state->remote_lsn < remote_commit)
1139  session_replication_state->remote_lsn = remote_commit;
1140  LWLockRelease(&session_replication_state->lock);
1141 }
1142 
1143 /*
1144  * Ask the machinery about the point up to which we successfully replayed
1145  * changes from an already setup replication origin.
1146  */
1147 XLogRecPtr
1149 {
1152 
1153  Assert(session_replication_state != NULL);
1154 
1155  LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1156  remote_lsn = session_replication_state->remote_lsn;
1157  local_lsn = session_replication_state->local_lsn;
1158  LWLockRelease(&session_replication_state->lock);
1159 
1160  if (flush && local_lsn != InvalidXLogRecPtr)
1161  XLogFlush(local_lsn);
1162 
1163  return remote_lsn;
1164 }
1165 
1166 
1167 
1168 /* ---------------------------------------------------------------------------
1169  * SQL functions for working with replication origin.
1170  *
1171  * These mostly should be fairly short wrappers around more generic functions.
1172  * ---------------------------------------------------------------------------
1173  */
1174 
1175 /*
1176  * Create replication origin for the passed in name, and return the assigned
1177  * oid.
1178  */
1179 Datum
1181 {
1182  char *name;
1184 
1185  replorigin_check_prerequisites(false, false);
1186 
1188  roident = replorigin_create(name);
1189 
1190  pfree(name);
1191 
1192  PG_RETURN_OID(roident);
1193 }
1194 
1195 /*
1196  * Drop replication origin.
1197  */
1198 Datum
1200 {
1201  char *name;
1203 
1204  replorigin_check_prerequisites(false, false);
1205 
1207 
1208  roident = replorigin_by_name(name, false);
1209  Assert(OidIsValid(roident));
1210 
1211  replorigin_drop(roident, true);
1212 
1213  pfree(name);
1214 
1215  PG_RETURN_VOID();
1216 }
1217 
1218 /*
1219  * Return oid of a replication origin.
1220  */
1221 Datum
1223 {
1224  char *name;
1226 
1227  replorigin_check_prerequisites(false, false);
1228 
1230  roident = replorigin_by_name(name, true);
1231 
1232  pfree(name);
1233 
1234  if (OidIsValid(roident))
1235  PG_RETURN_OID(roident);
1236  PG_RETURN_NULL();
1237 }
1238 
1239 /*
1240  * Setup a replication origin for this session.
1241  */
1242 Datum
1244 {
1245  char *name;
1246  RepOriginId origin;
1247 
1248  replorigin_check_prerequisites(true, false);
1249 
1251  origin = replorigin_by_name(name, false);
1252  replorigin_session_setup(origin);
1253 
1254  replorigin_session_origin = origin;
1255 
1256  pfree(name);
1257 
1258  PG_RETURN_VOID();
1259 }
1260 
1261 /*
1262  * Reset previously setup origin in this session
1263  */
1264 Datum
1266 {
1267  replorigin_check_prerequisites(true, false);
1268 
1270 
1274 
1275  PG_RETURN_VOID();
1276 }
1277 
1278 /*
1279  * Has a replication origin been setup for this session.
1280  */
1281 Datum
1283 {
1284  replorigin_check_prerequisites(false, false);
1285 
1287 }
1288 
1289 
1290 /*
1291  * Return the replication progress for origin setup in the current session.
1292  *
1293  * If 'flush' is set to true it is ensured that the returned value corresponds
1294  * to a local transaction that has been flushed. This is useful if asynchronous
1295  * commits are used when replaying replicated transactions.
1296  */
1297 Datum
1299 {
1301  bool flush = PG_GETARG_BOOL(0);
1302 
1303  replorigin_check_prerequisites(true, false);
1304 
1305  if (session_replication_state == NULL)
1306  ereport(ERROR,
1307  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1308  errmsg("no replication origin is configured")));
1309 
1310  remote_lsn = replorigin_session_get_progress(flush);
1311 
1312  if (remote_lsn == InvalidXLogRecPtr)
1313  PG_RETURN_NULL();
1314 
1315  PG_RETURN_LSN(remote_lsn);
1316 }
1317 
1318 Datum
1320 {
1321  XLogRecPtr location = PG_GETARG_LSN(0);
1322 
1323  replorigin_check_prerequisites(true, false);
1324 
1325  if (session_replication_state == NULL)
1326  ereport(ERROR,
1327  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1328  errmsg("no replication origin is configured")));
1329 
1330  replorigin_session_origin_lsn = location;
1332 
1333  PG_RETURN_VOID();
1334 }
1335 
1336 Datum
1338 {
1339  replorigin_check_prerequisites(true, false);
1340 
1343 
1344  PG_RETURN_VOID();
1345 }
1346 
1347 
1348 Datum
1350 {
1351  text *name = PG_GETARG_TEXT_PP(0);
1352  XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1353  RepOriginId node;
1354 
1355  replorigin_check_prerequisites(true, false);
1356 
1357  /* lock to prevent the replication origin from vanishing */
1359 
1360  node = replorigin_by_name(text_to_cstring(name), false);
1361 
1362  /*
1363  * Can't sensibly pass a local commit to be flushed at checkpoint - this
1364  * xact hasn't committed yet. This is why this function should be used to
1365  * set up the initial replication state, but not for replay.
1366  */
1367  replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1368  true /* go backward */ , true /* WAL log */ );
1369 
1371 
1372  PG_RETURN_VOID();
1373 }
1374 
1375 
1376 /*
1377  * Return the replication progress for an individual replication origin.
1378  *
1379  * If 'flush' is set to true it is ensured that the returned value corresponds
1380  * to a local transaction that has been flushed. This is useful if asynchronous
1381  * commits are used when replaying replicated transactions.
1382  */
1383 Datum
1385 {
1386  char *name;
1387  bool flush;
1390 
1391  replorigin_check_prerequisites(true, true);
1392 
1394  flush = PG_GETARG_BOOL(1);
1395 
1396  roident = replorigin_by_name(name, false);
1397  Assert(OidIsValid(roident));
1398 
1399  remote_lsn = replorigin_get_progress(roident, flush);
1400 
1401  if (remote_lsn == InvalidXLogRecPtr)
1402  PG_RETURN_NULL();
1403 
1404  PG_RETURN_LSN(remote_lsn);
1405 }
1406 
1407 
1408 Datum
1410 {
1411  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1412  TupleDesc tupdesc;
1413  Tuplestorestate *tupstore;
1414  MemoryContext per_query_ctx;
1415  MemoryContext oldcontext;
1416  int i;
1418 
1419  /* we we want to return 0 rows if slot is set to zero */
1420  replorigin_check_prerequisites(false, true);
1421 
1422  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1423  ereport(ERROR,
1424  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1425  errmsg("set-valued function called in context that cannot accept a set")));
1426  if (!(rsinfo->allowedModes & SFRM_Materialize))
1427  ereport(ERROR,
1428  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1429  errmsg("materialize mode required, but it is not allowed in this context")));
1430  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1431  elog(ERROR, "return type must be a row type");
1432 
1433  if (tupdesc->natts != REPLICATION_ORIGIN_PROGRESS_COLS)
1434  elog(ERROR, "wrong function definition");
1435 
1436  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1437  oldcontext = MemoryContextSwitchTo(per_query_ctx);
1438 
1439  tupstore = tuplestore_begin_heap(true, false, work_mem);
1440  rsinfo->returnMode = SFRM_Materialize;
1441  rsinfo->setResult = tupstore;
1442  rsinfo->setDesc = tupdesc;
1443 
1444  MemoryContextSwitchTo(oldcontext);
1445 
1446 
1447  /* prevent slots from being concurrently dropped */
1448  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1449 
1450  /*
1451  * Iterate through all possible replication_states, display if they are
1452  * filled. Note that we do not take any locks, so slightly corrupted/out
1453  * of date values are a possibility.
1454  */
1455  for (i = 0; i < max_replication_slots; i++)
1456  {
1460  char *roname;
1461 
1462  state = &replication_states[i];
1463 
1464  /* unused slot, nothing to display */
1465  if (state->roident == InvalidRepOriginId)
1466  continue;
1467 
1468  memset(values, 0, sizeof(values));
1469  memset(nulls, 1, sizeof(nulls));
1470 
1471  values[0] = ObjectIdGetDatum(state->roident);
1472  nulls[0] = false;
1473 
1474  /*
1475  * We're not preventing the origin to be dropped concurrently, so
1476  * silently accept that it might be gone.
1477  */
1478  if (replorigin_by_oid(state->roident, true,
1479  &roname))
1480  {
1481  values[1] = CStringGetTextDatum(roname);
1482  nulls[1] = false;
1483  }
1484 
1485  LWLockAcquire(&state->lock, LW_SHARED);
1486 
1487  values[2] = LSNGetDatum(state->remote_lsn);
1488  nulls[2] = false;
1489 
1490  values[3] = LSNGetDatum(state->local_lsn);
1491  nulls[3] = false;
1492 
1493  LWLockRelease(&state->lock);
1494 
1495  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1496  }
1497 
1498  tuplestore_donestoring(tupstore);
1499 
1500  LWLockRelease(ReplicationOriginLock);
1501 
1502 #undef REPLICATION_ORIGIN_PROGRESS_COLS
1503 
1504  return (Datum) 0;
1505 }
static ReplicationState * session_replication_state
Definition: origin.c:174
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:180
Definition: lwlock.h:32
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define IsA(nodeptr, _type_)
Definition: nodes.h:563
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
int MyProcPid
Definition: globals.c:39
int errhint(const char *fmt,...)
Definition: elog.c:987
XLogRecPtr local_lsn
Definition: origin.c:120
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:499
#define GETSTRUCT(TUP)
Definition: htup_details.h:661
#define Anum_pg_replication_origin_roident
#define RelationGetDescr(relation)
Definition: rel.h:437
#define DoNotReplicateId
Definition: origin.h:35
Datum pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
Definition: origin.c:1319
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:182
#define write(a, b, c)
Definition: win32.h:14
Datum pg_replication_origin_drop(PG_FUNCTION_ARGS)
Definition: origin.c:1199
#define ExclusiveLock
Definition: lockdefs.h:44
int64 TimestampTz
Definition: timestamp.h:39
XLogRecPtr remote_lsn
Definition: origin.c:144
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:233
uint32 pg_crc32c
Definition: pg_crc32c.h:38
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition: origin.c:981
RepOriginId roident
Definition: origin.c:143
int ConditionVariableBroadcast(ConditionVariable *cv)
void replorigin_drop(RepOriginId roident, bool nowait)
Definition: origin.c:334
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
unsigned char uint8
Definition: c.h:294
uint16 RepOriginId
Definition: xlogdefs.h:51
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1148
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
bool superuser(void)
Definition: superuser.c:47
#define MemSet(start, val, len)
Definition: c.h:853
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:255
void ReplicationOriginShmemInit(void)
Definition: origin.c:478
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:817
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:695
#define heap_close(r, l)
Definition: heapam.h:97
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:239
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1013
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:417
#define LOG
Definition: elog.h:26
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1373
unsigned int Oid
Definition: postgres_ext.h:31
bool RecoveryInProgress(void)
Definition: xlog.c:7922
#define OidIsValid(objectId)
Definition: c.h:576
#define PANIC
Definition: elog.h:53
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2763
static int fd(const char *x, int i)
Definition: preproc-init.c:105
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:328
#define PG_BINARY
Definition: c.h:1025
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:211
RepOriginId roident
Definition: origin.c:108
XLogRecPtr EndRecPtr
Definition: xlogreader.h:120
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:273
void LWLockRegisterTranche(int tranche_id, const char *tranche_name)
Definition: lwlock.c:599
Datum pg_replication_origin_advance(PG_FUNCTION_ARGS)
Definition: origin.c:1349
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1722
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
#define PG_RETURN_LSN(x)
Definition: pg_lsn.h:25
void CheckPointReplicationOrigin(void)
Definition: origin.c:528
void ConditionVariableInit(ConditionVariable *cv)
void replorigin_redo(XLogReaderState *record)
Definition: origin.c:756
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:416
void pfree(void *pointer)
Definition: mcxt.c:949
#define XLogRecGetData(decoder)
Definition: xlogreader.h:226
void ConditionVariableCancelSleep(void)
#define Anum_pg_replication_origin_roname
#define ReplicationOriginIdentIndex
Definition: indexing.h:333
#define ObjectIdGetDatum(X)
Definition: postgres.h:513
#define ERROR
Definition: elog.h:43
#define PG_UINT16_MAX
Definition: c.h:377
void replorigin_session_reset(void)
Definition: origin.c:1101
Oid CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:162
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2392
FormData_pg_replication_origin * Form_pg_replication_origin
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
#define InitDirtySnapshot(snapshotdata)
Definition: tqual.h:103
LWLock lock
Definition: origin.c:135
ItemPointerData t_self
Definition: htup.h:65
#define DEBUG2
Definition: elog.h:24
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:156
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:348
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:157
#define XLOG_REPLORIGIN_SET
Definition: origin.h:31
void StartupReplicationOrigin(void)
Definition: origin.c:641
#define RowExclusiveLock
Definition: lockdefs.h:38
int errcode_for_file_access(void)
Definition: elog.c:598
struct ReplicationState ReplicationState
unsigned int uint32
Definition: c.h:296
Datum pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
Definition: origin.c:1243
Datum pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
Definition: origin.c:1265
RepOriginId replorigin_create(char *roname)
Definition: origin.c:242
Datum pg_show_replication_origin_status(PG_FUNCTION_ARGS)
Definition: origin.c:1409
#define ereport(elevel, rest)
Definition: elog.h:122
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:222
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:607
Datum pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
Definition: origin.c:1282
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:674
int CloseTransientFile(int fd)
Definition: fd.c:2562
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1112
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:323
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:415
#define XLOG_REPLORIGIN_DROP
Definition: origin.h:32
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:24
struct ReplicationStateOnDisk ReplicationStateOnDisk
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:319
uintptr_t Datum
Definition: postgres.h:372
void CommandCounterIncrement(void)
Definition: xact.c:915
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1160
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1290
int work_mem
Definition: globals.c:113
Size ReplicationOriginShmemSize(void)
Definition: origin.c:458
#define REPLICATION_STATE_MAGIC
Definition: origin.c:177
#define InvalidOid
Definition: postgres_ext.h:36
static ReplicationStateCtl * replication_states_ctl
Definition: origin.c:167
Datum pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
Definition: origin.c:1337
int allowedModes
Definition: execnodes.h:269
#define PG_RETURN_VOID()
Definition: fmgr.h:309
struct ReplicationStateCtl ReplicationStateCtl
SetFunctionReturnMode returnMode
Definition: execnodes.h:271
int max_replication_slots
Definition: slot.c:99
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
XLogRecPtr remote_lsn
Definition: origin.c:113
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
Datum pg_replication_origin_progress(PG_FUNCTION_ARGS)
Definition: origin.c:1384
RepOriginId node_id
Definition: origin.h:28
uint64 XLogRecPtr
Definition: xlogdefs.h:21
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition: origin.c:151
#define Assert(condition)
Definition: c.h:670
#define XLR_INFO_MASK
Definition: xlogrecord.h:62
RepOriginId replorigin_session_origin
Definition: origin.c:155
Definition: regguts.h:298
RepOriginId node_id
Definition: origin.h:22
ConditionVariable origin_cv
Definition: origin.c:130
size_t Size
Definition: c.h:404
void replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
Definition: origin.c:1130
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1118
bool IsTransactionState(void)
Definition: xact.c:351
#define Natts_pg_replication_origin
XLogRecPtr remote_lsn
Definition: origin.h:21
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:203
const char * name
Definition: encode.c:521
#define InvalidRepOriginId
Definition: origin.h:34
Tuplestorestate * setResult
Definition: execnodes.h:274
#define DatumGetPointer(X)
Definition: postgres.h:555
static Datum values[MAXATTR]
Definition: bootstrap.c:164
char * text_to_cstring(const text *t)
Definition: varlena.c:182
ExprContext * econtext
Definition: execnodes.h:267
TupleDesc setDesc
Definition: execnodes.h:275
#define ReplicationOriginRelationId
int errmsg(const char *fmt,...)
Definition: elog.c:797
Datum pg_replication_origin_oid(PG_FUNCTION_ARGS)
Definition: origin.c:1222
#define PG_GETARG_TIMESTAMPTZ(n)
Definition: timestamp.h:36
int i
Datum pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
Definition: origin.c:1298
static ReplicationState * replication_states
Definition: origin.c:166
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
#define CStringGetTextDatum(s)
Definition: builtins.h:91
void * arg
Definition: c.h:487
#define PG_FUNCTION_ARGS
Definition: fmgr.h:158
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:98
#define elog
Definition: elog.h:219
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:105
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:73
Datum pg_replication_origin_create(PG_FUNCTION_ARGS)
Definition: origin.c:1180
#define REPLICATION_ORIGIN_PROGRESS_COLS
void XLogBeginInsert(void)
Definition: xloginsert.c:120
#define PG_RETURN_OID(x)
Definition: fmgr.h:320
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:78
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
Definition: origin.c:940
#define PG_RETURN_NULL()
Definition: fmgr.h:305
#define read(a, b, c)
Definition: win32.h:13
#define BTEqualStrategyNumber
Definition: stratnum.h:31
#define offsetof(type, field)
Definition: c.h:593