PostgreSQL Source Code git master
Loading...
Searching...
No Matches
origin.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * origin.c
4 * Logical replication progress tracking support.
5 *
6 * Copyright (c) 2013-2026, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/replication/logical/origin.c
10 *
11 * NOTES
12 *
13 * This file provides the following:
14 * * An infrastructure to name nodes in a replication setup
15 * * A facility to efficiently store and persist replication progress in an
16 * efficient and durable manner.
17 *
18 * Replication origin consists of a descriptive, user defined, external
19 * name and a short, thus space efficient, internal 2 byte one. This split
20 * exists because replication origin have to be stored in WAL and shared
21 * memory and long descriptors would be inefficient. For now only use 2 bytes
22 * for the internal id of a replication origin as it seems unlikely that there
23 * soon will be more than 65k nodes in one replication setup; and using only
24 * two bytes allow us to be more space efficient.
25 *
26 * Replication progress is tracked in a shared memory table
27 * (ReplicationState) that's dumped to disk every checkpoint. Entries
28 * ('slots') in this table are identified by the internal id. That's the case
29 * because it allows to increase replication progress during crash
30 * recovery. To allow doing so we store the original LSN (from the originating
31 * system) of a transaction in the commit record. That allows to recover the
32 * precise replayed state after crash recovery; without requiring synchronous
33 * commits. Allowing logical replication to use asynchronous commit is
34 * generally good for performance, but especially important as it allows a
35 * single threaded replay process to keep up with a source that has multiple
36 * backends generating changes concurrently. For efficiency and simplicity
37 * reasons a backend can setup one replication origin that's from then used as
38 * the source of changes produced by the backend, until reset again.
39 *
40 * This infrastructure is intended to be used in cooperation with logical
41 * decoding. When replaying from a remote system the configured origin is
42 * provided to output plugins, allowing prevention of replication loops and
43 * other filtering.
44 *
45 * There are several levels of locking at work:
46 *
47 * * To create and drop replication origins an exclusive lock on
48 * pg_replication_slot is required for the duration. That allows us to
49 * safely and conflict free assign new origins using a dirty snapshot.
50 *
51 * * When creating an in-memory replication progress slot the ReplicationOrigin
52 * LWLock has to be held exclusively; when iterating over the replication
53 * progress a shared lock has to be held, the same when advancing the
54 * replication progress of an individual backend that has not setup as the
55 * session's replication origin.
56 *
57 * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 * replication progress slot that slot's lwlock has to be held. That's
59 * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 * all our platforms, but it also simplifies memory ordering concerns
61 * between the remote and local lsn. We use a lwlock instead of a spinlock
62 * so it's less harmful to hold the lock over a WAL write
63 * (cf. AdvanceReplicationProgress).
64 *
65 * ---------------------------------------------------------------------------
66 */
67
68#include "postgres.h"
69
70#include <unistd.h>
71#include <sys/stat.h>
72
73#include "access/genam.h"
74#include "access/htup_details.h"
75#include "access/table.h"
76#include "access/xact.h"
77#include "access/xloginsert.h"
78#include "catalog/catalog.h"
79#include "catalog/indexing.h"
81#include "funcapi.h"
82#include "miscadmin.h"
83#include "nodes/execnodes.h"
84#include "pgstat.h"
85#include "replication/origin.h"
86#include "replication/slot.h"
88#include "storage/fd.h"
89#include "storage/ipc.h"
90#include "storage/lmgr.h"
91#include "utils/builtins.h"
92#include "utils/fmgroids.h"
93#include "utils/guc.h"
94#include "utils/pg_lsn.h"
95#include "utils/rel.h"
96#include "utils/snapmgr.h"
97#include "utils/syscache.h"
98
99/* paths for replication origin checkpoint files */
100#define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
101#define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
102
103/* GUC variables */
105
106/*
107 * Replay progress of a single remote node.
108 */
109typedef struct ReplicationState
110{
111 /*
112 * Local identifier for the remote node.
113 */
115
116 /*
117 * Location of the latest commit from the remote side.
118 */
120
121 /*
122 * Remember the local lsn of the commit record so we can XLogFlush() to it
123 * during a checkpoint so we know the commit record actually is safe on
124 * disk.
125 */
127
128 /*
129 * PID of backend that's acquired slot, or 0 if none.
130 */
132
133 /* Count of processes that are currently using this origin. */
135
136 /*
137 * Condition variable that's signaled when acquired_by changes.
138 */
140
141 /*
142 * Lock protecting remote_lsn and local_lsn.
143 */
146
147/*
148 * On disk version of ReplicationState.
149 */
155
156
158{
159 /* Tranche to use for per-origin LWLocks */
161 /* Array of length max_active_replication_origins */
164
165/* Global variable for per-transaction replication origin state */
167 .origin = InvalidReplOriginId, /* assumed identity */
168 .origin_lsn = InvalidXLogRecPtr,
169 .origin_timestamp = 0
170};
171
172/*
173 * Base address into a shared memory array of replication states of size
174 * max_active_replication_origins.
175 */
177
178/*
179 * Actual shared memory block (replication_states[] is now part of this).
180 */
182
183/*
184 * We keep a pointer to this backend's ReplicationState to avoid having to
185 * search the replication_states array in replorigin_session_advance for each
186 * remote commit. (Ownership of a backend's own entry can only be changed by
187 * that backend.)
188 */
190
191/* Magic for on disk files. */
192#define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
193
194static void
196{
200 errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
201
205 errmsg("cannot manipulate replication origins during recovery")));
206}
207
208
209/*
210 * IsReservedOriginName
211 * True iff name is either "none" or "any".
212 */
213static bool
215{
216 return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
218}
219
220/* ---------------------------------------------------------------------------
221 * Functions for working with replication origins themselves.
222 * ---------------------------------------------------------------------------
223 */
224
225/*
226 * Check for a persistent replication origin identified by name.
227 *
228 * Returns InvalidOid if the node isn't known yet and missing_ok is true.
229 */
231replorigin_by_name(const char *roname, bool missing_ok)
232{
234 Oid roident = InvalidOid;
235 HeapTuple tuple;
237
239
241 if (HeapTupleIsValid(tuple))
242 {
244 roident = ident->roident;
245 ReleaseSysCache(tuple);
246 }
247 else if (!missing_ok)
250 errmsg("replication origin \"%s\" does not exist",
251 roname)));
252
253 return roident;
254}
255
256/*
257 * Create a replication origin.
258 *
259 * Needs to be called in a transaction.
260 */
263{
264 Oid roident;
265 HeapTuple tuple = NULL;
266 Relation rel;
269 SysScanDesc scan;
270 ScanKeyData key;
271
272 /*
273 * To avoid needing a TOAST table for pg_replication_origin, we limit
274 * replication origin names to 512 bytes. This should be more than enough
275 * for all practical use.
276 */
280 errmsg("replication origin name is too long"),
281 errdetail("Replication origin names must be no longer than %d bytes.",
283
285
287
288 /*
289 * We need the numeric replication origin to be 16bit wide, so we cannot
290 * rely on the normal oid allocation. Instead we simply scan
291 * pg_replication_origin for the first unused id. That's not particularly
292 * efficient, but this should be a fairly infrequent operation - we can
293 * easily spend a bit more code on this when it turns out it needs to be
294 * faster.
295 *
296 * We handle concurrency by taking an exclusive lock (allowing reads!)
297 * over the table for the duration of the search. Because we use a "dirty
298 * snapshot" we can read rows that other in-progress sessions have
299 * written, even though they would be invisible with normal snapshots. Due
300 * to the exclusive lock there's no danger that new rows can appear while
301 * we're checking.
302 */
304
306
307 /*
308 * We want to be able to access pg_replication_origin without setting up a
309 * snapshot. To make that safe, it needs to not have a TOAST table, since
310 * TOASTed data cannot be fetched without a snapshot. As of this writing,
311 * its only varlena column is roname, which we limit to 512 bytes to avoid
312 * needing out-of-line storage. If you add a TOAST table to this catalog,
313 * be sure to set up a snapshot everywhere it might be needed. For more
314 * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
315 */
316 Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
317
318 for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
319 {
320 bool nulls[Natts_pg_replication_origin];
322 bool collides;
323
325
326 ScanKeyInit(&key,
329 ObjectIdGetDatum(roident));
330
332 true /* indexOK */ ,
334 1, &key);
335
337
338 systable_endscan(scan);
339
340 if (!collides)
341 {
342 /*
343 * Ok, found an unused roident, insert the new row and do a CCI,
344 * so our callers can look it up if they want to.
345 */
346 memset(&nulls, 0, sizeof(nulls));
347
350
351 tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
352 CatalogTupleInsert(rel, tuple);
354 break;
355 }
356 }
357
358 /* now release lock again, */
360
361 if (tuple == NULL)
364 errmsg("could not find free replication origin ID")));
365
366 heap_freetuple(tuple);
367 return roident;
368}
369
370/*
371 * Helper function to drop a replication origin.
372 */
373static void
375{
376 int i;
377
378 /*
379 * Clean up the slot state info, if there is any matching slot.
380 */
381restart:
383
384 for (i = 0; i < max_active_replication_origins; i++)
385 {
387
388 if (state->roident == roident)
389 {
390 /* found our slot, is it busy? */
391 if (state->refcount > 0)
392 {
394
395 if (nowait)
398 (state->acquired_by != 0)
399 ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
400 state->roident,
401 state->acquired_by)
402 : errmsg("could not drop replication origin with ID %d, in use by another process",
403 state->roident)));
404
405 /*
406 * We must wait and then retry. Since we don't know which CV
407 * to wait on until here, we can't readily use
408 * ConditionVariablePrepareToSleep (calling it here would be
409 * wrong, since we could miss the signal if we did so); just
410 * use ConditionVariableSleep directly.
411 */
412 cv = &state->origin_cv;
413
415
417 goto restart;
418 }
419
420 /* first make a WAL log entry */
421 {
423
424 xlrec.node_id = roident;
426 XLogRegisterData(&xlrec, sizeof(xlrec));
428 }
429
430 /* then clear the in-memory slot */
431 state->roident = InvalidReplOriginId;
432 state->remote_lsn = InvalidXLogRecPtr;
433 state->local_lsn = InvalidXLogRecPtr;
434 break;
435 }
436 }
439}
440
441/*
442 * Drop replication origin (by name).
443 *
444 * Needs to be called in a transaction.
445 */
446void
447replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
448{
449 ReplOriginId roident;
450 Relation rel;
451 HeapTuple tuple;
452
454
456
457 roident = replorigin_by_name(name, missing_ok);
458
459 /* Lock the origin to prevent concurrent drops. */
462
464 if (!HeapTupleIsValid(tuple))
465 {
466 if (!missing_ok)
467 elog(ERROR, "cache lookup failed for replication origin with ID %d",
468 roident);
469
470 /*
471 * We don't need to retain the locks if the origin is already dropped.
472 */
476 return;
477 }
478
479 replorigin_state_clear(roident, nowait);
480
481 /*
482 * Now, we can delete the catalog entry.
483 */
484 CatalogTupleDelete(rel, &tuple->t_self);
485 ReleaseSysCache(tuple);
486
488
489 /* We keep the lock on pg_replication_origin until commit */
490 table_close(rel, NoLock);
491}
492
493/*
494 * Lookup replication origin via its oid and return the name.
495 *
496 * The external name is palloc'd in the calling context.
497 *
498 * Returns true if the origin is known, false otherwise.
499 */
500bool
501replorigin_by_oid(ReplOriginId roident, bool missing_ok, char **roname)
502{
503 HeapTuple tuple;
505
506 Assert(OidIsValid((Oid) roident));
507 Assert(roident != InvalidReplOriginId);
508 Assert(roident != DoNotReplicateId);
509
511 ObjectIdGetDatum((Oid) roident));
512
513 if (HeapTupleIsValid(tuple))
514 {
516 *roname = text_to_cstring(&ric->roname);
517 ReleaseSysCache(tuple);
518
519 return true;
520 }
521 else
522 {
523 *roname = NULL;
524
525 if (!missing_ok)
528 errmsg("replication origin with ID %d does not exist",
529 roident)));
530
531 return false;
532 }
533}
534
535
536/* ---------------------------------------------------------------------------
537 * Functions for handling replication progress.
538 * ---------------------------------------------------------------------------
539 */
540
541Size
543{
544 Size size = 0;
545
547 return size;
548
549 size = add_size(size, offsetof(ReplicationStateCtl, states));
550
551 size = add_size(size,
553 return size;
554}
555
556void
558{
559 bool found;
560
562 return;
563
565 ShmemInitStruct("ReplicationOriginState",
567 &found);
569
570 if (!found)
571 {
572 int i;
573
575
577
578 for (i = 0; i < max_active_replication_origins; i++)
579 {
583 }
584 }
585}
586
587/* ---------------------------------------------------------------------------
588 * Perform a checkpoint of each replication origin's progress with respect to
589 * the replayed remote_lsn. Make sure that all transactions we refer to in the
590 * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
591 * if the transactions were originally committed asynchronously.
592 *
593 * We store checkpoints in the following format:
594 * +-------+------------------------+------------------+-----+--------+
595 * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
596 * +-------+------------------------+------------------+-----+--------+
597 *
598 * So its just the magic, followed by the statically sized
599 * ReplicationStateOnDisk structs. Note that the maximum number of
600 * ReplicationState is determined by max_active_replication_origins.
601 * ---------------------------------------------------------------------------
602 */
603void
605{
607 const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
608 int tmpfd;
609 int i;
612
614 return;
615
617
618 /* make sure no old temp file is remaining */
619 if (unlink(tmppath) < 0 && errno != ENOENT)
622 errmsg("could not remove file \"%s\": %m",
623 tmppath)));
624
625 /*
626 * no other backend can perform this at the same time; only one checkpoint
627 * can happen at a time.
628 */
631 if (tmpfd < 0)
634 errmsg("could not create file \"%s\": %m",
635 tmppath)));
636
637 /* write magic */
638 errno = 0;
639 if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
640 {
641 /* if write didn't set errno, assume problem is no disk space */
642 if (errno == 0)
643 errno = ENOSPC;
646 errmsg("could not write to file \"%s\": %m",
647 tmppath)));
648 }
649 COMP_CRC32C(crc, &magic, sizeof(magic));
650
651 /* prevent concurrent creations/drops */
653
654 /* write actual data */
655 for (i = 0; i < max_active_replication_origins; i++)
656 {
659 XLogRecPtr local_lsn;
660
661 if (curstate->roident == InvalidReplOriginId)
662 continue;
663
664 /* zero, to avoid uninitialized padding bytes */
665 memset(&disk_state, 0, sizeof(disk_state));
666
668
669 disk_state.roident = curstate->roident;
670
671 disk_state.remote_lsn = curstate->remote_lsn;
672 local_lsn = curstate->local_lsn;
673
674 LWLockRelease(&curstate->lock);
675
676 /* make sure we only write out a commit that's persistent */
677 XLogFlush(local_lsn);
678
679 errno = 0;
680 if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
681 sizeof(disk_state))
682 {
683 /* if write didn't set errno, assume problem is no disk space */
684 if (errno == 0)
685 errno = ENOSPC;
688 errmsg("could not write to file \"%s\": %m",
689 tmppath)));
690 }
691
693 }
694
696
697 /* write out the CRC */
699 errno = 0;
700 if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
701 {
702 /* if write didn't set errno, assume problem is no disk space */
703 if (errno == 0)
704 errno = ENOSPC;
707 errmsg("could not write to file \"%s\": %m",
708 tmppath)));
709 }
710
711 if (CloseTransientFile(tmpfd) != 0)
714 errmsg("could not close file \"%s\": %m",
715 tmppath)));
716
717 /* fsync, rename to permanent file, fsync file and directory */
719}
720
721/*
722 * Recover replication replay status from checkpoint data saved earlier by
723 * CheckPointReplicationOrigin.
724 *
725 * This only needs to be called at startup and *not* during every checkpoint
726 * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
727 * state thereafter can be recovered by looking at commit records.
728 */
729void
731{
732 const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
733 int fd;
734 int readBytes;
736 int last_state = 0;
739
740 /* don't want to overwrite already existing state */
741#ifdef USE_ASSERT_CHECKING
742 static bool already_started = false;
743
745 already_started = true;
746#endif
747
749 return;
750
752
753 elog(DEBUG2, "starting up replication origin progress state");
754
756
757 /*
758 * might have had max_active_replication_origins == 0 last run, or we just
759 * brought up a standby.
760 */
761 if (fd < 0 && errno == ENOENT)
762 return;
763 else if (fd < 0)
766 errmsg("could not open file \"%s\": %m",
767 path)));
768
769 /* verify magic, that is written even if nothing was active */
770 readBytes = read(fd, &magic, sizeof(magic));
771 if (readBytes != sizeof(magic))
772 {
773 if (readBytes < 0)
776 errmsg("could not read file \"%s\": %m",
777 path)));
778 else
781 errmsg("could not read file \"%s\": read %d of %zu",
782 path, readBytes, sizeof(magic))));
783 }
784 COMP_CRC32C(crc, &magic, sizeof(magic));
785
786 if (magic != REPLICATION_STATE_MAGIC)
788 (errmsg("replication checkpoint has wrong magic %u instead of %u",
789 magic, REPLICATION_STATE_MAGIC)));
790
791 /* we can skip locking here, no other access is possible */
792
793 /* recover individual states, until there are no more to be found */
794 while (true)
795 {
797
798 readBytes = read(fd, &disk_state, sizeof(disk_state));
799
800 if (readBytes < 0)
801 {
804 errmsg("could not read file \"%s\": %m",
805 path)));
806 }
807
808 /* no further data */
809 if (readBytes == sizeof(crc))
810 {
811 memcpy(&file_crc, &disk_state, sizeof(file_crc));
812 break;
813 }
814
815 if (readBytes != sizeof(disk_state))
816 {
819 errmsg("could not read file \"%s\": read %d of %zu",
820 path, readBytes, sizeof(disk_state))));
821 }
822
824
828 errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
829
830 /* copy data to shared memory */
833 last_state++;
834
835 ereport(LOG,
836 errmsg("recovered replication state of node %d to %X/%08X",
837 disk_state.roident,
838 LSN_FORMAT_ARGS(disk_state.remote_lsn)));
839 }
840
841 /* now check checksum */
843 if (file_crc != crc)
846 errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
847 crc, file_crc)));
848
849 if (CloseTransientFile(fd) != 0)
852 errmsg("could not close file \"%s\": %m",
853 path)));
854}
855
856void
858{
859 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
860
861 switch (info)
862 {
864 {
867
868 replorigin_advance(xlrec->node_id,
869 xlrec->remote_lsn, record->EndRecPtr,
870 xlrec->force /* backward */ ,
871 false /* WAL log */ );
872 break;
873 }
875 {
877 int i;
878
880
881 for (i = 0; i < max_active_replication_origins; i++)
882 {
884
885 /* found our slot */
886 if (state->roident == xlrec->node_id)
887 {
888 /* reset entry */
889 state->roident = InvalidReplOriginId;
890 state->remote_lsn = InvalidXLogRecPtr;
891 state->local_lsn = InvalidXLogRecPtr;
892 break;
893 }
894 }
895 break;
896 }
897 default:
898 elog(PANIC, "replorigin_redo: unknown op code %u", info);
899 }
900}
901
902
903/*
904 * Tell the replication origin progress machinery that a commit from 'node'
905 * that originated at the LSN remote_commit on the remote node was replayed
906 * successfully and that we don't need to do so again. In combination with
907 * setting up replorigin_xact_state {.origin_lsn, .origin_timestamp}
908 * that ensures we won't lose knowledge about that after a crash if the
909 * transaction had a persistent effect (think of asynchronous commits).
910 *
911 * local_commit needs to be a local LSN of the commit so that we can make sure
912 * upon a checkpoint that enough WAL has been persisted to disk.
913 *
914 * Needs to be called with a RowExclusiveLock on pg_replication_origin,
915 * unless running in recovery.
916 */
917void
920 bool go_backward, bool wal_log)
921{
922 int i;
925
927
928 /* we don't track DoNotReplicateId */
929 if (node == DoNotReplicateId)
930 return;
931
932 /*
933 * XXX: For the case where this is called by WAL replay, it'd be more
934 * efficient to restore into a backend local hashtable and only dump into
935 * shmem after recovery is finished. Let's wait with implementing that
936 * till it's shown to be a measurable expense
937 */
938
939 /* Lock exclusively, as we may have to create a new table entry. */
941
942 /*
943 * Search for either an existing slot for the origin, or a free one we can
944 * use.
945 */
946 for (i = 0; i < max_active_replication_origins; i++)
947 {
949
950 /* remember where to insert if necessary */
951 if (curstate->roident == InvalidReplOriginId &&
952 free_state == NULL)
953 {
955 continue;
956 }
957
958 /* not our slot */
959 if (curstate->roident != node)
960 {
961 continue;
962 }
963
964 /* ok, found slot */
966
968
969 /* Make sure it's not used by somebody else */
970 if (replication_state->refcount > 0)
971 {
974 (replication_state->acquired_by != 0)
975 ? errmsg("replication origin with ID %d is already active for PID %d",
976 replication_state->roident,
977 replication_state->acquired_by)
978 : errmsg("replication origin with ID %d is already active in another process",
979 replication_state->roident)));
980 }
981
982 break;
983 }
984
988 errmsg("could not find free replication state slot for replication origin with ID %d",
989 node),
990 errhint("Increase \"max_active_replication_origins\" and try again.")));
991
992 if (replication_state == NULL)
993 {
994 /* initialize new slot */
999 replication_state->roident = node;
1000 }
1001
1003
1004 /*
1005 * If somebody "forcefully" sets this slot, WAL log it, so it's durable
1006 * and the standby gets the message. Primarily this will be called during
1007 * WAL replay (of commit records) where no WAL logging is necessary.
1008 */
1009 if (wal_log)
1010 {
1012
1014 xlrec.node_id = node;
1015 xlrec.force = go_backward;
1016
1018 XLogRegisterData(&xlrec, sizeof(xlrec));
1019
1021 }
1022
1023 /*
1024 * Due to - harmless - race conditions during a checkpoint we could see
1025 * values here that are older than the ones we already have in memory. We
1026 * could also see older values for prepared transactions when the prepare
1027 * is sent at a later point of time along with commit prepared and there
1028 * are other transactions commits between prepare and commit prepared. See
1029 * ReorderBufferFinishPrepared. Don't overwrite those.
1030 */
1031 if (go_backward || replication_state->remote_lsn < remote_commit)
1032 replication_state->remote_lsn = remote_commit;
1034 (go_backward || replication_state->local_lsn < local_commit))
1035 replication_state->local_lsn = local_commit;
1037
1038 /*
1039 * Release *after* changing the LSNs, slot isn't acquired and thus could
1040 * otherwise be dropped anytime.
1041 */
1043}
1044
1045
1048{
1049 int i;
1050 XLogRecPtr local_lsn = InvalidXLogRecPtr;
1051 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1052
1053 /* prevent slots from being concurrently dropped */
1055
1056 for (i = 0; i < max_active_replication_origins; i++)
1057 {
1059
1061
1062 if (state->roident == node)
1063 {
1064 LWLockAcquire(&state->lock, LW_SHARED);
1065
1066 remote_lsn = state->remote_lsn;
1067 local_lsn = state->local_lsn;
1068
1069 LWLockRelease(&state->lock);
1070
1071 break;
1072 }
1073 }
1074
1076
1077 if (flush && XLogRecPtrIsValid(local_lsn))
1078 XLogFlush(local_lsn);
1079
1080 return remote_lsn;
1081}
1082
1083/* Helper function to reset the session replication origin */
1084static void
1086{
1088
1090
1092
1093 /* The origin must be held by at least one process at this point. */
1095
1096 /*
1097 * Reset the PID only if the current session is the first to set up this
1098 * origin. This avoids clearing the first process's PID when any other
1099 * session releases the origin.
1100 */
1103
1105
1108
1110
1112}
1113
1114/*
1115 * Tear down a (possibly) configured session replication origin during process
1116 * exit.
1117 */
1118static void
1126
1127/*
1128 * Setup a replication origin in the shared memory struct if it doesn't
1129 * already exist and cache access to the specific ReplicationSlot so the
1130 * array doesn't have to be searched when calling
1131 * replorigin_session_advance().
1132 *
1133 * Normally only one such cached origin can exist per process so the cached
1134 * value can only be set again after the previous value is torn down with
1135 * replorigin_session_reset(). For this normal case pass acquired_by = 0
1136 * (meaning the slot is not allowed to be already acquired by another process).
1137 *
1138 * However, sometimes multiple processes can safely re-use the same origin slot
1139 * (for example, multiple parallel apply processes can safely use the same
1140 * origin, provided they maintain commit order by allowing only one process to
1141 * commit at a time). For this case the first process must pass acquired_by =
1142 * 0, and then the other processes sharing that same origin can pass
1143 * acquired_by = PID of the first process.
1144 */
1145void
1147{
1148 static bool registered_cleanup;
1149 int i;
1150 int free_slot = -1;
1151
1152 if (!registered_cleanup)
1153 {
1155 registered_cleanup = true;
1156 }
1157
1159
1161 ereport(ERROR,
1163 errmsg("cannot setup replication origin when one is already setup")));
1164
1165 /* Lock exclusively, as we may have to create a new table entry. */
1167
1168 /*
1169 * Search for either an existing slot for the origin, or a free one we can
1170 * use.
1171 */
1172 for (i = 0; i < max_active_replication_origins; i++)
1173 {
1175
1176 /* remember where to insert if necessary */
1177 if (curstate->roident == InvalidReplOriginId &&
1178 free_slot == -1)
1179 {
1180 free_slot = i;
1181 continue;
1182 }
1183
1184 /* not our slot */
1185 if (curstate->roident != node)
1186 continue;
1187
1188 else if (curstate->acquired_by != 0 && acquired_by == 0)
1189 {
1190 ereport(ERROR,
1192 errmsg("replication origin with ID %d is already active for PID %d",
1193 curstate->roident, curstate->acquired_by)));
1194 }
1195
1196 else if (curstate->acquired_by != acquired_by)
1197 {
1198 ereport(ERROR,
1200 errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
1201 node, acquired_by)));
1202 }
1203
1204 /*
1205 * The origin is in use, but PID is not recorded. This can happen if
1206 * the process that originally acquired the origin exited without
1207 * releasing it. To ensure correctness, other processes cannot acquire
1208 * the origin until all processes currently using it have released it.
1209 */
1210 else if (curstate->acquired_by == 0 && curstate->refcount > 0)
1211 ereport(ERROR,
1213 errmsg("replication origin with ID %d is already active in another process",
1214 curstate->roident)));
1215
1216 /* ok, found slot */
1218 break;
1219 }
1220
1221
1223 ereport(ERROR,
1225 errmsg("could not find free replication state slot for replication origin with ID %d",
1226 node),
1227 errhint("Increase \"max_active_replication_origins\" and try again.")));
1228 else if (session_replication_state == NULL)
1229 {
1230 if (acquired_by)
1231 ereport(ERROR,
1233 errmsg("cannot use PID %d for inactive replication origin with ID %d",
1234 acquired_by, node)));
1235
1236 /* initialize new slot */
1241 }
1242
1243
1245
1246 if (acquired_by == 0)
1247 {
1250 }
1251 else
1252 {
1253 /*
1254 * Sanity check: the origin must already be acquired by the process
1255 * passed as input, and at least one process must be using it.
1256 */
1259 }
1260
1262
1264
1265 /* probably this one is pointless */
1267}
1268
1269/*
1270 * Reset replay state previously setup in this session.
1271 *
1272 * This function may only be called if an origin was setup with
1273 * replorigin_session_setup().
1274 */
1275void
1277{
1279
1281 ereport(ERROR,
1283 errmsg("no replication origin is configured")));
1284
1285 /*
1286 * Restrict explicit resetting of the replication origin if it was first
1287 * acquired by this process and others are still using it. While the
1288 * system handles this safely (as happens if the first session exits
1289 * without calling reset), it is best to avoid doing so.
1290 */
1293 ereport(ERROR,
1295 errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
1297 errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
1298 errhint("Reset the replication origin in all other processes before retrying.")));
1299
1301}
1302
1303/*
1304 * Do the same work replorigin_advance() does, just on the session's
1305 * configured origin.
1306 *
1307 * This is noticeably cheaper than using replorigin_advance().
1308 */
1309void
1322
1323/*
1324 * Ask the machinery about the point up to which we successfully replayed
1325 * changes from an already setup replication origin.
1326 */
1329{
1330 XLogRecPtr remote_lsn;
1331 XLogRecPtr local_lsn;
1332
1334
1339
1340 if (flush && XLogRecPtrIsValid(local_lsn))
1341 XLogFlush(local_lsn);
1342
1343 return remote_lsn;
1344}
1345
1346/*
1347 * Clear the per-transaction replication origin state.
1348 *
1349 * replorigin_session_origin is also cleared if clear_origin is set.
1350 */
1351void
1359
1360
1361/* ---------------------------------------------------------------------------
1362 * SQL functions for working with replication origin.
1363 *
1364 * These mostly should be fairly short wrappers around more generic functions.
1365 * ---------------------------------------------------------------------------
1366 */
1367
1368/*
1369 * Create replication origin for the passed in name, and return the assigned
1370 * oid.
1371 */
1372Datum
1374{
1375 char *name;
1376 ReplOriginId roident;
1377
1378 replorigin_check_prerequisites(false, false);
1379
1381
1382 /*
1383 * Replication origins "any and "none" are reserved for system options.
1384 * The origins "pg_xxx" are reserved for internal use.
1385 */
1387 ereport(ERROR,
1389 errmsg("replication origin name \"%s\" is reserved",
1390 name),
1391 errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1393
1394 /*
1395 * If built with appropriate switch, whine when regression-testing
1396 * conventions for replication origin names are violated.
1397 */
1398#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1399 if (strncmp(name, "regress_", 8) != 0)
1400 elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1401#endif
1402
1403 roident = replorigin_create(name);
1404
1405 pfree(name);
1406
1407 PG_RETURN_OID(roident);
1408}
1409
1410/*
1411 * Drop replication origin.
1412 */
1413Datum
1415{
1416 char *name;
1417
1418 replorigin_check_prerequisites(false, false);
1419
1421
1422 replorigin_drop_by_name(name, false, true);
1423
1424 pfree(name);
1425
1427}
1428
1429/*
1430 * Return oid of a replication origin.
1431 */
1432Datum
1434{
1435 char *name;
1436 ReplOriginId roident;
1437
1438 replorigin_check_prerequisites(false, false);
1439
1441 roident = replorigin_by_name(name, true);
1442
1443 pfree(name);
1444
1445 if (OidIsValid(roident))
1446 PG_RETURN_OID(roident);
1448}
1449
1450/*
1451 * Setup a replication origin for this session.
1452 */
1453Datum
1455{
1456 char *name;
1457 ReplOriginId origin;
1458 int pid;
1459
1460 replorigin_check_prerequisites(true, false);
1461
1463 origin = replorigin_by_name(name, false);
1464 pid = PG_GETARG_INT32(1);
1465 replorigin_session_setup(origin, pid);
1466
1468
1469 pfree(name);
1470
1472}
1473
1474/*
1475 * Reset previously setup origin in this session
1476 */
1477Datum
1488
1489/*
1490 * Has a replication origin been setup for this session.
1491 */
1492Datum
1499
1500
1501/*
1502 * Return the replication progress for origin setup in the current session.
1503 *
1504 * If 'flush' is set to true it is ensured that the returned value corresponds
1505 * to a local transaction that has been flushed. This is useful if asynchronous
1506 * commits are used when replaying replicated transactions.
1507 */
1508Datum
1510{
1511 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1512 bool flush = PG_GETARG_BOOL(0);
1513
1514 replorigin_check_prerequisites(true, false);
1515
1517 ereport(ERROR,
1519 errmsg("no replication origin is configured")));
1520
1521 remote_lsn = replorigin_session_get_progress(flush);
1522
1523 if (!XLogRecPtrIsValid(remote_lsn))
1525
1526 PG_RETURN_LSN(remote_lsn);
1527}
1528
1529Datum
1546
1547Datum
1549{
1550 replorigin_check_prerequisites(true, false);
1551
1552 /* Do not clear the session origin */
1553 replorigin_xact_clear(false);
1554
1556}
1557
1558
1559Datum
1561{
1564 ReplOriginId node;
1565
1566 replorigin_check_prerequisites(true, false);
1567
1568 /* lock to prevent the replication origin from vanishing */
1570
1571 node = replorigin_by_name(text_to_cstring(name), false);
1572
1573 /*
1574 * Can't sensibly pass a local commit to be flushed at checkpoint - this
1575 * xact hasn't committed yet. This is why this function should be used to
1576 * set up the initial replication state, but not for replay.
1577 */
1579 true /* go backward */ , true /* WAL log */ );
1580
1582
1584}
1585
1586
1587/*
1588 * Return the replication progress for an individual replication origin.
1589 *
1590 * If 'flush' is set to true it is ensured that the returned value corresponds
1591 * to a local transaction that has been flushed. This is useful if asynchronous
1592 * commits are used when replaying replicated transactions.
1593 */
1594Datum
1596{
1597 char *name;
1598 bool flush;
1599 ReplOriginId roident;
1600 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1601
1603
1605 flush = PG_GETARG_BOOL(1);
1606
1607 roident = replorigin_by_name(name, false);
1608 Assert(OidIsValid(roident));
1609
1610 remote_lsn = replorigin_get_progress(roident, flush);
1611
1612 if (!XLogRecPtrIsValid(remote_lsn))
1614
1615 PG_RETURN_LSN(remote_lsn);
1616}
1617
1618
1619Datum
1621{
1622 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1623 int i;
1624#define REPLICATION_ORIGIN_PROGRESS_COLS 4
1625
1626 /* we want to return 0 rows if slot is set to zero */
1627 replorigin_check_prerequisites(false, true);
1628
1629 InitMaterializedSRF(fcinfo, 0);
1630
1631 /* prevent slots from being concurrently dropped */
1633
1634 /*
1635 * Iterate through all possible replication_states, display if they are
1636 * filled. Note that we do not take any locks, so slightly corrupted/out
1637 * of date values are a possibility.
1638 */
1639 for (i = 0; i < max_active_replication_origins; i++)
1640 {
1644 char *roname;
1645
1647
1648 /* unused slot, nothing to display */
1649 if (state->roident == InvalidReplOriginId)
1650 continue;
1651
1652 memset(values, 0, sizeof(values));
1653 memset(nulls, 1, sizeof(nulls));
1654
1655 values[0] = ObjectIdGetDatum(state->roident);
1656 nulls[0] = false;
1657
1658 /*
1659 * We're not preventing the origin to be dropped concurrently, so
1660 * silently accept that it might be gone.
1661 */
1662 if (replorigin_by_oid(state->roident, true,
1663 &roname))
1664 {
1666 nulls[1] = false;
1667 }
1668
1669 LWLockAcquire(&state->lock, LW_SHARED);
1670
1671 values[2] = LSNGetDatum(state->remote_lsn);
1672 nulls[2] = false;
1673
1674 values[3] = LSNGetDatum(state->local_lsn);
1675 nulls[3] = false;
1676
1677 LWLockRelease(&state->lock);
1678
1679 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1680 values, nulls);
1681 }
1682
1684
1685#undef REPLICATION_ORIGIN_PROGRESS_COLS
1686
1687 return (Datum) 0;
1688}
static Datum values[MAXATTR]
Definition bootstrap.c:155
#define CStringGetTextDatum(s)
Definition builtins.h:97
uint8_t uint8
Definition c.h:544
#define Assert(condition)
Definition c.h:873
#define PG_BINARY
Definition c.h:1287
#define FLEXIBLE_ARRAY_MEMBER
Definition c.h:480
uint32_t uint32
Definition c.h:546
#define PG_UINT16_MAX
Definition c.h:601
#define MemSet(start, val, len)
Definition c.h:1013
#define OidIsValid(objectId)
Definition c.h:788
size_t Size
Definition c.h:619
bool IsReservedName(const char *name)
Definition catalog.c:278
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
int errcode_for_file_access(void)
Definition elog.c:886
int errdetail(const char *fmt,...)
Definition elog.c:1216
int errhint(const char *fmt,...)
Definition elog.c:1330
int errcode(int sqlerrcode)
Definition elog.c:863
int errmsg(const char *fmt,...)
Definition elog.c:1080
#define LOG
Definition elog.h:31
#define WARNING
Definition elog.h:36
#define DEBUG2
Definition elog.h:29
#define PANIC
Definition elog.h:42
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition fd.c:782
int CloseTransientFile(int fd)
Definition fd.c:2854
int OpenTransientFile(const char *fileName, int fileFlags)
Definition fd.c:2677
#define PG_RETURN_VOID()
Definition fmgr.h:350
#define PG_GETARG_TEXT_PP(n)
Definition fmgr.h:310
#define PG_GETARG_DATUM(n)
Definition fmgr.h:268
#define PG_RETURN_NULL()
Definition fmgr.h:346
#define PG_GETARG_INT32(n)
Definition fmgr.h:269
#define PG_GETARG_BOOL(n)
Definition fmgr.h:274
#define PG_RETURN_OID(x)
Definition fmgr.h:361
#define PG_FUNCTION_ARGS
Definition fmgr.h:193
#define PG_RETURN_BOOL(x)
Definition fmgr.h:360
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition funcapi.c:76
void systable_endscan(SysScanDesc sysscan)
Definition genam.c:603
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition genam.c:514
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition genam.c:388
int MyProcPid
Definition globals.c:47
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1117
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1435
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
#define ident
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition indexing.c:233
void CatalogTupleDelete(Relation heapRel, const ItemPointerData *tid)
Definition indexing.c:365
#define write(a, b, c)
Definition win32.h:14
#define read(a, b, c)
Definition win32.h:13
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:372
int i
Definition isn.c:77
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition lmgr.c:229
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition lmgr.c:107
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1148
#define NoLock
Definition lockdefs.h:34
#define AccessExclusiveLock
Definition lockdefs.h:43
#define ExclusiveLock
Definition lockdefs.h:42
#define RowExclusiveLock
Definition lockdefs.h:38
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1176
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1793
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition lwlock.c:698
@ LW_SHARED
Definition lwlock.h:113
@ LW_EXCLUSIVE
Definition lwlock.h:112
void pfree(void *pointer)
Definition mcxt.c:1616
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
ReplOriginId replorigin_create(const char *roname)
Definition origin.c:262
static ReplicationStateCtl * replication_states_ctl
Definition origin.c:181
ReplOriginXactState replorigin_xact_state
Definition origin.c:166
Size ReplicationOriginShmemSize(void)
Definition origin.c:542
ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition origin.c:231
void replorigin_advance(ReplOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition origin.c:918
Datum pg_replication_origin_progress(PG_FUNCTION_ARGS)
Definition origin.c:1595
void replorigin_session_reset(void)
Definition origin.c:1276
bool replorigin_by_oid(ReplOriginId roident, bool missing_ok, char **roname)
Definition origin.c:501
static bool IsReservedOriginName(const char *name)
Definition origin.c:214
void replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
Definition origin.c:1310
int max_active_replication_origins
Definition origin.c:104
Datum pg_replication_origin_advance(PG_FUNCTION_ARGS)
Definition origin.c:1560
static void replorigin_state_clear(ReplOriginId roident, bool nowait)
Definition origin.c:374
#define PG_REPLORIGIN_CHECKPOINT_TMPFILE
Definition origin.c:101
Datum pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
Definition origin.c:1509
XLogRecPtr replorigin_get_progress(ReplOriginId node, bool flush)
Definition origin.c:1047
static ReplicationState * replication_states
Definition origin.c:176
#define PG_REPLORIGIN_CHECKPOINT_FILENAME
Definition origin.c:100
Datum pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
Definition origin.c:1478
Datum pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
Definition origin.c:1530
Datum pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
Definition origin.c:1493
Datum pg_replication_origin_oid(PG_FUNCTION_ARGS)
Definition origin.c:1433
Datum pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
Definition origin.c:1454
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition origin.c:1119
void StartupReplicationOrigin(void)
Definition origin.c:730
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition origin.c:447
void CheckPointReplicationOrigin(void)
Definition origin.c:604
static void replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
Definition origin.c:195
static ReplicationState * session_replication_state
Definition origin.c:189
Datum pg_replication_origin_drop(PG_FUNCTION_ARGS)
Definition origin.c:1414
#define REPLICATION_ORIGIN_PROGRESS_COLS
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition origin.c:1328
void ReplicationOriginShmemInit(void)
Definition origin.c:557
static void replorigin_session_reset_internal(void)
Definition origin.c:1085
void replorigin_xact_clear(bool clear_origin)
Definition origin.c:1352
Datum pg_show_replication_origin_status(PG_FUNCTION_ARGS)
Definition origin.c:1620
#define REPLICATION_STATE_MAGIC
Definition origin.c:192
Datum pg_replication_origin_create(PG_FUNCTION_ARGS)
Definition origin.c:1373
void replorigin_session_setup(ReplOriginId node, int acquired_by)
Definition origin.c:1146
Datum pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
Definition origin.c:1548
void replorigin_redo(XLogReaderState *record)
Definition origin.c:857
#define DoNotReplicateId
Definition origin.h:34
#define XLOG_REPLORIGIN_DROP
Definition origin.h:31
#define InvalidReplOriginId
Definition origin.h:33
#define MAX_RONAME_LEN
Definition origin.h:41
#define XLOG_REPLORIGIN_SET
Definition origin.h:30
void * arg
#define ERRCODE_DATA_CORRUPTED
uint32 pg_crc32c
Definition pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition pg_crc32c.h:153
#define INIT_CRC32C(crc)
Definition pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition pg_crc32c.h:158
return crc
#define PG_GETARG_LSN(n)
Definition pg_lsn.h:36
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
#define PG_RETURN_LSN(x)
Definition pg_lsn.h:37
FormData_pg_replication_origin * Form_pg_replication_origin
int pg_strcasecmp(const char *s1, const char *s2)
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:262
uint64_t Datum
Definition postgres.h:70
static Pointer DatumGetPointer(Datum X)
Definition postgres.h:342
#define InvalidOid
unsigned int Oid
static int fd(const char *x, int i)
static int fb(int x)
#define RelationGetDescr(relation)
Definition rel.h:540
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition scankey.c:76
Size add_size(Size s1, Size s2)
Definition shmem.c:482
Size mul_size(Size s1, Size s2)
Definition shmem.c:497
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:378
#define InitDirtySnapshot(snapshotdata)
Definition snapmgr.h:42
#define BTEqualStrategyNumber
Definition stratnum.h:31
ItemPointerData t_self
Definition htup.h:65
Form_pg_class rd_rel
Definition rel.h:111
ReplOriginId origin
Definition origin.h:45
XLogRecPtr origin_lsn
Definition origin.h:46
TimestampTz origin_timestamp
Definition origin.h:47
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition origin.c:162
ReplOriginId roident
Definition origin.c:152
XLogRecPtr remote_lsn
Definition origin.c:153
XLogRecPtr remote_lsn
Definition origin.c:119
XLogRecPtr local_lsn
Definition origin.c:126
ConditionVariable origin_cv
Definition origin.c:139
ReplOriginId roident
Definition origin.c:114
XLogRecPtr EndRecPtr
Definition xlogreader.h:206
Definition c.h:706
ReplOriginId node_id
Definition origin.h:27
XLogRecPtr remote_lsn
Definition origin.h:20
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:264
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition syscache.c:220
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition tuplestore.c:784
#define PG_GETARG_TIMESTAMPTZ(n)
Definition timestamp.h:64
char * text_to_cstring(const text *t)
Definition varlena.c:214
const char * name
bool IsTransactionState(void)
Definition xact.c:388
void CommandCounterIncrement(void)
Definition xact.c:1101
bool RecoveryInProgress(void)
Definition xlog.c:6460
void XLogFlush(XLogRecPtr record)
Definition xlog.c:2783
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint16 ReplOriginId
Definition xlogdefs.h:69
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition xloginsert.c:478
void XLogRegisterData(const void *data, uint32 len)
Definition xloginsert.c:368
void XLogBeginInsert(void)
Definition xloginsert.c:152
#define XLogRecGetInfo(decoder)
Definition xlogreader.h:409
#define XLogRecGetData(decoder)
Definition xlogreader.h:414