PostgreSQL Source Code git master
origin.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * origin.c
4 * Logical replication progress tracking support.
5 *
6 * Copyright (c) 2013-2025, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/replication/logical/origin.c
10 *
11 * NOTES
12 *
13 * This file provides the following:
14 * * An infrastructure to name nodes in a replication setup
15 * * A facility to efficiently store and persist replication progress in an
16 * efficient and durable manner.
17 *
18 * Replication origin consist out of a descriptive, user defined, external
19 * name and a short, thus space efficient, internal 2 byte one. This split
20 * exists because replication origin have to be stored in WAL and shared
21 * memory and long descriptors would be inefficient. For now only use 2 bytes
22 * for the internal id of a replication origin as it seems unlikely that there
23 * soon will be more than 65k nodes in one replication setup; and using only
24 * two bytes allow us to be more space efficient.
25 *
26 * Replication progress is tracked in a shared memory table
27 * (ReplicationState) that's dumped to disk every checkpoint. Entries
28 * ('slots') in this table are identified by the internal id. That's the case
29 * because it allows to increase replication progress during crash
30 * recovery. To allow doing so we store the original LSN (from the originating
31 * system) of a transaction in the commit record. That allows to recover the
32 * precise replayed state after crash recovery; without requiring synchronous
33 * commits. Allowing logical replication to use asynchronous commit is
34 * generally good for performance, but especially important as it allows a
35 * single threaded replay process to keep up with a source that has multiple
36 * backends generating changes concurrently. For efficiency and simplicity
37 * reasons a backend can setup one replication origin that's from then used as
38 * the source of changes produced by the backend, until reset again.
39 *
40 * This infrastructure is intended to be used in cooperation with logical
41 * decoding. When replaying from a remote system the configured origin is
42 * provided to output plugins, allowing prevention of replication loops and
43 * other filtering.
44 *
45 * There are several levels of locking at work:
46 *
47 * * To create and drop replication origins an exclusive lock on
48 * pg_replication_slot is required for the duration. That allows us to
49 * safely and conflict free assign new origins using a dirty snapshot.
50 *
51 * * When creating an in-memory replication progress slot the ReplicationOrigin
52 * LWLock has to be held exclusively; when iterating over the replication
53 * progress a shared lock has to be held, the same when advancing the
54 * replication progress of an individual backend that has not setup as the
55 * session's replication origin.
56 *
57 * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 * replication progress slot that slot's lwlock has to be held. That's
59 * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 * all our platforms, but it also simplifies memory ordering concerns
61 * between the remote and local lsn. We use a lwlock instead of a spinlock
62 * so it's less harmful to hold the lock over a WAL write
63 * (cf. AdvanceReplicationProgress).
64 *
65 * ---------------------------------------------------------------------------
66 */
67
68#include "postgres.h"
69
70#include <unistd.h>
71#include <sys/stat.h>
72
73#include "access/genam.h"
74#include "access/htup_details.h"
75#include "access/table.h"
76#include "access/xact.h"
77#include "access/xloginsert.h"
78#include "catalog/catalog.h"
79#include "catalog/indexing.h"
81#include "funcapi.h"
82#include "miscadmin.h"
83#include "nodes/execnodes.h"
84#include "pgstat.h"
85#include "replication/origin.h"
86#include "replication/slot.h"
88#include "storage/fd.h"
89#include "storage/ipc.h"
90#include "storage/lmgr.h"
91#include "utils/builtins.h"
92#include "utils/fmgroids.h"
93#include "utils/guc.h"
94#include "utils/pg_lsn.h"
95#include "utils/rel.h"
96#include "utils/snapmgr.h"
97#include "utils/syscache.h"
98
99/* paths for replication origin checkpoint files */
100#define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
101#define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
102
103/* GUC variables */
105
106/*
107 * Replay progress of a single remote node.
108 */
109typedef struct ReplicationState
110{
111 /*
112 * Local identifier for the remote node.
113 */
115
116 /*
117 * Location of the latest commit from the remote side.
118 */
120
121 /*
122 * Remember the local lsn of the commit record so we can XLogFlush() to it
123 * during a checkpoint so we know the commit record actually is safe on
124 * disk.
125 */
127
128 /*
129 * PID of backend that's acquired slot, or 0 if none.
130 */
132
133 /*
134 * Condition variable that's signaled when acquired_by changes.
135 */
137
138 /*
139 * Lock protecting remote_lsn and local_lsn.
140 */
143
144/*
145 * On disk version of ReplicationState.
146 */
148{
152
153
155{
156 /* Tranche to use for per-origin LWLocks */
158 /* Array of length max_active_replication_origins */
161
162/* external variables */
166
167/*
168 * Base address into a shared memory array of replication states of size
169 * max_active_replication_origins.
170 */
172
173/*
174 * Actual shared memory block (replication_states[] is now part of this).
175 */
177
178/*
179 * We keep a pointer to this backend's ReplicationState to avoid having to
180 * search the replication_states array in replorigin_session_advance for each
181 * remote commit. (Ownership of a backend's own entry can only be changed by
182 * that backend.)
183 */
185
186/* Magic for on disk files. */
187#define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
188
189static void
190replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
191{
192 if (check_origins && max_active_replication_origins == 0)
194 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
195 errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
196
197 if (!recoveryOK && RecoveryInProgress())
199 (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
200 errmsg("cannot manipulate replication origins during recovery")));
201}
202
203
204/*
205 * IsReservedOriginName
206 * True iff name is either "none" or "any".
207 */
208static bool
210{
211 return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
212 (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
213}
214
215/* ---------------------------------------------------------------------------
216 * Functions for working with replication origins themselves.
217 * ---------------------------------------------------------------------------
218 */
219
220/*
221 * Check for a persistent replication origin identified by name.
222 *
223 * Returns InvalidOid if the node isn't known yet and missing_ok is true.
224 */
226replorigin_by_name(const char *roname, bool missing_ok)
227{
229 Oid roident = InvalidOid;
230 HeapTuple tuple;
231 Datum roname_d;
232
233 roname_d = CStringGetTextDatum(roname);
234
235 tuple = SearchSysCache1(REPLORIGNAME, roname_d);
236 if (HeapTupleIsValid(tuple))
237 {
239 roident = ident->roident;
240 ReleaseSysCache(tuple);
241 }
242 else if (!missing_ok)
244 (errcode(ERRCODE_UNDEFINED_OBJECT),
245 errmsg("replication origin \"%s\" does not exist",
246 roname)));
247
248 return roident;
249}
250
251/*
252 * Create a replication origin.
253 *
254 * Needs to be called in a transaction.
255 */
257replorigin_create(const char *roname)
258{
259 Oid roident;
260 HeapTuple tuple = NULL;
261 Relation rel;
262 Datum roname_d;
263 SnapshotData SnapshotDirty;
264 SysScanDesc scan;
266
267 /*
268 * To avoid needing a TOAST table for pg_replication_origin, we limit
269 * replication origin names to 512 bytes. This should be more than enough
270 * for all practical use.
271 */
272 if (strlen(roname) > MAX_RONAME_LEN)
274 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
275 errmsg("replication origin name is too long"),
276 errdetail("Replication origin names must be no longer than %d bytes.",
278
279 roname_d = CStringGetTextDatum(roname);
280
282
283 /*
284 * We need the numeric replication origin to be 16bit wide, so we cannot
285 * rely on the normal oid allocation. Instead we simply scan
286 * pg_replication_origin for the first unused id. That's not particularly
287 * efficient, but this should be a fairly infrequent operation - we can
288 * easily spend a bit more code on this when it turns out it needs to be
289 * faster.
290 *
291 * We handle concurrency by taking an exclusive lock (allowing reads!)
292 * over the table for the duration of the search. Because we use a "dirty
293 * snapshot" we can read rows that other in-progress sessions have
294 * written, even though they would be invisible with normal snapshots. Due
295 * to the exclusive lock there's no danger that new rows can appear while
296 * we're checking.
297 */
298 InitDirtySnapshot(SnapshotDirty);
299
300 rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
301
302 /*
303 * We want to be able to access pg_replication_origin without setting up a
304 * snapshot. To make that safe, it needs to not have a TOAST table, since
305 * TOASTed data cannot be fetched without a snapshot. As of this writing,
306 * its only varlena column is roname, which we limit to 512 bytes to avoid
307 * needing out-of-line storage. If you add a TOAST table to this catalog,
308 * be sure to set up a snapshot everywhere it might be needed. For more
309 * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
310 */
311 Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
312
313 for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
314 {
315 bool nulls[Natts_pg_replication_origin];
316 Datum values[Natts_pg_replication_origin];
317 bool collides;
318
320
322 Anum_pg_replication_origin_roident,
323 BTEqualStrategyNumber, F_OIDEQ,
324 ObjectIdGetDatum(roident));
325
326 scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
327 true /* indexOK */ ,
328 &SnapshotDirty,
329 1, &key);
330
331 collides = HeapTupleIsValid(systable_getnext(scan));
332
333 systable_endscan(scan);
334
335 if (!collides)
336 {
337 /*
338 * Ok, found an unused roident, insert the new row and do a CCI,
339 * so our callers can look it up if they want to.
340 */
341 memset(&nulls, 0, sizeof(nulls));
342
343 values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
344 values[Anum_pg_replication_origin_roname - 1] = roname_d;
345
346 tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
347 CatalogTupleInsert(rel, tuple);
349 break;
350 }
351 }
352
353 /* now release lock again, */
355
356 if (tuple == NULL)
358 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
359 errmsg("could not find free replication origin ID")));
360
361 heap_freetuple(tuple);
362 return roident;
363}
364
365/*
366 * Helper function to drop a replication origin.
367 */
368static void
370{
371 int i;
372
373 /*
374 * Clean up the slot state info, if there is any matching slot.
375 */
376restart:
377 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
378
379 for (i = 0; i < max_active_replication_origins; i++)
380 {
382
383 if (state->roident == roident)
384 {
385 /* found our slot, is it busy? */
386 if (state->acquired_by != 0)
387 {
389
390 if (nowait)
392 (errcode(ERRCODE_OBJECT_IN_USE),
393 errmsg("could not drop replication origin with ID %d, in use by PID %d",
394 state->roident,
395 state->acquired_by)));
396
397 /*
398 * We must wait and then retry. Since we don't know which CV
399 * to wait on until here, we can't readily use
400 * ConditionVariablePrepareToSleep (calling it here would be
401 * wrong, since we could miss the signal if we did so); just
402 * use ConditionVariableSleep directly.
403 */
404 cv = &state->origin_cv;
405
406 LWLockRelease(ReplicationOriginLock);
407
408 ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
409 goto restart;
410 }
411
412 /* first make a WAL log entry */
413 {
414 xl_replorigin_drop xlrec;
415
416 xlrec.node_id = roident;
418 XLogRegisterData(&xlrec, sizeof(xlrec));
419 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
420 }
421
422 /* then clear the in-memory slot */
423 state->roident = InvalidRepOriginId;
424 state->remote_lsn = InvalidXLogRecPtr;
425 state->local_lsn = InvalidXLogRecPtr;
426 break;
427 }
428 }
429 LWLockRelease(ReplicationOriginLock);
431}
432
433/*
434 * Drop replication origin (by name).
435 *
436 * Needs to be called in a transaction.
437 */
438void
439replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
440{
441 RepOriginId roident;
442 Relation rel;
443 HeapTuple tuple;
444
446
447 rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
448
449 roident = replorigin_by_name(name, missing_ok);
450
451 /* Lock the origin to prevent concurrent drops. */
452 LockSharedObject(ReplicationOriginRelationId, roident, 0,
454
455 tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
456 if (!HeapTupleIsValid(tuple))
457 {
458 if (!missing_ok)
459 elog(ERROR, "cache lookup failed for replication origin with ID %d",
460 roident);
461
462 /*
463 * We don't need to retain the locks if the origin is already dropped.
464 */
465 UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
468 return;
469 }
470
471 replorigin_state_clear(roident, nowait);
472
473 /*
474 * Now, we can delete the catalog entry.
475 */
476 CatalogTupleDelete(rel, &tuple->t_self);
477 ReleaseSysCache(tuple);
478
480
481 /* We keep the lock on pg_replication_origin until commit */
482 table_close(rel, NoLock);
483}
484
485/*
486 * Lookup replication origin via its oid and return the name.
487 *
488 * The external name is palloc'd in the calling context.
489 *
490 * Returns true if the origin is known, false otherwise.
491 */
492bool
493replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
494{
495 HeapTuple tuple;
497
498 Assert(OidIsValid((Oid) roident));
499 Assert(roident != InvalidRepOriginId);
500 Assert(roident != DoNotReplicateId);
501
502 tuple = SearchSysCache1(REPLORIGIDENT,
503 ObjectIdGetDatum((Oid) roident));
504
505 if (HeapTupleIsValid(tuple))
506 {
508 *roname = text_to_cstring(&ric->roname);
509 ReleaseSysCache(tuple);
510
511 return true;
512 }
513 else
514 {
515 *roname = NULL;
516
517 if (!missing_ok)
519 (errcode(ERRCODE_UNDEFINED_OBJECT),
520 errmsg("replication origin with ID %d does not exist",
521 roident)));
522
523 return false;
524 }
525}
526
527
528/* ---------------------------------------------------------------------------
529 * Functions for handling replication progress.
530 * ---------------------------------------------------------------------------
531 */
532
533Size
535{
536 Size size = 0;
537
539 return size;
540
541 size = add_size(size, offsetof(ReplicationStateCtl, states));
542
543 size = add_size(size,
545 return size;
546}
547
548void
550{
551 bool found;
552
554 return;
555
557 ShmemInitStruct("ReplicationOriginState",
559 &found);
561
562 if (!found)
563 {
564 int i;
565
567
568 replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
569
570 for (i = 0; i < max_active_replication_origins; i++)
571 {
575 }
576 }
577}
578
579/* ---------------------------------------------------------------------------
580 * Perform a checkpoint of each replication origin's progress with respect to
581 * the replayed remote_lsn. Make sure that all transactions we refer to in the
582 * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
583 * if the transactions were originally committed asynchronously.
584 *
585 * We store checkpoints in the following format:
586 * +-------+------------------------+------------------+-----+--------+
587 * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
588 * +-------+------------------------+------------------+-----+--------+
589 *
590 * So its just the magic, followed by the statically sized
591 * ReplicationStateOnDisk structs. Note that the maximum number of
592 * ReplicationState is determined by max_active_replication_origins.
593 * ---------------------------------------------------------------------------
594 */
595void
597{
598 const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
599 const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
600 int tmpfd;
601 int i;
604
606 return;
607
609
610 /* make sure no old temp file is remaining */
611 if (unlink(tmppath) < 0 && errno != ENOENT)
614 errmsg("could not remove file \"%s\": %m",
615 tmppath)));
616
617 /*
618 * no other backend can perform this at the same time; only one checkpoint
619 * can happen at a time.
620 */
621 tmpfd = OpenTransientFile(tmppath,
622 O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
623 if (tmpfd < 0)
626 errmsg("could not create file \"%s\": %m",
627 tmppath)));
628
629 /* write magic */
630 errno = 0;
631 if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
632 {
633 /* if write didn't set errno, assume problem is no disk space */
634 if (errno == 0)
635 errno = ENOSPC;
638 errmsg("could not write to file \"%s\": %m",
639 tmppath)));
640 }
641 COMP_CRC32C(crc, &magic, sizeof(magic));
642
643 /* prevent concurrent creations/drops */
644 LWLockAcquire(ReplicationOriginLock, LW_SHARED);
645
646 /* write actual data */
647 for (i = 0; i < max_active_replication_origins; i++)
648 {
649 ReplicationStateOnDisk disk_state;
651 XLogRecPtr local_lsn;
652
653 if (curstate->roident == InvalidRepOriginId)
654 continue;
655
656 /* zero, to avoid uninitialized padding bytes */
657 memset(&disk_state, 0, sizeof(disk_state));
658
659 LWLockAcquire(&curstate->lock, LW_SHARED);
660
661 disk_state.roident = curstate->roident;
662
663 disk_state.remote_lsn = curstate->remote_lsn;
664 local_lsn = curstate->local_lsn;
665
666 LWLockRelease(&curstate->lock);
667
668 /* make sure we only write out a commit that's persistent */
669 XLogFlush(local_lsn);
670
671 errno = 0;
672 if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
673 sizeof(disk_state))
674 {
675 /* if write didn't set errno, assume problem is no disk space */
676 if (errno == 0)
677 errno = ENOSPC;
680 errmsg("could not write to file \"%s\": %m",
681 tmppath)));
682 }
683
684 COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
685 }
686
687 LWLockRelease(ReplicationOriginLock);
688
689 /* write out the CRC */
691 errno = 0;
692 if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
693 {
694 /* if write didn't set errno, assume problem is no disk space */
695 if (errno == 0)
696 errno = ENOSPC;
699 errmsg("could not write to file \"%s\": %m",
700 tmppath)));
701 }
702
703 if (CloseTransientFile(tmpfd) != 0)
706 errmsg("could not close file \"%s\": %m",
707 tmppath)));
708
709 /* fsync, rename to permanent file, fsync file and directory */
710 durable_rename(tmppath, path, PANIC);
711}
712
713/*
714 * Recover replication replay status from checkpoint data saved earlier by
715 * CheckPointReplicationOrigin.
716 *
717 * This only needs to be called at startup and *not* during every checkpoint
718 * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
719 * state thereafter can be recovered by looking at commit records.
720 */
721void
723{
724 const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
725 int fd;
726 int readBytes;
728 int last_state = 0;
729 pg_crc32c file_crc;
731
732 /* don't want to overwrite already existing state */
733#ifdef USE_ASSERT_CHECKING
734 static bool already_started = false;
735
736 Assert(!already_started);
737 already_started = true;
738#endif
739
741 return;
742
744
745 elog(DEBUG2, "starting up replication origin progress state");
746
747 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
748
749 /*
750 * might have had max_active_replication_origins == 0 last run, or we just
751 * brought up a standby.
752 */
753 if (fd < 0 && errno == ENOENT)
754 return;
755 else if (fd < 0)
758 errmsg("could not open file \"%s\": %m",
759 path)));
760
761 /* verify magic, that is written even if nothing was active */
762 readBytes = read(fd, &magic, sizeof(magic));
763 if (readBytes != sizeof(magic))
764 {
765 if (readBytes < 0)
768 errmsg("could not read file \"%s\": %m",
769 path)));
770 else
773 errmsg("could not read file \"%s\": read %d of %zu",
774 path, readBytes, sizeof(magic))));
775 }
776 COMP_CRC32C(crc, &magic, sizeof(magic));
777
778 if (magic != REPLICATION_STATE_MAGIC)
780 (errmsg("replication checkpoint has wrong magic %u instead of %u",
781 magic, REPLICATION_STATE_MAGIC)));
782
783 /* we can skip locking here, no other access is possible */
784
785 /* recover individual states, until there are no more to be found */
786 while (true)
787 {
788 ReplicationStateOnDisk disk_state;
789
790 readBytes = read(fd, &disk_state, sizeof(disk_state));
791
792 if (readBytes < 0)
793 {
796 errmsg("could not read file \"%s\": %m",
797 path)));
798 }
799
800 /* no further data */
801 if (readBytes == sizeof(crc))
802 {
803 memcpy(&file_crc, &disk_state, sizeof(file_crc));
804 break;
805 }
806
807 if (readBytes != sizeof(disk_state))
808 {
811 errmsg("could not read file \"%s\": read %d of %zu",
812 path, readBytes, sizeof(disk_state))));
813 }
814
815 COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
816
817 if (last_state == max_active_replication_origins)
819 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
820 errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
821
822 /* copy data to shared memory */
823 replication_states[last_state].roident = disk_state.roident;
824 replication_states[last_state].remote_lsn = disk_state.remote_lsn;
825 last_state++;
826
827 ereport(LOG,
828 errmsg("recovered replication state of node %d to %X/%08X",
829 disk_state.roident,
830 LSN_FORMAT_ARGS(disk_state.remote_lsn)));
831 }
832
833 /* now check checksum */
835 if (file_crc != crc)
838 errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
839 crc, file_crc)));
840
841 if (CloseTransientFile(fd) != 0)
844 errmsg("could not close file \"%s\": %m",
845 path)));
846}
847
848void
850{
851 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
852
853 switch (info)
854 {
856 {
857 xl_replorigin_set *xlrec =
859
861 xlrec->remote_lsn, record->EndRecPtr,
862 xlrec->force /* backward */ ,
863 false /* WAL log */ );
864 break;
865 }
867 {
868 xl_replorigin_drop *xlrec;
869 int i;
870
871 xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
872
873 for (i = 0; i < max_active_replication_origins; i++)
874 {
876
877 /* found our slot */
878 if (state->roident == xlrec->node_id)
879 {
880 /* reset entry */
881 state->roident = InvalidRepOriginId;
882 state->remote_lsn = InvalidXLogRecPtr;
883 state->local_lsn = InvalidXLogRecPtr;
884 break;
885 }
886 }
887 break;
888 }
889 default:
890 elog(PANIC, "replorigin_redo: unknown op code %u", info);
891 }
892}
893
894
895/*
896 * Tell the replication origin progress machinery that a commit from 'node'
897 * that originated at the LSN remote_commit on the remote node was replayed
898 * successfully and that we don't need to do so again. In combination with
899 * setting up replorigin_session_origin_lsn and replorigin_session_origin
900 * that ensures we won't lose knowledge about that after a crash if the
901 * transaction had a persistent effect (think of asynchronous commits).
902 *
903 * local_commit needs to be a local LSN of the commit so that we can make sure
904 * upon a checkpoint that enough WAL has been persisted to disk.
905 *
906 * Needs to be called with a RowExclusiveLock on pg_replication_origin,
907 * unless running in recovery.
908 */
909void
911 XLogRecPtr remote_commit, XLogRecPtr local_commit,
912 bool go_backward, bool wal_log)
913{
914 int i;
915 ReplicationState *replication_state = NULL;
916 ReplicationState *free_state = NULL;
917
918 Assert(node != InvalidRepOriginId);
919
920 /* we don't track DoNotReplicateId */
921 if (node == DoNotReplicateId)
922 return;
923
924 /*
925 * XXX: For the case where this is called by WAL replay, it'd be more
926 * efficient to restore into a backend local hashtable and only dump into
927 * shmem after recovery is finished. Let's wait with implementing that
928 * till it's shown to be a measurable expense
929 */
930
931 /* Lock exclusively, as we may have to create a new table entry. */
932 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
933
934 /*
935 * Search for either an existing slot for the origin, or a free one we can
936 * use.
937 */
938 for (i = 0; i < max_active_replication_origins; i++)
939 {
941
942 /* remember where to insert if necessary */
943 if (curstate->roident == InvalidRepOriginId &&
944 free_state == NULL)
945 {
946 free_state = curstate;
947 continue;
948 }
949
950 /* not our slot */
951 if (curstate->roident != node)
952 {
953 continue;
954 }
955
956 /* ok, found slot */
957 replication_state = curstate;
958
959 LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
960
961 /* Make sure it's not used by somebody else */
962 if (replication_state->acquired_by != 0)
963 {
965 (errcode(ERRCODE_OBJECT_IN_USE),
966 errmsg("replication origin with ID %d is already active for PID %d",
967 replication_state->roident,
968 replication_state->acquired_by)));
969 }
970
971 break;
972 }
973
974 if (replication_state == NULL && free_state == NULL)
976 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
977 errmsg("could not find free replication state slot for replication origin with ID %d",
978 node),
979 errhint("Increase \"max_active_replication_origins\" and try again.")));
980
981 if (replication_state == NULL)
982 {
983 /* initialize new slot */
984 LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
985 replication_state = free_state;
986 Assert(!XLogRecPtrIsValid(replication_state->remote_lsn));
987 Assert(!XLogRecPtrIsValid(replication_state->local_lsn));
988 replication_state->roident = node;
989 }
990
991 Assert(replication_state->roident != InvalidRepOriginId);
992
993 /*
994 * If somebody "forcefully" sets this slot, WAL log it, so it's durable
995 * and the standby gets the message. Primarily this will be called during
996 * WAL replay (of commit records) where no WAL logging is necessary.
997 */
998 if (wal_log)
999 {
1000 xl_replorigin_set xlrec;
1001
1002 xlrec.remote_lsn = remote_commit;
1003 xlrec.node_id = node;
1004 xlrec.force = go_backward;
1005
1007 XLogRegisterData(&xlrec, sizeof(xlrec));
1008
1009 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
1010 }
1011
1012 /*
1013 * Due to - harmless - race conditions during a checkpoint we could see
1014 * values here that are older than the ones we already have in memory. We
1015 * could also see older values for prepared transactions when the prepare
1016 * is sent at a later point of time along with commit prepared and there
1017 * are other transactions commits between prepare and commit prepared. See
1018 * ReorderBufferFinishPrepared. Don't overwrite those.
1019 */
1020 if (go_backward || replication_state->remote_lsn < remote_commit)
1021 replication_state->remote_lsn = remote_commit;
1022 if (XLogRecPtrIsValid(local_commit) &&
1023 (go_backward || replication_state->local_lsn < local_commit))
1024 replication_state->local_lsn = local_commit;
1025 LWLockRelease(&replication_state->lock);
1026
1027 /*
1028 * Release *after* changing the LSNs, slot isn't acquired and thus could
1029 * otherwise be dropped anytime.
1030 */
1031 LWLockRelease(ReplicationOriginLock);
1032}
1033
1034
1037{
1038 int i;
1039 XLogRecPtr local_lsn = InvalidXLogRecPtr;
1040 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1041
1042 /* prevent slots from being concurrently dropped */
1043 LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1044
1045 for (i = 0; i < max_active_replication_origins; i++)
1046 {
1048
1050
1051 if (state->roident == node)
1052 {
1053 LWLockAcquire(&state->lock, LW_SHARED);
1054
1055 remote_lsn = state->remote_lsn;
1056 local_lsn = state->local_lsn;
1057
1058 LWLockRelease(&state->lock);
1059
1060 break;
1061 }
1062 }
1063
1064 LWLockRelease(ReplicationOriginLock);
1065
1066 if (flush && XLogRecPtrIsValid(local_lsn))
1067 XLogFlush(local_lsn);
1068
1069 return remote_lsn;
1070}
1071
1072/*
1073 * Tear down a (possibly) configured session replication origin during process
1074 * exit.
1075 */
1076static void
1078{
1079 ConditionVariable *cv = NULL;
1080
1081 if (session_replication_state == NULL)
1082 return;
1083
1084 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1085
1087 {
1089
1092 }
1093
1094 LWLockRelease(ReplicationOriginLock);
1095
1096 if (cv)
1098}
1099
1100/*
1101 * Setup a replication origin in the shared memory struct if it doesn't
1102 * already exist and cache access to the specific ReplicationSlot so the
1103 * array doesn't have to be searched when calling
1104 * replorigin_session_advance().
1105 *
1106 * Normally only one such cached origin can exist per process so the cached
1107 * value can only be set again after the previous value is torn down with
1108 * replorigin_session_reset(). For this normal case pass acquired_by = 0
1109 * (meaning the slot is not allowed to be already acquired by another process).
1110 *
1111 * However, sometimes multiple processes can safely re-use the same origin slot
1112 * (for example, multiple parallel apply processes can safely use the same
1113 * origin, provided they maintain commit order by allowing only one process to
1114 * commit at a time). For this case the first process must pass acquired_by =
1115 * 0, and then the other processes sharing that same origin can pass
1116 * acquired_by = PID of the first process.
1117 */
1118void
1120{
1121 static bool registered_cleanup;
1122 int i;
1123 int free_slot = -1;
1124
1125 if (!registered_cleanup)
1126 {
1128 registered_cleanup = true;
1129 }
1130
1132
1133 if (session_replication_state != NULL)
1134 ereport(ERROR,
1135 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1136 errmsg("cannot setup replication origin when one is already setup")));
1137
1138 /* Lock exclusively, as we may have to create a new table entry. */
1139 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1140
1141 /*
1142 * Search for either an existing slot for the origin, or a free one we can
1143 * use.
1144 */
1145 for (i = 0; i < max_active_replication_origins; i++)
1146 {
1148
1149 /* remember where to insert if necessary */
1150 if (curstate->roident == InvalidRepOriginId &&
1151 free_slot == -1)
1152 {
1153 free_slot = i;
1154 continue;
1155 }
1156
1157 /* not our slot */
1158 if (curstate->roident != node)
1159 continue;
1160
1161 else if (curstate->acquired_by != 0 && acquired_by == 0)
1162 {
1163 ereport(ERROR,
1164 (errcode(ERRCODE_OBJECT_IN_USE),
1165 errmsg("replication origin with ID %d is already active for PID %d",
1166 curstate->roident, curstate->acquired_by)));
1167 }
1168
1169 else if (curstate->acquired_by != acquired_by)
1170 {
1171 ereport(ERROR,
1172 (errcode(ERRCODE_OBJECT_IN_USE),
1173 errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
1174 node, acquired_by)));
1175 }
1176
1177 /* ok, found slot */
1178 session_replication_state = curstate;
1179 break;
1180 }
1181
1182
1183 if (session_replication_state == NULL && free_slot == -1)
1184 ereport(ERROR,
1185 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1186 errmsg("could not find free replication state slot for replication origin with ID %d",
1187 node),
1188 errhint("Increase \"max_active_replication_origins\" and try again.")));
1189 else if (session_replication_state == NULL)
1190 {
1191 if (acquired_by)
1192 ereport(ERROR,
1193 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1194 errmsg("cannot use PID %d for inactive replication origin with ID %d",
1195 acquired_by, node)));
1196
1197 /* initialize new slot */
1202 }
1203
1204
1206
1207 if (acquired_by == 0)
1209 else
1211
1212 LWLockRelease(ReplicationOriginLock);
1213
1214 /* probably this one is pointless */
1216}
1217
1218/*
1219 * Reset replay state previously setup in this session.
1220 *
1221 * This function may only be called if an origin was setup with
1222 * replorigin_session_setup().
1223 */
1224void
1226{
1228
1230
1231 if (session_replication_state == NULL)
1232 ereport(ERROR,
1233 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1234 errmsg("no replication origin is configured")));
1235
1236 LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1237
1241
1242 LWLockRelease(ReplicationOriginLock);
1243
1245}
1246
1247/*
1248 * Do the same work replorigin_advance() does, just on the session's
1249 * configured origin.
1250 *
1251 * This is noticeably cheaper than using replorigin_advance().
1252 */
1253void
1255{
1258
1260 if (session_replication_state->local_lsn < local_commit)
1261 session_replication_state->local_lsn = local_commit;
1262 if (session_replication_state->remote_lsn < remote_commit)
1263 session_replication_state->remote_lsn = remote_commit;
1265}
1266
1267/*
1268 * Ask the machinery about the point up to which we successfully replayed
1269 * changes from an already setup replication origin.
1270 */
1273{
1274 XLogRecPtr remote_lsn;
1275 XLogRecPtr local_lsn;
1276
1278
1283
1284 if (flush && XLogRecPtrIsValid(local_lsn))
1285 XLogFlush(local_lsn);
1286
1287 return remote_lsn;
1288}
1289
1290
1291
1292/* ---------------------------------------------------------------------------
1293 * SQL functions for working with replication origin.
1294 *
1295 * These mostly should be fairly short wrappers around more generic functions.
1296 * ---------------------------------------------------------------------------
1297 */
1298
1299/*
1300 * Create replication origin for the passed in name, and return the assigned
1301 * oid.
1302 */
1303Datum
1305{
1306 char *name;
1307 RepOriginId roident;
1308
1309 replorigin_check_prerequisites(false, false);
1310
1312
1313 /*
1314 * Replication origins "any and "none" are reserved for system options.
1315 * The origins "pg_xxx" are reserved for internal use.
1316 */
1318 ereport(ERROR,
1319 (errcode(ERRCODE_RESERVED_NAME),
1320 errmsg("replication origin name \"%s\" is reserved",
1321 name),
1322 errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1323 LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
1324
1325 /*
1326 * If built with appropriate switch, whine when regression-testing
1327 * conventions for replication origin names are violated.
1328 */
1329#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1330 if (strncmp(name, "regress_", 8) != 0)
1331 elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1332#endif
1333
1334 roident = replorigin_create(name);
1335
1336 pfree(name);
1337
1338 PG_RETURN_OID(roident);
1339}
1340
1341/*
1342 * Drop replication origin.
1343 */
1344Datum
1346{
1347 char *name;
1348
1349 replorigin_check_prerequisites(false, false);
1350
1352
1353 replorigin_drop_by_name(name, false, true);
1354
1355 pfree(name);
1356
1358}
1359
1360/*
1361 * Return oid of a replication origin.
1362 */
1363Datum
1365{
1366 char *name;
1367 RepOriginId roident;
1368
1369 replorigin_check_prerequisites(false, false);
1370
1372 roident = replorigin_by_name(name, true);
1373
1374 pfree(name);
1375
1376 if (OidIsValid(roident))
1377 PG_RETURN_OID(roident);
1379}
1380
1381/*
1382 * Setup a replication origin for this session.
1383 */
1384Datum
1386{
1387 char *name;
1388 RepOriginId origin;
1389 int pid;
1390
1391 replorigin_check_prerequisites(true, false);
1392
1394 origin = replorigin_by_name(name, false);
1395 pid = PG_GETARG_INT32(1);
1396 replorigin_session_setup(origin, pid);
1397
1399
1400 pfree(name);
1401
1403}
1404
1405/*
1406 * Reset previously setup origin in this session
1407 */
1408Datum
1410{
1411 replorigin_check_prerequisites(true, false);
1412
1414
1418
1420}
1421
1422/*
1423 * Has a replication origin been setup for this session.
1424 */
1425Datum
1427{
1428 replorigin_check_prerequisites(false, false);
1429
1431}
1432
1433
1434/*
1435 * Return the replication progress for origin setup in the current session.
1436 *
1437 * If 'flush' is set to true it is ensured that the returned value corresponds
1438 * to a local transaction that has been flushed. This is useful if asynchronous
1439 * commits are used when replaying replicated transactions.
1440 */
1441Datum
1443{
1444 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1445 bool flush = PG_GETARG_BOOL(0);
1446
1447 replorigin_check_prerequisites(true, false);
1448
1449 if (session_replication_state == NULL)
1450 ereport(ERROR,
1451 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1452 errmsg("no replication origin is configured")));
1453
1454 remote_lsn = replorigin_session_get_progress(flush);
1455
1456 if (!XLogRecPtrIsValid(remote_lsn))
1458
1459 PG_RETURN_LSN(remote_lsn);
1460}
1461
1462Datum
1464{
1465 XLogRecPtr location = PG_GETARG_LSN(0);
1466
1467 replorigin_check_prerequisites(true, false);
1468
1469 if (session_replication_state == NULL)
1470 ereport(ERROR,
1471 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1472 errmsg("no replication origin is configured")));
1473
1476
1478}
1479
1480Datum
1482{
1483 replorigin_check_prerequisites(true, false);
1484
1487
1489}
1490
1491
1492Datum
1494{
1496 XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1497 RepOriginId node;
1498
1499 replorigin_check_prerequisites(true, false);
1500
1501 /* lock to prevent the replication origin from vanishing */
1502 LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1503
1504 node = replorigin_by_name(text_to_cstring(name), false);
1505
1506 /*
1507 * Can't sensibly pass a local commit to be flushed at checkpoint - this
1508 * xact hasn't committed yet. This is why this function should be used to
1509 * set up the initial replication state, but not for replay.
1510 */
1511 replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1512 true /* go backward */ , true /* WAL log */ );
1513
1514 UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1515
1517}
1518
1519
1520/*
1521 * Return the replication progress for an individual replication origin.
1522 *
1523 * If 'flush' is set to true it is ensured that the returned value corresponds
1524 * to a local transaction that has been flushed. This is useful if asynchronous
1525 * commits are used when replaying replicated transactions.
1526 */
1527Datum
1529{
1530 char *name;
1531 bool flush;
1532 RepOriginId roident;
1533 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1534
1536
1538 flush = PG_GETARG_BOOL(1);
1539
1540 roident = replorigin_by_name(name, false);
1541 Assert(OidIsValid(roident));
1542
1543 remote_lsn = replorigin_get_progress(roident, flush);
1544
1545 if (!XLogRecPtrIsValid(remote_lsn))
1547
1548 PG_RETURN_LSN(remote_lsn);
1549}
1550
1551
1552Datum
1554{
1555 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1556 int i;
1558
1559 /* we want to return 0 rows if slot is set to zero */
1560 replorigin_check_prerequisites(false, true);
1561
1562 InitMaterializedSRF(fcinfo, 0);
1563
1564 /* prevent slots from being concurrently dropped */
1565 LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1566
1567 /*
1568 * Iterate through all possible replication_states, display if they are
1569 * filled. Note that we do not take any locks, so slightly corrupted/out
1570 * of date values are a possibility.
1571 */
1572 for (i = 0; i < max_active_replication_origins; i++)
1573 {
1577 char *roname;
1578
1580
1581 /* unused slot, nothing to display */
1582 if (state->roident == InvalidRepOriginId)
1583 continue;
1584
1585 memset(values, 0, sizeof(values));
1586 memset(nulls, 1, sizeof(nulls));
1587
1588 values[0] = ObjectIdGetDatum(state->roident);
1589 nulls[0] = false;
1590
1591 /*
1592 * We're not preventing the origin to be dropped concurrently, so
1593 * silently accept that it might be gone.
1594 */
1595 if (replorigin_by_oid(state->roident, true,
1596 &roname))
1597 {
1598 values[1] = CStringGetTextDatum(roname);
1599 nulls[1] = false;
1600 }
1601
1602 LWLockAcquire(&state->lock, LW_SHARED);
1603
1604 values[2] = LSNGetDatum(state->remote_lsn);
1605 nulls[2] = false;
1606
1607 values[3] = LSNGetDatum(state->local_lsn);
1608 nulls[3] = false;
1609
1610 LWLockRelease(&state->lock);
1611
1612 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1613 values, nulls);
1614 }
1615
1616 LWLockRelease(ReplicationOriginLock);
1617
1618#undef REPLICATION_ORIGIN_PROGRESS_COLS
1619
1620 return (Datum) 0;
1621}
static Datum values[MAXATTR]
Definition: bootstrap.c:153
#define CStringGetTextDatum(s)
Definition: builtins.h:97
uint8_t uint8
Definition: c.h:539
#define PG_BINARY
Definition: c.h:1261
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:475
uint32_t uint32
Definition: c.h:541
#define PG_UINT16_MAX
Definition: c.h:595
#define MemSet(start, val, len)
Definition: c.h:1022
#define OidIsValid(objectId)
Definition: c.h:777
size_t Size
Definition: c.h:613
bool IsReservedName(const char *name)
Definition: catalog.c:278
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
int64 TimestampTz
Definition: timestamp.h:39
int errcode_for_file_access(void)
Definition: elog.c:886
int errdetail(const char *fmt,...)
Definition: elog.c:1216
int errhint(const char *fmt,...)
Definition: elog.c:1330
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define LOG
Definition: elog.h:31
#define WARNING
Definition: elog.h:36
#define DEBUG2
Definition: elog.h:29
#define PANIC
Definition: elog.h:42
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define ereport(elevel,...)
Definition: elog.h:150
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:779
int CloseTransientFile(int fd)
Definition: fd.c:2868
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2691
#define PG_RETURN_VOID()
Definition: fmgr.h:349
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:309
#define PG_GETARG_DATUM(n)
Definition: fmgr.h:268
#define PG_RETURN_NULL()
Definition: fmgr.h:345
#define PG_GETARG_INT32(n)
Definition: fmgr.h:269
#define PG_GETARG_BOOL(n)
Definition: fmgr.h:274
#define PG_RETURN_OID(x)
Definition: fmgr.h:360
#define PG_FUNCTION_ARGS
Definition: fmgr.h:193
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:359
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:603
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:514
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:388
int MyProcPid
Definition: globals.c:47
Assert(PointerIsAligned(start, uint64))
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1117
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1435
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
Definition: htup_details.h:728
#define ident
Definition: indent_codes.h:47
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:233
void CatalogTupleDelete(Relation heapRel, const ItemPointerData *tid)
Definition: indexing.c:365
#define write(a, b, c)
Definition: win32.h:14
#define read(a, b, c)
Definition: win32.h:13
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
int i
Definition: isn.c:77
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1088
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:229
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:107
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1148
#define NoLock
Definition: lockdefs.h:34
#define AccessExclusiveLock
Definition: lockdefs.h:43
#define ExclusiveLock
Definition: lockdefs.h:42
#define RowExclusiveLock
Definition: lockdefs.h:38
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1174
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1894
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:698
@ LW_SHARED
Definition: lwlock.h:113
@ LW_EXCLUSIVE
Definition: lwlock.h:112
void pfree(void *pointer)
Definition: mcxt.c:1594
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:165
static ReplicationStateCtl * replication_states_ctl
Definition: origin.c:176
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:226
Size ReplicationOriginShmemSize(void)
Definition: origin.c:534
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:257
Datum pg_replication_origin_progress(PG_FUNCTION_ARGS)
Definition: origin.c:1528
void replorigin_session_reset(void)
Definition: origin.c:1225
struct ReplicationState ReplicationState
static bool IsReservedOriginName(const char *name)
Definition: origin.c:209
void replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
Definition: origin.c:1254
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:493
int max_active_replication_origins
Definition: origin.c:104
Datum pg_replication_origin_advance(PG_FUNCTION_ARGS)
Definition: origin.c:1493
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
Definition: origin.c:1036
#define PG_REPLORIGIN_CHECKPOINT_TMPFILE
Definition: origin.c:101
Datum pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
Definition: origin.c:1442
static ReplicationState * replication_states
Definition: origin.c:171
#define PG_REPLORIGIN_CHECKPOINT_FILENAME
Definition: origin.c:100
Datum pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
Definition: origin.c:1409
Datum pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
Definition: origin.c:1463
Datum pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
Definition: origin.c:1426
Datum pg_replication_origin_oid(PG_FUNCTION_ARGS)
Definition: origin.c:1364
Datum pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
Definition: origin.c:1385
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition: origin.c:1077
void StartupReplicationOrigin(void)
Definition: origin.c:722
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition: origin.c:439
RepOriginId replorigin_session_origin
Definition: origin.c:163
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:910
static void replorigin_state_clear(RepOriginId roident, bool nowait)
Definition: origin.c:369
void replorigin_session_setup(RepOriginId node, int acquired_by)
Definition: origin.c:1119
void CheckPointReplicationOrigin(void)
Definition: origin.c:596
static void replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
Definition: origin.c:190
static ReplicationState * session_replication_state
Definition: origin.c:184
Datum pg_replication_origin_drop(PG_FUNCTION_ARGS)
Definition: origin.c:1345
#define REPLICATION_ORIGIN_PROGRESS_COLS
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1272
void ReplicationOriginShmemInit(void)
Definition: origin.c:549
Datum pg_show_replication_origin_status(PG_FUNCTION_ARGS)
Definition: origin.c:1553
#define REPLICATION_STATE_MAGIC
Definition: origin.c:187
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:164
Datum pg_replication_origin_create(PG_FUNCTION_ARGS)
Definition: origin.c:1304
Datum pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
Definition: origin.c:1481
void replorigin_redo(XLogReaderState *record)
Definition: origin.c:849
struct ReplicationStateCtl ReplicationStateCtl
struct ReplicationStateOnDisk ReplicationStateOnDisk
#define DoNotReplicateId
Definition: origin.h:34
#define InvalidRepOriginId
Definition: origin.h:33
#define XLOG_REPLORIGIN_DROP
Definition: origin.h:31
#define MAX_RONAME_LEN
Definition: origin.h:41
#define XLOG_REPLORIGIN_SET
Definition: origin.h:30
void * arg
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:42
uint32 pg_crc32c
Definition: pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:153
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:158
return crc
#define PG_GETARG_LSN(n)
Definition: pg_lsn.h:36
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:31
#define PG_RETURN_LSN(x)
Definition: pg_lsn.h:37
FormData_pg_replication_origin * Form_pg_replication_origin
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:32
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:262
uint64_t Datum
Definition: postgres.h:70
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:322
#define InvalidOid
Definition: postgres_ext.h:37
unsigned int Oid
Definition: postgres_ext.h:32
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define RelationGetDescr(relation)
Definition: rel.h:541
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
Size add_size(Size s1, Size s2)
Definition: shmem.c:495
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:389
#define InitDirtySnapshot(snapshotdata)
Definition: snapmgr.h:42
#define BTEqualStrategyNumber
Definition: stratnum.h:31
ItemPointerData t_self
Definition: htup.h:65
Definition: lwlock.h:42
Form_pg_class rd_rel
Definition: rel.h:111
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition: origin.c:159
XLogRecPtr remote_lsn
Definition: origin.c:150
RepOriginId roident
Definition: origin.c:149
XLogRecPtr remote_lsn
Definition: origin.c:119
XLogRecPtr local_lsn
Definition: origin.c:126
ConditionVariable origin_cv
Definition: origin.c:136
RepOriginId roident
Definition: origin.c:114
LWLock lock
Definition: origin.c:141
TupleDesc setDesc
Definition: execnodes.h:364
Tuplestorestate * setResult
Definition: execnodes.h:363
XLogRecPtr EndRecPtr
Definition: xlogreader.h:206
Definition: regguts.h:323
Definition: c.h:695
RepOriginId node_id
Definition: origin.h:27
RepOriginId node_id
Definition: origin.h:21
XLogRecPtr remote_lsn
Definition: origin.h:20
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:264
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:220
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition: tuplestore.c:784
#define PG_GETARG_TIMESTAMPTZ(n)
Definition: timestamp.h:64
char * text_to_cstring(const text *t)
Definition: varlena.c:214
const char * name
bool IsTransactionState(void)
Definition: xact.c:388
void CommandCounterIncrement(void)
Definition: xact.c:1101
bool RecoveryInProgress(void)
Definition: xlog.c:6406
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2783
#define XLogRecPtrIsValid(r)
Definition: xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:47
uint16 RepOriginId
Definition: xlogdefs.h:69
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition: xloginsert.c:478
void XLogRegisterData(const void *data, uint32 len)
Definition: xloginsert.c:368
void XLogBeginInsert(void)
Definition: xloginsert.c:152
#define XLogRecGetInfo(decoder)
Definition: xlogreader.h:409
#define XLogRecGetData(decoder)
Definition: xlogreader.h:414