PostgreSQL Source Code git master
Loading...
Searching...
No Matches
sequencesync.c File Reference
#include "postgres.h"
#include "access/genam.h"
#include "access/table.h"
#include "catalog/pg_sequence.h"
#include "catalog/pg_subscription_rel.h"
#include "commands/sequence.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/logicalworker.h"
#include "replication/worker_internal.h"
#include "storage/lwlock.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/syscache.h"
#include "utils/usercontext.h"
Include dependency graph for sequencesync.c:

Go to the source code of this file.

Macros

#define REMOTE_SEQ_COL_COUNT   11
 
#define MAX_SEQUENCES_SYNC_PER_BATCH   100
 

Typedefs

typedef enum CopySeqResult CopySeqResult
 

Enumerations

enum  CopySeqResult {
  COPYSEQ_SUCCESS , COPYSEQ_MISMATCH , COPYSEQ_SUBSCRIBER_INSUFFICIENT_PERM , COPYSEQ_PUBLISHER_INSUFFICIENT_PERM ,
  COPYSEQ_SKIPPED
}
 

Functions

void ProcessSequencesForSync (void)
 
static void get_sequences_string (List *seqindexes, StringInfo buf)
 
static void report_sequence_errors (List *mismatched_seqs_idx, List *sub_insuffperm_seqs_idx, List *pub_insuffperm_seqs_idx, List *missing_seqs_idx)
 
static CopySeqResult get_and_validate_seq_info (TupleTableSlot *slot, Relation *sequence_rel, LogicalRepSequenceInfo **seqinfo, int *seqidx)
 
static CopySeqResult copy_sequence (LogicalRepSequenceInfo *seqinfo, Oid seqowner)
 
static void copy_sequences (WalReceiverConn *conn)
 
static void LogicalRepSyncSequences (void)
 
static void start_sequence_sync (void)
 
void SequenceSyncWorkerMain (Datum main_arg)
 

Variables

static Listseqinfos = NIL
 

Macro Definition Documentation

◆ MAX_SEQUENCES_SYNC_PER_BATCH

#define MAX_SEQUENCES_SYNC_PER_BATCH   100

◆ REMOTE_SEQ_COL_COUNT

#define REMOTE_SEQ_COL_COUNT   11

Definition at line 75 of file sequencesync.c.

Typedef Documentation

◆ CopySeqResult

Enumeration Type Documentation

◆ CopySeqResult

Enumerator
COPYSEQ_SUCCESS 
COPYSEQ_MISMATCH 
COPYSEQ_SUBSCRIBER_INSUFFICIENT_PERM 
COPYSEQ_PUBLISHER_INSUFFICIENT_PERM 
COPYSEQ_SKIPPED 

Definition at line 77 of file sequencesync.c.

78{
CopySeqResult
@ COPYSEQ_SUBSCRIBER_INSUFFICIENT_PERM
@ COPYSEQ_MISMATCH
@ COPYSEQ_SUCCESS
@ COPYSEQ_SKIPPED
@ COPYSEQ_PUBLISHER_INSUFFICIENT_PERM

Function Documentation

◆ copy_sequence()

static CopySeqResult copy_sequence ( LogicalRepSequenceInfo seqinfo,
Oid  seqowner 
)
static

Definition at line 356 of file sequencesync.c.

357{
361 Oid seqoid = seqinfo->localrelid;
362
363 /*
364 * If the user did not opt to run as the owner of the subscription
365 * ('run_as_owner'), then copy the sequence as the owner of the sequence.
366 */
367 if (!run_as_owner)
369
371
372 if (aclresult != ACLCHECK_OK)
373 {
374 if (!run_as_owner)
376
378 }
379
380 /*
381 * The log counter (log_cnt) tracks how many sequence values are still
382 * unused locally. It is only relevant to the local node and managed
383 * internally by nextval() when allocating new ranges. Since log_cnt does
384 * not affect the visible sequence state (like last_value or is_called)
385 * and is only used for local caching, it need not be copied to the
386 * subscriber during synchronization.
387 */
388 SetSequence(seqoid, seqinfo->last_value, seqinfo->is_called);
389
390 if (!run_as_owner)
392
393 /*
394 * Record the remote sequence's LSN in pg_subscription_rel and mark the
395 * sequence as READY.
396 */
398 seqinfo->page_lsn, false);
399
400 return COPYSEQ_SUCCESS;
401}
AclResult
Definition acl.h:183
@ ACLCHECK_OK
Definition acl.h:184
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition aclchk.c:4083
Subscription * MySubscription
Definition worker.c:484
void SetSequence(Oid relid, int64 next, bool iscalled)
Definition sequence.c:946
Oid GetUserId(void)
Definition miscinit.c:470
#define ACL_UPDATE
Definition parsenodes.h:78
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool already_locked)
unsigned int Oid
static int fb(int x)
void SwitchToUntrustedUser(Oid userid, UserContext *context)
Definition usercontext.c:33
void RestoreUserContext(UserContext *context)
Definition usercontext.c:87

References ACL_UPDATE, ACLCHECK_OK, COPYSEQ_SUBSCRIBER_INSUFFICIENT_PERM, COPYSEQ_SUCCESS, fb(), GetUserId(), MySubscription, Subscription::oid, pg_class_aclcheck(), RestoreUserContext(), Subscription::runasowner, SetSequence(), SwitchToUntrustedUser(), and UpdateSubscriptionRelState().

Referenced by copy_sequences().

◆ copy_sequences()

static void copy_sequences ( WalReceiverConn conn)
static

Definition at line 407 of file sequencesync.c.

408{
409 int cur_batch_base_index = 0;
416 StringInfoData cmd;
418
420 initStringInfo(&cmd);
421
422#define MAX_SEQUENCES_SYNC_PER_BATCH 100
423
424 elog(DEBUG1,
425 "logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d",
427
429 {
432 int batch_size = 0;
433 int batch_succeeded_count = 0;
435 int batch_skipped_count = 0;
439
440 WalRcvExecResult *res;
441 TupleTableSlot *slot;
442
444
445 for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++)
446 {
447 char *nspname_literal;
448 char *seqname_literal;
449
452
453 if (seqstr.len > 0)
455
458
459 appendStringInfo(&seqstr, "(%s, %s, %d)",
461
462 if (++batch_size == MAX_SEQUENCES_SYNC_PER_BATCH)
463 break;
464 }
465
466 /*
467 * We deliberately avoid acquiring a local lock on the sequence before
468 * querying the publisher to prevent potential distributed deadlocks
469 * in bi-directional replication setups.
470 *
471 * Example scenario:
472 *
473 * - On each node, a background worker acquires a lock on a sequence
474 * as part of a sync operation.
475 *
476 * - Concurrently, a user transaction attempts to alter the same
477 * sequence, waiting on the background worker's lock.
478 *
479 * - Meanwhile, a query from the other node tries to access metadata
480 * that depends on the completion of the alter operation.
481 *
482 * - This creates a circular wait across nodes:
483 *
484 * Node-1: Query -> waits on Alter -> waits on Sync Worker
485 *
486 * Node-2: Query -> waits on Alter -> waits on Sync Worker
487 *
488 * Since each node only sees part of the wait graph, the deadlock may
489 * go undetected, leading to indefinite blocking.
490 *
491 * Note: Each entry in VALUES includes an index 'seqidx' that
492 * represents the sequence's position in the local 'seqinfos' list.
493 * This index is propagated to the query results and later used to
494 * directly map the fetched publisher sequence rows back to their
495 * corresponding local entries without relying on result order or name
496 * matching.
497 */
498 appendStringInfo(&cmd,
499 "SELECT s.seqidx, has_sequence_privilege(c.oid, 'SELECT'),\n"
500 " ps.*, seq.seqtypid,\n"
501 " seq.seqstart, seq.seqincrement, seq.seqmin,\n"
502 " seq.seqmax, seq.seqcycle\n"
503 "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n"
504 "JOIN pg_namespace n ON n.nspname = s.schname\n"
505 "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n"
506 "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n"
507 "JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n",
508 seqstr.data);
509
510 res = walrcv_exec(conn, cmd.data, lengthof(seqRow), seqRow);
511 if (res->status != WALRCV_OK_TUPLES)
514 errmsg("could not fetch sequence information from the publisher: %s",
515 res->err));
516
518 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
519 {
523 int seqidx;
524
526
528 {
529 ConfigReloadPending = false;
531 }
532
534 &seqinfo, &seqidx);
537 sequence_rel->rd_rel->relowner);
538
539 switch (sync_status)
540 {
541 case COPYSEQ_SUCCESS:
542 elog(DEBUG1,
543 "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished",
544 MySubscription->name, seqinfo->nspname,
545 seqinfo->seqname);
547 break;
548 case COPYSEQ_MISMATCH:
549
550 /*
551 * Remember mismatched sequences in a long-lived memory
552 * context since these will be used after the transaction
553 * is committed.
554 */
557 seqidx);
560 break;
562
563 /*
564 * Remember sequences with insufficient privileges in a
565 * long-lived memory context since these will be used
566 * after the transaction is committed.
567 */
570 seqidx);
573 break;
575
576 /*
577 * Remember sequences for which the publisher lacks the
578 * privileges required by pg_get_sequence_data().
579 */
582 seqidx);
585 break;
586 case COPYSEQ_SKIPPED:
587
588 /*
589 * Concurrent removal of a sequence on the subscriber is
590 * treated as success, since the only viable action is to
591 * skip the corresponding sequence data. Missing sequences
592 * on the publisher are treated as ERROR.
593 */
594 if (seqinfo->found_on_pub)
595 {
596 ereport(LOG,
597 errmsg("skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently",
598 seqinfo->nspname,
599 seqinfo->seqname));
601 }
602 break;
603 }
604
605 if (sequence_rel)
607 }
608
612 resetStringInfo(&cmd);
613
619
620 elog(DEBUG1,
621 "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d subscriber insufficient permission, %d publisher insufficient permission, %d missing from publisher, %d skipped",
626
627 /* Commit this batch, and prepare for next batch */
629
631 {
632 for (int idx = cur_batch_base_index; idx < cur_batch_base_index + batch_size; idx++)
633 {
636
637 /* If the sequence was not found on publisher, record it */
638 if (!seqinfo->found_on_pub)
640 }
641 }
642
643 /*
644 * cur_batch_base_index is not incremented sequentially because some
645 * sequences may be missing, and the number of fetched rows may not
646 * match the batch size.
647 */
648 cur_batch_base_index += batch_size;
649 }
650
651 /* Report mismatches, permission issues, or missing sequences */
654}
Datum idx(PG_FUNCTION_ARGS)
Definition _int_op.c:263
MemoryContext ApplyContext
Definition worker.c:477
#define lengthof(array)
Definition c.h:873
int errcode(int sqlerrcode)
Definition elog.c:875
#define LOG
Definition elog.h:32
#define DEBUG1
Definition elog.h:31
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define ereport(elevel,...)
Definition elog.h:152
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
const TupleTableSlotOps TTSOpsMinimalTuple
Definition execTuples.c:86
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
@ PGC_SIGHUP
Definition guc.h:75
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27
List * lappend_int(List *list, int datum)
Definition list.c:357
#define NoLock
Definition lockdefs.h:34
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
static char * errmsg
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:138
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
static void * list_nth(const List *list, int n)
Definition pg_list.h:331
char * quote_literal_cstr(const char *rawstr)
Definition quote.c:101
#define REMOTE_SEQ_COL_COUNT
static CopySeqResult get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel, LogicalRepSequenceInfo **seqinfo, int *seqidx)
static List * seqinfos
#define MAX_SEQUENCES_SYNC_PER_BATCH
static void report_sequence_errors(List *mismatched_seqs_idx, List *sub_insuffperm_seqs_idx, List *pub_insuffperm_seqs_idx, List *missing_seqs_idx)
static CopySeqResult copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner)
PGconn * conn
Definition streamutil.c:52
void resetStringInfo(StringInfo str)
Definition stringinfo.c:126
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition stringinfo.c:230
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
Definition pg_list.h:54
Tuplestorestate * tuplestore
TupleDesc tupledesc
WalRcvExecStatus status
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
@ WALRCV_OK_TUPLES
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
void StartTransactionCommand(void)
Definition xact.c:3112
void CommitTransactionCommand(void)
Definition xact.c:3210

References appendStringInfo(), appendStringInfoString(), ApplyContext, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), ConfigReloadPending, conn, copy_sequence(), COPYSEQ_MISMATCH, COPYSEQ_PUBLISHER_INSUFFICIENT_PERM, COPYSEQ_SKIPPED, COPYSEQ_SUBSCRIBER_INSUFFICIENT_PERM, COPYSEQ_SUCCESS, StringInfoData::data, DEBUG1, elog, ereport, WalRcvExecResult::err, errcode(), errmsg, ERROR, ExecDropSingleTupleTableSlot(), fb(), get_and_validate_seq_info(), idx(), initStringInfo(), lappend_int(), lengthof, list_length(), list_nth(), LOG, MakeSingleTupleTableSlot(), MAX_SEQUENCES_SYNC_PER_BATCH, MemoryContextSwitchTo(), MySubscription, Subscription::name, NIL, NoLock, PGC_SIGHUP, ProcessConfigFile(), quote_literal_cstr(), REMOTE_SEQ_COL_COUNT, report_sequence_errors(), resetStringInfo(), seqinfos, StartTransactionCommand(), WalRcvExecResult::status, table_close(), TTSOpsMinimalTuple, WalRcvExecResult::tupledesc, WalRcvExecResult::tuplestore, tuplestore_gettupleslot(), walrcv_clear_result(), walrcv_exec, and WALRCV_OK_TUPLES.

Referenced by LogicalRepSyncSequences().

◆ get_and_validate_seq_info()

static CopySeqResult get_and_validate_seq_info ( TupleTableSlot slot,
Relation sequence_rel,
LogicalRepSequenceInfo **  seqinfo,
int seqidx 
)
static

Definition at line 248 of file sequencesync.c.

250{
251 bool isnull;
252 int col = 0;
253 Datum datum;
260 bool remote_cycle;
265
266 *seqidx = DatumGetInt32(slot_getattr(slot, ++col, &isnull));
267 Assert(!isnull);
268
269 /* Identify the corresponding local sequence for the given index. */
272
273 /*
274 * The remote sequence state can be NULL if the publisher lacks the
275 * required privileges or if the sequence was dropped concurrently after
276 * it was identified in the catalog snapshot (see pg_get_sequence_data()).
277 */
279 Assert(!isnull);
280
281 datum = slot_getattr(slot, ++col, &isnull);
282 if (isnull)
285
286 seqinfo_local->last_value = DatumGetInt64(datum);
287
288 seqinfo_local->is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull));
289 Assert(!isnull);
290
291 seqinfo_local->page_lsn = DatumGetLSN(slot_getattr(slot, ++col, &isnull));
292 Assert(!isnull);
293
294 remote_typid = DatumGetObjectId(slot_getattr(slot, ++col, &isnull));
295 Assert(!isnull);
296
297 remote_start = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
298 Assert(!isnull);
299
300 remote_increment = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
301 Assert(!isnull);
302
303 remote_min = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
304 Assert(!isnull);
305
306 remote_max = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
307 Assert(!isnull);
308
309 remote_cycle = DatumGetBool(slot_getattr(slot, ++col, &isnull));
310 Assert(!isnull);
311
312 /* Sanity check */
314
315 seqinfo_local->found_on_pub = true;
316
318
319 /* Sequence was concurrently dropped? */
320 if (!*sequence_rel)
321 return COPYSEQ_SKIPPED;
322
324
325 /* Sequence was concurrently dropped? */
326 if (!HeapTupleIsValid(tup))
327 elog(ERROR, "cache lookup failed for sequence %u",
328 seqinfo_local->localrelid);
329
331
332 /* Sequence parameters for remote/local are the same? */
333 if (local_seq->seqtypid != remote_typid ||
334 local_seq->seqstart != remote_start ||
335 local_seq->seqincrement != remote_increment ||
336 local_seq->seqmin != remote_min ||
337 local_seq->seqmax != remote_max ||
338 local_seq->seqcycle != remote_cycle)
340
341 /* Sequence was concurrently renamed? */
342 if (strcmp(seqinfo_local->nspname,
346
348 return result;
349}
#define Assert(condition)
Definition c.h:943
int64_t int64
Definition c.h:621
uint32 result
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
#define RowExclusiveLock
Definition lockdefs.h:38
char * get_namespace_name(Oid nspid)
Definition lsyscache.c:3674
static XLogRecPtr DatumGetLSN(Datum X)
Definition pg_lsn.h:25
END_CATALOG_STRUCT typedef FormData_pg_sequence * Form_pg_sequence
Definition pg_sequence.h:44
static bool DatumGetBool(Datum X)
Definition postgres.h:100
static int64 DatumGetInt64(Datum X)
Definition postgres.h:416
static Oid DatumGetObjectId(Datum X)
Definition postgres.h:242
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
uint64_t Datum
Definition postgres.h:70
static int32 DatumGetInt32(Datum X)
Definition postgres.h:202
#define RelationGetRelationName(relation)
Definition rel.h:550
#define RelationGetNamespace(relation)
Definition rel.h:557
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:265
HeapTuple SearchSysCache1(SysCacheIdentifier cacheId, Datum key1)
Definition syscache.c:221
Relation try_table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:60
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition tuptable.h:417

References Assert, COPYSEQ_MISMATCH, COPYSEQ_PUBLISHER_INSUFFICIENT_PERM, COPYSEQ_SKIPPED, COPYSEQ_SUCCESS, DatumGetBool(), DatumGetInt32(), DatumGetInt64(), DatumGetLSN(), DatumGetObjectId(), elog, ERROR, fb(), Form_pg_sequence, get_namespace_name(), GETSTRUCT(), HeapTupleIsValid, list_nth(), ObjectIdGetDatum(), RelationGetNamespace, RelationGetRelationName, ReleaseSysCache(), REMOTE_SEQ_COL_COUNT, result, RowExclusiveLock, SearchSysCache1(), seqinfos, slot_getattr(), and try_table_open().

Referenced by copy_sequences().

◆ get_sequences_string()

static void get_sequences_string ( List seqindexes,
StringInfo  buf 
)
static

Definition at line 149 of file sequencesync.c.

150{
153 {
156
157 if (buf->len > 0)
159
160 appendStringInfo(buf, "\"%s.%s\"", seqinfo->nspname, seqinfo->seqname);
161 }
162}
#define foreach_int(var, lst)
Definition pg_list.h:502
static char buf[DEFAULT_XLOG_SEG_SIZE]

References appendStringInfo(), appendStringInfoString(), buf, fb(), foreach_int, list_nth(), resetStringInfo(), and seqinfos.

Referenced by report_sequence_errors().

◆ LogicalRepSyncSequences()

static void LogicalRepSyncSequences ( void  )
static

Definition at line 661 of file sequencesync.c.

662{
663 char *err;
665 Relation rel;
667 ScanKeyData skey[2];
668 SysScanDesc scan;
671
673
675
676 ScanKeyInit(&skey[0],
679 ObjectIdGetDatum(subid));
680
681 ScanKeyInit(&skey[1],
685
686 scan = systable_beginscan(rel, InvalidOid, false,
687 NULL, 2, skey);
688 while (HeapTupleIsValid(tup = systable_getnext(scan)))
689 {
694
696
698
700
701 /* Skip if sequence was dropped concurrently */
702 if (!sequence_rel)
703 continue;
704
705 /* Skip if the relation is not a sequence */
706 if (sequence_rel->rd_rel->relkind != RELKIND_SEQUENCE)
707 {
709 continue;
710 }
711
712 /*
713 * Worker needs to process sequences across transaction boundary, so
714 * allocate them under long-lived context.
715 */
717
719 seq->localrelid = subrel->srrelid;
723
725
727 }
728
729 /* Cleanup */
730 systable_endscan(scan);
732
734
735 /*
736 * Exit early if no catalog entries found, likely due to concurrent drops.
737 */
738 if (!seqinfos)
739 return;
740
741 /* Is the use of a password mandatory? */
744
746 appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT,
748
749 /*
750 * Establish the connection to the publisher for sequence synchronization.
751 */
755 app_name.data, &err);
759 errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s",
761
762 pfree(app_name.data);
763
765}
WalReceiverConn * LogRepWorkerWalRcvConn
Definition worker.c:482
#define UINT64_FORMAT
Definition c.h:635
void err(int eval, const char *fmt,...)
Definition err.c:43
#define palloc0_object(type)
Definition fe_memutils.h:90
void systable_endscan(SysScanDesc sysscan)
Definition genam.c:604
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition genam.c:515
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition genam.c:388
LogicalRepWorker * MyLogicalRepWorker
Definition launcher.c:58
List * lappend(List *list, void *datum)
Definition list.c:339
#define AccessShareLock
Definition lockdefs.h:36
char * pstrdup(const char *in)
Definition mcxt.c:1910
void pfree(void *pointer)
Definition mcxt.c:1619
END_CATALOG_STRUCT typedef FormData_pg_subscription_rel * Form_pg_subscription_rel
static Datum CharGetDatum(char X)
Definition postgres.h:132
#define InvalidOid
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition scankey.c:76
static void copy_sequences(WalReceiverConn *conn)
#define BTEqualStrategyNumber
Definition stratnum.h:31
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
uint64 GetSystemIdentifier(void)
Definition xlog.c:4643

References AccessShareLock, appendStringInfo(), ApplyContext, BTEqualStrategyNumber, CharGetDatum(), CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), Subscription::conninfo, copy_sequences(), ereport, err(), errcode(), errmsg, ERROR, fb(), Form_pg_subscription_rel, get_namespace_name(), GETSTRUCT(), GetSystemIdentifier(), HeapTupleIsValid, initStringInfo(), InvalidOid, lappend(), LogRepWorkerWalRcvConn, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, Subscription::name, NoLock, ObjectIdGetDatum(), Subscription::oid, Subscription::ownersuperuser, palloc0_object, Subscription::passwordrequired, pfree(), pstrdup(), RelationGetNamespace, RelationGetRelationName, RowExclusiveLock, ScanKeyInit(), seqinfos, StartTransactionCommand(), LogicalRepWorker::subid, systable_beginscan(), systable_endscan(), systable_getnext(), table_close(), table_open(), try_table_open(), UINT64_FORMAT, and walrcv_connect.

Referenced by start_sequence_sync().

◆ ProcessSequencesForSync()

void ProcessSequencesForSync ( void  )

Definition at line 97 of file sequencesync.c.

98{
100 int nsyncworkers;
102 bool started_tx;
103
105
106 if (started_tx)
107 {
109 pgstat_report_stat(true);
110 }
111
113 return;
114
116
117 /* Check if there is a sequencesync worker already running? */
120 InvalidOid, true);
122 {
124 return;
125 }
126
127 /*
128 * Count running sync workers for this subscription, while we have the
129 * lock.
130 */
133
134 /*
135 * It is okay to read/update last_seqsync_start_time here in apply worker
136 * as we have already ensured that sync worker doesn't exist.
137 */
140}
LogicalRepWorker * logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, bool only_running)
Definition launcher.c:268
int logicalrep_sync_worker_count(Oid subid)
Definition launcher.c:937
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
@ LW_SHARED
Definition lwlock.h:105
long pgstat_report_stat(bool force)
Definition pgstat.c:722
TimestampTz last_seqsync_start_time
void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid, TimestampTz *last_start_time)
Definition syncutils.c:118
void FetchRelationStates(bool *has_pending_subtables, bool *has_pending_subsequences, bool *started_tx)
Definition syncutils.c:203
@ WORKERTYPE_SEQUENCESYNC

References CommitTransactionCommand(), fb(), FetchRelationStates(), InvalidOid, LogicalRepWorker::last_seqsync_start_time, launch_sync_worker(), logicalrep_sync_worker_count(), logicalrep_worker_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLogicalRepWorker, pgstat_report_stat(), LogicalRepWorker::subid, and WORKERTYPE_SEQUENCESYNC.

Referenced by ProcessSyncingRelations().

◆ report_sequence_errors()

static void report_sequence_errors ( List mismatched_seqs_idx,
List sub_insuffperm_seqs_idx,
List pub_insuffperm_seqs_idx,
List missing_seqs_idx 
)
static

Definition at line 176 of file sequencesync.c.

180{
182
183 /* Quick exit if there are no errors to report */
186 return;
187
189
191 {
195 errmsg_plural("mismatched or renamed sequence on subscriber (%s)",
196 "mismatched or renamed sequences on subscriber (%s)",
198 seqstr.data));
199 }
200
202 {
206 errmsg_plural("insufficient privileges on subscriber sequence (%s)",
207 "insufficient privileges on subscriber sequences (%s)",
209 seqstr.data));
210 }
211
213 {
217 errmsg_plural("insufficient privileges on publisher sequence (%s)",
218 "insufficient privileges on publisher sequences (%s)",
220 seqstr.data));
221 }
222
224 {
228 errmsg_plural("missing sequence on publisher (%s)",
229 "missing sequences on publisher (%s)",
231 seqstr.data));
232 }
233
236 errmsg("logical replication sequence synchronization failed for subscription \"%s\"",
238}
#define WARNING
Definition elog.h:37
int int int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...) pg_attribute_printf(1
static void get_sequences_string(List *seqindexes, StringInfo buf)

References ereport, errcode(), errmsg, errmsg_plural(), ERROR, fb(), get_sequences_string(), initStringInfo(), list_length(), MySubscription, Subscription::name, and WARNING.

Referenced by copy_sequences().

◆ SequenceSyncWorkerMain()

void SequenceSyncWorkerMain ( Datum  main_arg)

Definition at line 806 of file sequencesync.c.

807{
809
811
813
815}
void SetupApplyOrSyncWorker(int worker_slot)
Definition worker.c:5963
static void start_sequence_sync(void)
pg_noreturn void FinishSyncWorker(void)
Definition syncutils.c:50

References DatumGetInt32(), fb(), FinishSyncWorker(), SetupApplyOrSyncWorker(), and start_sequence_sync().

◆ start_sequence_sync()

static void start_sequence_sync ( void  )
static

Definition at line 775 of file sequencesync.c.

776{
778
779 PG_TRY();
780 {
781 /* Call initial sync. */
783 }
784 PG_CATCH();
785 {
788 else
789 {
790 /*
791 * Report the worker failed during sequence synchronization. Abort
792 * the current transaction so that the stats message is sent in an
793 * idle state.
794 */
797
798 PG_RE_THROW();
799 }
800 }
801 PG_END_TRY();
802}
void DisableSubscriptionAndExit(void)
Definition worker.c:6023
#define PG_RE_THROW()
Definition elog.h:407
#define PG_TRY(...)
Definition elog.h:374
#define PG_END_TRY(...)
Definition elog.h:399
#define PG_CATCH(...)
Definition elog.h:384
void pgstat_report_subscription_error(Oid subid)
static void LogicalRepSyncSequences(void)
static bool am_sequencesync_worker(void)
void AbortOutOfAnyTransaction(void)
Definition xact.c:4916

References AbortOutOfAnyTransaction(), am_sequencesync_worker(), Assert, Subscription::disableonerr, DisableSubscriptionAndExit(), LogicalRepSyncSequences(), MySubscription, Subscription::oid, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, and pgstat_report_subscription_error().

Referenced by SequenceSyncWorkerMain().

Variable Documentation

◆ seqinfos

List* seqinfos = NIL
static