PostgreSQL Source Code git master
Loading...
Searching...
No Matches
origin.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * origin.c
4 * Logical replication progress tracking support.
5 *
6 * Copyright (c) 2013-2026, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/replication/logical/origin.c
10 *
11 * NOTES
12 *
13 * This file provides the following:
14 * * An infrastructure to name nodes in a replication setup
15 * * A facility to efficiently store and persist replication progress in an
16 * efficient and durable manner.
17 *
18 * Replication origin consists of a descriptive, user defined, external
19 * name and a short, thus space efficient, internal 2 byte one. This split
20 * exists because replication origin have to be stored in WAL and shared
21 * memory and long descriptors would be inefficient. For now only use 2 bytes
22 * for the internal id of a replication origin as it seems unlikely that there
23 * soon will be more than 65k nodes in one replication setup; and using only
24 * two bytes allow us to be more space efficient.
25 *
26 * Replication progress is tracked in a shared memory table
27 * (ReplicationState) that's dumped to disk every checkpoint. Entries
28 * ('slots') in this table are identified by the internal id. That's the case
29 * because it allows to increase replication progress during crash
30 * recovery. To allow doing so we store the original LSN (from the originating
31 * system) of a transaction in the commit record. That allows to recover the
32 * precise replayed state after crash recovery; without requiring synchronous
33 * commits. Allowing logical replication to use asynchronous commit is
34 * generally good for performance, but especially important as it allows a
35 * single threaded replay process to keep up with a source that has multiple
36 * backends generating changes concurrently. For efficiency and simplicity
37 * reasons a backend can setup one replication origin that's from then used as
38 * the source of changes produced by the backend, until reset again.
39 *
40 * This infrastructure is intended to be used in cooperation with logical
41 * decoding. When replaying from a remote system the configured origin is
42 * provided to output plugins, allowing prevention of replication loops and
43 * other filtering.
44 *
45 * There are several levels of locking at work:
46 *
47 * * To create and drop replication origins an exclusive lock on
48 * pg_replication_slot is required for the duration. That allows us to
49 * safely and conflict free assign new origins using a dirty snapshot.
50 *
51 * * When creating an in-memory replication progress slot the ReplicationOrigin
52 * LWLock has to be held exclusively; when iterating over the replication
53 * progress a shared lock has to be held, the same when advancing the
54 * replication progress of an individual backend that has not setup as the
55 * session's replication origin.
56 *
57 * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 * replication progress slot that slot's lwlock has to be held. That's
59 * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 * all our platforms, but it also simplifies memory ordering concerns
61 * between the remote and local lsn. We use a lwlock instead of a spinlock
62 * so it's less harmful to hold the lock over a WAL write
63 * (cf. AdvanceReplicationProgress).
64 *
65 * ---------------------------------------------------------------------------
66 */
67
68#include "postgres.h"
69
70#include <unistd.h>
71#include <sys/stat.h>
72
73#include "access/genam.h"
74#include "access/htup_details.h"
75#include "access/table.h"
76#include "access/xact.h"
77#include "access/xloginsert.h"
78#include "catalog/catalog.h"
79#include "catalog/indexing.h"
81#include "funcapi.h"
82#include "miscadmin.h"
83#include "nodes/execnodes.h"
84#include "pgstat.h"
85#include "replication/origin.h"
86#include "replication/slot.h"
88#include "storage/fd.h"
89#include "storage/ipc.h"
90#include "storage/lmgr.h"
91#include "utils/builtins.h"
92#include "utils/fmgroids.h"
93#include "utils/guc.h"
94#include "utils/pg_lsn.h"
95#include "utils/rel.h"
96#include "utils/snapmgr.h"
97#include "utils/syscache.h"
98
99/* paths for replication origin checkpoint files */
100#define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
101#define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
102
103/* GUC variables */
105
106/*
107 * Replay progress of a single remote node.
108 */
109typedef struct ReplicationState
110{
111 /*
112 * Local identifier for the remote node.
113 */
115
116 /*
117 * Location of the latest commit from the remote side.
118 */
120
121 /*
122 * Remember the local lsn of the commit record so we can XLogFlush() to it
123 * during a checkpoint so we know the commit record actually is safe on
124 * disk.
125 */
127
128 /*
129 * PID of backend that's acquired slot, or 0 if none.
130 */
132
133 /* Count of processes that are currently using this origin. */
135
136 /*
137 * Condition variable that's signaled when acquired_by changes.
138 */
140
141 /*
142 * Lock protecting remote_lsn and local_lsn.
143 */
146
147/*
148 * On disk version of ReplicationState.
149 */
155
156
158{
159 /* Tranche to use for per-origin LWLocks */
161 /* Array of length max_active_replication_origins */
164
165/* external variables */
169
170/*
171 * Base address into a shared memory array of replication states of size
172 * max_active_replication_origins.
173 */
175
176/*
177 * Actual shared memory block (replication_states[] is now part of this).
178 */
180
181/*
182 * We keep a pointer to this backend's ReplicationState to avoid having to
183 * search the replication_states array in replorigin_session_advance for each
184 * remote commit. (Ownership of a backend's own entry can only be changed by
185 * that backend.)
186 */
188
189/* Magic for on disk files. */
190#define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
191
192static void
194{
198 errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
199
203 errmsg("cannot manipulate replication origins during recovery")));
204}
205
206
207/*
208 * IsReservedOriginName
209 * True iff name is either "none" or "any".
210 */
211static bool
213{
214 return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
216}
217
218/* ---------------------------------------------------------------------------
219 * Functions for working with replication origins themselves.
220 * ---------------------------------------------------------------------------
221 */
222
223/*
224 * Check for a persistent replication origin identified by name.
225 *
226 * Returns InvalidOid if the node isn't known yet and missing_ok is true.
227 */
229replorigin_by_name(const char *roname, bool missing_ok)
230{
232 Oid roident = InvalidOid;
233 HeapTuple tuple;
235
237
239 if (HeapTupleIsValid(tuple))
240 {
242 roident = ident->roident;
243 ReleaseSysCache(tuple);
244 }
245 else if (!missing_ok)
248 errmsg("replication origin \"%s\" does not exist",
249 roname)));
250
251 return roident;
252}
253
254/*
255 * Create a replication origin.
256 *
257 * Needs to be called in a transaction.
258 */
261{
262 Oid roident;
263 HeapTuple tuple = NULL;
264 Relation rel;
267 SysScanDesc scan;
268 ScanKeyData key;
269
270 /*
271 * To avoid needing a TOAST table for pg_replication_origin, we limit
272 * replication origin names to 512 bytes. This should be more than enough
273 * for all practical use.
274 */
278 errmsg("replication origin name is too long"),
279 errdetail("Replication origin names must be no longer than %d bytes.",
281
283
285
286 /*
287 * We need the numeric replication origin to be 16bit wide, so we cannot
288 * rely on the normal oid allocation. Instead we simply scan
289 * pg_replication_origin for the first unused id. That's not particularly
290 * efficient, but this should be a fairly infrequent operation - we can
291 * easily spend a bit more code on this when it turns out it needs to be
292 * faster.
293 *
294 * We handle concurrency by taking an exclusive lock (allowing reads!)
295 * over the table for the duration of the search. Because we use a "dirty
296 * snapshot" we can read rows that other in-progress sessions have
297 * written, even though they would be invisible with normal snapshots. Due
298 * to the exclusive lock there's no danger that new rows can appear while
299 * we're checking.
300 */
302
304
305 /*
306 * We want to be able to access pg_replication_origin without setting up a
307 * snapshot. To make that safe, it needs to not have a TOAST table, since
308 * TOASTed data cannot be fetched without a snapshot. As of this writing,
309 * its only varlena column is roname, which we limit to 512 bytes to avoid
310 * needing out-of-line storage. If you add a TOAST table to this catalog,
311 * be sure to set up a snapshot everywhere it might be needed. For more
312 * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
313 */
314 Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
315
316 for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
317 {
318 bool nulls[Natts_pg_replication_origin];
320 bool collides;
321
323
324 ScanKeyInit(&key,
327 ObjectIdGetDatum(roident));
328
330 true /* indexOK */ ,
332 1, &key);
333
335
336 systable_endscan(scan);
337
338 if (!collides)
339 {
340 /*
341 * Ok, found an unused roident, insert the new row and do a CCI,
342 * so our callers can look it up if they want to.
343 */
344 memset(&nulls, 0, sizeof(nulls));
345
348
349 tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
350 CatalogTupleInsert(rel, tuple);
352 break;
353 }
354 }
355
356 /* now release lock again, */
358
359 if (tuple == NULL)
362 errmsg("could not find free replication origin ID")));
363
364 heap_freetuple(tuple);
365 return roident;
366}
367
368/*
369 * Helper function to drop a replication origin.
370 */
371static void
373{
374 int i;
375
376 /*
377 * Clean up the slot state info, if there is any matching slot.
378 */
379restart:
381
382 for (i = 0; i < max_active_replication_origins; i++)
383 {
385
386 if (state->roident == roident)
387 {
388 /* found our slot, is it busy? */
389 if (state->refcount > 0)
390 {
392
393 if (nowait)
396 (state->acquired_by != 0)
397 ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
398 state->roident,
399 state->acquired_by)
400 : errmsg("could not drop replication origin with ID %d, in use by another process",
401 state->roident)));
402
403 /*
404 * We must wait and then retry. Since we don't know which CV
405 * to wait on until here, we can't readily use
406 * ConditionVariablePrepareToSleep (calling it here would be
407 * wrong, since we could miss the signal if we did so); just
408 * use ConditionVariableSleep directly.
409 */
410 cv = &state->origin_cv;
411
413
415 goto restart;
416 }
417
418 /* first make a WAL log entry */
419 {
421
422 xlrec.node_id = roident;
424 XLogRegisterData(&xlrec, sizeof(xlrec));
426 }
427
428 /* then clear the in-memory slot */
429 state->roident = InvalidRepOriginId;
430 state->remote_lsn = InvalidXLogRecPtr;
431 state->local_lsn = InvalidXLogRecPtr;
432 break;
433 }
434 }
437}
438
439/*
440 * Drop replication origin (by name).
441 *
442 * Needs to be called in a transaction.
443 */
444void
445replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
446{
447 RepOriginId roident;
448 Relation rel;
449 HeapTuple tuple;
450
452
454
455 roident = replorigin_by_name(name, missing_ok);
456
457 /* Lock the origin to prevent concurrent drops. */
460
462 if (!HeapTupleIsValid(tuple))
463 {
464 if (!missing_ok)
465 elog(ERROR, "cache lookup failed for replication origin with ID %d",
466 roident);
467
468 /*
469 * We don't need to retain the locks if the origin is already dropped.
470 */
474 return;
475 }
476
477 replorigin_state_clear(roident, nowait);
478
479 /*
480 * Now, we can delete the catalog entry.
481 */
482 CatalogTupleDelete(rel, &tuple->t_self);
483 ReleaseSysCache(tuple);
484
486
487 /* We keep the lock on pg_replication_origin until commit */
488 table_close(rel, NoLock);
489}
490
491/*
492 * Lookup replication origin via its oid and return the name.
493 *
494 * The external name is palloc'd in the calling context.
495 *
496 * Returns true if the origin is known, false otherwise.
497 */
498bool
499replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
500{
501 HeapTuple tuple;
503
504 Assert(OidIsValid((Oid) roident));
505 Assert(roident != InvalidRepOriginId);
506 Assert(roident != DoNotReplicateId);
507
509 ObjectIdGetDatum((Oid) roident));
510
511 if (HeapTupleIsValid(tuple))
512 {
514 *roname = text_to_cstring(&ric->roname);
515 ReleaseSysCache(tuple);
516
517 return true;
518 }
519 else
520 {
521 *roname = NULL;
522
523 if (!missing_ok)
526 errmsg("replication origin with ID %d does not exist",
527 roident)));
528
529 return false;
530 }
531}
532
533
534/* ---------------------------------------------------------------------------
535 * Functions for handling replication progress.
536 * ---------------------------------------------------------------------------
537 */
538
539Size
541{
542 Size size = 0;
543
545 return size;
546
547 size = add_size(size, offsetof(ReplicationStateCtl, states));
548
549 size = add_size(size,
551 return size;
552}
553
554void
556{
557 bool found;
558
560 return;
561
563 ShmemInitStruct("ReplicationOriginState",
565 &found);
567
568 if (!found)
569 {
570 int i;
571
573
575
576 for (i = 0; i < max_active_replication_origins; i++)
577 {
581 }
582 }
583}
584
585/* ---------------------------------------------------------------------------
586 * Perform a checkpoint of each replication origin's progress with respect to
587 * the replayed remote_lsn. Make sure that all transactions we refer to in the
588 * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
589 * if the transactions were originally committed asynchronously.
590 *
591 * We store checkpoints in the following format:
592 * +-------+------------------------+------------------+-----+--------+
593 * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
594 * +-------+------------------------+------------------+-----+--------+
595 *
596 * So its just the magic, followed by the statically sized
597 * ReplicationStateOnDisk structs. Note that the maximum number of
598 * ReplicationState is determined by max_active_replication_origins.
599 * ---------------------------------------------------------------------------
600 */
601void
603{
605 const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
606 int tmpfd;
607 int i;
610
612 return;
613
615
616 /* make sure no old temp file is remaining */
617 if (unlink(tmppath) < 0 && errno != ENOENT)
620 errmsg("could not remove file \"%s\": %m",
621 tmppath)));
622
623 /*
624 * no other backend can perform this at the same time; only one checkpoint
625 * can happen at a time.
626 */
629 if (tmpfd < 0)
632 errmsg("could not create file \"%s\": %m",
633 tmppath)));
634
635 /* write magic */
636 errno = 0;
637 if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
638 {
639 /* if write didn't set errno, assume problem is no disk space */
640 if (errno == 0)
641 errno = ENOSPC;
644 errmsg("could not write to file \"%s\": %m",
645 tmppath)));
646 }
647 COMP_CRC32C(crc, &magic, sizeof(magic));
648
649 /* prevent concurrent creations/drops */
651
652 /* write actual data */
653 for (i = 0; i < max_active_replication_origins; i++)
654 {
657 XLogRecPtr local_lsn;
658
659 if (curstate->roident == InvalidRepOriginId)
660 continue;
661
662 /* zero, to avoid uninitialized padding bytes */
663 memset(&disk_state, 0, sizeof(disk_state));
664
666
667 disk_state.roident = curstate->roident;
668
669 disk_state.remote_lsn = curstate->remote_lsn;
670 local_lsn = curstate->local_lsn;
671
672 LWLockRelease(&curstate->lock);
673
674 /* make sure we only write out a commit that's persistent */
675 XLogFlush(local_lsn);
676
677 errno = 0;
678 if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
679 sizeof(disk_state))
680 {
681 /* if write didn't set errno, assume problem is no disk space */
682 if (errno == 0)
683 errno = ENOSPC;
686 errmsg("could not write to file \"%s\": %m",
687 tmppath)));
688 }
689
691 }
692
694
695 /* write out the CRC */
697 errno = 0;
698 if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
699 {
700 /* if write didn't set errno, assume problem is no disk space */
701 if (errno == 0)
702 errno = ENOSPC;
705 errmsg("could not write to file \"%s\": %m",
706 tmppath)));
707 }
708
709 if (CloseTransientFile(tmpfd) != 0)
712 errmsg("could not close file \"%s\": %m",
713 tmppath)));
714
715 /* fsync, rename to permanent file, fsync file and directory */
717}
718
719/*
720 * Recover replication replay status from checkpoint data saved earlier by
721 * CheckPointReplicationOrigin.
722 *
723 * This only needs to be called at startup and *not* during every checkpoint
724 * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
725 * state thereafter can be recovered by looking at commit records.
726 */
727void
729{
730 const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
731 int fd;
732 int readBytes;
734 int last_state = 0;
737
738 /* don't want to overwrite already existing state */
739#ifdef USE_ASSERT_CHECKING
740 static bool already_started = false;
741
743 already_started = true;
744#endif
745
747 return;
748
750
751 elog(DEBUG2, "starting up replication origin progress state");
752
754
755 /*
756 * might have had max_active_replication_origins == 0 last run, or we just
757 * brought up a standby.
758 */
759 if (fd < 0 && errno == ENOENT)
760 return;
761 else if (fd < 0)
764 errmsg("could not open file \"%s\": %m",
765 path)));
766
767 /* verify magic, that is written even if nothing was active */
768 readBytes = read(fd, &magic, sizeof(magic));
769 if (readBytes != sizeof(magic))
770 {
771 if (readBytes < 0)
774 errmsg("could not read file \"%s\": %m",
775 path)));
776 else
779 errmsg("could not read file \"%s\": read %d of %zu",
780 path, readBytes, sizeof(magic))));
781 }
782 COMP_CRC32C(crc, &magic, sizeof(magic));
783
784 if (magic != REPLICATION_STATE_MAGIC)
786 (errmsg("replication checkpoint has wrong magic %u instead of %u",
787 magic, REPLICATION_STATE_MAGIC)));
788
789 /* we can skip locking here, no other access is possible */
790
791 /* recover individual states, until there are no more to be found */
792 while (true)
793 {
795
796 readBytes = read(fd, &disk_state, sizeof(disk_state));
797
798 if (readBytes < 0)
799 {
802 errmsg("could not read file \"%s\": %m",
803 path)));
804 }
805
806 /* no further data */
807 if (readBytes == sizeof(crc))
808 {
809 memcpy(&file_crc, &disk_state, sizeof(file_crc));
810 break;
811 }
812
813 if (readBytes != sizeof(disk_state))
814 {
817 errmsg("could not read file \"%s\": read %d of %zu",
818 path, readBytes, sizeof(disk_state))));
819 }
820
822
826 errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
827
828 /* copy data to shared memory */
831 last_state++;
832
833 ereport(LOG,
834 errmsg("recovered replication state of node %d to %X/%08X",
835 disk_state.roident,
836 LSN_FORMAT_ARGS(disk_state.remote_lsn)));
837 }
838
839 /* now check checksum */
841 if (file_crc != crc)
844 errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
845 crc, file_crc)));
846
847 if (CloseTransientFile(fd) != 0)
850 errmsg("could not close file \"%s\": %m",
851 path)));
852}
853
854void
856{
857 uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
858
859 switch (info)
860 {
862 {
865
866 replorigin_advance(xlrec->node_id,
867 xlrec->remote_lsn, record->EndRecPtr,
868 xlrec->force /* backward */ ,
869 false /* WAL log */ );
870 break;
871 }
873 {
875 int i;
876
878
879 for (i = 0; i < max_active_replication_origins; i++)
880 {
882
883 /* found our slot */
884 if (state->roident == xlrec->node_id)
885 {
886 /* reset entry */
887 state->roident = InvalidRepOriginId;
888 state->remote_lsn = InvalidXLogRecPtr;
889 state->local_lsn = InvalidXLogRecPtr;
890 break;
891 }
892 }
893 break;
894 }
895 default:
896 elog(PANIC, "replorigin_redo: unknown op code %u", info);
897 }
898}
899
900
901/*
902 * Tell the replication origin progress machinery that a commit from 'node'
903 * that originated at the LSN remote_commit on the remote node was replayed
904 * successfully and that we don't need to do so again. In combination with
905 * setting up replorigin_session_origin_lsn and replorigin_session_origin
906 * that ensures we won't lose knowledge about that after a crash if the
907 * transaction had a persistent effect (think of asynchronous commits).
908 *
909 * local_commit needs to be a local LSN of the commit so that we can make sure
910 * upon a checkpoint that enough WAL has been persisted to disk.
911 *
912 * Needs to be called with a RowExclusiveLock on pg_replication_origin,
913 * unless running in recovery.
914 */
915void
918 bool go_backward, bool wal_log)
919{
920 int i;
923
924 Assert(node != InvalidRepOriginId);
925
926 /* we don't track DoNotReplicateId */
927 if (node == DoNotReplicateId)
928 return;
929
930 /*
931 * XXX: For the case where this is called by WAL replay, it'd be more
932 * efficient to restore into a backend local hashtable and only dump into
933 * shmem after recovery is finished. Let's wait with implementing that
934 * till it's shown to be a measurable expense
935 */
936
937 /* Lock exclusively, as we may have to create a new table entry. */
939
940 /*
941 * Search for either an existing slot for the origin, or a free one we can
942 * use.
943 */
944 for (i = 0; i < max_active_replication_origins; i++)
945 {
947
948 /* remember where to insert if necessary */
949 if (curstate->roident == InvalidRepOriginId &&
950 free_state == NULL)
951 {
953 continue;
954 }
955
956 /* not our slot */
957 if (curstate->roident != node)
958 {
959 continue;
960 }
961
962 /* ok, found slot */
964
966
967 /* Make sure it's not used by somebody else */
968 if (replication_state->refcount > 0)
969 {
972 (replication_state->acquired_by != 0)
973 ? errmsg("replication origin with ID %d is already active for PID %d",
974 replication_state->roident,
975 replication_state->acquired_by)
976 : errmsg("replication origin with ID %d is already active in another process",
977 replication_state->roident)));
978 }
979
980 break;
981 }
982
986 errmsg("could not find free replication state slot for replication origin with ID %d",
987 node),
988 errhint("Increase \"max_active_replication_origins\" and try again.")));
989
990 if (replication_state == NULL)
991 {
992 /* initialize new slot */
997 replication_state->roident = node;
998 }
999
1001
1002 /*
1003 * If somebody "forcefully" sets this slot, WAL log it, so it's durable
1004 * and the standby gets the message. Primarily this will be called during
1005 * WAL replay (of commit records) where no WAL logging is necessary.
1006 */
1007 if (wal_log)
1008 {
1010
1012 xlrec.node_id = node;
1013 xlrec.force = go_backward;
1014
1016 XLogRegisterData(&xlrec, sizeof(xlrec));
1017
1019 }
1020
1021 /*
1022 * Due to - harmless - race conditions during a checkpoint we could see
1023 * values here that are older than the ones we already have in memory. We
1024 * could also see older values for prepared transactions when the prepare
1025 * is sent at a later point of time along with commit prepared and there
1026 * are other transactions commits between prepare and commit prepared. See
1027 * ReorderBufferFinishPrepared. Don't overwrite those.
1028 */
1029 if (go_backward || replication_state->remote_lsn < remote_commit)
1030 replication_state->remote_lsn = remote_commit;
1032 (go_backward || replication_state->local_lsn < local_commit))
1033 replication_state->local_lsn = local_commit;
1035
1036 /*
1037 * Release *after* changing the LSNs, slot isn't acquired and thus could
1038 * otherwise be dropped anytime.
1039 */
1041}
1042
1043
1046{
1047 int i;
1048 XLogRecPtr local_lsn = InvalidXLogRecPtr;
1049 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1050
1051 /* prevent slots from being concurrently dropped */
1053
1054 for (i = 0; i < max_active_replication_origins; i++)
1055 {
1057
1059
1060 if (state->roident == node)
1061 {
1062 LWLockAcquire(&state->lock, LW_SHARED);
1063
1064 remote_lsn = state->remote_lsn;
1065 local_lsn = state->local_lsn;
1066
1067 LWLockRelease(&state->lock);
1068
1069 break;
1070 }
1071 }
1072
1074
1075 if (flush && XLogRecPtrIsValid(local_lsn))
1076 XLogFlush(local_lsn);
1077
1078 return remote_lsn;
1079}
1080
1081/* Helper function to reset the session replication origin */
1082static void
1084{
1086
1088
1090
1091 /* The origin must be held by at least one process at this point. */
1093
1094 /*
1095 * Reset the PID only if the current session is the first to set up this
1096 * origin. This avoids clearing the first process's PID when any other
1097 * session releases the origin.
1098 */
1101
1103
1106
1108
1110}
1111
1112/*
1113 * Tear down a (possibly) configured session replication origin during process
1114 * exit.
1115 */
1116static void
1124
1125/*
1126 * Setup a replication origin in the shared memory struct if it doesn't
1127 * already exist and cache access to the specific ReplicationSlot so the
1128 * array doesn't have to be searched when calling
1129 * replorigin_session_advance().
1130 *
1131 * Normally only one such cached origin can exist per process so the cached
1132 * value can only be set again after the previous value is torn down with
1133 * replorigin_session_reset(). For this normal case pass acquired_by = 0
1134 * (meaning the slot is not allowed to be already acquired by another process).
1135 *
1136 * However, sometimes multiple processes can safely re-use the same origin slot
1137 * (for example, multiple parallel apply processes can safely use the same
1138 * origin, provided they maintain commit order by allowing only one process to
1139 * commit at a time). For this case the first process must pass acquired_by =
1140 * 0, and then the other processes sharing that same origin can pass
1141 * acquired_by = PID of the first process.
1142 */
1143void
1145{
1146 static bool registered_cleanup;
1147 int i;
1148 int free_slot = -1;
1149
1150 if (!registered_cleanup)
1151 {
1153 registered_cleanup = true;
1154 }
1155
1157
1159 ereport(ERROR,
1161 errmsg("cannot setup replication origin when one is already setup")));
1162
1163 /* Lock exclusively, as we may have to create a new table entry. */
1165
1166 /*
1167 * Search for either an existing slot for the origin, or a free one we can
1168 * use.
1169 */
1170 for (i = 0; i < max_active_replication_origins; i++)
1171 {
1173
1174 /* remember where to insert if necessary */
1175 if (curstate->roident == InvalidRepOriginId &&
1176 free_slot == -1)
1177 {
1178 free_slot = i;
1179 continue;
1180 }
1181
1182 /* not our slot */
1183 if (curstate->roident != node)
1184 continue;
1185
1186 else if (curstate->acquired_by != 0 && acquired_by == 0)
1187 {
1188 ereport(ERROR,
1190 errmsg("replication origin with ID %d is already active for PID %d",
1191 curstate->roident, curstate->acquired_by)));
1192 }
1193
1194 else if (curstate->acquired_by != acquired_by)
1195 {
1196 ereport(ERROR,
1198 errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
1199 node, acquired_by)));
1200 }
1201
1202 /*
1203 * The origin is in use, but PID is not recorded. This can happen if
1204 * the process that originally acquired the origin exited without
1205 * releasing it. To ensure correctness, other processes cannot acquire
1206 * the origin until all processes currently using it have released it.
1207 */
1208 else if (curstate->acquired_by == 0 && curstate->refcount > 0)
1209 ereport(ERROR,
1211 errmsg("replication origin with ID %d is already active in another process",
1212 curstate->roident)));
1213
1214 /* ok, found slot */
1216 break;
1217 }
1218
1219
1221 ereport(ERROR,
1223 errmsg("could not find free replication state slot for replication origin with ID %d",
1224 node),
1225 errhint("Increase \"max_active_replication_origins\" and try again.")));
1226 else if (session_replication_state == NULL)
1227 {
1228 if (acquired_by)
1229 ereport(ERROR,
1231 errmsg("cannot use PID %d for inactive replication origin with ID %d",
1232 acquired_by, node)));
1233
1234 /* initialize new slot */
1239 }
1240
1241
1243
1244 if (acquired_by == 0)
1245 {
1248 }
1249 else
1250 {
1251 /*
1252 * Sanity check: the origin must already be acquired by the process
1253 * passed as input, and at least one process must be using it.
1254 */
1257 }
1258
1260
1262
1263 /* probably this one is pointless */
1265}
1266
1267/*
1268 * Reset replay state previously setup in this session.
1269 *
1270 * This function may only be called if an origin was setup with
1271 * replorigin_session_setup().
1272 */
1273void
1275{
1277
1279 ereport(ERROR,
1281 errmsg("no replication origin is configured")));
1282
1283 /*
1284 * Restrict explicit resetting of the replication origin if it was first
1285 * acquired by this process and others are still using it. While the
1286 * system handles this safely (as happens if the first session exits
1287 * without calling reset), it is best to avoid doing so.
1288 */
1291 ereport(ERROR,
1293 errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
1295 errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
1296 errhint("Reset the replication origin in all other processes before retrying.")));
1297
1299}
1300
1301/*
1302 * Do the same work replorigin_advance() does, just on the session's
1303 * configured origin.
1304 *
1305 * This is noticeably cheaper than using replorigin_advance().
1306 */
1307void
1320
1321/*
1322 * Ask the machinery about the point up to which we successfully replayed
1323 * changes from an already setup replication origin.
1324 */
1327{
1328 XLogRecPtr remote_lsn;
1329 XLogRecPtr local_lsn;
1330
1332
1337
1338 if (flush && XLogRecPtrIsValid(local_lsn))
1339 XLogFlush(local_lsn);
1340
1341 return remote_lsn;
1342}
1343
1344
1345
1346/* ---------------------------------------------------------------------------
1347 * SQL functions for working with replication origin.
1348 *
1349 * These mostly should be fairly short wrappers around more generic functions.
1350 * ---------------------------------------------------------------------------
1351 */
1352
1353/*
1354 * Create replication origin for the passed in name, and return the assigned
1355 * oid.
1356 */
1357Datum
1359{
1360 char *name;
1361 RepOriginId roident;
1362
1363 replorigin_check_prerequisites(false, false);
1364
1366
1367 /*
1368 * Replication origins "any and "none" are reserved for system options.
1369 * The origins "pg_xxx" are reserved for internal use.
1370 */
1372 ereport(ERROR,
1374 errmsg("replication origin name \"%s\" is reserved",
1375 name),
1376 errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1378
1379 /*
1380 * If built with appropriate switch, whine when regression-testing
1381 * conventions for replication origin names are violated.
1382 */
1383#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1384 if (strncmp(name, "regress_", 8) != 0)
1385 elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1386#endif
1387
1388 roident = replorigin_create(name);
1389
1390 pfree(name);
1391
1392 PG_RETURN_OID(roident);
1393}
1394
1395/*
1396 * Drop replication origin.
1397 */
1398Datum
1400{
1401 char *name;
1402
1403 replorigin_check_prerequisites(false, false);
1404
1406
1407 replorigin_drop_by_name(name, false, true);
1408
1409 pfree(name);
1410
1412}
1413
1414/*
1415 * Return oid of a replication origin.
1416 */
1417Datum
1419{
1420 char *name;
1421 RepOriginId roident;
1422
1423 replorigin_check_prerequisites(false, false);
1424
1426 roident = replorigin_by_name(name, true);
1427
1428 pfree(name);
1429
1430 if (OidIsValid(roident))
1431 PG_RETURN_OID(roident);
1433}
1434
1435/*
1436 * Setup a replication origin for this session.
1437 */
1438Datum
1440{
1441 char *name;
1442 RepOriginId origin;
1443 int pid;
1444
1445 replorigin_check_prerequisites(true, false);
1446
1448 origin = replorigin_by_name(name, false);
1449 pid = PG_GETARG_INT32(1);
1450 replorigin_session_setup(origin, pid);
1451
1453
1454 pfree(name);
1455
1457}
1458
1459/*
1460 * Reset previously setup origin in this session
1461 */
1462Datum
1475
1476/*
1477 * Has a replication origin been setup for this session.
1478 */
1479Datum
1486
1487
1488/*
1489 * Return the replication progress for origin setup in the current session.
1490 *
1491 * If 'flush' is set to true it is ensured that the returned value corresponds
1492 * to a local transaction that has been flushed. This is useful if asynchronous
1493 * commits are used when replaying replicated transactions.
1494 */
1495Datum
1497{
1498 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1499 bool flush = PG_GETARG_BOOL(0);
1500
1501 replorigin_check_prerequisites(true, false);
1502
1504 ereport(ERROR,
1506 errmsg("no replication origin is configured")));
1507
1508 remote_lsn = replorigin_session_get_progress(flush);
1509
1510 if (!XLogRecPtrIsValid(remote_lsn))
1512
1513 PG_RETURN_LSN(remote_lsn);
1514}
1515
1516Datum
1533
1534Datum
1544
1545
1546Datum
1548{
1551 RepOriginId node;
1552
1553 replorigin_check_prerequisites(true, false);
1554
1555 /* lock to prevent the replication origin from vanishing */
1557
1558 node = replorigin_by_name(text_to_cstring(name), false);
1559
1560 /*
1561 * Can't sensibly pass a local commit to be flushed at checkpoint - this
1562 * xact hasn't committed yet. This is why this function should be used to
1563 * set up the initial replication state, but not for replay.
1564 */
1566 true /* go backward */ , true /* WAL log */ );
1567
1569
1571}
1572
1573
1574/*
1575 * Return the replication progress for an individual replication origin.
1576 *
1577 * If 'flush' is set to true it is ensured that the returned value corresponds
1578 * to a local transaction that has been flushed. This is useful if asynchronous
1579 * commits are used when replaying replicated transactions.
1580 */
1581Datum
1583{
1584 char *name;
1585 bool flush;
1586 RepOriginId roident;
1587 XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1588
1590
1592 flush = PG_GETARG_BOOL(1);
1593
1594 roident = replorigin_by_name(name, false);
1595 Assert(OidIsValid(roident));
1596
1597 remote_lsn = replorigin_get_progress(roident, flush);
1598
1599 if (!XLogRecPtrIsValid(remote_lsn))
1601
1602 PG_RETURN_LSN(remote_lsn);
1603}
1604
1605
1606Datum
1608{
1609 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1610 int i;
1611#define REPLICATION_ORIGIN_PROGRESS_COLS 4
1612
1613 /* we want to return 0 rows if slot is set to zero */
1614 replorigin_check_prerequisites(false, true);
1615
1616 InitMaterializedSRF(fcinfo, 0);
1617
1618 /* prevent slots from being concurrently dropped */
1620
1621 /*
1622 * Iterate through all possible replication_states, display if they are
1623 * filled. Note that we do not take any locks, so slightly corrupted/out
1624 * of date values are a possibility.
1625 */
1626 for (i = 0; i < max_active_replication_origins; i++)
1627 {
1631 char *roname;
1632
1634
1635 /* unused slot, nothing to display */
1636 if (state->roident == InvalidRepOriginId)
1637 continue;
1638
1639 memset(values, 0, sizeof(values));
1640 memset(nulls, 1, sizeof(nulls));
1641
1642 values[0] = ObjectIdGetDatum(state->roident);
1643 nulls[0] = false;
1644
1645 /*
1646 * We're not preventing the origin to be dropped concurrently, so
1647 * silently accept that it might be gone.
1648 */
1649 if (replorigin_by_oid(state->roident, true,
1650 &roname))
1651 {
1653 nulls[1] = false;
1654 }
1655
1656 LWLockAcquire(&state->lock, LW_SHARED);
1657
1658 values[2] = LSNGetDatum(state->remote_lsn);
1659 nulls[2] = false;
1660
1661 values[3] = LSNGetDatum(state->local_lsn);
1662 nulls[3] = false;
1663
1664 LWLockRelease(&state->lock);
1665
1666 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1667 values, nulls);
1668 }
1669
1671
1672#undef REPLICATION_ORIGIN_PROGRESS_COLS
1673
1674 return (Datum) 0;
1675}
static Datum values[MAXATTR]
Definition bootstrap.c:155
#define CStringGetTextDatum(s)
Definition builtins.h:97
uint8_t uint8
Definition c.h:554
#define Assert(condition)
Definition c.h:883
#define PG_BINARY
Definition c.h:1281
#define FLEXIBLE_ARRAY_MEMBER
Definition c.h:490
uint32_t uint32
Definition c.h:556
#define PG_UINT16_MAX
Definition c.h:611
#define MemSet(start, val, len)
Definition c.h:1023
#define OidIsValid(objectId)
Definition c.h:798
size_t Size
Definition c.h:629
bool IsReservedName(const char *name)
Definition catalog.c:278
bool ConditionVariableCancelSleep(void)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
int64 TimestampTz
Definition timestamp.h:39
int errcode_for_file_access(void)
Definition elog.c:886
int errdetail(const char *fmt,...)
Definition elog.c:1216
int errhint(const char *fmt,...)
Definition elog.c:1330
int errcode(int sqlerrcode)
Definition elog.c:863
int errmsg(const char *fmt,...)
Definition elog.c:1080
#define LOG
Definition elog.h:31
#define WARNING
Definition elog.h:36
#define DEBUG2
Definition elog.h:29
#define PANIC
Definition elog.h:42
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition fd.c:779
int CloseTransientFile(int fd)
Definition fd.c:2851
int OpenTransientFile(const char *fileName, int fileFlags)
Definition fd.c:2674
#define PG_RETURN_VOID()
Definition fmgr.h:350
#define PG_GETARG_TEXT_PP(n)
Definition fmgr.h:310
#define PG_GETARG_DATUM(n)
Definition fmgr.h:268
#define PG_RETURN_NULL()
Definition fmgr.h:346
#define PG_GETARG_INT32(n)
Definition fmgr.h:269
#define PG_GETARG_BOOL(n)
Definition fmgr.h:274
#define PG_RETURN_OID(x)
Definition fmgr.h:361
#define PG_FUNCTION_ARGS
Definition fmgr.h:193
#define PG_RETURN_BOOL(x)
Definition fmgr.h:360
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition funcapi.c:76
void systable_endscan(SysScanDesc sysscan)
Definition genam.c:603
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition genam.c:514
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition genam.c:388
int MyProcPid
Definition globals.c:47
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1117
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1435
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
#define ident
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition indexing.c:233
void CatalogTupleDelete(Relation heapRel, const ItemPointerData *tid)
Definition indexing.c:365
#define write(a, b, c)
Definition win32.h:14
#define read(a, b, c)
Definition win32.h:13
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:372
int i
Definition isn.c:77
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition lmgr.c:229
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition lmgr.c:107
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1148
#define NoLock
Definition lockdefs.h:34
#define AccessExclusiveLock
Definition lockdefs.h:43
#define ExclusiveLock
Definition lockdefs.h:42
#define RowExclusiveLock
Definition lockdefs.h:38
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1176
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1793
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition lwlock.c:698
@ LW_SHARED
Definition lwlock.h:113
@ LW_EXCLUSIVE
Definition lwlock.h:112
void pfree(void *pointer)
Definition mcxt.c:1616
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
TimestampTz replorigin_session_origin_timestamp
Definition origin.c:168
static ReplicationStateCtl * replication_states_ctl
Definition origin.c:179
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition origin.c:229
Size ReplicationOriginShmemSize(void)
Definition origin.c:540
RepOriginId replorigin_create(const char *roname)
Definition origin.c:260
Datum pg_replication_origin_progress(PG_FUNCTION_ARGS)
Definition origin.c:1582
void replorigin_session_reset(void)
Definition origin.c:1274
static bool IsReservedOriginName(const char *name)
Definition origin.c:212
void replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
Definition origin.c:1308
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition origin.c:499
int max_active_replication_origins
Definition origin.c:104
Datum pg_replication_origin_advance(PG_FUNCTION_ARGS)
Definition origin.c:1547
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
Definition origin.c:1045
#define PG_REPLORIGIN_CHECKPOINT_TMPFILE
Definition origin.c:101
Datum pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
Definition origin.c:1496
static ReplicationState * replication_states
Definition origin.c:174
#define PG_REPLORIGIN_CHECKPOINT_FILENAME
Definition origin.c:100
Datum pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
Definition origin.c:1463
Datum pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
Definition origin.c:1517
Datum pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
Definition origin.c:1480
Datum pg_replication_origin_oid(PG_FUNCTION_ARGS)
Definition origin.c:1418
Datum pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
Definition origin.c:1439
static void ReplicationOriginExitCleanup(int code, Datum arg)
Definition origin.c:1117
void StartupReplicationOrigin(void)
Definition origin.c:728
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition origin.c:445
RepOriginId replorigin_session_origin
Definition origin.c:166
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition origin.c:916
static void replorigin_state_clear(RepOriginId roident, bool nowait)
Definition origin.c:372
void replorigin_session_setup(RepOriginId node, int acquired_by)
Definition origin.c:1144
void CheckPointReplicationOrigin(void)
Definition origin.c:602
static void replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
Definition origin.c:193
static ReplicationState * session_replication_state
Definition origin.c:187
Datum pg_replication_origin_drop(PG_FUNCTION_ARGS)
Definition origin.c:1399
#define REPLICATION_ORIGIN_PROGRESS_COLS
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition origin.c:1326
void ReplicationOriginShmemInit(void)
Definition origin.c:555
static void replorigin_session_reset_internal(void)
Definition origin.c:1083
Datum pg_show_replication_origin_status(PG_FUNCTION_ARGS)
Definition origin.c:1607
#define REPLICATION_STATE_MAGIC
Definition origin.c:190
XLogRecPtr replorigin_session_origin_lsn
Definition origin.c:167
Datum pg_replication_origin_create(PG_FUNCTION_ARGS)
Definition origin.c:1358
Datum pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
Definition origin.c:1535
void replorigin_redo(XLogReaderState *record)
Definition origin.c:855
#define DoNotReplicateId
Definition origin.h:34
#define InvalidRepOriginId
Definition origin.h:33
#define XLOG_REPLORIGIN_DROP
Definition origin.h:31
#define MAX_RONAME_LEN
Definition origin.h:41
#define XLOG_REPLORIGIN_SET
Definition origin.h:30
void * arg
#define ERRCODE_DATA_CORRUPTED
uint32 pg_crc32c
Definition pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition pg_crc32c.h:153
#define INIT_CRC32C(crc)
Definition pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition pg_crc32c.h:158
return crc
#define PG_GETARG_LSN(n)
Definition pg_lsn.h:36
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
#define PG_RETURN_LSN(x)
Definition pg_lsn.h:37
FormData_pg_replication_origin * Form_pg_replication_origin
int pg_strcasecmp(const char *s1, const char *s2)
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:262
uint64_t Datum
Definition postgres.h:70
static Pointer DatumGetPointer(Datum X)
Definition postgres.h:342
#define InvalidOid
unsigned int Oid
static int fd(const char *x, int i)
static int fb(int x)
#define RelationGetDescr(relation)
Definition rel.h:540
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition scankey.c:76
Size add_size(Size s1, Size s2)
Definition shmem.c:495
Size mul_size(Size s1, Size s2)
Definition shmem.c:510
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:389
#define InitDirtySnapshot(snapshotdata)
Definition snapmgr.h:42
#define BTEqualStrategyNumber
Definition stratnum.h:31
ItemPointerData t_self
Definition htup.h:65
Form_pg_class rd_rel
Definition rel.h:111
ReplicationState states[FLEXIBLE_ARRAY_MEMBER]
Definition origin.c:162
XLogRecPtr remote_lsn
Definition origin.c:153
RepOriginId roident
Definition origin.c:152
XLogRecPtr remote_lsn
Definition origin.c:119
XLogRecPtr local_lsn
Definition origin.c:126
ConditionVariable origin_cv
Definition origin.c:139
RepOriginId roident
Definition origin.c:114
XLogRecPtr EndRecPtr
Definition xlogreader.h:206
Definition c.h:716
RepOriginId node_id
Definition origin.h:27
XLogRecPtr remote_lsn
Definition origin.h:20
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:264
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition syscache.c:220
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition tuplestore.c:784
#define PG_GETARG_TIMESTAMPTZ(n)
Definition timestamp.h:64
char * text_to_cstring(const text *t)
Definition varlena.c:214
const char * name
bool IsTransactionState(void)
Definition xact.c:388
void CommandCounterIncrement(void)
Definition xact.c:1101
bool RecoveryInProgress(void)
Definition xlog.c:6461
void XLogFlush(XLogRecPtr record)
Definition xlog.c:2784
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint16 RepOriginId
Definition xlogdefs.h:69
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info)
Definition xloginsert.c:478
void XLogRegisterData(const void *data, uint32 len)
Definition xloginsert.c:368
void XLogBeginInsert(void)
Definition xloginsert.c:152
#define XLogRecGetInfo(decoder)
Definition xlogreader.h:409
#define XLogRecGetData(decoder)
Definition xlogreader.h:414