PostgreSQL Source Code  git master
origin.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * origin.c
4  * Logical replication progress tracking support.
5  *
6  * Copyright (c) 2013-2020, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/logical/origin.c
10  *
11  * NOTES
12  *
13  * This file provides the following:
14  * * An infrastructure to name nodes in a replication setup
15  * * A facility to efficiently store and persist replication progress in an
16  * efficient and durable manner.
17  *
18  * Replication origin consist out of a descriptive, user defined, external
19  * name and a short, thus space efficient, internal 2 byte one. This split
20  * exists because replication origin have to be stored in WAL and shared
21  * memory and long descriptors would be inefficient. For now only use 2 bytes
22  * for the internal id of a replication origin as it seems unlikely that there
23  * soon will be more than 65k nodes in one replication setup; and using only
24  * two bytes allow us to be more space efficient.
25  *
26  * Replication progress is tracked in a shared memory table
27  * (ReplicationState) that's dumped to disk every checkpoint. Entries
28  * ('slots') in this table are identified by the internal id. That's the case
29  * because it allows to increase replication progress during crash
30  * recovery. To allow doing so we store the original LSN (from the originating
31  * system) of a transaction in the commit record. That allows to recover the
32  * precise replayed state after crash recovery; without requiring synchronous
33  * commits. Allowing logical replication to use asynchronous commit is
34  * generally good for performance, but especially important as it allows a
35  * single threaded replay process to keep up with a source that has multiple
36  * backends generating changes concurrently. For efficiency and simplicity
37  * reasons a backend can setup one replication origin that's from then used as
38  * the source of changes produced by the backend, until reset again.
39  *
40  * This infrastructure is intended to be used in cooperation with logical
41  * decoding. When replaying from a remote system the configured origin is
42  * provided to output plugins, allowing prevention of replication loops and
43  * other filtering.
44  *
45  * There are several levels of locking at work:
46  *
47  * * To create and drop replication origins an exclusive lock on
48  * pg_replication_slot is required for the duration. That allows us to
49  * safely and conflict free assign new origins using a dirty snapshot.
50  *
51  * * When creating an in-memory replication progress slot the ReplicationOrigin
52  * LWLock has to be held exclusively; when iterating over the replication
53  * progress a shared lock has to be held, the same when advancing the
54  * replication progress of an individual backend that has not setup as the
55  * session's replication origin.
56  *
57  * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58  * replication progress slot that slot's lwlock has to be held. That's
59  * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60  * all our platforms, but it also simplifies memory ordering concerns
61  * between the remote and local lsn. We use a lwlock instead of a spinlock
62  * so it's less harmful to hold the lock over a WAL write
63  * (cf. AdvanceReplicationProgress).
64  *
65  * ---------------------------------------------------------------------------
66  */
67 
68 #include "postgres.h"
69 
70 #include <unistd.h>
71 #include <sys/stat.h>
72 
73 #include "access/genam.h"
74 #include "access/htup_details.h"
75 #include "access/table.h"
76 #include "access/xact.h"
77 #include "catalog/catalog.h"
78 #include "catalog/indexing.h"
79 #include "funcapi.h"
80 #include "miscadmin.h"
81 #include "nodes/execnodes.h"
82 #include "pgstat.h"
83 #include "replication/logical.h"
84 #include "replication/origin.h"
86 #include "storage/copydir.h"
87 #include "storage/fd.h"
88 #include "storage/ipc.h"
89 #include "storage/lmgr.h"
90 #include "utils/builtins.h"
91 #include "utils/fmgroids.h"
92 #include "utils/pg_lsn.h"
93 #include "utils/rel.h"
94 #include "utils/snapmgr.h"
95 #include "utils/syscache.h"
96 
97 /*
98  * Replay progress of a single remote node.
99  */
100 typedef struct ReplicationState
101 {
102  /*
103  * Local identifier for the remote node.
104  */
106 
107  /*
108  * Location of the latest commit from the remote side.
109  */
111 
112  /*
113  * Remember the local lsn of the commit record so we can XLogFlush() to it
114  * during a checkpoint so we know the commit record actually is safe on
115  * disk.
116  */
118 
119  /*
120  * PID of backend that's acquired slot, or 0 if none.
121  */
123 
124  /*
125  * Condition variable that's signalled when acquired_by changes.
126  */
128 
129  /*
130  * Lock protecting remote_lsn and local_lsn.
131  */
134 
135 /*
136  * On disk version of ReplicationState.
137  */
139 {
143 
144 
145 typedef struct ReplicationStateCtl
146 {
147  /* Tranche to use for per-origin LWLocks */
149  /* Array of length max_replication_slots */
152 
153 /* external variables */
157 
158 /*
159  * Base address into a shared memory array of replication states of size
160  * max_replication_slots.
161  *
162  * XXX: Should we use a separate variable to size this rather than
163  * max_replication_slots?
164  */
166 
167 /*
168  * Actual shared memory block (replication_states[] is now part of this).
169  */
171 
172 /*
173  * Backend-local, cached element from ReplicationState for use in a backend
174  * replaying remote commits, so we don't have to search ReplicationState for
175  * the backends current RepOriginId.
176  */
178 
179 /* Magic for on disk files. */
180 #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
181 
182 static void
183 replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
184 {
185  if (!superuser())
186  ereport(ERROR,
187  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
188  errmsg("only superusers can query or manipulate replication origins")));
189 
190  if (check_slots && max_replication_slots == 0)
191  ereport(ERROR,
192  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
193  errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
194 
195  if (!recoveryOK && RecoveryInProgress())
196  ereport(ERROR,
197  (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
198  errmsg("cannot manipulate replication origins during recovery")));
199 
200 }
201 
202 
203 /* ---------------------------------------------------------------------------
204  * Functions for working with replication origins themselves.
205  * ---------------------------------------------------------------------------
206  */
207 
208 /*
209  * Check for a persistent replication origin identified by name.
210  *
211  * Returns InvalidOid if the node isn't known yet and missing_ok is true.
212  */
214 replorigin_by_name(char *roname, bool missing_ok)
215 {
218  HeapTuple tuple;
219  Datum roname_d;
220 
221  roname_d = CStringGetTextDatum(roname);
222 
223  tuple = SearchSysCache1(REPLORIGNAME, roname_d);
224  if (HeapTupleIsValid(tuple))
225  {
226  ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
227  roident = ident->roident;
228  ReleaseSysCache(tuple);
229  }
230  else if (!missing_ok)
231  ereport(ERROR,
232  (errcode(ERRCODE_UNDEFINED_OBJECT),
233  errmsg("replication origin \"%s\" does not exist",
234  roname)));
235 
236  return roident;
237 }
238 
239 /*
240  * Create a replication origin.
241  *
242  * Needs to be called in a transaction.
243  */
245 replorigin_create(char *roname)
246 {
247  Oid roident;
248  HeapTuple tuple = NULL;
249  Relation rel;
250  Datum roname_d;
251  SnapshotData SnapshotDirty;
252  SysScanDesc scan;
254 
255  roname_d = CStringGetTextDatum(roname);
256 
258 
259  /*
260  * We need the numeric replication origin to be 16bit wide, so we cannot
261  * rely on the normal oid allocation. Instead we simply scan
262  * pg_replication_origin for the first unused id. That's not particularly
263  * efficient, but this should be a fairly infrequent operation - we can
264  * easily spend a bit more code on this when it turns out it needs to be
265  * faster.
266  *
267  * We handle concurrency by taking an exclusive lock (allowing reads!)
268  * over the table for the duration of the search. Because we use a "dirty
269  * snapshot" we can read rows that other in-progress sessions have
270  * written, even though they would be invisible with normal snapshots. Due
271  * to the exclusive lock there's no danger that new rows can appear while
272  * we're checking.
273  */
274  InitDirtySnapshot(SnapshotDirty);
275 
276  rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
277 
278  for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
279  {
280  bool nulls[Natts_pg_replication_origin];
281  Datum values[Natts_pg_replication_origin];
282  bool collides;
283 
285 
286  ScanKeyInit(&key,
287  Anum_pg_replication_origin_roident,
288  BTEqualStrategyNumber, F_OIDEQ,
289  ObjectIdGetDatum(roident));
290 
292  true /* indexOK */ ,
293  &SnapshotDirty,
294  1, &key);
295 
296  collides = HeapTupleIsValid(systable_getnext(scan));
297 
298  systable_endscan(scan);
299 
300  if (!collides)
301  {
302  /*
303  * Ok, found an unused roident, insert the new row and do a CCI,
304  * so our callers can look it up if they want to.
305  */
306  memset(&nulls, 0, sizeof(nulls));
307 
308  values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
309  values[Anum_pg_replication_origin_roname - 1] = roname_d;
310 
311  tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
312  CatalogTupleInsert(rel, tuple);
314  break;
315  }
316  }
317 
318  /* now release lock again, */
320 
321  if (tuple == NULL)
322  ereport(ERROR,
323  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
324  errmsg("could not find free replication origin OID")));
325 
326  heap_freetuple(tuple);
327  return roident;
328 }
329 
330 
331 /*
332  * Drop replication origin.
333  *
334  * Needs to be called in a transaction.
335  */
336 void
338 {
339  HeapTuple tuple;
340  Relation rel;
341  int i;
342 
344 
345  /*
346  * To interlock against concurrent drops, we hold ExclusiveLock on
347  * pg_replication_origin throughout this function.
348  */
349  rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
350 
351  /*
352  * First, clean up the slot state info, if there is any matching slot.
353  */
354 restart:
355  tuple = NULL;
356  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
357 
358  for (i = 0; i < max_replication_slots; i++)
359  {
360  ReplicationState *state = &replication_states[i];
361 
362  if (state->roident == roident)
363  {
364  /* found our slot, is it busy? */
365  if (state->acquired_by != 0)
366  {
367  ConditionVariable *cv;
368 
369  if (nowait)
370  ereport(ERROR,
371  (errcode(ERRCODE_OBJECT_IN_USE),
372  errmsg("could not drop replication origin with OID %d, in use by PID %d",
373  state->roident,
374  state->acquired_by)));
375 
376  /*
377  * We must wait and then retry. Since we don't know which CV
378  * to wait on until here, we can't readily use
379  * ConditionVariablePrepareToSleep (calling it here would be
380  * wrong, since we could miss the signal if we did so); just
381  * use ConditionVariableSleep directly.
382  */
383  cv = &state->origin_cv;
384 
385  LWLockRelease(ReplicationOriginLock);
386 
388  goto restart;
389  }
390 
391  /* first make a WAL log entry */
392  {
393  xl_replorigin_drop xlrec;
394 
395  xlrec.node_id = roident;
396  XLogBeginInsert();
397  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
398  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
399  }
400 
401  /* then clear the in-memory slot */
402  state->roident = InvalidRepOriginId;
403  state->remote_lsn = InvalidXLogRecPtr;
404  state->local_lsn = InvalidXLogRecPtr;
405  break;
406  }
407  }
408  LWLockRelease(ReplicationOriginLock);
410 
411  /*
412  * Now, we can delete the catalog entry.
413  */
415  if (!HeapTupleIsValid(tuple))
416  elog(ERROR, "cache lookup failed for replication origin with oid %u",
417  roident);
418 
419  CatalogTupleDelete(rel, &tuple->t_self);
420  ReleaseSysCache(tuple);
421 
423 
424  /* now release lock again */
426 }
427 
428 
429 /*
430  * Lookup replication origin via it's oid and return the name.
431  *
432  * The external name is palloc'd in the calling context.
433  *
434  * Returns true if the origin is known, false otherwise.
435  */
436 bool
437 replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
438 {
439  HeapTuple tuple;
441 
442  Assert(OidIsValid((Oid) roident));
443  Assert(roident != InvalidRepOriginId);
444  Assert(roident != DoNotReplicateId);
445 
447  ObjectIdGetDatum((Oid) roident));
448 
449  if (HeapTupleIsValid(tuple))
450  {
451  ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
452  *roname = text_to_cstring(&ric->roname);
453  ReleaseSysCache(tuple);
454 
455  return true;
456  }
457  else
458  {
459  *roname = NULL;
460 
461  if (!missing_ok)
462  ereport(ERROR,
463  (errcode(ERRCODE_UNDEFINED_OBJECT),
464  errmsg("replication origin with OID %u does not exist",
465  roident)));
466 
467  return false;
468  }
469 }
470 
471 
472 /* ---------------------------------------------------------------------------
473  * Functions for handling replication progress.
474  * ---------------------------------------------------------------------------
475  */
476 
477 Size
479 {
480  Size size = 0;
481 
482  /*
483  * XXX: max_replication_slots is arguably the wrong thing to use, as here
484  * we keep the replay state of *remote* transactions. But for now it seems
485  * sufficient to reuse it, rather than introduce a separate GUC.
486  */
487  if (max_replication_slots == 0)
488  return size;
489 
490  size = add_size(size, offsetof(ReplicationStateCtl, states));
491 
492  size = add_size(size,
494  return size;
495 }
496 
497 void
499 {
500  bool found;
501 
502  if (max_replication_slots == 0)
503  return;
504 
505  replication_states_ctl = (ReplicationStateCtl *)
506  ShmemInitStruct("ReplicationOriginState",
508  &found);
509  replication_states = replication_states_ctl->states;
510 
511  if (!found)
512  {
513  int i;
514 
515  MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
516 
517  replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
518 
519  for (i = 0; i < max_replication_slots; i++)
520  {
521  LWLockInitialize(&replication_states[i].lock,
522  replication_states_ctl->tranche_id);
523  ConditionVariableInit(&replication_states[i].origin_cv);
524  }
525  }
526 }
527 
528 /* ---------------------------------------------------------------------------
529  * Perform a checkpoint of each replication origin's progress with respect to
530  * the replayed remote_lsn. Make sure that all transactions we refer to in the
531  * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
532  * if the transactions were originally committed asynchronously.
533  *
534  * We store checkpoints in the following format:
535  * +-------+------------------------+------------------+-----+--------+
536  * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
537  * +-------+------------------------+------------------+-----+--------+
538  *
539  * So its just the magic, followed by the statically sized
540  * ReplicationStateOnDisk structs. Note that the maximum number of
541  * ReplicationState is determined by max_replication_slots.
542  * ---------------------------------------------------------------------------
543  */
544 void
546 {
547  const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
548  const char *path = "pg_logical/replorigin_checkpoint";
549  int tmpfd;
550  int i;
552  pg_crc32c crc;
553 
554  if (max_replication_slots == 0)
555  return;
556 
557  INIT_CRC32C(crc);
558 
559  /* make sure no old temp file is remaining */
560  if (unlink(tmppath) < 0 && errno != ENOENT)
561  ereport(PANIC,
563  errmsg("could not remove file \"%s\": %m",
564  tmppath)));
565 
566  /*
567  * no other backend can perform this at the same time, we're protected by
568  * CheckpointLock.
569  */
570  tmpfd = OpenTransientFile(tmppath,
571  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
572  if (tmpfd < 0)
573  ereport(PANIC,
575  errmsg("could not create file \"%s\": %m",
576  tmppath)));
577 
578  /* write magic */
579  errno = 0;
580  if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
581  {
582  /* if write didn't set errno, assume problem is no disk space */
583  if (errno == 0)
584  errno = ENOSPC;
585  ereport(PANIC,
587  errmsg("could not write to file \"%s\": %m",
588  tmppath)));
589  }
590  COMP_CRC32C(crc, &magic, sizeof(magic));
591 
592  /* prevent concurrent creations/drops */
593  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
594 
595  /* write actual data */
596  for (i = 0; i < max_replication_slots; i++)
597  {
598  ReplicationStateOnDisk disk_state;
599  ReplicationState *curstate = &replication_states[i];
601 
602  if (curstate->roident == InvalidRepOriginId)
603  continue;
604 
605  /* zero, to avoid uninitialized padding bytes */
606  memset(&disk_state, 0, sizeof(disk_state));
607 
608  LWLockAcquire(&curstate->lock, LW_SHARED);
609 
610  disk_state.roident = curstate->roident;
611 
612  disk_state.remote_lsn = curstate->remote_lsn;
613  local_lsn = curstate->local_lsn;
614 
615  LWLockRelease(&curstate->lock);
616 
617  /* make sure we only write out a commit that's persistent */
618  XLogFlush(local_lsn);
619 
620  errno = 0;
621  if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
622  sizeof(disk_state))
623  {
624  /* if write didn't set errno, assume problem is no disk space */
625  if (errno == 0)
626  errno = ENOSPC;
627  ereport(PANIC,
629  errmsg("could not write to file \"%s\": %m",
630  tmppath)));
631  }
632 
633  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
634  }
635 
636  LWLockRelease(ReplicationOriginLock);
637 
638  /* write out the CRC */
639  FIN_CRC32C(crc);
640  errno = 0;
641  if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
642  {
643  /* if write didn't set errno, assume problem is no disk space */
644  if (errno == 0)
645  errno = ENOSPC;
646  ereport(PANIC,
648  errmsg("could not write to file \"%s\": %m",
649  tmppath)));
650  }
651 
652  if (CloseTransientFile(tmpfd) != 0)
653  ereport(PANIC,
655  errmsg("could not close file \"%s\": %m",
656  tmppath)));
657 
658  /* fsync, rename to permanent file, fsync file and directory */
659  durable_rename(tmppath, path, PANIC);
660 }
661 
662 /*
663  * Recover replication replay status from checkpoint data saved earlier by
664  * CheckPointReplicationOrigin.
665  *
666  * This only needs to be called at startup and *not* during every checkpoint
667  * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
668  * state thereafter can be recovered by looking at commit records.
669  */
670 void
672 {
673  const char *path = "pg_logical/replorigin_checkpoint";
674  int fd;
675  int readBytes;
677  int last_state = 0;
678  pg_crc32c file_crc;
679  pg_crc32c crc;
680 
681  /* don't want to overwrite already existing state */
682 #ifdef USE_ASSERT_CHECKING
683  static bool already_started = false;
684 
685  Assert(!already_started);
686  already_started = true;
687 #endif
688 
689  if (max_replication_slots == 0)
690  return;
691 
692  INIT_CRC32C(crc);
693 
694  elog(DEBUG2, "starting up replication origin progress state");
695 
696  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
697 
698  /*
699  * might have had max_replication_slots == 0 last run, or we just brought
700  * up a standby.
701  */
702  if (fd < 0 && errno == ENOENT)
703  return;
704  else if (fd < 0)
705  ereport(PANIC,
707  errmsg("could not open file \"%s\": %m",
708  path)));
709 
710  /* verify magic, that is written even if nothing was active */
711  readBytes = read(fd, &magic, sizeof(magic));
712  if (readBytes != sizeof(magic))
713  {
714  if (readBytes < 0)
715  ereport(PANIC,
717  errmsg("could not read file \"%s\": %m",
718  path)));
719  else
720  ereport(PANIC,
722  errmsg("could not read file \"%s\": read %d of %zu",
723  path, readBytes, sizeof(magic))));
724  }
725  COMP_CRC32C(crc, &magic, sizeof(magic));
726 
727  if (magic != REPLICATION_STATE_MAGIC)
728  ereport(PANIC,
729  (errmsg("replication checkpoint has wrong magic %u instead of %u",
730  magic, REPLICATION_STATE_MAGIC)));
731 
732  /* we can skip locking here, no other access is possible */
733 
734  /* recover individual states, until there are no more to be found */
735  while (true)
736  {
737  ReplicationStateOnDisk disk_state;
738 
739  readBytes = read(fd, &disk_state, sizeof(disk_state));
740 
741  /* no further data */
742  if (readBytes == sizeof(crc))
743  {
744  /* not pretty, but simple ... */
745  file_crc = *(pg_crc32c *) &disk_state;
746  break;
747  }
748 
749  if (readBytes < 0)
750  {
751  ereport(PANIC,
753  errmsg("could not read file \"%s\": %m",
754  path)));
755  }
756 
757  if (readBytes != sizeof(disk_state))
758  {
759  ereport(PANIC,
761  errmsg("could not read file \"%s\": read %d of %zu",
762  path, readBytes, sizeof(disk_state))));
763  }
764 
765  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
766 
767  if (last_state == max_replication_slots)
768  ereport(PANIC,
769  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
770  errmsg("could not find free replication state, increase max_replication_slots")));
771 
772  /* copy data to shared memory */
773  replication_states[last_state].roident = disk_state.roident;
774  replication_states[last_state].remote_lsn = disk_state.remote_lsn;
775  last_state++;
776 
777  elog(LOG, "recovered replication state of node %u to %X/%X",
778  disk_state.roident,
779  (uint32) (disk_state.remote_lsn >> 32),
780  (uint32) disk_state.remote_lsn);
781  }
782 
783  /* now check checksum */
784  FIN_CRC32C(crc);
785  if (file_crc != crc)
786  ereport(PANIC,
787  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
788  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
789  crc, file_crc)));
790 
791  if (CloseTransientFile(fd) != 0)
792  ereport(PANIC,
794  errmsg("could not close file \"%s\": %m",
795  path)));
796 }
797 
798 void
800 {
801  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
802 
803  switch (info)
804  {
805  case XLOG_REPLORIGIN_SET:
806  {
807  xl_replorigin_set *xlrec =
808  (xl_replorigin_set *) XLogRecGetData(record);
809 
811  xlrec->remote_lsn, record->EndRecPtr,
812  xlrec->force /* backward */ ,
813  false /* WAL log */ );
814  break;
815  }
817  {
818  xl_replorigin_drop *xlrec;
819  int i;
820 
821  xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
822 
823  for (i = 0; i < max_replication_slots; i++)
824  {
825  ReplicationState *state = &replication_states[i];
826 
827  /* found our slot */
828  if (state->roident == xlrec->node_id)
829  {
830  /* reset entry */
831  state->roident = InvalidRepOriginId;
832  state->remote_lsn = InvalidXLogRecPtr;
833  state->local_lsn = InvalidXLogRecPtr;
834  break;
835  }
836  }
837  break;
838  }
839  default:
840  elog(PANIC, "replorigin_redo: unknown op code %u", info);
841  }
842 }
843 
844 
845 /*
846  * Tell the replication origin progress machinery that a commit from 'node'
847  * that originated at the LSN remote_commit on the remote node was replayed
848  * successfully and that we don't need to do so again. In combination with
849  * setting up replorigin_session_origin_lsn and replorigin_session_origin
850  * that ensures we won't loose knowledge about that after a crash if the
851  * transaction had a persistent effect (think of asynchronous commits).
852  *
853  * local_commit needs to be a local LSN of the commit so that we can make sure
854  * upon a checkpoint that enough WAL has been persisted to disk.
855  *
856  * Needs to be called with a RowExclusiveLock on pg_replication_origin,
857  * unless running in recovery.
858  */
859 void
861  XLogRecPtr remote_commit, XLogRecPtr local_commit,
862  bool go_backward, bool wal_log)
863 {
864  int i;
865  ReplicationState *replication_state = NULL;
866  ReplicationState *free_state = NULL;
867 
868  Assert(node != InvalidRepOriginId);
869 
870  /* we don't track DoNotReplicateId */
871  if (node == DoNotReplicateId)
872  return;
873 
874  /*
875  * XXX: For the case where this is called by WAL replay, it'd be more
876  * efficient to restore into a backend local hashtable and only dump into
877  * shmem after recovery is finished. Let's wait with implementing that
878  * till it's shown to be a measurable expense
879  */
880 
881  /* Lock exclusively, as we may have to create a new table entry. */
882  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
883 
884  /*
885  * Search for either an existing slot for the origin, or a free one we can
886  * use.
887  */
888  for (i = 0; i < max_replication_slots; i++)
889  {
890  ReplicationState *curstate = &replication_states[i];
891 
892  /* remember where to insert if necessary */
893  if (curstate->roident == InvalidRepOriginId &&
894  free_state == NULL)
895  {
896  free_state = curstate;
897  continue;
898  }
899 
900  /* not our slot */
901  if (curstate->roident != node)
902  {
903  continue;
904  }
905 
906  /* ok, found slot */
907  replication_state = curstate;
908 
909  LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
910 
911  /* Make sure it's not used by somebody else */
912  if (replication_state->acquired_by != 0)
913  {
914  ereport(ERROR,
915  (errcode(ERRCODE_OBJECT_IN_USE),
916  errmsg("replication origin with OID %d is already active for PID %d",
917  replication_state->roident,
918  replication_state->acquired_by)));
919  }
920 
921  break;
922  }
923 
924  if (replication_state == NULL && free_state == NULL)
925  ereport(ERROR,
926  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
927  errmsg("could not find free replication state slot for replication origin with OID %u",
928  node),
929  errhint("Increase max_replication_slots and try again.")));
930 
931  if (replication_state == NULL)
932  {
933  /* initialize new slot */
934  LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
935  replication_state = free_state;
936  Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
937  Assert(replication_state->local_lsn == InvalidXLogRecPtr);
938  replication_state->roident = node;
939  }
940 
941  Assert(replication_state->roident != InvalidRepOriginId);
942 
943  /*
944  * If somebody "forcefully" sets this slot, WAL log it, so it's durable
945  * and the standby gets the message. Primarily this will be called during
946  * WAL replay (of commit records) where no WAL logging is necessary.
947  */
948  if (wal_log)
949  {
950  xl_replorigin_set xlrec;
951 
952  xlrec.remote_lsn = remote_commit;
953  xlrec.node_id = node;
954  xlrec.force = go_backward;
955 
956  XLogBeginInsert();
957  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
958 
959  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
960  }
961 
962  /*
963  * Due to - harmless - race conditions during a checkpoint we could see
964  * values here that are older than the ones we already have in memory.
965  * Don't overwrite those.
966  */
967  if (go_backward || replication_state->remote_lsn < remote_commit)
968  replication_state->remote_lsn = remote_commit;
969  if (local_commit != InvalidXLogRecPtr &&
970  (go_backward || replication_state->local_lsn < local_commit))
971  replication_state->local_lsn = local_commit;
972  LWLockRelease(&replication_state->lock);
973 
974  /*
975  * Release *after* changing the LSNs, slot isn't acquired and thus could
976  * otherwise be dropped anytime.
977  */
978  LWLockRelease(ReplicationOriginLock);
979 }
980 
981 
984 {
985  int i;
988 
989  /* prevent slots from being concurrently dropped */
990  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
991 
992  for (i = 0; i < max_replication_slots; i++)
993  {
995 
996  state = &replication_states[i];
997 
998  if (state->roident == node)
999  {
1000  LWLockAcquire(&state->lock, LW_SHARED);
1001 
1002  remote_lsn = state->remote_lsn;
1003  local_lsn = state->local_lsn;
1004 
1005  LWLockRelease(&state->lock);
1006 
1007  break;
1008  }
1009  }
1010 
1011  LWLockRelease(ReplicationOriginLock);
1012 
1013  if (flush && local_lsn != InvalidXLogRecPtr)
1014  XLogFlush(local_lsn);
1015 
1016  return remote_lsn;
1017 }
1018 
1019 /*
1020  * Tear down a (possibly) configured session replication origin during process
1021  * exit.
1022  */
1023 static void
1025 {
1026  ConditionVariable *cv = NULL;
1027 
1028  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1029 
1030  if (session_replication_state != NULL &&
1031  session_replication_state->acquired_by == MyProcPid)
1032  {
1033  cv = &session_replication_state->origin_cv;
1034 
1035  session_replication_state->acquired_by = 0;
1036  session_replication_state = NULL;
1037  }
1038 
1039  LWLockRelease(ReplicationOriginLock);
1040 
1041  if (cv)
1043 }
1044 
1045 /*
1046  * Setup a replication origin in the shared memory struct if it doesn't
1047  * already exists and cache access to the specific ReplicationSlot so the
1048  * array doesn't have to be searched when calling
1049  * replorigin_session_advance().
1050  *
1051  * Obviously only one such cached origin can exist per process and the current
1052  * cached value can only be set again after the previous value is torn down
1053  * with replorigin_session_reset().
1054  */
1055 void
1057 {
1058  static bool registered_cleanup;
1059  int i;
1060  int free_slot = -1;
1061 
1062  if (!registered_cleanup)
1063  {
1065  registered_cleanup = true;
1066  }
1067 
1069 
1070  if (session_replication_state != NULL)
1071  ereport(ERROR,
1072  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1073  errmsg("cannot setup replication origin when one is already setup")));
1074 
1075  /* Lock exclusively, as we may have to create a new table entry. */
1076  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1077 
1078  /*
1079  * Search for either an existing slot for the origin, or a free one we can
1080  * use.
1081  */
1082  for (i = 0; i < max_replication_slots; i++)
1083  {
1084  ReplicationState *curstate = &replication_states[i];
1085 
1086  /* remember where to insert if necessary */
1087  if (curstate->roident == InvalidRepOriginId &&
1088  free_slot == -1)
1089  {
1090  free_slot = i;
1091  continue;
1092  }
1093 
1094  /* not our slot */
1095  if (curstate->roident != node)
1096  continue;
1097 
1098  else if (curstate->acquired_by != 0)
1099  {
1100  ereport(ERROR,
1101  (errcode(ERRCODE_OBJECT_IN_USE),
1102  errmsg("replication origin with OID %d is already active for PID %d",
1103  curstate->roident, curstate->acquired_by)));
1104  }
1105 
1106  /* ok, found slot */
1107  session_replication_state = curstate;
1108  }
1109 
1110 
1111  if (session_replication_state == NULL && free_slot == -1)
1112  ereport(ERROR,
1113  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1114  errmsg("could not find free replication state slot for replication origin with OID %u",
1115  node),
1116  errhint("Increase max_replication_slots and try again.")));
1117  else if (session_replication_state == NULL)
1118  {
1119  /* initialize new slot */
1120  session_replication_state = &replication_states[free_slot];
1121  Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
1122  Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
1123  session_replication_state->roident = node;
1124  }
1125 
1126 
1127  Assert(session_replication_state->roident != InvalidRepOriginId);
1128 
1129  session_replication_state->acquired_by = MyProcPid;
1130 
1131  LWLockRelease(ReplicationOriginLock);
1132 
1133  /* probably this one is pointless */
1134  ConditionVariableBroadcast(&session_replication_state->origin_cv);
1135 }
1136 
1137 /*
1138  * Reset replay state previously setup in this session.
1139  *
1140  * This function may only be called if an origin was setup with
1141  * replorigin_session_setup().
1142  */
1143 void
1145 {
1146  ConditionVariable *cv;
1147 
1149 
1150  if (session_replication_state == NULL)
1151  ereport(ERROR,
1152  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1153  errmsg("no replication origin is configured")));
1154 
1155  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1156 
1157  session_replication_state->acquired_by = 0;
1158  cv = &session_replication_state->origin_cv;
1159  session_replication_state = NULL;
1160 
1161  LWLockRelease(ReplicationOriginLock);
1162 
1164 }
1165 
1166 /*
1167  * Do the same work replorigin_advance() does, just on the session's
1168  * configured origin.
1169  *
1170  * This is noticeably cheaper than using replorigin_advance().
1171  */
1172 void
1174 {
1175  Assert(session_replication_state != NULL);
1176  Assert(session_replication_state->roident != InvalidRepOriginId);
1177 
1178  LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1179  if (session_replication_state->local_lsn < local_commit)
1180  session_replication_state->local_lsn = local_commit;
1181  if (session_replication_state->remote_lsn < remote_commit)
1182  session_replication_state->remote_lsn = remote_commit;
1183  LWLockRelease(&session_replication_state->lock);
1184 }
1185 
1186 /*
1187  * Ask the machinery about the point up to which we successfully replayed
1188  * changes from an already setup replication origin.
1189  */
1190 XLogRecPtr
1192 {
1195 
1196  Assert(session_replication_state != NULL);
1197 
1198  LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1199  remote_lsn = session_replication_state->remote_lsn;
1200  local_lsn = session_replication_state->local_lsn;
1201  LWLockRelease(&session_replication_state->lock);
1202 
1203  if (flush && local_lsn != InvalidXLogRecPtr)
1204  XLogFlush(local_lsn);
1205 
1206  return remote_lsn;
1207 }
1208 
1209 
1210 
1211 /* ---------------------------------------------------------------------------
1212  * SQL functions for working with replication origin.
1213  *
1214  * These mostly should be fairly short wrappers around more generic functions.
1215  * ---------------------------------------------------------------------------
1216  */
1217 
1218 /*
1219  * Create replication origin for the passed in name, and return the assigned
1220  * oid.
1221  */
1222 Datum
1224 {
1225  char *name;
1227 
1228  replorigin_check_prerequisites(false, false);
1229 
1231 
1232  /* Replication origins "pg_xxx" are reserved for internal use */
1233  if (IsReservedName(name))
1234  ereport(ERROR,
1235  (errcode(ERRCODE_RESERVED_NAME),
1236  errmsg("replication origin name \"%s\" is reserved",
1237  name),
1238  errdetail("Origin names starting with \"pg_\" are reserved.")));
1239 
1240  /*
1241  * If built with appropriate switch, whine when regression-testing
1242  * conventions for replication origin names are violated.
1243  */
1244 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1245  if (strncmp(name, "regress_", 8) != 0)
1246  elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1247 #endif
1248 
1249  roident = replorigin_create(name);
1250 
1251  pfree(name);
1252 
1253  PG_RETURN_OID(roident);
1254 }
1255 
1256 /*
1257  * Drop replication origin.
1258  */
1259 Datum
1261 {
1262  char *name;
1264 
1265  replorigin_check_prerequisites(false, false);
1266 
1268 
1269  roident = replorigin_by_name(name, false);
1270  Assert(OidIsValid(roident));
1271 
1272  replorigin_drop(roident, true);
1273 
1274  pfree(name);
1275 
1276  PG_RETURN_VOID();
1277 }
1278 
1279 /*
1280  * Return oid of a replication origin.
1281  */
1282 Datum
1284 {
1285  char *name;
1287 
1288  replorigin_check_prerequisites(false, false);
1289 
1291  roident = replorigin_by_name(name, true);
1292 
1293  pfree(name);
1294 
1295  if (OidIsValid(roident))
1296  PG_RETURN_OID(roident);
1297  PG_RETURN_NULL();
1298 }
1299 
1300 /*
1301  * Setup a replication origin for this session.
1302  */
1303 Datum
1305 {
1306  char *name;
1307  RepOriginId origin;
1308 
1309  replorigin_check_prerequisites(true, false);
1310 
1312  origin = replorigin_by_name(name, false);
1313  replorigin_session_setup(origin);
1314 
1315  replorigin_session_origin = origin;
1316 
1317  pfree(name);
1318 
1319  PG_RETURN_VOID();
1320 }
1321 
1322 /*
1323  * Reset previously setup origin in this session
1324  */
1325 Datum
1327 {
1328  replorigin_check_prerequisites(true, false);
1329 
1331 
1335 
1336  PG_RETURN_VOID();
1337 }
1338 
1339 /*
1340  * Has a replication origin been setup for this session.
1341  */
1342 Datum
1344 {
1345  replorigin_check_prerequisites(false, false);
1346 
1348 }
1349 
1350 
1351 /*
1352  * Return the replication progress for origin setup in the current session.
1353  *
1354  * If 'flush' is set to true it is ensured that the returned value corresponds
1355  * to a local transaction that has been flushed. This is useful if asynchronous
1356  * commits are used when replaying replicated transactions.
1357  */
1358 Datum
1360 {
1362  bool flush = PG_GETARG_BOOL(0);
1363 
1364  replorigin_check_prerequisites(true, false);
1365 
1366  if (session_replication_state == NULL)
1367  ereport(ERROR,
1368  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1369  errmsg("no replication origin is configured")));
1370 
1371  remote_lsn = replorigin_session_get_progress(flush);
1372 
1373  if (remote_lsn == InvalidXLogRecPtr)
1374  PG_RETURN_NULL();
1375 
1376  PG_RETURN_LSN(remote_lsn);
1377 }
1378 
1379 Datum
1381 {
1382  XLogRecPtr location = PG_GETARG_LSN(0);
1383 
1384  replorigin_check_prerequisites(true, false);
1385 
1386  if (session_replication_state == NULL)
1387  ereport(ERROR,
1388  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1389  errmsg("no replication origin is configured")));
1390 
1391  replorigin_session_origin_lsn = location;
1393 
1394  PG_RETURN_VOID();
1395 }
1396 
1397 Datum
1399 {
1400  replorigin_check_prerequisites(true, false);
1401 
1404 
1405  PG_RETURN_VOID();
1406 }
1407 
1408 
1409 Datum
1411 {
1412  text *name = PG_GETARG_TEXT_PP(0);
1413  XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1414  RepOriginId node;
1415 
1416  replorigin_check_prerequisites(true, false);
1417 
1418  /* lock to prevent the replication origin from vanishing */
1419  LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1420 
1421  node = replorigin_by_name(text_to_cstring(name), false);
1422 
1423  /*
1424  * Can't sensibly pass a local commit to be flushed at checkpoint - this
1425  * xact hasn't committed yet. This is why this function should be used to
1426  * set up the initial replication state, but not for replay.
1427  */
1428  replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1429  true /* go backward */ , true /* WAL log */ );
1430 
1431  UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1432 
1433  PG_RETURN_VOID();
1434 }
1435 
1436 
1437 /*
1438  * Return the replication progress for an individual replication origin.
1439  *
1440  * If 'flush' is set to true it is ensured that the returned value corresponds
1441  * to a local transaction that has been flushed. This is useful if asynchronous
1442  * commits are used when replaying replicated transactions.
1443  */
1444 Datum
1446 {
1447  char *name;
1448  bool flush;
1451 
1452  replorigin_check_prerequisites(true, true);
1453 
1455  flush = PG_GETARG_BOOL(1);
1456 
1457  roident = replorigin_by_name(name, false);
1458  Assert(OidIsValid(roident));
1459 
1460  remote_lsn = replorigin_get_progress(roident, flush);
1461 
1462  if (remote_lsn == InvalidXLogRecPtr)
1463  PG_RETURN_NULL();
1464 
1465  PG_RETURN_LSN(remote_lsn);
1466 }
1467 
1468 
1469 Datum
1471 {
1472  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1473  TupleDesc tupdesc;
1474  Tuplestorestate *tupstore;
1475  MemoryContext per_query_ctx;
1476  MemoryContext oldcontext;
1477  int i;
1479 
1480  /* we want to return 0 rows if slot is set to zero */
1481  replorigin_check_prerequisites(false, true);
1482 
1483  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1484  ereport(ERROR,
1485  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1486  errmsg("set-valued function called in context that cannot accept a set")));
1487  if (!(rsinfo->allowedModes & SFRM_Materialize))
1488  ereport(ERROR,
1489  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1490  errmsg("materialize mode required, but it is not allowed in this context")));
1491  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1492  elog(ERROR, "return type must be a row type");
1493 
1494  if (tupdesc->natts != REPLICATION_ORIGIN_PROGRESS_COLS)
1495  elog(ERROR, "wrong function definition");
1496 
1497  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1498  oldcontext = MemoryContextSwitchTo(per_query_ctx);
1499 
1500  tupstore = tuplestore_begin_heap(true, false, work_mem);
1501  rsinfo->returnMode = SFRM_Materialize;
1502  rsinfo->setResult = tupstore;
1503  rsinfo->setDesc = tupdesc;
1504 
1505  MemoryContextSwitchTo(oldcontext);
1506 
1507 
1508  /* prevent slots from being concurrently dropped */
1509  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1510 
1511  /*
1512  * Iterate through all possible replication_states, display if they are
1513  * filled. Note that we do not take any locks, so slightly corrupted/out
1514  * of date values are a possibility.
1515  */
1516  for (i = 0; i < max_replication_slots; i++)
1517  {
1521  char *roname;
1522 
1523  state = &replication_states[i];
1524 
1525  /* unused slot, nothing to display */
1526  if (state->roident == InvalidRepOriginId)
1527  continue;
1528 
1529  memset(values, 0, sizeof(values));
1530  memset(nulls, 1, sizeof(nulls));
1531 
1532  values[0] = ObjectIdGetDatum(state->roident);
1533  nulls[0] = false;
1534 
1535  /*
1536  * We're not preventing the origin to be dropped concurrently, so
1537  * silently accept that it might be gone.
1538  */
1539  if (replorigin_by_oid(state->roident, true,
1540  &roname))
1541  {
1542  values[1] = CStringGetTextDatum(roname);
1543  nulls[1] = false;
1544  }
1545 
1546  LWLockAcquire(&state->lock, LW_SHARED);
1547 
1548  values[2] = LSNGetDatum(state->remote_lsn);
1549  nulls[2] = false;
1550 
1551  values[3] = LSNGetDatum(state->local_lsn);
1552  nulls[3] = false;
1553 
1554  LWLockRelease(&state->lock);
1555 
1556  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1557  }
1558 
1559  tuplestore_donestoring(tupstore);
1560 
1561  LWLockRelease(ReplicationOriginLock);
1562 
1563 #undef REPLICATION_ORIGIN_PROGRESS_COLS
1564 
1565  return (Datum) 0;
1566 }
static ReplicationState * session_replication_state
Definition: origin.c:177
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:750
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:183
Definition: lwlock.h:32
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define IsA(nodeptr, _type_)
Definition: nodes.h:580
#define InitDirtySnapshot(snapshotdata)
Definition: snapmgr.h:76
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:205
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
int MyProcPid
Definition: globals.c:40
int errhint(const char *fmt,...)
Definition: elog.c:1071
XLogRecPtr local_lsn
Definition: origin.c:117
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:529
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
#define RelationGetDescr(relation)
Definition: rel.h:482
#define DoNotReplicateId
Definition: origin.h:34
Datum pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
Definition: origin.c:1380
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:199
#define write(a, b, c)
Definition: win32.h:14
Datum pg_replication_origin_drop(PG_FUNCTION_ARGS)
Definition: origin.c:1260
#define ExclusiveLock
Definition: lockdefs.h:44
int64 TimestampTz
Definition: timestamp.h:39
XLogRecPtr remote_lsn
Definition: origin.c:141
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:268
uint32 pg_crc32c
Definition: pg_crc32c.h:38
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition: origin.c:1024
RepOriginId roident
Definition: origin.c:140
void replorigin_drop(RepOriginId roident, bool nowait)
Definition: origin.c:337
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
unsigned char uint8
Definition: c.h:365
uint16 RepOriginId
Definition: xlogdefs.h:58
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:276
void ConditionVariableBroadcast(ConditionVariable *cv)
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1191
int errcode(int sqlerrcode)
Definition: elog.c:610
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
bool superuser(void)
Definition: superuser.c:46
#define MemSet(start, val, len)
Definition: c.h:971
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:269
void ReplicationOriginShmemInit(void)
Definition: origin.c:498
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:860
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:274
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1056
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:437
#define LOG
Definition: elog.h:26
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
bool RecoveryInProgress(void)
Definition: xlog.c:8069
bool IsReservedName(const char *name)
Definition: catalog.c:212
#define OidIsValid(objectId)
Definition: c.h:644
#define PANIC
Definition: elog.h:53
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2844
static int fd(const char *x, int i)
Definition: preproc-init.c:105
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:356
#define PG_BINARY
Definition: c.h:1234
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:214
RepOriginId roident
Definition: origin.c:105
XLogRecPtr EndRecPtr
Definition: xlogreader.h:176
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:308
Datum pg_replication_origin_advance(PG_FUNCTION_ARGS)
Definition: origin.c:1410
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
#define PG_RETURN_LSN(x)
Definition: pg_lsn.h:25
void CheckPointReplicationOrigin(void)
Definition: origin.c:545
void ConditionVariableInit(ConditionVariable *cv)
void replorigin_redo(XLogReaderState *record)
Definition: origin.c:799
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:448
void pfree(void *pointer)
Definition: mcxt.c:1056
#define XLogRecGetData(decoder)
Definition: xlogreader.h:311
void ConditionVariableCancelSleep(void)
#define ReplicationOriginIdentIndex
Definition: indexing.h:337
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
#define PG_UINT16_MAX
Definition: c.h:448
void replorigin_session_reset(void)
Definition: origin.c:1144
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2370
FormData_pg_replication_origin * Form_pg_replication_origin
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:392
LWLock lock
Definition: origin.c:132
ItemPointerData t_self
Definition: htup.h:65
#define DEBUG2
Definition: elog.h:24
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:155
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:361
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:156
#define XLOG_REPLORIGIN_SET
Definition: origin.h:30
void StartupReplicationOrigin(void)
Definition: origin.c:671
#define RowExclusiveLock
Definition: lockdefs.h:38
int errdetail(const char *fmt,...)
Definition: elog.c:957
int errcode_for_file_access(void)
Definition: elog.c:633
struct ReplicationState ReplicationState
unsigned int uint32
Definition: c.h:367
Datum pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
Definition: origin.c:1304
Datum pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
Definition: origin.c:1326
RepOriginId replorigin_create(char *roname)
Definition: origin.c:245
Datum pg_show_replication_origin_status(PG_FUNCTION_ARGS)
Definition: origin.c:1470
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:307
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:656
Datum pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
Definition: origin.c:1343
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:45
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:745
int CloseTransientFile(int fd)
Definition: fd.c:2547
#define WARNING
Definition: elog.h:40
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1116
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:324
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:318
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:416
#define XLOG_REPLORIGIN_DROP
Definition: origin.h:31
Size mul_size(Size s1, Size s2)
Definition: shmem.c:515
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:24
struct ReplicationStateOnDisk ReplicationStateOnDisk
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:358
uintptr_t Datum
Definition: postgres.h:367
void CommandCounterIncrement(void)
Definition: xact.c:1006
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1164
Size add_size(Size s1, Size s2)
Definition: shmem.c:498
int work_mem
Definition: globals.c:121
Size ReplicationOriginShmemSize(void)
Definition: origin.c:478
#define REPLICATION_STATE_MAGIC
Definition: origin.c:180
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:144
static ReplicationStateCtl * replication_states_ctl
Definition: origin.c:170
Datum pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
Definition: origin.c:1398
int allowedModes
Definition: execnodes.h:305
#define PG_RETURN_VOID()
Definition: fmgr.h:348
struct ReplicationStateCtl ReplicationStateCtl
SetFunctionReturnMode returnMode
Definition: execnodes.h:307
int max_replication_slots
Definition: slot.c:99
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
XLogRecPtr remote_lsn
Definition: origin.c:110
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
Datum pg_replication_origin_progress(PG_FUNCTION_ARGS)
Definition: origin.c:1445
RepOriginId node_id
Definition: origin.h:27
uint64 XLogRecPtr
Definition: xlogdefs.h:21
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition: origin.c:150
#define Assert(condition)
Definition: c.h:738
#define XLR_INFO_MASK
Definition: xlogrecord.h:62
RepOriginId replorigin_session_origin
Definition: origin.c:154
Definition: regguts.h:298
RepOriginId node_id
Definition: origin.h:21
ConditionVariable origin_cv
Definition: origin.c:127
size_t Size
Definition: c.h:466
void replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
Definition: origin.c:1173
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
bool IsTransactionState(void)
Definition: xact.c:356
XLogRecPtr remote_lsn
Definition: origin.h:20
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:233
const char * name
Definition: encode.c:555
#define InvalidRepOriginId
Definition: origin.h:33
Tuplestorestate * setResult
Definition: execnodes.h:310
#define DatumGetPointer(X)
Definition: postgres.h:549
static Datum values[MAXATTR]
Definition: bootstrap.c:167
char * text_to_cstring(const text *t)
Definition: varlena.c:205
ExprContext * econtext
Definition: execnodes.h:303
TupleDesc setDesc
Definition: execnodes.h:311
int errmsg(const char *fmt,...)
Definition: elog.c:824
Datum pg_replication_origin_oid(PG_FUNCTION_ARGS)
Definition: origin.c:1283
#define PG_GETARG_TIMESTAMPTZ(n)
Definition: timestamp.h:36
#define elog(elevel,...)
Definition: elog.h:214
int i
Datum pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
Definition: origin.c:1359
static ReplicationState * replication_states
Definition: origin.c:165
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
#define CStringGetTextDatum(s)
Definition: builtins.h:87
void * arg
Definition: c.h:555
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:108
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:89
Datum pg_replication_origin_create(PG_FUNCTION_ARGS)
Definition: origin.c:1223
#define REPLICATION_ORIGIN_PROGRESS_COLS
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
void XLogBeginInsert(void)
Definition: xloginsert.c:121
#define PG_RETURN_OID(x)
Definition: fmgr.h:359
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:94
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
Definition: origin.c:983
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:183
#define PG_RETURN_NULL()
Definition: fmgr.h:344
#define read(a, b, c)
Definition: win32.h:13
#define BTEqualStrategyNumber
Definition: stratnum.h:31
#define offsetof(type, field)
Definition: c.h:661