PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
origin.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * origin.c
4  * Logical replication progress tracking support.
5  *
6  * Copyright (c) 2013-2017, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/logical/origin.c
10  *
11  * NOTES
12  *
13  * This file provides the following:
14  * * An infrastructure to name nodes in a replication setup
15  * * A facility to efficiently store and persist replication progress in an
16  * efficient and durable manner.
17  *
18  * Replication origin consist out of a descriptive, user defined, external
19  * name and a short, thus space efficient, internal 2 byte one. This split
20  * exists because replication origin have to be stored in WAL and shared
21  * memory and long descriptors would be inefficient. For now only use 2 bytes
22  * for the internal id of a replication origin as it seems unlikely that there
23  * soon will be more than 65k nodes in one replication setup; and using only
24  * two bytes allow us to be more space efficient.
25  *
26  * Replication progress is tracked in a shared memory table
27  * (ReplicationStates) that's dumped to disk every checkpoint. Entries
28  * ('slots') in this table are identified by the internal id. That's the case
29  * because it allows to increase replication progress during crash
30  * recovery. To allow doing so we store the original LSN (from the originating
31  * system) of a transaction in the commit record. That allows to recover the
32  * precise replayed state after crash recovery; without requiring synchronous
33  * commits. Allowing logical replication to use asynchronous commit is
34  * generally good for performance, but especially important as it allows a
35  * single threaded replay process to keep up with a source that has multiple
36  * backends generating changes concurrently. For efficiency and simplicity
37  * reasons a backend can setup one replication origin that's from then used as
38  * the source of changes produced by the backend, until reset again.
39  *
40  * This infrastructure is intended to be used in cooperation with logical
41  * decoding. When replaying from a remote system the configured origin is
42  * provided to output plugins, allowing prevention of replication loops and
43  * other filtering.
44  *
45  * There are several levels of locking at work:
46  *
47  * * To create and drop replication origins an exclusive lock on
48  * pg_replication_slot is required for the duration. That allows us to
49  * safely and conflict free assign new origins using a dirty snapshot.
50  *
51  * * When creating an in-memory replication progress slot the ReplicationOirgin
52  * LWLock has to be held exclusively; when iterating over the replication
53  * progress a shared lock has to be held, the same when advancing the
54  * replication progress of an individual backend that has not setup as the
55  * session's replication origin.
56  *
57  * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58  * replication progress slot that slot's lwlock has to be held. That's
59  * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60  * all our platforms, but it also simplifies memory ordering concerns
61  * between the remote and local lsn. We use a lwlock instead of a spinlock
62  * so it's less harmful to hold the lock over a WAL write
63  * (c.f. AdvanceReplicationProgress).
64  *
65  * ---------------------------------------------------------------------------
66  */
67 
68 #include "postgres.h"
69 
70 #include <unistd.h>
71 #include <sys/stat.h>
72 
73 #include "funcapi.h"
74 #include "miscadmin.h"
75 
76 #include "access/genam.h"
77 #include "access/heapam.h"
78 #include "access/htup_details.h"
79 #include "access/xact.h"
80 
81 #include "catalog/indexing.h"
82 
83 #include "nodes/execnodes.h"
84 
85 #include "replication/origin.h"
86 #include "replication/logical.h"
87 
88 #include "storage/fd.h"
89 #include "storage/ipc.h"
90 #include "storage/lmgr.h"
91 #include "storage/copydir.h"
92 
93 #include "utils/builtins.h"
94 #include "utils/fmgroids.h"
95 #include "utils/pg_lsn.h"
96 #include "utils/rel.h"
97 #include "utils/syscache.h"
98 #include "utils/tqual.h"
99 
100 /*
101  * Replay progress of a single remote node.
102  */
103 typedef struct ReplicationState
104 {
105  /*
106  * Local identifier for the remote node.
107  */
109 
110  /*
111  * Location of the latest commit from the remote side.
112  */
114 
115  /*
116  * Remember the local lsn of the commit record so we can XLogFlush() to it
117  * during a checkpoint so we know the commit record actually is safe on
118  * disk.
119  */
121 
122  /*
123  * PID of backend that's acquired slot, or 0 if none.
124  */
126 
127  /*
128  * Lock protecting remote_lsn and local_lsn.
129  */
132 
133 /*
134  * On disk version of ReplicationState.
135  */
137 {
141 
142 
143 typedef struct ReplicationStateCtl
144 {
146  ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
148 
149 /* external variables */
153 
154 /*
155  * Base address into a shared memory array of replication states of size
156  * max_replication_slots.
157  *
158  * XXX: Should we use a separate variable to size this rather than
159  * max_replication_slots?
160  */
163 
164 /*
165  * Backend-local, cached element from ReplicationStates for use in a backend
166  * replaying remote commits, so we don't have to search ReplicationStates for
167  * the backends current RepOriginId.
168  */
170 
171 /* Magic for on disk files. */
172 #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
173 
174 static void
175 replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
176 {
177  if (!superuser())
178  ereport(ERROR,
179  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
180  errmsg("only superusers can query or manipulate replication origins")));
181 
182  if (check_slots && max_replication_slots == 0)
183  ereport(ERROR,
184  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
185  errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
186 
187  if (!recoveryOK && RecoveryInProgress())
188  ereport(ERROR,
189  (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
190  errmsg("cannot manipulate replication origins during recovery")));
191 
192 }
193 
194 
195 /* ---------------------------------------------------------------------------
196  * Functions for working with replication origins themselves.
197  * ---------------------------------------------------------------------------
198  */
199 
200 /*
201  * Check for a persistent replication origin identified by name.
202  *
203  * Returns InvalidOid if the node isn't known yet and missing_ok is true.
204  */
206 replorigin_by_name(char *roname, bool missing_ok)
207 {
209  Oid roident = InvalidOid;
210  HeapTuple tuple;
211  Datum roname_d;
212 
213  roname_d = CStringGetTextDatum(roname);
214 
215  tuple = SearchSysCache1(REPLORIGNAME, roname_d);
216  if (HeapTupleIsValid(tuple))
217  {
218  ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
219  roident = ident->roident;
220  ReleaseSysCache(tuple);
221  }
222  else if (!missing_ok)
223  elog(ERROR, "cache lookup failed for replication origin '%s'",
224  roname);
225 
226  return roident;
227 }
228 
229 /*
230  * Create a replication origin.
231  *
232  * Needs to be called in a transaction.
233  */
235 replorigin_create(char *roname)
236 {
237  Oid roident;
238  HeapTuple tuple = NULL;
239  Relation rel;
240  Datum roname_d;
241  SnapshotData SnapshotDirty;
242  SysScanDesc scan;
243  ScanKeyData key;
244 
245  roname_d = CStringGetTextDatum(roname);
246 
248 
249  /*
250  * We need the numeric replication origin to be 16bit wide, so we cannot
251  * rely on the normal oid allocation. Instead we simply scan
252  * pg_replication_origin for the first unused id. That's not particularly
253  * efficient, but this should be a fairly infrequent operation - we can
254  * easily spend a bit more code on this when it turns out it needs to be
255  * faster.
256  *
257  * We handle concurrency by taking an exclusive lock (allowing reads!)
258  * over the table for the duration of the search. Because we use a "dirty
259  * snapshot" we can read rows that other in-progress sessions have
260  * written, even though they would be invisible with normal snapshots. Due
261  * to the exclusive lock there's no danger that new rows can appear while
262  * we're checking.
263  */
264  InitDirtySnapshot(SnapshotDirty);
265 
267 
268  for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
269  {
270  bool nulls[Natts_pg_replication_origin];
272  bool collides;
273 
275 
276  ScanKeyInit(&key,
278  BTEqualStrategyNumber, F_OIDEQ,
279  ObjectIdGetDatum(roident));
280 
282  true /* indexOK */ ,
283  &SnapshotDirty,
284  1, &key);
285 
286  collides = HeapTupleIsValid(systable_getnext(scan));
287 
288  systable_endscan(scan);
289 
290  if (!collides)
291  {
292  /*
293  * Ok, found an unused roident, insert the new row and do a CCI,
294  * so our callers can look it up if they want to.
295  */
296  memset(&nulls, 0, sizeof(nulls));
297 
299  values[Anum_pg_replication_origin_roname - 1] = roname_d;
300 
301  tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
302  CatalogTupleInsert(rel, tuple);
304  break;
305  }
306  }
307 
308  /* now release lock again, */
310 
311  if (tuple == NULL)
312  ereport(ERROR,
313  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
314  errmsg("could not find free replication origin OID")));
315 
316  heap_freetuple(tuple);
317  return roident;
318 }
319 
320 
321 /*
322  * Drop replication origin.
323  *
324  * Needs to be called in a transaction.
325  */
326 void
328 {
329  HeapTuple tuple = NULL;
330  Relation rel;
331  int i;
332 
334 
336 
337  /* cleanup the slot state info */
338  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
339 
340  for (i = 0; i < max_replication_slots; i++)
341  {
342  ReplicationState *state = &replication_states[i];
343 
344  /* found our slot */
345  if (state->roident == roident)
346  {
347  if (state->acquired_by != 0)
348  {
349  ereport(ERROR,
350  (errcode(ERRCODE_OBJECT_IN_USE),
351  errmsg("could not drop replication origin with OID %d, in use by PID %d",
352  state->roident,
353  state->acquired_by)));
354  }
355 
356  /* first WAL log */
357  {
358  xl_replorigin_drop xlrec;
359 
360  xlrec.node_id = roident;
361  XLogBeginInsert();
362  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
363  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
364  }
365 
366  /* then reset the in-memory entry */
367  state->roident = InvalidRepOriginId;
368  state->remote_lsn = InvalidXLogRecPtr;
369  state->local_lsn = InvalidXLogRecPtr;
370  break;
371  }
372  }
373  LWLockRelease(ReplicationOriginLock);
374 
376  if (!HeapTupleIsValid(tuple))
377  elog(ERROR, "cache lookup failed for replication origin with oid %u",
378  roident);
379 
380  CatalogTupleDelete(rel, &tuple->t_self);
381  ReleaseSysCache(tuple);
382 
384 
385  /* now release lock again, */
387 }
388 
389 
390 /*
391  * Lookup replication origin via it's oid and return the name.
392  *
393  * The external name is palloc'd in the calling context.
394  *
395  * Returns true if the origin is known, false otherwise.
396  */
397 bool
398 replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
399 {
400  HeapTuple tuple;
402 
403  Assert(OidIsValid((Oid) roident));
404  Assert(roident != InvalidRepOriginId);
405  Assert(roident != DoNotReplicateId);
406 
408  ObjectIdGetDatum((Oid) roident));
409 
410  if (HeapTupleIsValid(tuple))
411  {
412  ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
413  *roname = text_to_cstring(&ric->roname);
414  ReleaseSysCache(tuple);
415 
416  return true;
417  }
418  else
419  {
420  *roname = NULL;
421 
422  if (!missing_ok)
423  elog(ERROR, "cache lookup failed for replication origin with oid %u",
424  roident);
425 
426  return false;
427  }
428 }
429 
430 
431 /* ---------------------------------------------------------------------------
432  * Functions for handling replication progress.
433  * ---------------------------------------------------------------------------
434  */
435 
436 Size
438 {
439  Size size = 0;
440 
441  /*
442  * XXX: max_replication_slots is arguably the wrong thing to use, as here
443  * we keep the replay state of *remote* transactions. But for now it seems
444  * sufficient to reuse it, lest we introduce a separate guc.
445  */
446  if (max_replication_slots == 0)
447  return size;
448 
449  size = add_size(size, offsetof(ReplicationStateCtl, states));
450 
451  size = add_size(size,
453  return size;
454 }
455 
456 void
458 {
459  bool found;
460 
461  if (max_replication_slots == 0)
462  return;
463 
464  replication_states_ctl = (ReplicationStateCtl *)
465  ShmemInitStruct("ReplicationOriginState",
467  &found);
468  replication_states = replication_states_ctl->states;
469 
470  if (!found)
471  {
472  int i;
473 
474  replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN;
475 
476  MemSet(replication_states, 0, ReplicationOriginShmemSize());
477 
478  for (i = 0; i < max_replication_slots; i++)
479  LWLockInitialize(&replication_states[i].lock,
480  replication_states_ctl->tranche_id);
481  }
482 
483  LWLockRegisterTranche(replication_states_ctl->tranche_id,
484  "replication_origin");
485 }
486 
487 /* ---------------------------------------------------------------------------
488  * Perform a checkpoint of each replication origin's progress with respect to
489  * the replayed remote_lsn. Make sure that all transactions we refer to in the
490  * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
491  * if the transactions were originally committed asynchronously.
492  *
493  * We store checkpoints in the following format:
494  * +-------+------------------------+------------------+-----+--------+
495  * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
496  * +-------+------------------------+------------------+-----+--------+
497  *
498  * So its just the magic, followed by the statically sized
499  * ReplicationStateOnDisk structs. Note that the maximum number of
500  * ReplicationStates is determined by max_replication_slots.
501  * ---------------------------------------------------------------------------
502  */
503 void
505 {
506  const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
507  const char *path = "pg_logical/replorigin_checkpoint";
508  int tmpfd;
509  int i;
511  pg_crc32c crc;
512 
513  if (max_replication_slots == 0)
514  return;
515 
516  INIT_CRC32C(crc);
517 
518  /* make sure no old temp file is remaining */
519  if (unlink(tmppath) < 0 && errno != ENOENT)
520  ereport(PANIC,
522  errmsg("could not remove file \"%s\": %m",
523  tmppath)));
524 
525  /*
526  * no other backend can perform this at the same time, we're protected by
527  * CheckpointLock.
528  */
529  tmpfd = OpenTransientFile((char *) tmppath,
530  O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
531  S_IRUSR | S_IWUSR);
532  if (tmpfd < 0)
533  ereport(PANIC,
535  errmsg("could not create file \"%s\": %m",
536  tmppath)));
537 
538  /* write magic */
539  if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
540  {
541  CloseTransientFile(tmpfd);
542  ereport(PANIC,
544  errmsg("could not write to file \"%s\": %m",
545  tmppath)));
546  }
547  COMP_CRC32C(crc, &magic, sizeof(magic));
548 
549  /* prevent concurrent creations/drops */
550  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
551 
552  /* write actual data */
553  for (i = 0; i < max_replication_slots; i++)
554  {
555  ReplicationStateOnDisk disk_state;
556  ReplicationState *curstate = &replication_states[i];
557  XLogRecPtr local_lsn;
558 
559  if (curstate->roident == InvalidRepOriginId)
560  continue;
561 
562  LWLockAcquire(&curstate->lock, LW_SHARED);
563 
564  disk_state.roident = curstate->roident;
565 
566  disk_state.remote_lsn = curstate->remote_lsn;
567  local_lsn = curstate->local_lsn;
568 
569  LWLockRelease(&curstate->lock);
570 
571  /* make sure we only write out a commit that's persistent */
572  XLogFlush(local_lsn);
573 
574  if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
575  sizeof(disk_state))
576  {
577  CloseTransientFile(tmpfd);
578  ereport(PANIC,
580  errmsg("could not write to file \"%s\": %m",
581  tmppath)));
582  }
583 
584  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
585  }
586 
587  LWLockRelease(ReplicationOriginLock);
588 
589  /* write out the CRC */
590  FIN_CRC32C(crc);
591  if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
592  {
593  CloseTransientFile(tmpfd);
594  ereport(PANIC,
596  errmsg("could not write to file \"%s\": %m",
597  tmppath)));
598  }
599 
600  CloseTransientFile(tmpfd);
601 
602  /* fsync, rename to permanent file, fsync file and directory */
603  durable_rename(tmppath, path, PANIC);
604 }
605 
606 /*
607  * Recover replication replay status from checkpoint data saved earlier by
608  * CheckPointReplicationOrigin.
609  *
610  * This only needs to be called at startup and *not* during every checkpoint
611  * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
612  * state thereafter can be recovered by looking at commit records.
613  */
614 void
616 {
617  const char *path = "pg_logical/replorigin_checkpoint";
618  int fd;
619  int readBytes;
621  int last_state = 0;
622  pg_crc32c file_crc;
623  pg_crc32c crc;
624 
625  /* don't want to overwrite already existing state */
626 #ifdef USE_ASSERT_CHECKING
627  static bool already_started = false;
628 
629  Assert(!already_started);
630  already_started = true;
631 #endif
632 
633  if (max_replication_slots == 0)
634  return;
635 
636  INIT_CRC32C(crc);
637 
638  elog(DEBUG2, "starting up replication origin progress state");
639 
640  fd = OpenTransientFile((char *) path, O_RDONLY | PG_BINARY, 0);
641 
642  /*
643  * might have had max_replication_slots == 0 last run, or we just brought
644  * up a standby.
645  */
646  if (fd < 0 && errno == ENOENT)
647  return;
648  else if (fd < 0)
649  ereport(PANIC,
651  errmsg("could not open file \"%s\": %m",
652  path)));
653 
654  /* verify magic, that is written even if nothing was active */
655  readBytes = read(fd, &magic, sizeof(magic));
656  if (readBytes != sizeof(magic))
657  ereport(PANIC,
658  (errmsg("could not read file \"%s\": %m",
659  path)));
660  COMP_CRC32C(crc, &magic, sizeof(magic));
661 
662  if (magic != REPLICATION_STATE_MAGIC)
663  ereport(PANIC,
664  (errmsg("replication checkpoint has wrong magic %u instead of %u",
665  magic, REPLICATION_STATE_MAGIC)));
666 
667  /* we can skip locking here, no other access is possible */
668 
669  /* recover individual states, until there are no more to be found */
670  while (true)
671  {
672  ReplicationStateOnDisk disk_state;
673 
674  readBytes = read(fd, &disk_state, sizeof(disk_state));
675 
676  /* no further data */
677  if (readBytes == sizeof(crc))
678  {
679  /* not pretty, but simple ... */
680  file_crc = *(pg_crc32c *) &disk_state;
681  break;
682  }
683 
684  if (readBytes < 0)
685  {
686  ereport(PANIC,
688  errmsg("could not read file \"%s\": %m",
689  path)));
690  }
691 
692  if (readBytes != sizeof(disk_state))
693  {
694  ereport(PANIC,
696  errmsg("could not read file \"%s\": read %d of %zu",
697  path, readBytes, sizeof(disk_state))));
698  }
699 
700  COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
701 
702  if (last_state == max_replication_slots)
703  ereport(PANIC,
704  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
705  errmsg("could not find free replication state, increase max_replication_slots")));
706 
707  /* copy data to shared memory */
708  replication_states[last_state].roident = disk_state.roident;
709  replication_states[last_state].remote_lsn = disk_state.remote_lsn;
710  last_state++;
711 
712  elog(LOG, "recovered replication state of node %u to %X/%X",
713  disk_state.roident,
714  (uint32) (disk_state.remote_lsn >> 32),
715  (uint32) disk_state.remote_lsn);
716  }
717 
718  /* now check checksum */
719  FIN_CRC32C(crc);
720  if (file_crc != crc)
721  ereport(PANIC,
722  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
723  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
724  crc, file_crc)));
725 
726  CloseTransientFile(fd);
727 }
728 
729 void
731 {
732  uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
733 
734  switch (info)
735  {
736  case XLOG_REPLORIGIN_SET:
737  {
738  xl_replorigin_set *xlrec =
739  (xl_replorigin_set *) XLogRecGetData(record);
740 
742  xlrec->remote_lsn, record->EndRecPtr,
743  xlrec->force /* backward */ ,
744  false /* WAL log */ );
745  break;
746  }
748  {
749  xl_replorigin_drop *xlrec;
750  int i;
751 
752  xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
753 
754  for (i = 0; i < max_replication_slots; i++)
755  {
756  ReplicationState *state = &replication_states[i];
757 
758  /* found our slot */
759  if (state->roident == xlrec->node_id)
760  {
761  /* reset entry */
762  state->roident = InvalidRepOriginId;
763  state->remote_lsn = InvalidXLogRecPtr;
764  state->local_lsn = InvalidXLogRecPtr;
765  break;
766  }
767  }
768  break;
769  }
770  default:
771  elog(PANIC, "replorigin_redo: unknown op code %u", info);
772  }
773 }
774 
775 
776 /*
777  * Tell the replication origin progress machinery that a commit from 'node'
778  * that originated at the LSN remote_commit on the remote node was replayed
779  * successfully and that we don't need to do so again. In combination with
780  * setting up replorigin_session_origin_lsn and replorigin_session_origin
781  * that ensures we won't loose knowledge about that after a crash if the
782  * transaction had a persistent effect (think of asynchronous commits).
783  *
784  * local_commit needs to be a local LSN of the commit so that we can make sure
785  * upon a checkpoint that enough WAL has been persisted to disk.
786  *
787  * Needs to be called with a RowExclusiveLock on pg_replication_origin,
788  * unless running in recovery.
789  */
790 void
792  XLogRecPtr remote_commit, XLogRecPtr local_commit,
793  bool go_backward, bool wal_log)
794 {
795  int i;
796  ReplicationState *replication_state = NULL;
797  ReplicationState *free_state = NULL;
798 
799  Assert(node != InvalidRepOriginId);
800 
801  /* we don't track DoNotReplicateId */
802  if (node == DoNotReplicateId)
803  return;
804 
805  /*
806  * XXX: For the case where this is called by WAL replay, it'd be more
807  * efficient to restore into a backend local hashtable and only dump into
808  * shmem after recovery is finished. Let's wait with implementing that
809  * till it's shown to be a measurable expense
810  */
811 
812  /* Lock exclusively, as we may have to create a new table entry. */
813  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
814 
815  /*
816  * Search for either an existing slot for the origin, or a free one we can
817  * use.
818  */
819  for (i = 0; i < max_replication_slots; i++)
820  {
821  ReplicationState *curstate = &replication_states[i];
822 
823  /* remember where to insert if necessary */
824  if (curstate->roident == InvalidRepOriginId &&
825  free_state == NULL)
826  {
827  free_state = curstate;
828  continue;
829  }
830 
831  /* not our slot */
832  if (curstate->roident != node)
833  {
834  continue;
835  }
836 
837  /* ok, found slot */
838  replication_state = curstate;
839 
840  LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
841 
842  /* Make sure it's not used by somebody else */
843  if (replication_state->acquired_by != 0)
844  {
845  ereport(ERROR,
846  (errcode(ERRCODE_OBJECT_IN_USE),
847  errmsg("replication origin with OID %d is already active for PID %d",
848  replication_state->roident,
849  replication_state->acquired_by)));
850  }
851 
852  break;
853  }
854 
855  if (replication_state == NULL && free_state == NULL)
856  ereport(ERROR,
857  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
858  errmsg("could not find free replication state slot for replication origin with OID %u",
859  node),
860  errhint("Increase max_replication_slots and try again.")));
861 
862  if (replication_state == NULL)
863  {
864  /* initialize new slot */
865  LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
866  replication_state = free_state;
867  Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
868  Assert(replication_state->local_lsn == InvalidXLogRecPtr);
869  replication_state->roident = node;
870  }
871 
872  Assert(replication_state->roident != InvalidRepOriginId);
873 
874  /*
875  * If somebody "forcefully" sets this slot, WAL log it, so it's durable
876  * and the standby gets the message. Primarily this will be called during
877  * WAL replay (of commit records) where no WAL logging is necessary.
878  */
879  if (wal_log)
880  {
881  xl_replorigin_set xlrec;
882 
883  xlrec.remote_lsn = remote_commit;
884  xlrec.node_id = node;
885  xlrec.force = go_backward;
886 
887  XLogBeginInsert();
888  XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
889 
890  XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
891  }
892 
893  /*
894  * Due to - harmless - race conditions during a checkpoint we could see
895  * values here that are older than the ones we already have in memory.
896  * Don't overwrite those.
897  */
898  if (go_backward || replication_state->remote_lsn < remote_commit)
899  replication_state->remote_lsn = remote_commit;
900  if (local_commit != InvalidXLogRecPtr &&
901  (go_backward || replication_state->local_lsn < local_commit))
902  replication_state->local_lsn = local_commit;
903  LWLockRelease(&replication_state->lock);
904 
905  /*
906  * Release *after* changing the LSNs, slot isn't acquired and thus could
907  * otherwise be dropped anytime.
908  */
909  LWLockRelease(ReplicationOriginLock);
910 }
911 
912 
915 {
916  int i;
917  XLogRecPtr local_lsn = InvalidXLogRecPtr;
918  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
919 
920  /* prevent slots from being concurrently dropped */
921  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
922 
923  for (i = 0; i < max_replication_slots; i++)
924  {
926 
927  state = &replication_states[i];
928 
929  if (state->roident == node)
930  {
931  LWLockAcquire(&state->lock, LW_SHARED);
932 
933  remote_lsn = state->remote_lsn;
934  local_lsn = state->local_lsn;
935 
936  LWLockRelease(&state->lock);
937 
938  break;
939  }
940  }
941 
942  LWLockRelease(ReplicationOriginLock);
943 
944  if (flush && local_lsn != InvalidXLogRecPtr)
945  XLogFlush(local_lsn);
946 
947  return remote_lsn;
948 }
949 
950 /*
951  * Tear down a (possibly) configured session replication origin during process
952  * exit.
953  */
954 static void
956 {
957  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
958 
959  if (session_replication_state != NULL &&
960  session_replication_state->acquired_by == MyProcPid)
961  {
962  session_replication_state->acquired_by = 0;
963  session_replication_state = NULL;
964  }
965 
966  LWLockRelease(ReplicationOriginLock);
967 }
968 
969 /*
970  * Setup a replication origin in the shared memory struct if it doesn't
971  * already exists and cache access to the specific ReplicationSlot so the
972  * array doesn't have to be searched when calling
973  * replorigin_session_advance().
974  *
975  * Obviously only one such cached origin can exist per process and the current
976  * cached value can only be set again after the previous value is torn down
977  * with replorigin_session_reset().
978  */
979 void
981 {
982  static bool registered_cleanup;
983  int i;
984  int free_slot = -1;
985 
986  if (!registered_cleanup)
987  {
989  registered_cleanup = true;
990  }
991 
993 
994  if (session_replication_state != NULL)
995  ereport(ERROR,
996  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
997  errmsg("cannot setup replication origin when one is already setup")));
998 
999  /* Lock exclusively, as we may have to create a new table entry. */
1000  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1001 
1002  /*
1003  * Search for either an existing slot for the origin, or a free one we can
1004  * use.
1005  */
1006  for (i = 0; i < max_replication_slots; i++)
1007  {
1008  ReplicationState *curstate = &replication_states[i];
1009 
1010  /* remember where to insert if necessary */
1011  if (curstate->roident == InvalidRepOriginId &&
1012  free_slot == -1)
1013  {
1014  free_slot = i;
1015  continue;
1016  }
1017 
1018  /* not our slot */
1019  if (curstate->roident != node)
1020  continue;
1021 
1022  else if (curstate->acquired_by != 0)
1023  {
1024  ereport(ERROR,
1025  (errcode(ERRCODE_OBJECT_IN_USE),
1026  errmsg("replication identifier %d is already active for PID %d",
1027  curstate->roident, curstate->acquired_by)));
1028  }
1029 
1030  /* ok, found slot */
1031  session_replication_state = curstate;
1032  }
1033 
1034 
1035  if (session_replication_state == NULL && free_slot == -1)
1036  ereport(ERROR,
1037  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1038  errmsg("could not find free replication state slot for replication origin with OID %u",
1039  node),
1040  errhint("Increase max_replication_slots and try again.")));
1041  else if (session_replication_state == NULL)
1042  {
1043  /* initialize new slot */
1044  session_replication_state = &replication_states[free_slot];
1045  Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
1046  Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
1047  session_replication_state->roident = node;
1048  }
1049 
1050 
1051  Assert(session_replication_state->roident != InvalidRepOriginId);
1052 
1053  session_replication_state->acquired_by = MyProcPid;
1054 
1055  LWLockRelease(ReplicationOriginLock);
1056 }
1057 
1058 /*
1059  * Reset replay state previously setup in this session.
1060  *
1061  * This function may only be called if an origin was setup with
1062  * replorigin_session_setup().
1063  */
1064 void
1066 {
1068 
1069  if (session_replication_state == NULL)
1070  ereport(ERROR,
1071  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1072  errmsg("no replication origin is configured")));
1073 
1074  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1075 
1076  session_replication_state->acquired_by = 0;
1077  session_replication_state = NULL;
1078 
1079  LWLockRelease(ReplicationOriginLock);
1080 }
1081 
1082 /*
1083  * Do the same work replorigin_advance() does, just on the session's
1084  * configured origin.
1085  *
1086  * This is noticeably cheaper than using replorigin_advance().
1087  */
1088 void
1090 {
1091  Assert(session_replication_state != NULL);
1092  Assert(session_replication_state->roident != InvalidRepOriginId);
1093 
1094  LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1095  if (session_replication_state->local_lsn < local_commit)
1096  session_replication_state->local_lsn = local_commit;
1097  if (session_replication_state->remote_lsn < remote_commit)
1098  session_replication_state->remote_lsn = remote_commit;
1099  LWLockRelease(&session_replication_state->lock);
1100 }
1101 
1102 /*
1103  * Ask the machinery about the point up to which we successfully replayed
1104  * changes from an already setup replication origin.
1105  */
1106 XLogRecPtr
1108 {
1109  XLogRecPtr remote_lsn;
1110  XLogRecPtr local_lsn;
1111 
1112  Assert(session_replication_state != NULL);
1113 
1114  LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1115  remote_lsn = session_replication_state->remote_lsn;
1116  local_lsn = session_replication_state->local_lsn;
1117  LWLockRelease(&session_replication_state->lock);
1118 
1119  if (flush && local_lsn != InvalidXLogRecPtr)
1120  XLogFlush(local_lsn);
1121 
1122  return remote_lsn;
1123 }
1124 
1125 
1126 
1127 /* ---------------------------------------------------------------------------
1128  * SQL functions for working with replication origin.
1129  *
1130  * These mostly should be fairly short wrappers around more generic functions.
1131  * ---------------------------------------------------------------------------
1132  */
1133 
1134 /*
1135  * Create replication origin for the passed in name, and return the assigned
1136  * oid.
1137  */
1138 Datum
1140 {
1141  char *name;
1142  RepOriginId roident;
1143 
1144  replorigin_check_prerequisites(false, false);
1145 
1147  roident = replorigin_create(name);
1148 
1149  pfree(name);
1150 
1151  PG_RETURN_OID(roident);
1152 }
1153 
1154 /*
1155  * Drop replication origin.
1156  */
1157 Datum
1159 {
1160  char *name;
1161  RepOriginId roident;
1162 
1163  replorigin_check_prerequisites(false, false);
1164 
1166 
1167  roident = replorigin_by_name(name, false);
1168  Assert(OidIsValid(roident));
1169 
1170  replorigin_drop(roident);
1171 
1172  pfree(name);
1173 
1174  PG_RETURN_VOID();
1175 }
1176 
1177 /*
1178  * Return oid of a replication origin.
1179  */
1180 Datum
1182 {
1183  char *name;
1184  RepOriginId roident;
1185 
1186  replorigin_check_prerequisites(false, false);
1187 
1189  roident = replorigin_by_name(name, true);
1190 
1191  pfree(name);
1192 
1193  if (OidIsValid(roident))
1194  PG_RETURN_OID(roident);
1195  PG_RETURN_NULL();
1196 }
1197 
1198 /*
1199  * Setup a replication origin for this session.
1200  */
1201 Datum
1203 {
1204  char *name;
1205  RepOriginId origin;
1206 
1207  replorigin_check_prerequisites(true, false);
1208 
1210  origin = replorigin_by_name(name, false);
1211  replorigin_session_setup(origin);
1212 
1213  replorigin_session_origin = origin;
1214 
1215  pfree(name);
1216 
1217  PG_RETURN_VOID();
1218 }
1219 
1220 /*
1221  * Reset previously setup origin in this session
1222  */
1223 Datum
1225 {
1226  replorigin_check_prerequisites(true, false);
1227 
1229 
1233 
1234  PG_RETURN_VOID();
1235 }
1236 
1237 /*
1238  * Has a replication origin been setup for this session.
1239  */
1240 Datum
1242 {
1243  replorigin_check_prerequisites(false, false);
1244 
1246 }
1247 
1248 
1249 /*
1250  * Return the replication progress for origin setup in the current session.
1251  *
1252  * If 'flush' is set to true it is ensured that the returned value corresponds
1253  * to a local transaction that has been flushed. this is useful if asynchronous
1254  * commits are used when replaying replicated transactions.
1255  */
1256 Datum
1258 {
1259  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1260  bool flush = PG_GETARG_BOOL(0);
1261 
1262  replorigin_check_prerequisites(true, false);
1263 
1264  if (session_replication_state == NULL)
1265  ereport(ERROR,
1266  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1267  errmsg("no replication origin is configured")));
1268 
1269  remote_lsn = replorigin_session_get_progress(flush);
1270 
1271  if (remote_lsn == InvalidXLogRecPtr)
1272  PG_RETURN_NULL();
1273 
1274  PG_RETURN_LSN(remote_lsn);
1275 }
1276 
1277 Datum
1279 {
1280  XLogRecPtr location = PG_GETARG_LSN(0);
1281 
1282  replorigin_check_prerequisites(true, false);
1283 
1284  if (session_replication_state == NULL)
1285  ereport(ERROR,
1286  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1287  errmsg("no replication origin is configured")));
1288 
1289  replorigin_session_origin_lsn = location;
1291 
1292  PG_RETURN_VOID();
1293 }
1294 
1295 Datum
1297 {
1298  replorigin_check_prerequisites(true, false);
1299 
1302 
1303  PG_RETURN_VOID();
1304 }
1305 
1306 
1307 Datum
1309 {
1310  text *name = PG_GETARG_TEXT_PP(0);
1311  XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1312  RepOriginId node;
1313 
1314  replorigin_check_prerequisites(true, false);
1315 
1316  /* lock to prevent the replication origin from vanishing */
1318 
1319  node = replorigin_by_name(text_to_cstring(name), false);
1320 
1321  /*
1322  * Can't sensibly pass a local commit to be flushed at checkpoint - this
1323  * xact hasn't committed yet. This is why this function should be used to
1324  * set up the initial replication state, but not for replay.
1325  */
1326  replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1327  true /* go backward */ , true /* wal log */ );
1328 
1330 
1331  PG_RETURN_VOID();
1332 }
1333 
1334 
1335 /*
1336  * Return the replication progress for an individual replication origin.
1337  *
1338  * If 'flush' is set to true it is ensured that the returned value corresponds
1339  * to a local transaction that has been flushed. this is useful if asynchronous
1340  * commits are used when replaying replicated transactions.
1341  */
1342 Datum
1344 {
1345  char *name;
1346  bool flush;
1347  RepOriginId roident;
1348  XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1349 
1350  replorigin_check_prerequisites(true, true);
1351 
1353  flush = PG_GETARG_BOOL(1);
1354 
1355  roident = replorigin_by_name(name, false);
1356  Assert(OidIsValid(roident));
1357 
1358  remote_lsn = replorigin_get_progress(roident, flush);
1359 
1360  if (remote_lsn == InvalidXLogRecPtr)
1361  PG_RETURN_NULL();
1362 
1363  PG_RETURN_LSN(remote_lsn);
1364 }
1365 
1366 
1367 Datum
1369 {
1370  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1371  TupleDesc tupdesc;
1372  Tuplestorestate *tupstore;
1373  MemoryContext per_query_ctx;
1374  MemoryContext oldcontext;
1375  int i;
1377 
1378  /* we we want to return 0 rows if slot is set to zero */
1379  replorigin_check_prerequisites(false, true);
1380 
1381  if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
1382  ereport(ERROR,
1383  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1384  errmsg("set-valued function called in context that cannot accept a set")));
1385  if (!(rsinfo->allowedModes & SFRM_Materialize))
1386  ereport(ERROR,
1387  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1388  errmsg("materialize mode required, but it is not allowed in this context")));
1389  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1390  elog(ERROR, "return type must be a row type");
1391 
1392  if (tupdesc->natts != REPLICATION_ORIGIN_PROGRESS_COLS)
1393  elog(ERROR, "wrong function definition");
1394 
1395  per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
1396  oldcontext = MemoryContextSwitchTo(per_query_ctx);
1397 
1398  tupstore = tuplestore_begin_heap(true, false, work_mem);
1399  rsinfo->returnMode = SFRM_Materialize;
1400  rsinfo->setResult = tupstore;
1401  rsinfo->setDesc = tupdesc;
1402 
1403  MemoryContextSwitchTo(oldcontext);
1404 
1405 
1406  /* prevent slots from being concurrently dropped */
1407  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1408 
1409  /*
1410  * Iterate through all possible replication_states, display if they are
1411  * filled. Note that we do not take any locks, so slightly corrupted/out
1412  * of date values are a possibility.
1413  */
1414  for (i = 0; i < max_replication_slots; i++)
1415  {
1419  char *roname;
1420 
1421  state = &replication_states[i];
1422 
1423  /* unused slot, nothing to display */
1424  if (state->roident == InvalidRepOriginId)
1425  continue;
1426 
1427  memset(values, 0, sizeof(values));
1428  memset(nulls, 1, sizeof(nulls));
1429 
1430  values[0] = ObjectIdGetDatum(state->roident);
1431  nulls[0] = false;
1432 
1433  /*
1434  * We're not preventing the origin to be dropped concurrently, so
1435  * silently accept that it might be gone.
1436  */
1437  if (replorigin_by_oid(state->roident, true,
1438  &roname))
1439  {
1440  values[1] = CStringGetTextDatum(roname);
1441  nulls[1] = false;
1442  }
1443 
1444  LWLockAcquire(&state->lock, LW_SHARED);
1445 
1446  values[2] = LSNGetDatum(state->remote_lsn);
1447  nulls[2] = false;
1448 
1449  values[3] = LSNGetDatum(state->local_lsn);
1450  nulls[3] = false;
1451 
1452  LWLockRelease(&state->lock);
1453 
1454  tuplestore_putvalues(tupstore, tupdesc, values, nulls);
1455  }
1456 
1457  tuplestore_donestoring(tupstore);
1458 
1459  LWLockRelease(ReplicationOriginLock);
1460 
1461 #undef REPLICATION_ORIGIN_PROGRESS_COLS
1462 
1463  return (Datum) 0;
1464 }
static ReplicationState * session_replication_state
Definition: origin.c:169
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
Definition: tuplestore.c:735
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
static void replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
Definition: origin.c:175
Definition: lwlock.h:32
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define IsA(nodeptr, _type_)
Definition: nodes.h:571
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition: funcapi.c:211
int MyProcPid
Definition: globals.c:38
int errhint(const char *fmt,...)
Definition: elog.c:987
XLogRecPtr local_lsn
Definition: origin.c:120
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:499
#define GETSTRUCT(TUP)
Definition: htup_details.h:656
#define Anum_pg_replication_origin_roident
#define RelationGetDescr(relation)
Definition: rel.h:425
#define DoNotReplicateId
Definition: origin.h:35
Datum pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
Definition: origin.c:1278
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:182
#define write(a, b, c)
Definition: win32.h:19
Datum pg_replication_origin_drop(PG_FUNCTION_ARGS)
Definition: origin.c:1158
#define ExclusiveLock
Definition: lockdefs.h:44
int64 TimestampTz
Definition: timestamp.h:39
XLogRecPtr remote_lsn
Definition: origin.c:139
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:225
uint32 pg_crc32c
Definition: pg_crc32c.h:38
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition: origin.c:955
RepOriginId roident
Definition: origin.c:138
#define tuplestore_donestoring(state)
Definition: tuplestore.h:60
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
unsigned char uint8
Definition: c.h:266
uint16 RepOriginId
Definition: xlogdefs.h:51
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1107
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
bool superuser(void)
Definition: superuser.c:47
#define MemSet(start, val, len)
Definition: c.h:857
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:255
void ReplicationOriginShmemInit(void)
Definition: origin.c:457
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:791
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:692
#define heap_close(r, l)
Definition: heapam.h:97
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:231
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:980
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:398
#define LOG
Definition: elog.h:26
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1374
unsigned int Oid
Definition: postgres_ext.h:31
bool RecoveryInProgress(void)
Definition: xlog.c:7863
#define OidIsValid(objectId)
Definition: c.h:538
#define PANIC
Definition: elog.h:53
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2754
static int fd(const char *x, int i)
Definition: preproc-init.c:105
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:328
#define PG_BINARY
Definition: c.h:1038
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:206
#define SearchSysCache1(cacheId, key1)
Definition: syscache.h:150
RepOriginId roident
Definition: origin.c:108
XLogRecPtr EndRecPtr
Definition: xlogreader.h:115
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:265
Datum pg_replication_origin_advance(PG_FUNCTION_ARGS)
Definition: origin.c:1308
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
#define PG_RETURN_LSN(x)
Definition: pg_lsn.h:25
void CheckPointReplicationOrigin(void)
Definition: origin.c:504
void replorigin_redo(XLogReaderState *record)
Definition: origin.c:730
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:416
void pfree(void *pointer)
Definition: mcxt.c:950
#define XLogRecGetData(decoder)
Definition: xlogreader.h:218
#define Anum_pg_replication_origin_roname
#define ReplicationOriginIdentIndex
Definition: indexing.h:326
#define ObjectIdGetDatum(X)
Definition: postgres.h:513
#define ERROR
Definition: elog.h:43
#define PG_UINT16_MAX
Definition: c.h:338
void replorigin_session_reset(void)
Definition: origin.c:1065
Oid CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:162
FormData_pg_replication_origin * Form_pg_replication_origin
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:372
#define InitDirtySnapshot(snapshotdata)
Definition: tqual.h:100
LWLock lock
Definition: origin.c:130
ItemPointerData t_self
Definition: htup.h:65
#define DEBUG2
Definition: elog.h:24
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:151
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:348
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:152
#define XLOG_REPLORIGIN_SET
Definition: origin.h:31
void StartupReplicationOrigin(void)
Definition: origin.c:615
#define RowExclusiveLock
Definition: lockdefs.h:38
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2107
int errcode_for_file_access(void)
Definition: elog.c:598
struct ReplicationState ReplicationState
unsigned int uint32
Definition: c.h:268
Datum pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
Definition: origin.c:1202
Datum pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
Definition: origin.c:1224
RepOriginId replorigin_create(char *roname)
Definition: origin.c:235
Datum pg_show_replication_origin_status(PG_FUNCTION_ARGS)
Definition: origin.c:1368
int unlink(const char *filename)
#define ereport(elevel, rest)
Definition: elog.h:122
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:214
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:593
Datum pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
Definition: origin.c:1241
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:667
int CloseTransientFile(int fd)
Definition: fd.c:2268
void XLogRegisterData(char *data, int len)
Definition: xloginsert.c:323
void replorigin_drop(RepOriginId roident)
Definition: origin.c:327
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
Definition: tuplestore.c:316
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:415
#define XLOG_REPLORIGIN_DROP
Definition: origin.h:32
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:24
struct ReplicationStateOnDisk ReplicationStateOnDisk
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:311
uintptr_t Datum
Definition: postgres.h:372
void CommandCounterIncrement(void)
Definition: xact.c:922
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1093
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1287
int work_mem
Definition: globals.c:112
Size ReplicationOriginShmemSize(void)
Definition: origin.c:437
#define REPLICATION_STATE_MAGIC
Definition: origin.c:172
#define InvalidOid
Definition: postgres_ext.h:36
static ReplicationStateCtl * replication_states_ctl
Definition: origin.c:162
Datum pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
Definition: origin.c:1296
int allowedModes
Definition: execnodes.h:201
#define PG_RETURN_VOID()
Definition: fmgr.h:301
struct ReplicationStateCtl ReplicationStateCtl
SetFunctionReturnMode returnMode
Definition: execnodes.h:203
int max_replication_slots
Definition: slot.c:99
XLogRecPtr remote_lsn
Definition: origin.c:113
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
Datum pg_replication_origin_progress(PG_FUNCTION_ARGS)
Definition: origin.c:1343
RepOriginId node_id
Definition: origin.h:28
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition: origin.c:146
#define Assert(condition)
Definition: c.h:675
#define XLR_INFO_MASK
Definition: xlogrecord.h:62
RepOriginId replorigin_session_origin
Definition: origin.c:150
Definition: regguts.h:298
RepOriginId node_id
Definition: origin.h:22
size_t Size
Definition: c.h:356
void replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
Definition: origin.c:1089
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
bool IsTransactionState(void)
Definition: xact.c:350
#define Natts_pg_replication_origin
void LWLockRegisterTranche(int tranche_id, char *tranche_name)
Definition: lwlock.c:592
XLogRecPtr remote_lsn
Definition: origin.h:21
MemoryContext ecxt_per_query_memory
Definition: execnodes.h:135
const char * name
Definition: encode.c:521
#define InvalidRepOriginId
Definition: origin.h:34
Tuplestorestate * setResult
Definition: execnodes.h:206
#define DatumGetPointer(X)
Definition: postgres.h:555
static Datum values[MAXATTR]
Definition: bootstrap.c:162
char * text_to_cstring(const text *t)
Definition: varlena.c:182
ExprContext * econtext
Definition: execnodes.h:199
TupleDesc setDesc
Definition: execnodes.h:207
#define ReplicationOriginRelationId
int errmsg(const char *fmt,...)
Definition: elog.c:797
Datum pg_replication_origin_oid(PG_FUNCTION_ARGS)
Definition: origin.c:1181
#define PG_GETARG_TIMESTAMPTZ(n)
Definition: timestamp.h:36
int i
Datum pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
Definition: origin.c:1257
static ReplicationState * replication_states
Definition: origin.c:161
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
#define CStringGetTextDatum(s)
Definition: builtins.h:91
void * arg
Definition: c.h:439
#define PG_FUNCTION_ARGS
Definition: fmgr.h:150
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:97
#define elog
Definition: elog.h:219
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:105
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:73
Datum pg_replication_origin_create(PG_FUNCTION_ARGS)
Definition: origin.c:1139
#define REPLICATION_ORIGIN_PROGRESS_COLS
void XLogBeginInsert(void)
Definition: xloginsert.c:120
#define PG_RETURN_OID(x)
Definition: fmgr.h:312
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:78
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
Definition: origin.c:914
#define PG_RETURN_NULL()
Definition: fmgr.h:297
#define read(a, b, c)
Definition: win32.h:18
#define BTEqualStrategyNumber
Definition: stratnum.h:31
#define offsetof(type, field)
Definition: c.h:555