PostgreSQL Source Code git master
Loading...
Searching...
No Matches
origin.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * origin.c
4 * Logical replication progress tracking support.
5 *
6 * Copyright (c) 2013-2026, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/replication/logical/origin.c
10 *
11 * NOTES
12 *
13 * This file provides the following:
14 * * An infrastructure to name nodes in a replication setup
15 * * A facility to efficiently store and persist replication progress in an
16 * efficient and durable manner.
17 *
18 * Replication origin consists of a descriptive, user defined, external
19 * name and a short, thus space efficient, internal 2 byte one. This split
20 * exists because replication origin have to be stored in WAL and shared
21 * memory and long descriptors would be inefficient. For now only use 2 bytes
22 * for the internal id of a replication origin as it seems unlikely that there
23 * soon will be more than 65k nodes in one replication setup; and using only
24 * two bytes allow us to be more space efficient.
25 *
26 * Replication progress is tracked in a shared memory table
27 * (ReplicationState) that's dumped to disk every checkpoint. Entries
28 * ('slots') in this table are identified by the internal id. That's the case
29 * because it allows to increase replication progress during crash
30 * recovery. To allow doing so we store the original LSN (from the originating
31 * system) of a transaction in the commit record. That allows to recover the
32 * precise replayed state after crash recovery; without requiring synchronous
33 * commits. Allowing logical replication to use asynchronous commit is
34 * generally good for performance, but especially important as it allows a
35 * single threaded replay process to keep up with a source that has multiple
36 * backends generating changes concurrently. For efficiency and simplicity
37 * reasons a backend can setup one replication origin that's from then used as
38 * the source of changes produced by the backend, until reset again.
39 *
40 * This infrastructure is intended to be used in cooperation with logical
41 * decoding. When replaying from a remote system the configured origin is
42 * provided to output plugins, allowing prevention of replication loops and
43 * other filtering.
44 *
45 * There are several levels of locking at work:
46 *
47 * * To create and drop replication origins an exclusive lock on
48 * pg_replication_slot is required for the duration. That allows us to
49 * safely and conflict free assign new origins using a dirty snapshot.
50 *
51 * * When creating an in-memory replication progress slot the ReplicationOrigin
52 * LWLock has to be held exclusively; when iterating over the replication
53 * progress a shared lock has to be held, the same when advancing the
54 * replication progress of an individual backend that has not setup as the
55 * session's replication origin.
56 *
57 * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 * replication progress slot that slot's lwlock has to be held. That's
59 * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 * all our platforms, but it also simplifies memory ordering concerns
61 * between the remote and local lsn. We use a lwlock instead of a spinlock
62 * so it's less harmful to hold the lock over a WAL write
63 * (cf. AdvanceReplicationProgress).
64 *
65 * ---------------------------------------------------------------------------
66 */
67
68#include "postgres.h"
69
70#include <unistd.h>
71#include <sys/stat.h>
72
73#include "access/genam.h"
74#include "access/htup_details.h"
75#include "access/table.h"
76#include "access/xact.h"
77#include "access/xloginsert.h"
78#include "catalog/catalog.h"
79#include "catalog/indexing.h"
81#include "funcapi.h"
82#include "miscadmin.h"
83#include "nodes/execnodes.h"
84#include "pgstat.h"
85#include "replication/origin.h"
86#include "replication/slot.h"
88#include "storage/fd.h"
89#include "storage/ipc.h"
90#include "storage/lmgr.h"
91#include "utils/builtins.h"
92#include "utils/fmgroids.h"
93#include "utils/guc.h"
94#include "utils/pg_lsn.h"
95#include "utils/rel.h"
96#include "utils/snapmgr.h"
97#include "utils/syscache.h"
98#include "utils/wait_event.h"
99
100/* paths for replication origin checkpoint files */
101#define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
102#define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
103
104/* GUC variables */
106
107/*
108 * Replay progress of a single remote node.
109 */
110typedef struct ReplicationState
111{
112 /*
113 * Local identifier for the remote node.
114 */
116
117 /*
118 * Location of the latest commit from the remote side.
119 */
121
122 /*
123 * Remember the local lsn of the commit record so we can XLogFlush() to it
124 * during a checkpoint so we know the commit record actually is safe on
125 * disk.
126 */
128
129 /*
130 * PID of backend that's acquired slot, or 0 if none.
131 */
133
134 /* Count of processes that are currently using this origin. */
136
137 /*
138 * Condition variable that's signaled when acquired_by changes.
139 */
141
142 /*
143 * Lock protecting remote_lsn and local_lsn.
144 */
147
148/*
149 * On disk version of ReplicationState.
150 */
156
157
159{
160 /* Tranche to use for per-origin LWLocks */
162 /* Array of length max_active_replication_origins */
165
166/* Global variable for per-transaction replication origin state */
168 .origin = InvalidReplOriginId, /* assumed identity */
169 .origin_lsn = InvalidXLogRecPtr,
170 .origin_timestamp = 0
171};
172
173/*
174 * Base address into a shared memory array of replication states of size
175 * max_active_replication_origins.
176 */
178
179/*
180 * Actual shared memory block (replication_states[] is now part of this).
181 */
183
184/*
185 * We keep a pointer to this backend's ReplicationState to avoid having to
186 * search the replication_states array in replorigin_session_advance for each
187 * remote commit. (Ownership of a backend's own entry can only be changed by
188 * that backend.)
189 */
191
192/* Magic for on disk files. */
193#define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
194
195static void
197{
201 errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
202
206 errmsg("cannot manipulate replication origins during recovery")));
207}
208
209
210/*
211 * IsReservedOriginName
212 * True iff name is either "none" or "any".
213 */
214static bool
216{
217 return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
219}
220
221/* ---------------------------------------------------------------------------
222 * Functions for working with replication origins themselves.
223 * ---------------------------------------------------------------------------
224 */
225
226/*
227 * Check for a persistent replication origin identified by name.
228 *
229 * Returns InvalidOid if the node isn't known yet and missing_ok is true.
230 */
232replorigin_by_name(const char *roname, bool missing_ok)
233{
235 Oid roident = InvalidOid;
236 HeapTuple tuple;
238
240
242 if (HeapTupleIsValid(tuple))
243 {
245 roident = ident->roident;
246 ReleaseSysCache(tuple);
247 }
248 else if (!missing_ok)
251 errmsg("replication origin \"%s\" does not exist",
252 roname)));
253
254 return roident;
255}
256
257/*
258 * Create a replication origin.
259 *
260 * Needs to be called in a transaction.
261 */
264{
265 Oid roident;
266 HeapTuple tuple = NULL;
267 Relation rel;
270 SysScanDesc scan;
271 ScanKeyData key;
272
273 /*
274 * To avoid needing a TOAST table for pg_replication_origin, we limit
275 * replication origin names to 512 bytes. This should be more than enough
276 * for all practical use.
277 */
281 errmsg("replication origin name is too long"),
282 errdetail("Replication origin names must be no longer than %d bytes.",
284
286
288
289 /*
290 * We need the numeric replication origin to be 16bit wide, so we cannot
291 * rely on the normal oid allocation. Instead we simply scan
292 * pg_replication_origin for the first unused id. That's not particularly
293 * efficient, but this should be a fairly infrequent operation - we can
294 * easily spend a bit more code on this when it turns out it needs to be
295 * faster.
296 *
297 * We handle concurrency by taking an exclusive lock (allowing reads!)
298 * over the table for the duration of the search. Because we use a "dirty
299 * snapshot" we can read rows that other in-progress sessions have
300 * written, even though they would be invisible with normal snapshots. Due
301 * to the exclusive lock there's no danger that new rows can appear while
302 * we're checking.
303 */
305
307
308 /*
309 * We want to be able to access pg_replication_origin without setting up a
310 * snapshot. To make that safe, it needs to not have a TOAST table, since
311 * TOASTed data cannot be fetched without a snapshot. As of this writing,
312 * its only varlena column is roname, which we limit to 512 bytes to avoid
313 * needing out-of-line storage. If you add a TOAST table to this catalog,
314 * be sure to set up a snapshot everywhere it might be needed. For more
315 * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
316 */
317 Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
318
319 for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
320 {
321 bool nulls[Natts_pg_replication_origin];
323 bool collides;
324
326
327 ScanKeyInit(&key,
330 ObjectIdGetDatum(roident));
331
333 true /* indexOK */ ,
335 1, &key);
336
338
339 systable_endscan(scan);
340
341 if (!collides)
342 {
343 /*
344 * Ok, found an unused roident, insert the new row and do a CCI,
345 * so our callers can look it up if they want to.
346 */
347 memset(&nulls, 0, sizeof(nulls));
348
351
352 tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
353 CatalogTupleInsert(rel, tuple);
355 break;
356 }
357 }
358
359 /* now release lock again, */
361
362 if (tuple == NULL)
365 errmsg("could not find free replication origin ID")));
366
367 heap_freetuple(tuple);
368 return roident;
369}
370
371/*
372 * Helper function to drop a replication origin.
373 */
374static void
376{
377 int i;
378
379 /*
380 * Clean up the slot state info, if there is any matching slot.
381 */
382restart:
384
385 for (i = 0; i < max_active_replication_origins; i++)
386 {
388
389 if (state->roident == roident)
390 {
391 /* found our slot, is it busy? */
392 if (state->refcount > 0)
393 {
395
396 if (nowait)
399 (state->acquired_by != 0)
400 ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
401 state->roident,
402 state->acquired_by)
403 : errmsg("could not drop replication origin with ID %d, in use by another process",
404 state->roident)));
405
406 /*
407 * We must wait and then retry. Since we don't know which CV
408 * to wait on until here, we can't readily use
409 * ConditionVariablePrepareToSleep (calling it here would be
410 * wrong, since we could miss the signal if we did so); just
411 * use ConditionVariableSleep directly.
412 */
413 cv = &state->origin_cv;
414
416
418 goto restart;
419 }
420
421 /* first make a WAL log entry */
422 {
424
425 xlrec.node_id = roident;
427 XLogRegisterData(&xlrec, sizeof(xlrec));
429 }
430
431 /* then clear the in-memory slot */
432 state->roident = InvalidReplOriginId;
433 state->remote_lsn = InvalidXLogRecPtr;
434 state->local_lsn = InvalidXLogRecPtr;
435 break;
436 }
437 }
440}
441
442/*
443 * Drop replication origin (by name).
444 *
445 * Needs to be called in a transaction.
446 */
447void
448replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
449{
450 ReplOriginId roident;
451 Relation rel;
452 HeapTuple tuple;
453
455
457
458 roident = replorigin_by_name(name, missing_ok);
459
460 /* Lock the origin to prevent concurrent drops. */
463
465 if (!HeapTupleIsValid(tuple))
466 {
467 if (!missing_ok)
468 elog(ERROR, "cache lookup failed for replication origin with ID %d",
469 roident);
470
471 /*
472 * We don't need to retain the locks if the origin is already dropped.
473 */
477 return;
478 }
479
480 replorigin_state_clear(roident, nowait);
481
482 /*
483 * Now, we can delete the catalog entry.
484 */
485 CatalogTupleDelete(rel, &tuple->t_self);
486 ReleaseSysCache(tuple);
487
489
490 /* We keep the lock on pg_replication_origin until commit */
491 table_close(rel, NoLock);
492}
493
494/*
495 * Lookup replication origin via its oid and return the name.
496 *
497 * The external name is palloc'd in the calling context.
498 *
499 * Returns true if the origin is known, false otherwise.
500 */
501bool
502replorigin_by_oid(ReplOriginId roident, bool missing_ok, char **roname)
503{
504 HeapTuple tuple;
506
507 Assert(OidIsValid((Oid) roident));
508 Assert(roident != InvalidReplOriginId);
509 Assert(roident != DoNotReplicateId);
510
512 ObjectIdGetDatum((Oid) roident));
513
514 if (HeapTupleIsValid(tuple))
515 {
517 *roname = text_to_cstring(&ric->roname);
518 ReleaseSysCache(tuple);
519
520 return true;
521 }
522 else
523 {
524 *roname = NULL;
525
526 if (!missing_ok)
529 errmsg("replication origin with ID %d does not exist",
530 roident)));
531
532 return false;
533 }
534}
535
536
537/* ---------------------------------------------------------------------------
538 * Functions for handling replication progress.
539 * ---------------------------------------------------------------------------
540 */
541
542Size
544{
545 Size size = 0;
546
548 return size;
549
550 size = add_size(size, offsetof(ReplicationStateCtl, states));
551
552 size = add_size(size,
554 return size;
555}
556
557void
559{
560 bool found;
561
563 return;
564
566 ShmemInitStruct("ReplicationOriginState",
568 &found);
570
571 if (!found)
572 {
573 int i;
574
576
578
579 for (i = 0; i < max_active_replication_origins; i++)
580 {
584 }
585 }
586}
587
588/* ---------------------------------------------------------------------------
589 * Perform a checkpoint of each replication origin's progress with respect to
590 * the replayed remote_lsn. Make sure that all transactions we refer to in the
591 * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
592 * if the transactions were originally committed asynchronously.
593 *
594 * We store checkpoints in the following format:
595 * +-------+------------------------+------------------+-----+--------+
596 * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
597 * +-------+------------------------+------------------+-----+--------+
598 *
599 * So its just the magic, followed by the statically sized
600 * ReplicationStateOnDisk structs. Note that the maximum number of
601 * ReplicationState is determined by max_active_replication_origins.
602 * ---------------------------------------------------------------------------
603 */
604void
606{
608 const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
609 int tmpfd;
610 int i;
613
615 return;
616
618
619 /* make sure no old temp file is remaining */
620 if (unlink(tmppath) < 0 && errno != ENOENT)
623 errmsg("could not remove file \"%s\": %m",
624 tmppath)));
625
626 /*
627 * no other backend can perform this at the same time; only one checkpoint
628 * can happen at a time.
629 */
632 if (tmpfd < 0)
635 errmsg("could not create file \"%s\": %m",
636 tmppath)));
637
638 /* write magic */
639 errno = 0;
640 if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
641 {
642 /* if write didn't set errno, assume problem is no disk space */
643 if (errno == 0)
644 errno = ENOSPC;
647 errmsg("could not write to file \"%s\": %m",
648 tmppath)));
649 }
650 COMP_CRC32C(crc, &magic, sizeof(magic));
651
652 /* prevent concurrent creations/drops */
654
655 /* write actual data */
656 for (i = 0; i < max_active_replication_origins; i++)
657 {
660 XLogRecPtr local_lsn;
661
662 if (curstate->roident == InvalidReplOriginId)
663 continue;
664
665 /* zero, to avoid uninitialized padding bytes */
666 memset(&disk_state, 0, sizeof(disk_state));
667
669
670 disk_state.roident = curstate->roident;
671
672 disk_state.remote_lsn = curstate->remote_lsn;
673 local_lsn = curstate->local_lsn;
674
675 LWLockRelease(&curstate->lock);
676
677 /* make sure we only write out a commit that's persistent */
678 XLogFlush(local_lsn);
679
680 errno = 0;
681 if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
682 sizeof(disk_state))
683 {
684 /* if write didn't set errno, assume problem is no disk space */
685 if (errno == 0)
686 errno = ENOSPC;
689 errmsg("could not write to file \"%s\": %m",
690 tmppath)));
691 }
692
694 }
695
697
698 /* write out the CRC */
700 errno = 0;
701 if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
702 {
703 /* if write didn't set errno, assume problem is no disk space */
704 if (errno == 0)
705 errno = ENOSPC;
708 errmsg("could not write to file \"%s\": %m",
709 tmppath)));
710 }
711
712 if (CloseTransientFile(tmpfd) != 0)
715 errmsg("could not close file \"%s\": %m",
716 tmppath)));
717
718 /* fsync, rename to permanent file, fsync file and directory */
720}
721
722/*
723 * Recover replication replay status from checkpoint data saved earlier by
724 * CheckPointReplicationOrigin.
725 *
726 * This only needs to be called at startup and *not* during every checkpoint
727 * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
728 * state thereafter can be recovered by looking at commit records.
729 */
730void
732{
733 const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
734 int fd;
735 int readBytes;
737 int last_state = 0;
740
741 /* don't want to overwrite already existing state */
742#ifdef USE_ASSERT_CHECKING
743 static bool already_started = false;
744
746 already_started = true;
747#endif
748
750 return;
751
753
754 elog(DEBUG2, "starting up replication origin progress state");
755
757
758 /*
759 * might have had max_active_replication_origins == 0 last run, or we just
760 * brought up a standby.
761 */
762 if (fd < 0 && errno == ENOENT)
763 return;
764 else if (fd < 0)
767 errmsg("could not open file \"%s\": %m",
768 path)));
769
770 /* verify magic, that is written even if nothing was active */
771 readBytes = read(fd, &magic, sizeof(magic));
772 if (readBytes != sizeof(magic))
773 {
774 if (readBytes < 0)
777 errmsg("could not read file \"%s\": %m",
778 path)));
779 else
782 errmsg("could not read file \"%s\": read %d of %zu",
783 path, readBytes, sizeof(magic))));
784 }
785 COMP_CRC32C(crc, &magic, sizeof(magic));
786
787 if (magic != REPLICATION_STATE_MAGIC)
789 (errmsg("replication checkpoint has wrong magic %u instead of %u",
790 magic, REPLICATION_STATE_MAGIC)));
791
792 /* we can skip locking here, no other access is possible */
793
794 /* recover individual states, until there are no more to be found */
795 while (true)
796 {
798
799 readBytes = read(fd, &disk_state, sizeof(disk_state));
800
801 if (readBytes < 0)
802 {
805 errmsg("could not read file \"%s\": %m",
806 path)));
807 }
808
809 /* no further data */
810 if (readBytes == sizeof(crc))
811 {
812 memcpy(&file_crc, &disk_state, sizeof(file_crc));
813 break;
814 }
815
816 if (readBytes != sizeof(disk_state))
817 {
820 errmsg("could not read file \"%s\": read %d of %zu",
821 path, readBytes, sizeof(disk_state))));
822 }
823
825
829 errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
830
831 /* copy data to shared memory */
834 last_state++;
835
836 ereport(LOG,
837 errmsg("recovered replication state of node %d to %X/%08X",
838 disk_state.roident,
839 LSN_FORMAT_ARGS(disk_state.remote_lsn)));
840 }
841
842 /* now check checksum */
844 if (file_crc != crc)
847 errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
848 crc, file_crc)));
849
850 if (CloseTransientFile(fd) != 0)
853 errmsg("could not close file \"%s\": %m",
854 path)));
855}
856
857void
859{
860 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
861
862 switch (info)
863 {
865 {
868
869 replorigin_advance(xlrec->node_id,
870 xlrec->remote_lsn, record->EndRecPtr,
871 xlrec->force /* backward */ ,
872 false /* WAL log */ );
873 break;
874 }
876 {
878 int i;
879
881
882 for (i = 0; i < max_active_replication_origins; i++)
883 {
885
886 /* found our slot */
887 if (state->roident == xlrec->node_id)
888 {
889 /* reset entry */
890 state->roident = InvalidReplOriginId;
891 state->remote_lsn = InvalidXLogRecPtr;
892 state->local_lsn = InvalidXLogRecPtr;
893 break;
894 }
895 }
896 break;
897 }
898 default:
899 elog(PANIC, "replorigin_redo: unknown op code %u", info);
900 }
901}
902
903
904/*
905 * Tell the replication origin progress machinery that a commit from 'node'
906 * that originated at the LSN remote_commit on the remote node was replayed
907 * successfully and that we don't need to do so again. In combination with
908 * setting up replorigin_xact_state {.origin_lsn, .origin_timestamp}
909 * that ensures we won't lose knowledge about that after a crash if the
910 * transaction had a persistent effect (think of asynchronous commits).
911 *
912 * local_commit needs to be a local LSN of the commit so that we can make sure
913 * upon a checkpoint that enough WAL has been persisted to disk.
914 *
915 * Needs to be called with a RowExclusiveLock on pg_replication_origin,
916 * unless running in recovery.
917 */
918void
921 bool go_backward, bool wal_log)
922{
923 int i;
926
928
929 /* we don't track DoNotReplicateId */
930 if (node == DoNotReplicateId)
931 return;
932
933 /*
934 * XXX: For the case where this is called by WAL replay, it'd be more
935 * efficient to restore into a backend local hashtable and only dump into
936 * shmem after recovery is finished. Let's wait with implementing that
937 * till it's shown to be a measurable expense
938 */
939
940 /* Lock exclusively, as we may have to create a new table entry. */
942
943 /*
944 * Search for either an existing slot for the origin, or a free one we can
945 * use.
946 */
947 for (i = 0; i < max_active_replication_origins; i++)
948 {
950
951 /* remember where to insert if necessary */
952 if (curstate->roident == InvalidReplOriginId &&
953 free_state == NULL)
954 {
956 continue;
957 }
958
959 /* not our slot */
960 if (curstate->roident != node)
961 {
962 continue;
963 }
964
965 /* ok, found slot */
967
969
970 /* Make sure it's not used by somebody else */
971 if (replication_state->refcount > 0)
972 {
975 (replication_state->acquired_by != 0)
976 ? errmsg("replication origin with ID %d is already active for PID %d",
977 replication_state->roident,
978 replication_state->acquired_by)
979 : errmsg("replication origin with ID %d is already active in another process",
980 replication_state->roident)));
981 }
982
983 break;
984 }
985
989 errmsg("could not find free replication state slot for replication origin with ID %d",
990 node),
991 errhint("Increase \"max_active_replication_origins\" and try again.")));
992
993 if (replication_state == NULL)
994 {
995 /* initialize new slot */
1000 replication_state->roident = node;
1001 }
1002
1004
1005 /*
1006 * If somebody "forcefully" sets this slot, WAL log it, so it's durable
1007 * and the standby gets the message. Primarily this will be called during
1008 * WAL replay (of commit records) where no WAL logging is necessary.
1009 */
1010 if (wal_log)
1011 {
1013
1015 xlrec.node_id = node;
1016 xlrec.force = go_backward;
1017
1019 XLogRegisterData(&xlrec, sizeof(xlrec));
1020
1022 }
1023
1024 /*
1025 * Due to - harmless - race conditions during a checkpoint we could see
1026 * values here that are older than the ones we already have in memory. We
1027 * could also see older values for prepared transactions when the prepare
1028 * is sent at a later point of time along with commit prepared and there
1029 * are other transactions commits between prepare and commit prepared. See
1030 * ReorderBufferFinishPrepared. Don't overwrite those.
1031 */
1032 if (go_backward || replication_state->remote_lsn < remote_commit)
1033 replication_state->remote_lsn = remote_commit;
1035 (go_backward || replication_state->local_lsn < local_commit))
1036 replication_state->local_lsn = local_commit;
1038
1039 /*
1040 * Release *after* changing the LSNs, slot isn't acquired and thus could
1041 * otherwise be dropped anytime.
1042 */
1044}
1045
1046
1049{
1050 int i;
1051 XLogRecPtr local_lsn = InvalidXLogRecPtr;
1052 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1053
1054 /* prevent slots from being concurrently dropped */
1056
1057 for (i = 0; i < max_active_replication_origins; i++)
1058 {
1060
1062
1063 if (state->roident == node)
1064 {
1065 LWLockAcquire(&state->lock, LW_SHARED);
1066
1067 remote_lsn = state->remote_lsn;
1068 local_lsn = state->local_lsn;
1069
1070 LWLockRelease(&state->lock);
1071
1072 break;
1073 }
1074 }
1075
1077
1078 if (flush && XLogRecPtrIsValid(local_lsn))
1079 XLogFlush(local_lsn);
1080
1081 return remote_lsn;
1082}
1083
1084/* Helper function to reset the session replication origin */
1085static void
1087{
1089
1091
1093
1094 /* The origin must be held by at least one process at this point. */
1096
1097 /*
1098 * Reset the PID only if the current session is the first to set up this
1099 * origin. This avoids clearing the first process's PID when any other
1100 * session releases the origin.
1101 */
1104
1106
1109
1111
1113}
1114
1115/*
1116 * Tear down a (possibly) configured session replication origin during process
1117 * exit.
1118 */
1119static void
1127
1128/*
1129 * Setup a replication origin in the shared memory struct if it doesn't
1130 * already exist and cache access to the specific ReplicationSlot so the
1131 * array doesn't have to be searched when calling
1132 * replorigin_session_advance().
1133 *
1134 * Normally only one such cached origin can exist per process so the cached
1135 * value can only be set again after the previous value is torn down with
1136 * replorigin_session_reset(). For this normal case pass acquired_by = 0
1137 * (meaning the slot is not allowed to be already acquired by another process).
1138 *
1139 * However, sometimes multiple processes can safely re-use the same origin slot
1140 * (for example, multiple parallel apply processes can safely use the same
1141 * origin, provided they maintain commit order by allowing only one process to
1142 * commit at a time). For this case the first process must pass acquired_by =
1143 * 0, and then the other processes sharing that same origin can pass
1144 * acquired_by = PID of the first process.
1145 */
1146void
1148{
1149 static bool registered_cleanup;
1150 int i;
1151 int free_slot = -1;
1152
1153 if (!registered_cleanup)
1154 {
1156 registered_cleanup = true;
1157 }
1158
1160
1162 ereport(ERROR,
1164 errmsg("cannot setup replication origin when one is already setup")));
1165
1166 /* Lock exclusively, as we may have to create a new table entry. */
1168
1169 /*
1170 * Search for either an existing slot for the origin, or a free one we can
1171 * use.
1172 */
1173 for (i = 0; i < max_active_replication_origins; i++)
1174 {
1176
1177 /* remember where to insert if necessary */
1178 if (curstate->roident == InvalidReplOriginId &&
1179 free_slot == -1)
1180 {
1181 free_slot = i;
1182 continue;
1183 }
1184
1185 /* not our slot */
1186 if (curstate->roident != node)
1187 continue;
1188
1189 else if (curstate->acquired_by != 0 && acquired_by == 0)
1190 {
1191 ereport(ERROR,
1193 errmsg("replication origin with ID %d is already active for PID %d",
1194 curstate->roident, curstate->acquired_by)));
1195 }
1196
1197 else if (curstate->acquired_by != acquired_by)
1198 {
1199 ereport(ERROR,
1201 errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
1202 node, acquired_by)));
1203 }
1204
1205 /*
1206 * The origin is in use, but PID is not recorded. This can happen if
1207 * the process that originally acquired the origin exited without
1208 * releasing it. To ensure correctness, other processes cannot acquire
1209 * the origin until all processes currently using it have released it.
1210 */
1211 else if (curstate->acquired_by == 0 && curstate->refcount > 0)
1212 ereport(ERROR,
1214 errmsg("replication origin with ID %d is already active in another process",
1215 curstate->roident)));
1216
1217 /* ok, found slot */
1219 break;
1220 }
1221
1222
1224 ereport(ERROR,
1226 errmsg("could not find free replication state slot for replication origin with ID %d",
1227 node),
1228 errhint("Increase \"max_active_replication_origins\" and try again.")));
1229 else if (session_replication_state == NULL)
1230 {
1231 if (acquired_by)
1232 ereport(ERROR,
1234 errmsg("cannot use PID %d for inactive replication origin with ID %d",
1235 acquired_by, node)));
1236
1237 /* initialize new slot */
1242 }
1243
1244
1246
1247 if (acquired_by == 0)
1248 {
1251 }
1252 else
1253 {
1254 /*
1255 * Sanity check: the origin must already be acquired by the process
1256 * passed as input, and at least one process must be using it.
1257 */
1260 }
1261
1263
1265
1266 /* probably this one is pointless */
1268}
1269
1270/*
1271 * Reset replay state previously setup in this session.
1272 *
1273 * This function may only be called if an origin was setup with
1274 * replorigin_session_setup().
1275 */
1276void
1278{
1280
1282 ereport(ERROR,
1284 errmsg("no replication origin is configured")));
1285
1286 /*
1287 * Restrict explicit resetting of the replication origin if it was first
1288 * acquired by this process and others are still using it. While the
1289 * system handles this safely (as happens if the first session exits
1290 * without calling reset), it is best to avoid doing so.
1291 */
1294 ereport(ERROR,
1296 errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
1298 errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
1299 errhint("Reset the replication origin in all other processes before retrying.")));
1300
1302}
1303
1304/*
1305 * Do the same work replorigin_advance() does, just on the session's
1306 * configured origin.
1307 *
1308 * This is noticeably cheaper than using replorigin_advance().
1309 */
1310void
1323
1324/*
1325 * Ask the machinery about the point up to which we successfully replayed
1326 * changes from an already setup replication origin.
1327 */
1330{
1331 XLogRecPtr remote_lsn;
1332 XLogRecPtr local_lsn;
1333
1335
1340
1341 if (flush && XLogRecPtrIsValid(local_lsn))
1342 XLogFlush(local_lsn);
1343
1344 return remote_lsn;
1345}
1346
1347/*
1348 * Clear the per-transaction replication origin state.
1349 *
1350 * replorigin_session_origin is also cleared if clear_origin is set.
1351 */
1352void
1360
1361
1362/* ---------------------------------------------------------------------------
1363 * SQL functions for working with replication origin.
1364 *
1365 * These mostly should be fairly short wrappers around more generic functions.
1366 * ---------------------------------------------------------------------------
1367 */
1368
1369/*
1370 * Create replication origin for the passed in name, and return the assigned
1371 * oid.
1372 */
1373Datum
1375{
1376 char *name;
1377 ReplOriginId roident;
1378
1379 replorigin_check_prerequisites(false, false);
1380
1382
1383 /*
1384 * Replication origins "any and "none" are reserved for system options.
1385 * The origins "pg_xxx" are reserved for internal use.
1386 */
1388 ereport(ERROR,
1390 errmsg("replication origin name \"%s\" is reserved",
1391 name),
1392 errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1394
1395 /*
1396 * If built with appropriate switch, whine when regression-testing
1397 * conventions for replication origin names are violated.
1398 */
1399#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1400 if (strncmp(name, "regress_", 8) != 0)
1401 elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1402#endif
1403
1404 roident = replorigin_create(name);
1405
1406 pfree(name);
1407
1408 PG_RETURN_OID(roident);
1409}
1410
1411/*
1412 * Drop replication origin.
1413 */
1414Datum
1416{
1417 char *name;
1418
1419 replorigin_check_prerequisites(false, false);
1420
1422
1423 replorigin_drop_by_name(name, false, true);
1424
1425 pfree(name);
1426
1428}
1429
1430/*
1431 * Return oid of a replication origin.
1432 */
1433Datum
1435{
1436 char *name;
1437 ReplOriginId roident;
1438
1439 replorigin_check_prerequisites(false, false);
1440
1442 roident = replorigin_by_name(name, true);
1443
1444 pfree(name);
1445
1446 if (OidIsValid(roident))
1447 PG_RETURN_OID(roident);
1449}
1450
1451/*
1452 * Setup a replication origin for this session.
1453 */
1454Datum
1456{
1457 char *name;
1458 ReplOriginId origin;
1459 int pid;
1460
1461 replorigin_check_prerequisites(true, false);
1462
1464 origin = replorigin_by_name(name, false);
1465 pid = PG_GETARG_INT32(1);
1466 replorigin_session_setup(origin, pid);
1467
1469
1470 pfree(name);
1471
1473}
1474
1475/*
1476 * Reset previously setup origin in this session
1477 */
1478Datum
1489
1490/*
1491 * Has a replication origin been setup for this session.
1492 */
1493Datum
1500
1501
1502/*
1503 * Return the replication progress for origin setup in the current session.
1504 *
1505 * If 'flush' is set to true it is ensured that the returned value corresponds
1506 * to a local transaction that has been flushed. This is useful if asynchronous
1507 * commits are used when replaying replicated transactions.
1508 */
1509Datum
1511{
1512 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1513 bool flush = PG_GETARG_BOOL(0);
1514
1515 replorigin_check_prerequisites(true, false);
1516
1518 ereport(ERROR,
1520 errmsg("no replication origin is configured")));
1521
1522 remote_lsn = replorigin_session_get_progress(flush);
1523
1524 if (!XLogRecPtrIsValid(remote_lsn))
1526
1527 PG_RETURN_LSN(remote_lsn);
1528}
1529
1530Datum
1547
1548Datum
1550{
1551 replorigin_check_prerequisites(true, false);
1552
1553 /* Do not clear the session origin */
1554 replorigin_xact_clear(false);
1555
1557}
1558
1559
1560Datum
1562{
1565 ReplOriginId node;
1566
1567 replorigin_check_prerequisites(true, false);
1568
1569 /* lock to prevent the replication origin from vanishing */
1571
1572 node = replorigin_by_name(text_to_cstring(name), false);
1573
1574 /*
1575 * Can't sensibly pass a local commit to be flushed at checkpoint - this
1576 * xact hasn't committed yet. This is why this function should be used to
1577 * set up the initial replication state, but not for replay.
1578 */
1580 true /* go backward */ , true /* WAL log */ );
1581
1583
1585}
1586
1587
1588/*
1589 * Return the replication progress for an individual replication origin.
1590 *
1591 * If 'flush' is set to true it is ensured that the returned value corresponds
1592 * to a local transaction that has been flushed. This is useful if asynchronous
1593 * commits are used when replaying replicated transactions.
1594 */
1595Datum
1597{
1598 char *name;
1599 bool flush;
1600 ReplOriginId roident;
1601 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1602
1604
1606 flush = PG_GETARG_BOOL(1);
1607
1608 roident = replorigin_by_name(name, false);
1609 Assert(OidIsValid(roident));
1610
1611 remote_lsn = replorigin_get_progress(roident, flush);
1612
1613 if (!XLogRecPtrIsValid(remote_lsn))
1615
1616 PG_RETURN_LSN(remote_lsn);
1617}
1618
1619
1620Datum
1622{
1623 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1624 int i;
1625#define REPLICATION_ORIGIN_PROGRESS_COLS 4
1626
1627 /* we want to return 0 rows if slot is set to zero */
1628 replorigin_check_prerequisites(false, true);
1629
1630 InitMaterializedSRF(fcinfo, 0);
1631
1632 /* prevent slots from being concurrently dropped */
1634
1635 /*
1636 * Iterate through all possible replication_states, display if they are
1637 * filled. Note that we do not take any locks, so slightly corrupted/out
1638 * of date values are a possibility.
1639 */
1640 for (i = 0; i < max_active_replication_origins; i++)
1641 {
1645 char *roname;
1646
1648
1649 /* unused slot, nothing to display */
1650 if (state->roident == InvalidReplOriginId)
1651 continue;
1652
1653 memset(values, 0, sizeof(values));
1654 memset(nulls, 1, sizeof(nulls));
1655
1656 values[0] = ObjectIdGetDatum(state->roident);
1657 nulls[0] = false;
1658
1659 /*
1660 * We're not preventing the origin to be dropped concurrently, so
1661 * silently accept that it might be gone.
1662 */
1663 if (replorigin_by_oid(state->roident, true,
1664 &roname))
1665 {
1667 nulls[1] = false;
1668 }
1669
1670 LWLockAcquire(&state->lock, LW_SHARED);
1671
1672 values[2] = LSNGetDatum(state->remote_lsn);
1673 nulls[2] = false;
1674
1675 values[3] = LSNGetDatum(state->local_lsn);
1676 nulls[3] = false;
1677
1678 LWLockRelease(&state->lock);
1679
1680 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1681 values, nulls);
1682 }
1683
1685
1686#undef REPLICATION_ORIGIN_PROGRESS_COLS
1687
1688 return (Datum) 0;
1689}
static Datum values[MAXATTR]
Definition bootstrap.c:188
#define CStringGetTextDatum(s)
Definition builtins.h:98
uint8_t uint8
Definition c.h:616
#define Assert(condition)
Definition c.h:945
#define PG_BINARY
Definition c.h:1376
#define FLEXIBLE_ARRAY_MEMBER
Definition c.h:552
uint32_t uint32
Definition c.h:618
#define PG_UINT16_MAX
Definition c.h:673
#define MemSet(start, val, len)
Definition c.h:1109
#define OidIsValid(objectId)
Definition c.h:860
size_t Size
Definition c.h:691
bool IsReservedName(const char *name)
Definition catalog.c:278
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
Datum arg
Definition elog.c:1322
int errcode_for_file_access(void)
Definition elog.c:897
int errcode(int sqlerrcode)
Definition elog.c:874
#define LOG
Definition elog.h:31
int errhint(const char *fmt,...) pg_attribute_printf(1
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define WARNING
Definition elog.h:36
#define DEBUG2
Definition elog.h:29
#define PANIC
Definition elog.h:42
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition fd.c:783
int CloseTransientFile(int fd)
Definition fd.c:2855
int OpenTransientFile(const char *fileName, int fileFlags)
Definition fd.c:2678
#define PG_RETURN_VOID()
Definition fmgr.h:350
#define PG_GETARG_TEXT_PP(n)
Definition fmgr.h:310
#define PG_GETARG_DATUM(n)
Definition fmgr.h:268
#define PG_RETURN_NULL()
Definition fmgr.h:346
#define PG_GETARG_INT32(n)
Definition fmgr.h:269
#define PG_GETARG_BOOL(n)
Definition fmgr.h:274
#define PG_RETURN_OID(x)
Definition fmgr.h:361
#define PG_FUNCTION_ARGS
Definition fmgr.h:193
#define PG_RETURN_BOOL(x)
Definition fmgr.h:360
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition funcapi.c:76
void systable_endscan(SysScanDesc sysscan)
Definition genam.c:603
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition genam.c:514
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition genam.c:388
int MyProcPid
Definition globals.c:47
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1037
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1384
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
#define ident
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition indexing.c:233
void CatalogTupleDelete(Relation heapRel, const ItemPointerData *tid)
Definition indexing.c:365
#define write(a, b, c)
Definition win32.h:14
#define read(a, b, c)
Definition win32.h:13
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:372
int i
Definition isn.c:77
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition lmgr.c:229
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition lmgr.c:107
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1148
#define NoLock
Definition lockdefs.h:34
#define AccessExclusiveLock
Definition lockdefs.h:43
#define ExclusiveLock
Definition lockdefs.h:42
#define RowExclusiveLock
Definition lockdefs.h:38
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1177
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1794
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition lwlock.c:699
@ LW_SHARED
Definition lwlock.h:113
@ LW_EXCLUSIVE
Definition lwlock.h:112
void pfree(void *pointer)
Definition mcxt.c:1616
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
static char * errmsg
ReplOriginId replorigin_create(const char *roname)
Definition origin.c:263
static ReplicationStateCtl * replication_states_ctl
Definition origin.c:182
ReplOriginXactState replorigin_xact_state
Definition origin.c:167
Size ReplicationOriginShmemSize(void)
Definition origin.c:543
ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition origin.c:232
void replorigin_advance(ReplOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition origin.c:919
Datum pg_replication_origin_progress(PG_FUNCTION_ARGS)
Definition origin.c:1596
void replorigin_session_reset(void)
Definition origin.c:1277
bool replorigin_by_oid(ReplOriginId roident, bool missing_ok, char **roname)
Definition origin.c:502
static bool IsReservedOriginName(const char *name)
Definition origin.c:215
void replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
Definition origin.c:1311
int max_active_replication_origins
Definition origin.c:105
Datum pg_replication_origin_advance(PG_FUNCTION_ARGS)
Definition origin.c:1561
static void replorigin_state_clear(ReplOriginId roident, bool nowait)
Definition origin.c:375
#define PG_REPLORIGIN_CHECKPOINT_TMPFILE
Definition origin.c:102
Datum pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
Definition origin.c:1510
XLogRecPtr replorigin_get_progress(ReplOriginId node, bool flush)
Definition origin.c:1048
static ReplicationState * replication_states
Definition origin.c:177
#define PG_REPLORIGIN_CHECKPOINT_FILENAME
Definition origin.c:101
Datum pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
Definition origin.c:1479
Datum pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
Definition origin.c:1531
Datum pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
Definition origin.c:1494
Datum pg_replication_origin_oid(PG_FUNCTION_ARGS)
Definition origin.c:1434
Datum pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
Definition origin.c:1455
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition origin.c:1120
void StartupReplicationOrigin(void)
Definition origin.c:731
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition origin.c:448
void CheckPointReplicationOrigin(void)
Definition origin.c:605
static void replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
Definition origin.c:196
static ReplicationState * session_replication_state
Definition origin.c:190
Datum pg_replication_origin_drop(PG_FUNCTION_ARGS)
Definition origin.c:1415
#define REPLICATION_ORIGIN_PROGRESS_COLS
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition origin.c:1329
void ReplicationOriginShmemInit(void)
Definition origin.c:558
static void replorigin_session_reset_internal(void)
Definition origin.c:1086
void replorigin_xact_clear(bool clear_origin)
Definition origin.c:1353
Datum pg_show_replication_origin_status(PG_FUNCTION_ARGS)
Definition origin.c:1621
#define REPLICATION_STATE_MAGIC
Definition origin.c:193
Datum pg_replication_origin_create(PG_FUNCTION_ARGS)
Definition origin.c:1374
void replorigin_session_setup(ReplOriginId node, int acquired_by)
Definition origin.c:1147
Datum pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
Definition origin.c:1549
void replorigin_redo(XLogReaderState *record)
Definition origin.c:858
#define DoNotReplicateId
Definition origin.h:34
#define XLOG_REPLORIGIN_DROP
Definition origin.h:31
#define InvalidReplOriginId
Definition origin.h:33
#define MAX_RONAME_LEN
Definition origin.h:41
#define XLOG_REPLORIGIN_SET
Definition origin.h:30
#define ERRCODE_DATA_CORRUPTED
uint32 pg_crc32c
Definition pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition pg_crc32c.h:153
#define INIT_CRC32C(crc)
Definition pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition pg_crc32c.h:158
return crc
#define PG_GETARG_LSN(n)
Definition pg_lsn.h:36
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
#define PG_RETURN_LSN(x)
Definition pg_lsn.h:37
END_CATALOG_STRUCT typedef FormData_pg_replication_origin * Form_pg_replication_origin
int pg_strcasecmp(const char *s1, const char *s2)
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
uint64_t Datum
Definition postgres.h:70
static Pointer DatumGetPointer(Datum X)
Definition postgres.h:332
#define InvalidOid
unsigned int Oid
static int fd(const char *x, int i)
static int fb(int x)
#define RelationGetDescr(relation)
Definition rel.h:540
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition scankey.c:76
Size add_size(Size s1, Size s2)
Definition shmem.c:485
Size mul_size(Size s1, Size s2)
Definition shmem.c:500
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:381
#define InitDirtySnapshot(snapshotdata)
Definition snapmgr.h:42
#define BTEqualStrategyNumber
Definition stratnum.h:31
ItemPointerData t_self
Definition htup.h:65
Form_pg_class rd_rel
Definition rel.h:111
ReplOriginId origin
Definition origin.h:45
XLogRecPtr origin_lsn
Definition origin.h:46
TimestampTz origin_timestamp
Definition origin.h:47
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition origin.c:163
ReplOriginId roident
Definition origin.c:153
XLogRecPtr remote_lsn
Definition origin.c:154
XLogRecPtr remote_lsn
Definition origin.c:120
XLogRecPtr local_lsn
Definition origin.c:127
ConditionVariable origin_cv
Definition origin.c:140
ReplOriginId roident
Definition origin.c:115
XLogRecPtr EndRecPtr
Definition xlogreader.h:206
Definition c.h:778
ReplOriginId node_id
Definition origin.h:27
XLogRecPtr remote_lsn
Definition origin.h:20
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:264
HeapTuple SearchSysCache1(SysCacheIdentifier cacheId, Datum key1)
Definition syscache.c:220
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition tuplestore.c:785
#define PG_GETARG_TIMESTAMPTZ(n)
Definition timestamp.h:64
char * text_to_cstring(const text *t)
Definition varlena.c:217
const char * name
bool IsTransactionState(void)
Definition xact.c:389
void CommandCounterIncrement(void)
Definition xact.c:1102
bool RecoveryInProgress(void)
Definition xlog.c:6444
void XLogFlush(XLogRecPtr record)
Definition xlog.c:2767
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint16 ReplOriginId
Definition xlogdefs.h:69
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition xloginsert.c:479
void XLogRegisterData(const void *data, uint32 len)
Definition xloginsert.c:369
void XLogBeginInsert(void)
Definition xloginsert.c:153
#define XLogRecGetInfo(decoder)
Definition xlogreader.h:409
#define XLogRecGetData(decoder)
Definition xlogreader.h:414