PostgreSQL Source Code git master
Loading...
Searching...
No Matches
origin.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * origin.c
4 * Logical replication progress tracking support.
5 *
6 * Copyright (c) 2013-2026, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/replication/logical/origin.c
10 *
11 * NOTES
12 *
13 * This file provides the following:
14 * * An infrastructure to name nodes in a replication setup
15 * * A facility to efficiently store and persist replication progress in an
16 * efficient and durable manner.
17 *
18 * Replication origin consists of a descriptive, user defined, external
19 * name and a short, thus space efficient, internal 2 byte one. This split
20 * exists because replication origin have to be stored in WAL and shared
21 * memory and long descriptors would be inefficient. For now only use 2 bytes
22 * for the internal id of a replication origin as it seems unlikely that there
23 * soon will be more than 65k nodes in one replication setup; and using only
24 * two bytes allow us to be more space efficient.
25 *
26 * Replication progress is tracked in a shared memory table
27 * (ReplicationState) that's dumped to disk every checkpoint. Entries
28 * ('slots') in this table are identified by the internal id. That's the case
29 * because it allows to increase replication progress during crash
30 * recovery. To allow doing so we store the original LSN (from the originating
31 * system) of a transaction in the commit record. That allows to recover the
32 * precise replayed state after crash recovery; without requiring synchronous
33 * commits. Allowing logical replication to use asynchronous commit is
34 * generally good for performance, but especially important as it allows a
35 * single threaded replay process to keep up with a source that has multiple
36 * backends generating changes concurrently. For efficiency and simplicity
37 * reasons a backend can setup one replication origin that's from then used as
38 * the source of changes produced by the backend, until reset again.
39 *
40 * This infrastructure is intended to be used in cooperation with logical
41 * decoding. When replaying from a remote system the configured origin is
42 * provided to output plugins, allowing prevention of replication loops and
43 * other filtering.
44 *
45 * There are several levels of locking at work:
46 *
47 * * To create and drop replication origins an exclusive lock on
48 * pg_replication_slot is required for the duration. That allows us to
49 * safely and conflict free assign new origins using a dirty snapshot.
50 *
51 * * When creating an in-memory replication progress slot the ReplicationOrigin
52 * LWLock has to be held exclusively; when iterating over the replication
53 * progress a shared lock has to be held, the same when advancing the
54 * replication progress of an individual backend that has not setup as the
55 * session's replication origin.
56 *
57 * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 * replication progress slot that slot's lwlock has to be held. That's
59 * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 * all our platforms, but it also simplifies memory ordering concerns
61 * between the remote and local lsn. We use a lwlock instead of a spinlock
62 * so it's less harmful to hold the lock over a WAL write
63 * (cf. AdvanceReplicationProgress).
64 *
65 * ---------------------------------------------------------------------------
66 */
67
68#include "postgres.h"
69
70#include <unistd.h>
71#include <sys/stat.h>
72
73#include "access/genam.h"
74#include "access/htup_details.h"
75#include "access/table.h"
76#include "access/xact.h"
77#include "access/xloginsert.h"
78#include "catalog/catalog.h"
79#include "catalog/indexing.h"
81#include "funcapi.h"
82#include "miscadmin.h"
83#include "nodes/execnodes.h"
84#include "pgstat.h"
85#include "replication/origin.h"
86#include "replication/slot.h"
88#include "storage/fd.h"
89#include "storage/ipc.h"
90#include "storage/lmgr.h"
91#include "storage/subsystems.h"
92#include "utils/builtins.h"
93#include "utils/fmgroids.h"
94#include "utils/guc.h"
95#include "utils/pg_lsn.h"
96#include "utils/rel.h"
97#include "utils/snapmgr.h"
98#include "utils/syscache.h"
99#include "utils/wait_event.h"
100
101/* paths for replication origin checkpoint files */
102#define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
103#define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
104
105/* GUC variables */
107
108/*
109 * Replay progress of a single remote node.
110 */
111typedef struct ReplicationState
112{
113 /*
114 * Local identifier for the remote node.
115 */
117
118 /*
119 * Location of the latest commit from the remote side.
120 */
122
123 /*
124 * Remember the local lsn of the commit record so we can XLogFlush() to it
125 * during a checkpoint so we know the commit record actually is safe on
126 * disk.
127 */
129
130 /*
131 * PID of backend that's acquired slot, or 0 if none.
132 */
134
135 /* Count of processes that are currently using this origin. */
137
138 /*
139 * Condition variable that's signaled when acquired_by changes.
140 */
142
143 /*
144 * Lock protecting remote_lsn and local_lsn.
145 */
148
149/*
150 * On disk version of ReplicationState.
151 */
157
158
160{
161 /* Tranche to use for per-origin LWLocks */
163 /* Array of length max_active_replication_origins */
166
167/* Global variable for per-transaction replication origin state */
169 .origin = InvalidReplOriginId, /* assumed identity */
170 .origin_lsn = InvalidXLogRecPtr,
171 .origin_timestamp = 0
172};
173
174/*
175 * Base address into a shared memory array of replication states of size
176 * max_active_replication_origins.
177 */
179
180static void ReplicationOriginShmemRequest(void *arg);
181static void ReplicationOriginShmemInit(void *arg);
182static void ReplicationOriginShmemAttach(void *arg);
183
189
190/*
191 * Actual shared memory block (replication_states[] is now part of this).
192 */
194
195/*
196 * We keep a pointer to this backend's ReplicationState to avoid having to
197 * search the replication_states array in replorigin_session_advance for each
198 * remote commit. (Ownership of a backend's own entry can only be changed by
199 * that backend.)
200 */
202
203/* Magic for on disk files. */
204#define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
205
206static void
208{
212 errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
213
217 errmsg("cannot manipulate replication origins during recovery")));
218}
219
220
221/*
222 * IsReservedOriginName
223 * True iff name is either "none" or "any".
224 */
225static bool
227{
228 return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
230}
231
232/* ---------------------------------------------------------------------------
233 * Functions for working with replication origins themselves.
234 * ---------------------------------------------------------------------------
235 */
236
237/*
238 * Check for a persistent replication origin identified by name.
239 *
240 * Returns InvalidOid if the node isn't known yet and missing_ok is true.
241 */
243replorigin_by_name(const char *roname, bool missing_ok)
244{
246 Oid roident = InvalidOid;
247 HeapTuple tuple;
249
251
253 if (HeapTupleIsValid(tuple))
254 {
256 roident = ident->roident;
257 ReleaseSysCache(tuple);
258 }
259 else if (!missing_ok)
262 errmsg("replication origin \"%s\" does not exist",
263 roname)));
264
265 return roident;
266}
267
268/*
269 * Create a replication origin.
270 *
271 * Needs to be called in a transaction.
272 */
275{
276 Oid roident;
277 HeapTuple tuple = NULL;
278 Relation rel;
281 SysScanDesc scan;
282 ScanKeyData key;
283
284 /*
285 * To avoid needing a TOAST table for pg_replication_origin, we limit
286 * replication origin names to 512 bytes. This should be more than enough
287 * for all practical use.
288 */
292 errmsg("replication origin name is too long"),
293 errdetail("Replication origin names must be no longer than %d bytes.",
295
297
299
300 /*
301 * We need the numeric replication origin to be 16bit wide, so we cannot
302 * rely on the normal oid allocation. Instead we simply scan
303 * pg_replication_origin for the first unused id. That's not particularly
304 * efficient, but this should be a fairly infrequent operation - we can
305 * easily spend a bit more code on this when it turns out it needs to be
306 * faster.
307 *
308 * We handle concurrency by taking an exclusive lock (allowing reads!)
309 * over the table for the duration of the search. Because we use a "dirty
310 * snapshot" we can read rows that other in-progress sessions have
311 * written, even though they would be invisible with normal snapshots. Due
312 * to the exclusive lock there's no danger that new rows can appear while
313 * we're checking.
314 */
316
318
319 /*
320 * We want to be able to access pg_replication_origin without setting up a
321 * snapshot. To make that safe, it needs to not have a TOAST table, since
322 * TOASTed data cannot be fetched without a snapshot. As of this writing,
323 * its only varlena column is roname, which we limit to 512 bytes to avoid
324 * needing out-of-line storage. If you add a TOAST table to this catalog,
325 * be sure to set up a snapshot everywhere it might be needed. For more
326 * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
327 */
328 Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
329
330 for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
331 {
332 bool nulls[Natts_pg_replication_origin];
334 bool collides;
335
337
338 ScanKeyInit(&key,
341 ObjectIdGetDatum(roident));
342
344 true /* indexOK */ ,
346 1, &key);
347
349
350 systable_endscan(scan);
351
352 if (!collides)
353 {
354 /*
355 * Ok, found an unused roident, insert the new row and do a CCI,
356 * so our callers can look it up if they want to.
357 */
358 memset(&nulls, 0, sizeof(nulls));
359
362
363 tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
364 CatalogTupleInsert(rel, tuple);
366 break;
367 }
368 }
369
370 /* now release lock again, */
372
373 if (tuple == NULL)
376 errmsg("could not find free replication origin ID")));
377
378 heap_freetuple(tuple);
379 return roident;
380}
381
382/*
383 * Helper function to drop a replication origin.
384 */
385static void
387{
388 int i;
389
390 /*
391 * Clean up the slot state info, if there is any matching slot.
392 */
393restart:
395
396 for (i = 0; i < max_active_replication_origins; i++)
397 {
399
400 if (state->roident == roident)
401 {
402 /* found our slot, is it busy? */
403 if (state->refcount > 0)
404 {
406
407 if (nowait)
410 (state->acquired_by != 0)
411 ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
412 state->roident,
413 state->acquired_by)
414 : errmsg("could not drop replication origin with ID %d, in use by another process",
415 state->roident)));
416
417 /*
418 * We must wait and then retry. Since we don't know which CV
419 * to wait on until here, we can't readily use
420 * ConditionVariablePrepareToSleep (calling it here would be
421 * wrong, since we could miss the signal if we did so); just
422 * use ConditionVariableSleep directly.
423 */
424 cv = &state->origin_cv;
425
427
429 goto restart;
430 }
431
432 /* first make a WAL log entry */
433 {
435
436 xlrec.node_id = roident;
438 XLogRegisterData(&xlrec, sizeof(xlrec));
440 }
441
442 /* then clear the in-memory slot */
443 state->roident = InvalidReplOriginId;
444 state->remote_lsn = InvalidXLogRecPtr;
445 state->local_lsn = InvalidXLogRecPtr;
446 break;
447 }
448 }
451}
452
453/*
454 * Drop replication origin (by name).
455 *
456 * Needs to be called in a transaction.
457 */
458void
459replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
460{
461 ReplOriginId roident;
462 Relation rel;
463 HeapTuple tuple;
464
466
468
469 roident = replorigin_by_name(name, missing_ok);
470
471 /* Lock the origin to prevent concurrent drops. */
474
476 if (!HeapTupleIsValid(tuple))
477 {
478 if (!missing_ok)
479 elog(ERROR, "cache lookup failed for replication origin with ID %d",
480 roident);
481
482 /*
483 * We don't need to retain the locks if the origin is already dropped.
484 */
488 return;
489 }
490
491 replorigin_state_clear(roident, nowait);
492
493 /*
494 * Now, we can delete the catalog entry.
495 */
496 CatalogTupleDelete(rel, &tuple->t_self);
497 ReleaseSysCache(tuple);
498
500
501 /* We keep the lock on pg_replication_origin until commit */
502 table_close(rel, NoLock);
503}
504
505/*
506 * Lookup replication origin via its oid and return the name.
507 *
508 * The external name is palloc'd in the calling context.
509 *
510 * Returns true if the origin is known, false otherwise.
511 */
512bool
513replorigin_by_oid(ReplOriginId roident, bool missing_ok, char **roname)
514{
515 HeapTuple tuple;
517
518 Assert(OidIsValid((Oid) roident));
519 Assert(roident != InvalidReplOriginId);
520 Assert(roident != DoNotReplicateId);
521
523 ObjectIdGetDatum((Oid) roident));
524
525 if (HeapTupleIsValid(tuple))
526 {
528 *roname = text_to_cstring(&ric->roname);
529 ReleaseSysCache(tuple);
530
531 return true;
532 }
533 else
534 {
535 *roname = NULL;
536
537 if (!missing_ok)
540 errmsg("replication origin with ID %d does not exist",
541 roident)));
542
543 return false;
544 }
545}
546
547
548/* ---------------------------------------------------------------------------
549 * Functions for handling replication progress.
550 * ---------------------------------------------------------------------------
551 */
552
553static void
555{
556 Size size = 0;
557
559 return;
560
561 size = add_size(size, offsetof(ReplicationStateCtl, states));
562 size = add_size(size,
564 ShmemRequestStruct(.name = "ReplicationOriginState",
565 .size = size,
566 .ptr = (void **) &replication_states_ctl,
567 );
568}
569
570static void
587
588static void
596
597/* ---------------------------------------------------------------------------
598 * Perform a checkpoint of each replication origin's progress with respect to
599 * the replayed remote_lsn. Make sure that all transactions we refer to in the
600 * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
601 * if the transactions were originally committed asynchronously.
602 *
603 * We store checkpoints in the following format:
604 * +-------+------------------------+------------------+-----+--------+
605 * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
606 * +-------+------------------------+------------------+-----+--------+
607 *
608 * So its just the magic, followed by the statically sized
609 * ReplicationStateOnDisk structs. Note that the maximum number of
610 * ReplicationState is determined by max_active_replication_origins.
611 * ---------------------------------------------------------------------------
612 */
613void
615{
617 const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
618 int tmpfd;
619 int i;
622
624 return;
625
627
628 /* make sure no old temp file is remaining */
629 if (unlink(tmppath) < 0 && errno != ENOENT)
632 errmsg("could not remove file \"%s\": %m",
633 tmppath)));
634
635 /*
636 * no other backend can perform this at the same time; only one checkpoint
637 * can happen at a time.
638 */
641 if (tmpfd < 0)
644 errmsg("could not create file \"%s\": %m",
645 tmppath)));
646
647 /* write magic */
648 errno = 0;
649 if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
650 {
651 /* if write didn't set errno, assume problem is no disk space */
652 if (errno == 0)
653 errno = ENOSPC;
656 errmsg("could not write to file \"%s\": %m",
657 tmppath)));
658 }
659 COMP_CRC32C(crc, &magic, sizeof(magic));
660
661 /* prevent concurrent creations/drops */
663
664 /* write actual data */
665 for (i = 0; i < max_active_replication_origins; i++)
666 {
669 XLogRecPtr local_lsn;
670
671 if (curstate->roident == InvalidReplOriginId)
672 continue;
673
674 /* zero, to avoid uninitialized padding bytes */
675 memset(&disk_state, 0, sizeof(disk_state));
676
678
679 disk_state.roident = curstate->roident;
680
681 disk_state.remote_lsn = curstate->remote_lsn;
682 local_lsn = curstate->local_lsn;
683
684 LWLockRelease(&curstate->lock);
685
686 /* make sure we only write out a commit that's persistent */
687 XLogFlush(local_lsn);
688
689 errno = 0;
690 if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
691 sizeof(disk_state))
692 {
693 /* if write didn't set errno, assume problem is no disk space */
694 if (errno == 0)
695 errno = ENOSPC;
698 errmsg("could not write to file \"%s\": %m",
699 tmppath)));
700 }
701
703 }
704
706
707 /* write out the CRC */
709 errno = 0;
710 if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
711 {
712 /* if write didn't set errno, assume problem is no disk space */
713 if (errno == 0)
714 errno = ENOSPC;
717 errmsg("could not write to file \"%s\": %m",
718 tmppath)));
719 }
720
721 if (CloseTransientFile(tmpfd) != 0)
724 errmsg("could not close file \"%s\": %m",
725 tmppath)));
726
727 /* fsync, rename to permanent file, fsync file and directory */
729}
730
731/*
732 * Recover replication replay status from checkpoint data saved earlier by
733 * CheckPointReplicationOrigin.
734 *
735 * This only needs to be called at startup and *not* during every checkpoint
736 * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
737 * state thereafter can be recovered by looking at commit records.
738 */
739void
741{
742 const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
743 int fd;
744 int readBytes;
746 int last_state = 0;
749
750 /* don't want to overwrite already existing state */
751#ifdef USE_ASSERT_CHECKING
752 static bool already_started = false;
753
755 already_started = true;
756#endif
757
759 return;
760
762
763 elog(DEBUG2, "starting up replication origin progress state");
764
766
767 /*
768 * might have had max_active_replication_origins == 0 last run, or we just
769 * brought up a standby.
770 */
771 if (fd < 0 && errno == ENOENT)
772 return;
773 else if (fd < 0)
776 errmsg("could not open file \"%s\": %m",
777 path)));
778
779 /* verify magic, that is written even if nothing was active */
780 readBytes = read(fd, &magic, sizeof(magic));
781 if (readBytes != sizeof(magic))
782 {
783 if (readBytes < 0)
786 errmsg("could not read file \"%s\": %m",
787 path)));
788 else
791 errmsg("could not read file \"%s\": read %d of %zu",
792 path, readBytes, sizeof(magic))));
793 }
794 COMP_CRC32C(crc, &magic, sizeof(magic));
795
796 if (magic != REPLICATION_STATE_MAGIC)
798 (errmsg("replication checkpoint has wrong magic %u instead of %u",
799 magic, REPLICATION_STATE_MAGIC)));
800
801 /* we can skip locking here, no other access is possible */
802
803 /* recover individual states, until there are no more to be found */
804 while (true)
805 {
807
808 readBytes = read(fd, &disk_state, sizeof(disk_state));
809
810 if (readBytes < 0)
811 {
814 errmsg("could not read file \"%s\": %m",
815 path)));
816 }
817
818 /* no further data */
819 if (readBytes == sizeof(crc))
820 {
821 memcpy(&file_crc, &disk_state, sizeof(file_crc));
822 break;
823 }
824
825 if (readBytes != sizeof(disk_state))
826 {
829 errmsg("could not read file \"%s\": read %d of %zu",
830 path, readBytes, sizeof(disk_state))));
831 }
832
834
838 errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
839
840 /* copy data to shared memory */
843 last_state++;
844
845 ereport(LOG,
846 errmsg("recovered replication state of node %d to %X/%08X",
847 disk_state.roident,
848 LSN_FORMAT_ARGS(disk_state.remote_lsn)));
849 }
850
851 /* now check checksum */
853 if (file_crc != crc)
856 errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
857 crc, file_crc)));
858
859 if (CloseTransientFile(fd) != 0)
862 errmsg("could not close file \"%s\": %m",
863 path)));
864}
865
866void
868{
869 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
870
871 switch (info)
872 {
874 {
877
878 replorigin_advance(xlrec->node_id,
879 xlrec->remote_lsn, record->EndRecPtr,
880 xlrec->force /* backward */ ,
881 false /* WAL log */ );
882 break;
883 }
885 {
887 int i;
888
890
891 for (i = 0; i < max_active_replication_origins; i++)
892 {
894
895 /* found our slot */
896 if (state->roident == xlrec->node_id)
897 {
898 /* reset entry */
899 state->roident = InvalidReplOriginId;
900 state->remote_lsn = InvalidXLogRecPtr;
901 state->local_lsn = InvalidXLogRecPtr;
902 break;
903 }
904 }
905 break;
906 }
907 default:
908 elog(PANIC, "replorigin_redo: unknown op code %u", info);
909 }
910}
911
912
913/*
914 * Tell the replication origin progress machinery that a commit from 'node'
915 * that originated at the LSN remote_commit on the remote node was replayed
916 * successfully and that we don't need to do so again. In combination with
917 * setting up replorigin_xact_state {.origin_lsn, .origin_timestamp}
918 * that ensures we won't lose knowledge about that after a crash if the
919 * transaction had a persistent effect (think of asynchronous commits).
920 *
921 * local_commit needs to be a local LSN of the commit so that we can make sure
922 * upon a checkpoint that enough WAL has been persisted to disk.
923 *
924 * Needs to be called with a RowExclusiveLock on pg_replication_origin,
925 * unless running in recovery.
926 */
927void
930 bool go_backward, bool wal_log)
931{
932 int i;
935
937
938 /* we don't track DoNotReplicateId */
939 if (node == DoNotReplicateId)
940 return;
941
942 /*
943 * XXX: For the case where this is called by WAL replay, it'd be more
944 * efficient to restore into a backend local hashtable and only dump into
945 * shmem after recovery is finished. Let's wait with implementing that
946 * till it's shown to be a measurable expense
947 */
948
949 /* Lock exclusively, as we may have to create a new table entry. */
951
952 /*
953 * Search for either an existing slot for the origin, or a free one we can
954 * use.
955 */
956 for (i = 0; i < max_active_replication_origins; i++)
957 {
959
960 /* remember where to insert if necessary */
961 if (curstate->roident == InvalidReplOriginId &&
962 free_state == NULL)
963 {
965 continue;
966 }
967
968 /* not our slot */
969 if (curstate->roident != node)
970 {
971 continue;
972 }
973
974 /* ok, found slot */
976
978
979 /* Make sure it's not used by somebody else */
980 if (replication_state->refcount > 0)
981 {
984 (replication_state->acquired_by != 0)
985 ? errmsg("replication origin with ID %d is already active for PID %d",
986 replication_state->roident,
987 replication_state->acquired_by)
988 : errmsg("replication origin with ID %d is already active in another process",
989 replication_state->roident)));
990 }
991
992 break;
993 }
994
998 errmsg("could not find free replication state slot for replication origin with ID %d",
999 node),
1000 errhint("Increase \"max_active_replication_origins\" and try again.")));
1001
1002 if (replication_state == NULL)
1003 {
1004 /* initialize new slot */
1009 replication_state->roident = node;
1010 }
1011
1013
1014 /*
1015 * If somebody "forcefully" sets this slot, WAL log it, so it's durable
1016 * and the standby gets the message. Primarily this will be called during
1017 * WAL replay (of commit records) where no WAL logging is necessary.
1018 */
1019 if (wal_log)
1020 {
1022
1024 xlrec.node_id = node;
1025 xlrec.force = go_backward;
1026
1028 XLogRegisterData(&xlrec, sizeof(xlrec));
1029
1031 }
1032
1033 /*
1034 * Due to - harmless - race conditions during a checkpoint we could see
1035 * values here that are older than the ones we already have in memory. We
1036 * could also see older values for prepared transactions when the prepare
1037 * is sent at a later point of time along with commit prepared and there
1038 * are other transactions commits between prepare and commit prepared. See
1039 * ReorderBufferFinishPrepared. Don't overwrite those.
1040 */
1041 if (go_backward || replication_state->remote_lsn < remote_commit)
1042 replication_state->remote_lsn = remote_commit;
1044 (go_backward || replication_state->local_lsn < local_commit))
1045 replication_state->local_lsn = local_commit;
1047
1048 /*
1049 * Release *after* changing the LSNs, slot isn't acquired and thus could
1050 * otherwise be dropped anytime.
1051 */
1053}
1054
1055
1058{
1059 int i;
1060 XLogRecPtr local_lsn = InvalidXLogRecPtr;
1061 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1062
1063 /* prevent slots from being concurrently dropped */
1065
1066 for (i = 0; i < max_active_replication_origins; i++)
1067 {
1069
1071
1072 if (state->roident == node)
1073 {
1074 LWLockAcquire(&state->lock, LW_SHARED);
1075
1076 remote_lsn = state->remote_lsn;
1077 local_lsn = state->local_lsn;
1078
1079 LWLockRelease(&state->lock);
1080
1081 break;
1082 }
1083 }
1084
1086
1087 if (flush && XLogRecPtrIsValid(local_lsn))
1088 XLogFlush(local_lsn);
1089
1090 return remote_lsn;
1091}
1092
1093/* Helper function to reset the session replication origin */
1094static void
1096{
1098
1100
1102
1103 /* The origin must be held by at least one process at this point. */
1105
1106 /*
1107 * Reset the PID only if the current session is the first to set up this
1108 * origin. This avoids clearing the first process's PID when any other
1109 * session releases the origin.
1110 */
1113
1115
1118
1120
1122}
1123
1124/*
1125 * Tear down a (possibly) configured session replication origin during process
1126 * exit.
1127 */
1128static void
1136
1137/*
1138 * Setup a replication origin in the shared memory struct if it doesn't
1139 * already exist and cache access to the specific ReplicationSlot so the
1140 * array doesn't have to be searched when calling
1141 * replorigin_session_advance().
1142 *
1143 * Normally only one such cached origin can exist per process so the cached
1144 * value can only be set again after the previous value is torn down with
1145 * replorigin_session_reset(). For this normal case pass acquired_by = 0
1146 * (meaning the slot is not allowed to be already acquired by another process).
1147 *
1148 * However, sometimes multiple processes can safely re-use the same origin slot
1149 * (for example, multiple parallel apply processes can safely use the same
1150 * origin, provided they maintain commit order by allowing only one process to
1151 * commit at a time). For this case the first process must pass acquired_by =
1152 * 0, and then the other processes sharing that same origin can pass
1153 * acquired_by = PID of the first process.
1154 */
1155void
1157{
1158 static bool registered_cleanup;
1159 int i;
1160 int free_slot = -1;
1161
1162 if (!registered_cleanup)
1163 {
1165 registered_cleanup = true;
1166 }
1167
1169
1171 ereport(ERROR,
1173 errmsg("cannot setup replication origin when one is already setup")));
1174
1175 /* Lock exclusively, as we may have to create a new table entry. */
1177
1178 /*
1179 * Search for either an existing slot for the origin, or a free one we can
1180 * use.
1181 */
1182 for (i = 0; i < max_active_replication_origins; i++)
1183 {
1185
1186 /* remember where to insert if necessary */
1187 if (curstate->roident == InvalidReplOriginId &&
1188 free_slot == -1)
1189 {
1190 free_slot = i;
1191 continue;
1192 }
1193
1194 /* not our slot */
1195 if (curstate->roident != node)
1196 continue;
1197
1198 if (acquired_by == 0)
1199 {
1200 /* With acquired_by == 0, we need the origin to be free */
1201 if (curstate->acquired_by != 0)
1202 {
1203 ereport(ERROR,
1205 errmsg("replication origin with ID %d is already active for PID %d",
1206 curstate->roident, curstate->acquired_by)));
1207 }
1208 else if (curstate->refcount > 0)
1209 {
1210 /*
1211 * The origin is in use, but PID is not recorded. This can
1212 * happen if the process that originally acquired the origin
1213 * exited without releasing it. To ensure correctness, other
1214 * processes cannot acquire the origin until all processes
1215 * currently using it have released it.
1216 */
1217 ereport(ERROR,
1219 errmsg("replication origin with ID %d is already active in another process",
1220 curstate->roident)));
1221 }
1222 }
1223 else
1224 {
1225 /*
1226 * With acquired_by != 0, we need the origin to be active by the
1227 * given PID
1228 */
1229 if (curstate->acquired_by != acquired_by)
1230 ereport(ERROR,
1232 errmsg("replication origin with ID %d is not active for PID %d",
1233 curstate->roident, acquired_by)));
1234
1235 /*
1236 * Here, it is okay to have refcount > 0 as more than one process
1237 * can safely re-use the origin.
1238 */
1239 }
1240
1241 /* ok, found slot */
1243 break;
1244 }
1245
1247 {
1248 if (acquired_by != 0)
1249 ereport(ERROR,
1251 errmsg("cannot use PID %d for inactive replication origin with ID %d",
1252 acquired_by, node)));
1253
1254 /* initialize new slot */
1255 if (free_slot == -1)
1256 ereport(ERROR,
1258 errmsg("could not find free replication state slot for replication origin with ID %d",
1259 node),
1260 errhint("Increase \"max_active_replication_origins\" and try again.")));
1261
1266 }
1267
1268
1270
1271 if (acquired_by == 0)
1272 {
1275 }
1276 else
1277 {
1278 /*
1279 * Sanity check: the origin must already be acquired by the process
1280 * passed as input, and at least one process must be using it.
1281 */
1284 }
1285
1287
1289
1290 /* probably this one is pointless */
1292}
1293
1294/*
1295 * Reset replay state previously setup in this session.
1296 *
1297 * This function may only be called if an origin was setup with
1298 * replorigin_session_setup().
1299 */
1300void
1302{
1304
1306 ereport(ERROR,
1308 errmsg("no replication origin is configured")));
1309
1310 /*
1311 * Restrict explicit resetting of the replication origin if it was first
1312 * acquired by this process and others are still using it. While the
1313 * system handles this safely (as happens if the first session exits
1314 * without calling reset), it is best to avoid doing so.
1315 */
1318 ereport(ERROR,
1320 errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
1322 errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
1323 errhint("Reset the replication origin in all other processes before retrying.")));
1324
1326}
1327
1328/*
1329 * Do the same work replorigin_advance() does, just on the session's
1330 * configured origin.
1331 *
1332 * This is noticeably cheaper than using replorigin_advance().
1333 */
1334void
1347
1348/*
1349 * Ask the machinery about the point up to which we successfully replayed
1350 * changes from an already setup replication origin.
1351 */
1354{
1355 XLogRecPtr remote_lsn;
1356 XLogRecPtr local_lsn;
1357
1359
1364
1365 if (flush && XLogRecPtrIsValid(local_lsn))
1366 XLogFlush(local_lsn);
1367
1368 return remote_lsn;
1369}
1370
1371/*
1372 * Clear the per-transaction replication origin state.
1373 *
1374 * replorigin_session_origin is also cleared if clear_origin is set.
1375 */
1376void
1384
1385
1386/* ---------------------------------------------------------------------------
1387 * SQL functions for working with replication origin.
1388 *
1389 * These mostly should be fairly short wrappers around more generic functions.
1390 * ---------------------------------------------------------------------------
1391 */
1392
1393/*
1394 * Create replication origin for the passed in name, and return the assigned
1395 * oid.
1396 */
1397Datum
1399{
1400 char *name;
1401 ReplOriginId roident;
1402
1403 replorigin_check_prerequisites(false, false);
1404
1406
1407 /*
1408 * Replication origins "any and "none" are reserved for system options.
1409 * The origins "pg_xxx" are reserved for internal use.
1410 */
1412 ereport(ERROR,
1414 errmsg("replication origin name \"%s\" is reserved",
1415 name),
1416 errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1418
1419 /*
1420 * If built with appropriate switch, whine when regression-testing
1421 * conventions for replication origin names are violated.
1422 */
1423#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1424 if (strncmp(name, "regress_", 8) != 0)
1425 elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1426#endif
1427
1428 roident = replorigin_create(name);
1429
1430 pfree(name);
1431
1432 PG_RETURN_OID(roident);
1433}
1434
1435/*
1436 * Drop replication origin.
1437 */
1438Datum
1440{
1441 char *name;
1442
1443 replorigin_check_prerequisites(false, false);
1444
1446
1447 replorigin_drop_by_name(name, false, true);
1448
1449 pfree(name);
1450
1452}
1453
1454/*
1455 * Return oid of a replication origin.
1456 */
1457Datum
1459{
1460 char *name;
1461 ReplOriginId roident;
1462
1463 replorigin_check_prerequisites(false, false);
1464
1466 roident = replorigin_by_name(name, true);
1467
1468 pfree(name);
1469
1470 if (OidIsValid(roident))
1471 PG_RETURN_OID(roident);
1473}
1474
1475/*
1476 * Setup a replication origin for this session.
1477 */
1478Datum
1480{
1481 char *name;
1482 ReplOriginId origin;
1483 int pid;
1484
1485 replorigin_check_prerequisites(true, false);
1486
1488 origin = replorigin_by_name(name, false);
1489 pid = PG_GETARG_INT32(1);
1490 replorigin_session_setup(origin, pid);
1491
1493
1494 pfree(name);
1495
1497}
1498
1499/*
1500 * Reset previously setup origin in this session
1501 */
1502Datum
1513
1514/*
1515 * Has a replication origin been setup for this session.
1516 */
1517Datum
1524
1525
1526/*
1527 * Return the replication progress for origin setup in the current session.
1528 *
1529 * If 'flush' is set to true it is ensured that the returned value corresponds
1530 * to a local transaction that has been flushed. This is useful if asynchronous
1531 * commits are used when replaying replicated transactions.
1532 */
1533Datum
1535{
1536 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1537 bool flush = PG_GETARG_BOOL(0);
1538
1539 replorigin_check_prerequisites(true, false);
1540
1542 ereport(ERROR,
1544 errmsg("no replication origin is configured")));
1545
1546 remote_lsn = replorigin_session_get_progress(flush);
1547
1548 if (!XLogRecPtrIsValid(remote_lsn))
1550
1551 PG_RETURN_LSN(remote_lsn);
1552}
1553
1554Datum
1571
1572Datum
1574{
1575 replorigin_check_prerequisites(true, false);
1576
1577 /* Do not clear the session origin */
1578 replorigin_xact_clear(false);
1579
1581}
1582
1583
1584Datum
1586{
1589 ReplOriginId node;
1590
1591 replorigin_check_prerequisites(true, false);
1592
1593 /* lock to prevent the replication origin from vanishing */
1595
1596 node = replorigin_by_name(text_to_cstring(name), false);
1597
1598 /*
1599 * Can't sensibly pass a local commit to be flushed at checkpoint - this
1600 * xact hasn't committed yet. This is why this function should be used to
1601 * set up the initial replication state, but not for replay.
1602 */
1604 true /* go backward */ , true /* WAL log */ );
1605
1607
1609}
1610
1611
1612/*
1613 * Return the replication progress for an individual replication origin.
1614 *
1615 * If 'flush' is set to true it is ensured that the returned value corresponds
1616 * to a local transaction that has been flushed. This is useful if asynchronous
1617 * commits are used when replaying replicated transactions.
1618 */
1619Datum
1621{
1622 char *name;
1623 bool flush;
1624 ReplOriginId roident;
1625 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1626
1628
1630 flush = PG_GETARG_BOOL(1);
1631
1632 roident = replorigin_by_name(name, false);
1633 Assert(OidIsValid(roident));
1634
1635 remote_lsn = replorigin_get_progress(roident, flush);
1636
1637 if (!XLogRecPtrIsValid(remote_lsn))
1639
1640 PG_RETURN_LSN(remote_lsn);
1641}
1642
1643
1644Datum
1646{
1647 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1648 int i;
1649#define REPLICATION_ORIGIN_PROGRESS_COLS 4
1650
1651 /* we want to return 0 rows if slot is set to zero */
1652 replorigin_check_prerequisites(false, true);
1653
1654 InitMaterializedSRF(fcinfo, 0);
1655
1656 /* prevent slots from being concurrently dropped */
1658
1659 /*
1660 * Iterate through all possible replication_states, display if they are
1661 * filled. Note that we do not take any locks, so slightly corrupted/out
1662 * of date values are a possibility.
1663 */
1664 for (i = 0; i < max_active_replication_origins; i++)
1665 {
1669 char *roname;
1670
1672
1673 /* unused slot, nothing to display */
1674 if (state->roident == InvalidReplOriginId)
1675 continue;
1676
1677 memset(values, 0, sizeof(values));
1678 memset(nulls, 1, sizeof(nulls));
1679
1680 values[0] = ObjectIdGetDatum(state->roident);
1681 nulls[0] = false;
1682
1683 /*
1684 * We're not preventing the origin to be dropped concurrently, so
1685 * silently accept that it might be gone.
1686 */
1687 if (replorigin_by_oid(state->roident, true,
1688 &roname))
1689 {
1691 nulls[1] = false;
1692 }
1693
1694 LWLockAcquire(&state->lock, LW_SHARED);
1695
1696 values[2] = LSNGetDatum(state->remote_lsn);
1697 nulls[2] = false;
1698
1699 values[3] = LSNGetDatum(state->local_lsn);
1700 nulls[3] = false;
1701
1702 LWLockRelease(&state->lock);
1703
1704 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1705 values, nulls);
1706 }
1707
1709
1710#undef REPLICATION_ORIGIN_PROGRESS_COLS
1711
1712 return (Datum) 0;
1713}
static Datum values[MAXATTR]
Definition bootstrap.c:190
#define CStringGetTextDatum(s)
Definition builtins.h:98
uint8_t uint8
Definition c.h:622
#define Assert(condition)
Definition c.h:943
#define PG_BINARY
Definition c.h:1374
#define FLEXIBLE_ARRAY_MEMBER
Definition c.h:558
uint32_t uint32
Definition c.h:624
#define PG_UINT16_MAX
Definition c.h:671
#define OidIsValid(objectId)
Definition c.h:858
size_t Size
Definition c.h:689
bool IsReservedName(const char *name)
Definition catalog.c:278
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
Datum arg
Definition elog.c:1322
int errcode_for_file_access(void)
Definition elog.c:897
int errcode(int sqlerrcode)
Definition elog.c:874
#define LOG
Definition elog.h:32
int errhint(const char *fmt,...) pg_attribute_printf(1
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define WARNING
Definition elog.h:37
#define DEBUG2
Definition elog.h:30
#define PANIC
Definition elog.h:44
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define ereport(elevel,...)
Definition elog.h:152
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition fd.c:783
int CloseTransientFile(int fd)
Definition fd.c:2855
int OpenTransientFile(const char *fileName, int fileFlags)
Definition fd.c:2678
#define PG_RETURN_VOID()
Definition fmgr.h:350
#define PG_GETARG_TEXT_PP(n)
Definition fmgr.h:310
#define PG_GETARG_DATUM(n)
Definition fmgr.h:268
#define PG_RETURN_NULL()
Definition fmgr.h:346
#define PG_GETARG_INT32(n)
Definition fmgr.h:269
#define PG_GETARG_BOOL(n)
Definition fmgr.h:274
#define PG_RETURN_OID(x)
Definition fmgr.h:361
#define PG_FUNCTION_ARGS
Definition fmgr.h:193
#define PG_RETURN_BOOL(x)
Definition fmgr.h:360
void InitMaterializedSRF(FunctionCallInfo fcinfo, uint32 flags)
Definition funcapi.c:76
void systable_endscan(SysScanDesc sysscan)
Definition genam.c:612
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition genam.c:523
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition genam.c:388
int MyProcPid
Definition globals.c:49
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1025
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1372
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
#define ident
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition indexing.c:233
void CatalogTupleDelete(Relation heapRel, const ItemPointerData *tid)
Definition indexing.c:365
#define write(a, b, c)
Definition win32.h:14
#define read(a, b, c)
Definition win32.h:13
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:372
int i
Definition isn.c:77
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition lmgr.c:229
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition lmgr.c:107
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1148
#define NoLock
Definition lockdefs.h:34
#define AccessExclusiveLock
Definition lockdefs.h:43
#define ExclusiveLock
Definition lockdefs.h:42
#define RowExclusiveLock
Definition lockdefs.h:38
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition lwlock.c:670
@ LW_SHARED
Definition lwlock.h:105
@ LW_EXCLUSIVE
Definition lwlock.h:104
void pfree(void *pointer)
Definition mcxt.c:1616
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
static char * errmsg
ReplOriginId replorigin_create(const char *roname)
Definition origin.c:274
static ReplicationStateCtl * replication_states_ctl
Definition origin.c:193
static void ReplicationOriginShmemInit(void *arg)
Definition origin.c:571
ReplOriginXactState replorigin_xact_state
Definition origin.c:168
static void ReplicationOriginShmemRequest(void *arg)
Definition origin.c:554
ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition origin.c:243
void replorigin_advance(ReplOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition origin.c:928
Datum pg_replication_origin_progress(PG_FUNCTION_ARGS)
Definition origin.c:1620
void replorigin_session_reset(void)
Definition origin.c:1301
bool replorigin_by_oid(ReplOriginId roident, bool missing_ok, char **roname)
Definition origin.c:513
static bool IsReservedOriginName(const char *name)
Definition origin.c:226
void replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
Definition origin.c:1335
int max_active_replication_origins
Definition origin.c:106
const ShmemCallbacks ReplicationOriginShmemCallbacks
Definition origin.c:184
Datum pg_replication_origin_advance(PG_FUNCTION_ARGS)
Definition origin.c:1585
static void replorigin_state_clear(ReplOriginId roident, bool nowait)
Definition origin.c:386
#define PG_REPLORIGIN_CHECKPOINT_TMPFILE
Definition origin.c:103
Datum pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
Definition origin.c:1534
XLogRecPtr replorigin_get_progress(ReplOriginId node, bool flush)
Definition origin.c:1057
static ReplicationState * replication_states
Definition origin.c:178
#define PG_REPLORIGIN_CHECKPOINT_FILENAME
Definition origin.c:102
Datum pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
Definition origin.c:1503
Datum pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
Definition origin.c:1555
Datum pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
Definition origin.c:1518
Datum pg_replication_origin_oid(PG_FUNCTION_ARGS)
Definition origin.c:1458
Datum pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
Definition origin.c:1479
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition origin.c:1129
void StartupReplicationOrigin(void)
Definition origin.c:740
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition origin.c:459
void CheckPointReplicationOrigin(void)
Definition origin.c:614
static void replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
Definition origin.c:207
static ReplicationState * session_replication_state
Definition origin.c:201
Datum pg_replication_origin_drop(PG_FUNCTION_ARGS)
Definition origin.c:1439
#define REPLICATION_ORIGIN_PROGRESS_COLS
static void ReplicationOriginShmemAttach(void *arg)
Definition origin.c:589
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition origin.c:1353
static void replorigin_session_reset_internal(void)
Definition origin.c:1095
void replorigin_xact_clear(bool clear_origin)
Definition origin.c:1377
Datum pg_show_replication_origin_status(PG_FUNCTION_ARGS)
Definition origin.c:1645
#define REPLICATION_STATE_MAGIC
Definition origin.c:204
Datum pg_replication_origin_create(PG_FUNCTION_ARGS)
Definition origin.c:1398
void replorigin_session_setup(ReplOriginId node, int acquired_by)
Definition origin.c:1156
Datum pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
Definition origin.c:1573
void replorigin_redo(XLogReaderState *record)
Definition origin.c:867
#define DoNotReplicateId
Definition origin.h:34
#define XLOG_REPLORIGIN_DROP
Definition origin.h:31
#define InvalidReplOriginId
Definition origin.h:33
#define MAX_RONAME_LEN
Definition origin.h:41
#define XLOG_REPLORIGIN_SET
Definition origin.h:30
#define ERRCODE_DATA_CORRUPTED
uint32 pg_crc32c
Definition pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition pg_crc32c.h:173
#define INIT_CRC32C(crc)
Definition pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition pg_crc32c.h:178
return crc
#define PG_GETARG_LSN(n)
Definition pg_lsn.h:36
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
#define PG_RETURN_LSN(x)
Definition pg_lsn.h:37
END_CATALOG_STRUCT typedef FormData_pg_replication_origin * Form_pg_replication_origin
int pg_strcasecmp(const char *s1, const char *s2)
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
uint64_t Datum
Definition postgres.h:70
static Pointer DatumGetPointer(Datum X)
Definition postgres.h:332
#define InvalidOid
unsigned int Oid
static int fd(const char *x, int i)
static int fb(int x)
#define RelationGetDescr(relation)
Definition rel.h:542
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition scankey.c:76
Size add_size(Size s1, Size s2)
Definition shmem.c:1048
Size mul_size(Size s1, Size s2)
Definition shmem.c:1063
#define ShmemRequestStruct(...)
Definition shmem.h:176
#define InitDirtySnapshot(snapshotdata)
Definition snapmgr.h:42
#define BTEqualStrategyNumber
Definition stratnum.h:31
ItemPointerData t_self
Definition htup.h:65
Form_pg_class rd_rel
Definition rel.h:111
ReplOriginId origin
Definition origin.h:45
XLogRecPtr origin_lsn
Definition origin.h:46
TimestampTz origin_timestamp
Definition origin.h:47
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition origin.c:164
ReplOriginId roident
Definition origin.c:154
XLogRecPtr remote_lsn
Definition origin.c:155
XLogRecPtr remote_lsn
Definition origin.c:121
XLogRecPtr local_lsn
Definition origin.c:128
ConditionVariable origin_cv
Definition origin.c:141
ReplOriginId roident
Definition origin.c:116
ShmemRequestCallback request_fn
Definition shmem.h:133
XLogRecPtr EndRecPtr
Definition xlogreader.h:206
Definition c.h:776
ReplOriginId node_id
Definition origin.h:27
XLogRecPtr remote_lsn
Definition origin.h:20
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:265
HeapTuple SearchSysCache1(SysCacheIdentifier cacheId, Datum key1)
Definition syscache.c:221
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition tuplestore.c:785
#define PG_GETARG_TIMESTAMPTZ(n)
Definition timestamp.h:64
char * text_to_cstring(const text *t)
Definition varlena.c:217
const char * name
bool IsTransactionState(void)
Definition xact.c:389
void CommandCounterIncrement(void)
Definition xact.c:1130
bool RecoveryInProgress(void)
Definition xlog.c:6830
void XLogFlush(XLogRecPtr record)
Definition xlog.c:2801
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint16 ReplOriginId
Definition xlogdefs.h:69
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition xloginsert.c:482
void XLogRegisterData(const void *data, uint32 len)
Definition xloginsert.c:372
void XLogBeginInsert(void)
Definition xloginsert.c:153
#define XLogRecGetInfo(decoder)
Definition xlogreader.h:410
#define XLogRecGetData(decoder)
Definition xlogreader.h:415