PostgreSQL Source Code git master
Loading...
Searching...
No Matches
sequencesync.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 * sequencesync.c
3 * PostgreSQL logical replication: sequence synchronization
4 *
5 * Copyright (c) 2025-2026, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/sequencesync.c
9 *
10 * NOTES
11 * This file contains code for sequence synchronization for
12 * logical replication.
13 *
14 * Sequences requiring synchronization are tracked in the pg_subscription_rel
15 * catalog.
16 *
17 * Sequences to be synchronized will be added with state INIT when either of
18 * the following commands is executed:
19 * CREATE SUBSCRIPTION
20 * ALTER SUBSCRIPTION ... REFRESH PUBLICATION
21 *
22 * Executing the following command resets all sequences in the subscription to
23 * state INIT, triggering re-synchronization:
24 * ALTER SUBSCRIPTION ... REFRESH SEQUENCES
25 *
26 * The apply worker periodically scans pg_subscription_rel for sequences in
27 * INIT state. When such sequences are found, it spawns a sequencesync worker
28 * to handle synchronization.
29 *
30 * A single sequencesync worker is responsible for synchronizing all sequences.
31 * It begins by retrieving the list of sequences that are flagged for
32 * synchronization, i.e., those in the INIT state. These sequences are then
33 * processed in batches, allowing multiple entries to be synchronized within a
34 * single transaction. The worker fetches the current sequence values and page
35 * LSNs from the remote publisher, updates the corresponding sequences on the
36 * local subscriber, and finally marks each sequence as READY upon successful
37 * synchronization.
38 *
39 * Sequence state transitions follow this pattern:
40 * INIT -> READY
41 *
42 * To avoid creating too many transactions, up to MAX_SEQUENCES_SYNC_PER_BATCH
43 * sequences are synchronized per transaction. The locks on the sequence
44 * relation will be periodically released at each transaction commit.
45 *
46 * XXX: We didn't choose launcher process to maintain the launch of sequencesync
47 * worker as it didn't have database connection to access the sequences from the
48 * pg_subscription_rel system catalog that need to be synchronized.
49 *-------------------------------------------------------------------------
50 */
51
52#include "postgres.h"
53
54#include "access/genam.h"
55#include "access/table.h"
56#include "catalog/pg_sequence.h"
58#include "commands/sequence.h"
59#include "pgstat.h"
63#include "utils/acl.h"
64#include "utils/builtins.h"
65#include "utils/fmgroids.h"
66#include "utils/guc.h"
67#include "utils/inval.h"
68#include "utils/lsyscache.h"
69#include "utils/memutils.h"
70#include "utils/pg_lsn.h"
71#include "utils/syscache.h"
72#include "utils/usercontext.h"
73
74#define REMOTE_SEQ_COL_COUNT 10
75
83
84static List *seqinfos = NIL;
85
86/*
87 * Apply worker determines if sequence synchronization is needed.
88 *
89 * Start a sequencesync worker if one is not already running. The active
90 * sequencesync worker will handle all pending sequence synchronization. If any
91 * sequences remain unsynchronized after it exits, a new worker can be started
92 * in the next iteration.
93 */
94void
96{
98 int nsyncworkers;
100 bool started_tx;
101
103
104 if (started_tx)
105 {
107 pgstat_report_stat(true);
108 }
109
111 return;
112
114
115 /* Check if there is a sequencesync worker already running? */
118 InvalidOid, true);
120 {
122 return;
123 }
124
125 /*
126 * Count running sync workers for this subscription, while we have the
127 * lock.
128 */
131
132 /*
133 * It is okay to read/update last_seqsync_start_time here in apply worker
134 * as we have already ensured that sync worker doesn't exist.
135 */
138}
139
140/*
141 * get_sequences_string
142 *
143 * Build a comma-separated string of schema-qualified sequence names
144 * for the given list of sequence indexes.
145 */
146static void
148{
151 {
154
155 if (buf->len > 0)
157
158 appendStringInfo(buf, "\"%s.%s\"", seqinfo->nspname, seqinfo->seqname);
159 }
160}
161
162/*
163 * report_sequence_errors
164 *
165 * Report discrepancies found during sequence synchronization between
166 * the publisher and subscriber. Emits warnings for:
167 * a) mismatched definitions or concurrent rename
168 * b) insufficient privileges
169 * c) missing sequences on the subscriber
170 * Then raises an ERROR to indicate synchronization failure.
171 */
172static void
175{
177
178 /* Quick exit if there are no errors to report */
180 return;
181
183
185 {
189 errmsg_plural("mismatched or renamed sequence on subscriber (%s)",
190 "mismatched or renamed sequences on subscriber (%s)",
192 seqstr->data));
193 }
194
196 {
200 errmsg_plural("insufficient privileges on sequence (%s)",
201 "insufficient privileges on sequences (%s)",
203 seqstr->data));
204 }
205
207 {
211 errmsg_plural("missing sequence on publisher (%s)",
212 "missing sequences on publisher (%s)",
214 seqstr->data));
215 }
216
219 errmsg("logical replication sequence synchronization failed for subscription \"%s\"",
221}
222
223/*
224 * get_and_validate_seq_info
225 *
226 * Extracts remote sequence information from the tuple slot received from the
227 * publisher, and validates it against the corresponding local sequence
228 * definition.
229 */
230static CopySeqResult
233{
234 bool isnull;
235 int col = 0;
236 Datum datum;
242 bool remote_cycle;
247
248 *seqidx = DatumGetInt32(slot_getattr(slot, ++col, &isnull));
249 Assert(!isnull);
250
251 /* Identify the corresponding local sequence for the given index. */
254
255 /*
256 * last_value can be NULL if the sequence was dropped concurrently (see
257 * pg_get_sequence_data()).
258 */
259 datum = slot_getattr(slot, ++col, &isnull);
260 if (isnull)
261 return COPYSEQ_SKIPPED;
262 seqinfo_local->last_value = DatumGetInt64(datum);
263
264 seqinfo_local->is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull));
265 Assert(!isnull);
266
267 seqinfo_local->page_lsn = DatumGetLSN(slot_getattr(slot, ++col, &isnull));
268 Assert(!isnull);
269
270 remote_typid = DatumGetObjectId(slot_getattr(slot, ++col, &isnull));
271 Assert(!isnull);
272
273 remote_start = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
274 Assert(!isnull);
275
276 remote_increment = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
277 Assert(!isnull);
278
279 remote_min = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
280 Assert(!isnull);
281
282 remote_max = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
283 Assert(!isnull);
284
285 remote_cycle = DatumGetBool(slot_getattr(slot, ++col, &isnull));
286 Assert(!isnull);
287
288 /* Sanity check */
290
291 seqinfo_local->found_on_pub = true;
292
294
295 /* Sequence was concurrently dropped? */
296 if (!*sequence_rel)
297 return COPYSEQ_SKIPPED;
298
300
301 /* Sequence was concurrently dropped? */
302 if (!HeapTupleIsValid(tup))
303 elog(ERROR, "cache lookup failed for sequence %u",
304 seqinfo_local->localrelid);
305
307
308 /* Sequence parameters for remote/local are the same? */
309 if (local_seq->seqtypid != remote_typid ||
310 local_seq->seqstart != remote_start ||
311 local_seq->seqincrement != remote_increment ||
312 local_seq->seqmin != remote_min ||
313 local_seq->seqmax != remote_max ||
314 local_seq->seqcycle != remote_cycle)
315 result = COPYSEQ_MISMATCH;
316
317 /* Sequence was concurrently renamed? */
318 if (strcmp(seqinfo_local->nspname,
321 result = COPYSEQ_MISMATCH;
322
324 return result;
325}
326
327/*
328 * Apply remote sequence state to local sequence and mark it as
329 * synchronized (READY).
330 */
331static CopySeqResult
333{
337 Oid seqoid = seqinfo->localrelid;
338
339 /*
340 * If the user did not opt to run as the owner of the subscription
341 * ('run_as_owner'), then copy the sequence as the owner of the sequence.
342 */
343 if (!run_as_owner)
345
347
348 if (aclresult != ACLCHECK_OK)
349 {
350 if (!run_as_owner)
352
354 }
355
356 /*
357 * The log counter (log_cnt) tracks how many sequence values are still
358 * unused locally. It is only relevant to the local node and managed
359 * internally by nextval() when allocating new ranges. Since log_cnt does
360 * not affect the visible sequence state (like last_value or is_called)
361 * and is only used for local caching, it need not be copied to the
362 * subscriber during synchronization.
363 */
364 SetSequence(seqoid, seqinfo->last_value, seqinfo->is_called);
365
366 if (!run_as_owner)
368
369 /*
370 * Record the remote sequence's LSN in pg_subscription_rel and mark the
371 * sequence as READY.
372 */
374 seqinfo->page_lsn, false);
375
376 return COPYSEQ_SUCCESS;
377}
378
379/*
380 * Copy existing data of sequences from the publisher.
381 */
382static void
384{
385 int cur_batch_base_index = 0;
393
394#define MAX_SEQUENCES_SYNC_PER_BATCH 100
395
396 elog(DEBUG1,
397 "logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d",
399
401 {
404 int batch_size = 0;
405 int batch_succeeded_count = 0;
407 int batch_skipped_count = 0;
411
412 WalRcvExecResult *res;
413 TupleTableSlot *slot;
414
416
417 for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++)
418 {
419 char *nspname_literal;
420 char *seqname_literal;
421
424
425 if (seqstr->len > 0)
427
430
431 appendStringInfo(seqstr, "(%s, %s, %d)",
433
434 if (++batch_size == MAX_SEQUENCES_SYNC_PER_BATCH)
435 break;
436 }
437
438 /*
439 * We deliberately avoid acquiring a local lock on the sequence before
440 * querying the publisher to prevent potential distributed deadlocks
441 * in bi-directional replication setups.
442 *
443 * Example scenario:
444 *
445 * - On each node, a background worker acquires a lock on a sequence
446 * as part of a sync operation.
447 *
448 * - Concurrently, a user transaction attempts to alter the same
449 * sequence, waiting on the background worker's lock.
450 *
451 * - Meanwhile, a query from the other node tries to access metadata
452 * that depends on the completion of the alter operation.
453 *
454 * - This creates a circular wait across nodes:
455 *
456 * Node-1: Query -> waits on Alter -> waits on Sync Worker
457 *
458 * Node-2: Query -> waits on Alter -> waits on Sync Worker
459 *
460 * Since each node only sees part of the wait graph, the deadlock may
461 * go undetected, leading to indefinite blocking.
462 *
463 * Note: Each entry in VALUES includes an index 'seqidx' that
464 * represents the sequence's position in the local 'seqinfos' list.
465 * This index is propagated to the query results and later used to
466 * directly map the fetched publisher sequence rows back to their
467 * corresponding local entries without relying on result order or name
468 * matching.
469 */
471 "SELECT s.seqidx, ps.*, seq.seqtypid,\n"
472 " seq.seqstart, seq.seqincrement, seq.seqmin,\n"
473 " seq.seqmax, seq.seqcycle\n"
474 "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n"
475 "JOIN pg_namespace n ON n.nspname = s.schname\n"
476 "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n"
477 "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n"
478 "JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n",
479 seqstr->data);
480
481 res = walrcv_exec(conn, cmd->data, lengthof(seqRow), seqRow);
482 if (res->status != WALRCV_OK_TUPLES)
485 errmsg("could not fetch sequence information from the publisher: %s",
486 res->err));
487
489 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
490 {
493 int seqidx;
494
496
498 {
499 ConfigReloadPending = false;
501 }
502
504 &seqinfo, &seqidx);
507 sequence_rel->rd_rel->relowner);
508
509 switch (sync_status)
510 {
511 case COPYSEQ_SUCCESS:
512 elog(DEBUG1,
513 "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished",
514 MySubscription->name, seqinfo->nspname,
515 seqinfo->seqname);
517 break;
518 case COPYSEQ_MISMATCH:
519
520 /*
521 * Remember mismatched sequences in a long-lived memory
522 * context since these will be used after the transaction
523 * is committed.
524 */
527 seqidx);
530 break;
532
533 /*
534 * Remember sequences with insufficient privileges in a
535 * long-lived memory context since these will be used
536 * after the transaction is committed.
537 */
540 seqidx);
543 break;
544 case COPYSEQ_SKIPPED:
545
546 /*
547 * Concurrent removal of a sequence on the subscriber is
548 * treated as success, since the only viable action is to
549 * skip the corresponding sequence data. Missing sequences
550 * on the publisher are treated as ERROR.
551 */
552 if (seqinfo->found_on_pub)
553 {
554 ereport(LOG,
555 errmsg("skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently",
556 seqinfo->nspname,
557 seqinfo->seqname));
559 }
560 break;
561 }
562
563 if (sequence_rel)
565 }
566
570 resetStringInfo(cmd);
571
576
577 elog(DEBUG1,
578 "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped",
583
584 /* Commit this batch, and prepare for next batch */
586
588 {
589 for (int idx = cur_batch_base_index; idx < cur_batch_base_index + batch_size; idx++)
590 {
593
594 /* If the sequence was not found on publisher, record it */
595 if (!seqinfo->found_on_pub)
597 }
598 }
599
600 /*
601 * cur_batch_base_index is not incremented sequentially because some
602 * sequences may be missing, and the number of fetched rows may not
603 * match the batch size.
604 */
605 cur_batch_base_index += batch_size;
606 }
607
608 /* Report mismatches, permission issues, or missing sequences */
611}
612
613/*
614 * Identifies sequences that require synchronization and initiates the
615 * synchronization process.
616 */
617static void
619{
620 char *err;
622 Relation rel;
624 ScanKeyData skey[2];
625 SysScanDesc scan;
628
630
632
633 ScanKeyInit(&skey[0],
636 ObjectIdGetDatum(subid));
637
638 ScanKeyInit(&skey[1],
642
643 scan = systable_beginscan(rel, InvalidOid, false,
644 NULL, 2, skey);
645 while (HeapTupleIsValid(tup = systable_getnext(scan)))
646 {
651
653
655
657
658 /* Skip if sequence was dropped concurrently */
659 if (!sequence_rel)
660 continue;
661
662 /* Skip if the relation is not a sequence */
663 if (sequence_rel->rd_rel->relkind != RELKIND_SEQUENCE)
664 {
666 continue;
667 }
668
669 /*
670 * Worker needs to process sequences across transaction boundary, so
671 * allocate them under long-lived context.
672 */
674
676 seq->localrelid = subrel->srrelid;
680
682
684 }
685
686 /* Cleanup */
687 systable_endscan(scan);
689
691
692 /*
693 * Exit early if no catalog entries found, likely due to concurrent drops.
694 */
695 if (!seqinfos)
696 return;
697
698 /* Is the use of a password mandatory? */
701
703 appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT,
705
706 /*
707 * Establish the connection to the publisher for sequence synchronization.
708 */
712 app_name.data, &err);
716 errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s",
718
719 pfree(app_name.data);
720
722}
723
724/*
725 * Execute the initial sync with error handling. Disable the subscription,
726 * if required.
727 *
728 * Note that we don't handle FATAL errors which are probably because of system
729 * resource error and are not repeatable.
730 */
731static void
733{
735
736 PG_TRY();
737 {
738 /* Call initial sync. */
740 }
741 PG_CATCH();
742 {
745 else
746 {
747 /*
748 * Report the worker failed during sequence synchronization. Abort
749 * the current transaction so that the stats message is sent in an
750 * idle state.
751 */
755
756 PG_RE_THROW();
757 }
758 }
759 PG_END_TRY();
760}
761
762/* Logical Replication sequencesync worker entry point */
763void
Datum idx(PG_FUNCTION_ARGS)
Definition _int_op.c:262
AclResult
Definition acl.h:182
@ ACLCHECK_OK
Definition acl.h:183
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition aclchk.c:4039
void DisableSubscriptionAndExit(void)
Definition worker.c:5947
MemoryContext ApplyContext
Definition worker.c:472
void SetupApplyOrSyncWorker(int worker_slot)
Definition worker.c:5886
WalReceiverConn * LogRepWorkerWalRcvConn
Definition worker.c:477
Subscription * MySubscription
Definition worker.c:479
#define Assert(condition)
Definition c.h:883
int64_t int64
Definition c.h:553
#define UINT64_FORMAT
Definition c.h:575
#define lengthof(array)
Definition c.h:813
void SetSequence(Oid relid, int64 next, bool iscalled)
Definition sequence.c:946
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
Definition elog.c:1193
int errcode(int sqlerrcode)
Definition elog.c:863
int errmsg(const char *fmt,...)
Definition elog.c:1080
#define LOG
Definition elog.h:31
#define PG_RE_THROW()
Definition elog.h:405
#define PG_TRY(...)
Definition elog.h:372
#define WARNING
Definition elog.h:36
#define PG_END_TRY(...)
Definition elog.h:397
#define DEBUG1
Definition elog.h:30
#define ERROR
Definition elog.h:39
#define PG_CATCH(...)
Definition elog.h:382
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
void err(int eval, const char *fmt,...)
Definition err.c:43
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
const TupleTableSlotOps TTSOpsMinimalTuple
Definition execTuples.c:86
#define palloc0_object(type)
Definition fe_memutils.h:75
void systable_endscan(SysScanDesc sysscan)
Definition genam.c:603
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition genam.c:514
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition genam.c:388
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
@ PGC_SIGHUP
Definition guc.h:75
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27
LogicalRepWorker * logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, bool only_running)
Definition launcher.c:258
LogicalRepWorker * MyLogicalRepWorker
Definition launcher.c:56
int logicalrep_sync_worker_count(Oid subid)
Definition launcher.c:927
List * lappend(List *list, void *datum)
Definition list.c:339
List * lappend_int(List *list, int datum)
Definition list.c:357
#define NoLock
Definition lockdefs.h:34
#define AccessShareLock
Definition lockdefs.h:36
#define RowExclusiveLock
Definition lockdefs.h:38
char * get_namespace_name(Oid nspid)
Definition lsyscache.c:3516
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1176
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1793
@ LW_SHARED
Definition lwlock.h:113
char * pstrdup(const char *in)
Definition mcxt.c:1781
void pfree(void *pointer)
Definition mcxt.c:1616
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
Oid GetUserId(void)
Definition miscinit.c:469
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
#define ACL_UPDATE
Definition parsenodes.h:78
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
static void * list_nth(const List *list, int n)
Definition pg_list.h:299
#define foreach_int(var, lst)
Definition pg_list.h:470
static XLogRecPtr DatumGetLSN(Datum X)
Definition pg_lsn.h:25
FormData_pg_sequence * Form_pg_sequence
Definition pg_sequence.h:40
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool already_locked)
FormData_pg_subscription_rel * Form_pg_subscription_rel
static char buf[DEFAULT_XLOG_SEG_SIZE]
long pgstat_report_stat(bool force)
Definition pgstat.c:704
void pgstat_report_subscription_error(Oid subid, LogicalRepWorkerType wtype)
static bool DatumGetBool(Datum X)
Definition postgres.h:100
static int64 DatumGetInt64(Datum X)
Definition postgres.h:413
static Oid DatumGetObjectId(Datum X)
Definition postgres.h:252
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:262
uint64_t Datum
Definition postgres.h:70
static int32 DatumGetInt32(Datum X)
Definition postgres.h:212
static Datum CharGetDatum(char X)
Definition postgres.h:132
#define InvalidOid
unsigned int Oid
static int fb(int x)
char * quote_literal_cstr(const char *rawstr)
Definition quote.c:101
#define RelationGetRelationName(relation)
Definition rel.h:548
#define RelationGetNamespace(relation)
Definition rel.h:555
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition scankey.c:76
#define REMOTE_SEQ_COL_COUNT
CopySeqResult
@ COPYSEQ_INSUFFICIENT_PERM
@ COPYSEQ_MISMATCH
@ COPYSEQ_SUCCESS
@ COPYSEQ_SKIPPED
static CopySeqResult get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel, LogicalRepSequenceInfo **seqinfo, int *seqidx)
static List * seqinfos
#define MAX_SEQUENCES_SYNC_PER_BATCH
void SequenceSyncWorkerMain(Datum main_arg)
static void start_sequence_sync(void)
static void LogicalRepSyncSequences(void)
static void copy_sequences(WalReceiverConn *conn)
static void get_sequences_string(List *seqindexes, StringInfo buf)
static void report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx, List *missing_seqs_idx)
void ProcessSequencesForSync(void)
static CopySeqResult copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner)
#define BTEqualStrategyNumber
Definition stratnum.h:31
PGconn * conn
Definition streamutil.c:52
StringInfo makeStringInfo(void)
Definition stringinfo.c:72
void resetStringInfo(StringInfo str)
Definition stringinfo.c:126
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition stringinfo.c:230
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
Definition pg_list.h:54
TimestampTz last_seqsync_start_time
Tuplestorestate * tuplestore
TupleDesc tupledesc
WalRcvExecStatus status
void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid, TimestampTz *last_start_time)
Definition syncutils.c:117
pg_noreturn void FinishSyncWorker(void)
Definition syncutils.c:50
void FetchRelationStates(bool *has_pending_subtables, bool *has_pending_subsequences, bool *started_tx)
Definition syncutils.c:202
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:264
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition syscache.c:220
Relation try_table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:60
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition tuptable.h:398
void SwitchToUntrustedUser(Oid userid, UserContext *context)
Definition usercontext.c:33
void RestoreUserContext(UserContext *context)
Definition usercontext.c:87
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
@ WALRCV_OK_TUPLES
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
@ WORKERTYPE_SEQUENCESYNC
static bool am_sequencesync_worker(void)
void StartTransactionCommand(void)
Definition xact.c:3080
void CommitTransactionCommand(void)
Definition xact.c:3178
void AbortOutOfAnyTransaction(void)
Definition xact.c:4884
uint64 GetSystemIdentifier(void)
Definition xlog.c:4628