PostgreSQL Source Code git master
sequencesync.c File Reference
#include "postgres.h"
#include "access/table.h"
#include "catalog/pg_sequence.h"
#include "catalog/pg_subscription_rel.h"
#include "commands/sequence.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/logicalworker.h"
#include "replication/worker_internal.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/syscache.h"
#include "utils/usercontext.h"
Include dependency graph for sequencesync.c:

Go to the source code of this file.

Macros

#define REMOTE_SEQ_COL_COUNT   10
 
#define MAX_SEQUENCES_SYNC_PER_BATCH   100
 

Typedefs

typedef enum CopySeqResult CopySeqResult
 

Enumerations

enum  CopySeqResult { COPYSEQ_SUCCESS , COPYSEQ_MISMATCH , COPYSEQ_INSUFFICIENT_PERM , COPYSEQ_SKIPPED }
 

Functions

void ProcessSequencesForSync (void)
 
static void get_sequences_string (List *seqindexes, StringInfo buf)
 
static void report_sequence_errors (List *mismatched_seqs_idx, List *insuffperm_seqs_idx, List *missing_seqs_idx)
 
static CopySeqResult get_and_validate_seq_info (TupleTableSlot *slot, Relation *sequence_rel, LogicalRepSequenceInfo **seqinfo, int *seqidx)
 
static CopySeqResult copy_sequence (LogicalRepSequenceInfo *seqinfo, Oid seqowner)
 
static void copy_sequences (WalReceiverConn *conn)
 
static void LogicalRepSyncSequences (void)
 
static void start_sequence_sync (void)
 
void SequenceSyncWorkerMain (Datum main_arg)
 

Variables

static Listseqinfos = NIL
 

Macro Definition Documentation

◆ MAX_SEQUENCES_SYNC_PER_BATCH

#define MAX_SEQUENCES_SYNC_PER_BATCH   100

◆ REMOTE_SEQ_COL_COUNT

#define REMOTE_SEQ_COL_COUNT   10

Definition at line 73 of file sequencesync.c.

Typedef Documentation

◆ CopySeqResult

Enumeration Type Documentation

◆ CopySeqResult

Enumerator
COPYSEQ_SUCCESS 
COPYSEQ_MISMATCH 
COPYSEQ_INSUFFICIENT_PERM 
COPYSEQ_SKIPPED 

Definition at line 75 of file sequencesync.c.

76{
CopySeqResult
Definition: sequencesync.c:76
@ COPYSEQ_INSUFFICIENT_PERM
Definition: sequencesync.c:79
@ COPYSEQ_MISMATCH
Definition: sequencesync.c:78
@ COPYSEQ_SUCCESS
Definition: sequencesync.c:77
@ COPYSEQ_SKIPPED
Definition: sequencesync.c:80

Function Documentation

◆ copy_sequence()

static CopySeqResult copy_sequence ( LogicalRepSequenceInfo seqinfo,
Oid  seqowner 
)
static

Definition at line 324 of file sequencesync.c.

325{
326 UserContext ucxt;
327 AclResult aclresult;
328 bool run_as_owner = MySubscription->runasowner;
329 Oid seqoid = seqinfo->localrelid;
330
331 /*
332 * If the user did not opt to run as the owner of the subscription
333 * ('run_as_owner'), then copy the sequence as the owner of the sequence.
334 */
335 if (!run_as_owner)
336 SwitchToUntrustedUser(seqowner, &ucxt);
337
338 aclresult = pg_class_aclcheck(seqoid, GetUserId(), ACL_UPDATE);
339
340 if (aclresult != ACLCHECK_OK)
341 {
342 if (!run_as_owner)
343 RestoreUserContext(&ucxt);
344
346 }
347
348 /*
349 * The log counter (log_cnt) tracks how many sequence values are still
350 * unused locally. It is only relevant to the local node and managed
351 * internally by nextval() when allocating new ranges. Since log_cnt does
352 * not affect the visible sequence state (like last_value or is_called)
353 * and is only used for local caching, it need not be copied to the
354 * subscriber during synchronization.
355 */
356 SetSequence(seqoid, seqinfo->last_value, seqinfo->is_called);
357
358 if (!run_as_owner)
359 RestoreUserContext(&ucxt);
360
361 /*
362 * Record the remote sequence's LSN in pg_subscription_rel and mark the
363 * sequence as READY.
364 */
365 UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY,
366 seqinfo->page_lsn, false);
367
368 return COPYSEQ_SUCCESS;
369}
AclResult
Definition: acl.h:182
@ ACLCHECK_OK
Definition: acl.h:183
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4037
Subscription * MySubscription
Definition: worker.c:479
void SetSequence(Oid relid, int64 next, bool iscalled)
Definition: sequence.c:946
Oid GetUserId(void)
Definition: miscinit.c:469
#define ACL_UPDATE
Definition: parsenodes.h:78
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool already_locked)
unsigned int Oid
Definition: postgres_ext.h:32
void SwitchToUntrustedUser(Oid userid, UserContext *context)
Definition: usercontext.c:33
void RestoreUserContext(UserContext *context)
Definition: usercontext.c:87

References ACL_UPDATE, ACLCHECK_OK, COPYSEQ_INSUFFICIENT_PERM, COPYSEQ_SUCCESS, GetUserId(), LogicalRepSequenceInfo::is_called, LogicalRepSequenceInfo::last_value, LogicalRepSequenceInfo::localrelid, MySubscription, Subscription::oid, LogicalRepSequenceInfo::page_lsn, pg_class_aclcheck(), RestoreUserContext(), Subscription::runasowner, SetSequence(), SwitchToUntrustedUser(), and UpdateSubscriptionRelState().

Referenced by copy_sequences().

◆ copy_sequences()

static void copy_sequences ( WalReceiverConn conn)
static

Definition at line 375 of file sequencesync.c.

376{
377 int cur_batch_base_index = 0;
378 int n_seqinfos = list_length(seqinfos);
379 List *mismatched_seqs_idx = NIL;
380 List *missing_seqs_idx = NIL;
381 List *insuffperm_seqs_idx = NIL;
382 StringInfo seqstr = makeStringInfo();
384 MemoryContext oldctx;
385
386#define MAX_SEQUENCES_SYNC_PER_BATCH 100
387
388 elog(DEBUG1,
389 "logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d",
390 MySubscription->name, n_seqinfos);
391
392 while (cur_batch_base_index < n_seqinfos)
393 {
394 Oid seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID,
395 BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID};
396 int batch_size = 0;
397 int batch_succeeded_count = 0;
398 int batch_mismatched_count = 0;
399 int batch_skipped_count = 0;
400 int batch_insuffperm_count = 0;
401 int batch_missing_count;
402 Relation sequence_rel;
403
404 WalRcvExecResult *res;
405 TupleTableSlot *slot;
406
408
409 for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++)
410 {
411 char *nspname_literal;
412 char *seqname_literal;
413
414 LogicalRepSequenceInfo *seqinfo =
416
417 if (seqstr->len > 0)
418 appendStringInfoString(seqstr, ", ");
419
420 nspname_literal = quote_literal_cstr(seqinfo->nspname);
421 seqname_literal = quote_literal_cstr(seqinfo->seqname);
422
423 appendStringInfo(seqstr, "(%s, %s, %d)",
424 nspname_literal, seqname_literal, idx);
425
426 if (++batch_size == MAX_SEQUENCES_SYNC_PER_BATCH)
427 break;
428 }
429
430 /*
431 * We deliberately avoid acquiring a local lock on the sequence before
432 * querying the publisher to prevent potential distributed deadlocks
433 * in bi-directional replication setups.
434 *
435 * Example scenario:
436 *
437 * - On each node, a background worker acquires a lock on a sequence
438 * as part of a sync operation.
439 *
440 * - Concurrently, a user transaction attempts to alter the same
441 * sequence, waiting on the background worker's lock.
442 *
443 * - Meanwhile, a query from the other node tries to access metadata
444 * that depends on the completion of the alter operation.
445 *
446 * - This creates a circular wait across nodes:
447 *
448 * Node-1: Query -> waits on Alter -> waits on Sync Worker
449 *
450 * Node-2: Query -> waits on Alter -> waits on Sync Worker
451 *
452 * Since each node only sees part of the wait graph, the deadlock may
453 * go undetected, leading to indefinite blocking.
454 *
455 * Note: Each entry in VALUES includes an index 'seqidx' that
456 * represents the sequence's position in the local 'seqinfos' list.
457 * This index is propagated to the query results and later used to
458 * directly map the fetched publisher sequence rows back to their
459 * corresponding local entries without relying on result order or name
460 * matching.
461 */
463 "SELECT s.seqidx, ps.*, seq.seqtypid,\n"
464 " seq.seqstart, seq.seqincrement, seq.seqmin,\n"
465 " seq.seqmax, seq.seqcycle\n"
466 "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n"
467 "JOIN pg_namespace n ON n.nspname = s.schname\n"
468 "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n"
469 "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n"
470 "JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n",
471 seqstr->data);
472
473 res = walrcv_exec(conn, cmd->data, lengthof(seqRow), seqRow);
474 if (res->status != WALRCV_OK_TUPLES)
476 errcode(ERRCODE_CONNECTION_FAILURE),
477 errmsg("could not fetch sequence information from the publisher: %s",
478 res->err));
479
481 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
482 {
483 CopySeqResult sync_status;
484 LogicalRepSequenceInfo *seqinfo;
485 int seqidx;
486
488
490 {
491 ConfigReloadPending = false;
493 }
494
495 sync_status = get_and_validate_seq_info(slot, &sequence_rel,
496 &seqinfo, &seqidx);
497 if (sync_status == COPYSEQ_SUCCESS)
498 sync_status = copy_sequence(seqinfo,
499 sequence_rel->rd_rel->relowner);
500
501 switch (sync_status)
502 {
503 case COPYSEQ_SUCCESS:
504 elog(DEBUG1,
505 "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished",
506 MySubscription->name, seqinfo->nspname,
507 seqinfo->seqname);
508 batch_succeeded_count++;
509 break;
510 case COPYSEQ_MISMATCH:
511
512 /*
513 * Remember mismatched sequences in a long-lived memory
514 * context since these will be used after the transaction
515 * is committed.
516 */
518 mismatched_seqs_idx = lappend_int(mismatched_seqs_idx,
519 seqidx);
520 MemoryContextSwitchTo(oldctx);
521 batch_mismatched_count++;
522 break;
524
525 /*
526 * Remember sequences with insufficient privileges in a
527 * long-lived memory context since these will be used
528 * after the transaction is committed.
529 */
531 insuffperm_seqs_idx = lappend_int(insuffperm_seqs_idx,
532 seqidx);
533 MemoryContextSwitchTo(oldctx);
534 batch_insuffperm_count++;
535 break;
536 case COPYSEQ_SKIPPED:
537 ereport(LOG,
538 errmsg("skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently",
539 seqinfo->nspname,
540 seqinfo->seqname));
541 batch_skipped_count++;
542 break;
543 }
544
545 if (sequence_rel)
546 table_close(sequence_rel, NoLock);
547 }
548
551 resetStringInfo(seqstr);
552 resetStringInfo(cmd);
553
554 batch_missing_count = batch_size - (batch_succeeded_count +
555 batch_mismatched_count +
556 batch_insuffperm_count +
557 batch_skipped_count);
558
559 elog(DEBUG1,
560 "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped",
562 (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1,
563 batch_size, batch_succeeded_count, batch_mismatched_count,
564 batch_insuffperm_count, batch_missing_count, batch_skipped_count);
565
566 /* Commit this batch, and prepare for next batch */
568
569 if (batch_missing_count)
570 {
571 for (int idx = cur_batch_base_index; idx < cur_batch_base_index + batch_size; idx++)
572 {
573 LogicalRepSequenceInfo *seqinfo =
575
576 /* If the sequence was not found on publisher, record it */
577 if (!seqinfo->found_on_pub)
578 missing_seqs_idx = lappend_int(missing_seqs_idx, idx);
579 }
580 }
581
582 /*
583 * cur_batch_base_index is not incremented sequentially because some
584 * sequences may be missing, and the number of fetched rows may not
585 * match the batch size.
586 */
587 cur_batch_base_index += batch_size;
588 }
589
590 /* Report mismatches, permission issues, or missing sequences */
591 report_sequence_errors(mismatched_seqs_idx, insuffperm_seqs_idx,
592 missing_seqs_idx);
593}
Datum idx(PG_FUNCTION_ARGS)
Definition: _int_op.c:262
MemoryContext ApplyContext
Definition: worker.c:472
#define lengthof(array)
Definition: c.h:790
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define LOG
Definition: elog.h:31
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define ereport(elevel,...)
Definition: elog.h:150
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1427
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1443
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:86
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
@ PGC_SIGHUP
Definition: guc.h:75
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
List * lappend_int(List *list, int datum)
Definition: list.c:357
#define NoLock
Definition: lockdefs.h:34
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
static void * list_nth(const List *list, int n)
Definition: pg_list.h:299
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:103
#define REMOTE_SEQ_COL_COUNT
Definition: sequencesync.c:73
static CopySeqResult get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel, LogicalRepSequenceInfo **seqinfo, int *seqidx)
Definition: sequencesync.c:230
static List * seqinfos
Definition: sequencesync.c:83
#define MAX_SEQUENCES_SYNC_PER_BATCH
static void report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx, List *missing_seqs_idx)
Definition: sequencesync.c:172
static CopySeqResult copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner)
Definition: sequencesync.c:324
PGconn * conn
Definition: streamutil.c:52
StringInfo makeStringInfo(void)
Definition: stringinfo.c:72
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:126
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:230
Definition: pg_list.h:54
Form_pg_class rd_rel
Definition: rel.h:111
Tuplestorestate * tuplestore
Definition: walreceiver.h:223
TupleDesc tupledesc
Definition: walreceiver.h:224
WalRcvExecStatus status
Definition: walreceiver.h:220
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
Definition: tuplestore.c:1130
@ WALRCV_OK_TUPLES
Definition: walreceiver.h:207
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:471
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:465
void StartTransactionCommand(void)
Definition: xact.c:3077
void CommitTransactionCommand(void)
Definition: xact.c:3175

References appendStringInfo(), appendStringInfoString(), ApplyContext, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), ConfigReloadPending, conn, copy_sequence(), COPYSEQ_INSUFFICIENT_PERM, COPYSEQ_MISMATCH, COPYSEQ_SKIPPED, COPYSEQ_SUCCESS, StringInfoData::data, DEBUG1, elog, ereport, WalRcvExecResult::err, errcode(), errmsg(), ERROR, ExecDropSingleTupleTableSlot(), LogicalRepSequenceInfo::found_on_pub, get_and_validate_seq_info(), idx(), lappend_int(), StringInfoData::len, lengthof, list_length(), list_nth(), LOG, MakeSingleTupleTableSlot(), makeStringInfo(), MAX_SEQUENCES_SYNC_PER_BATCH, MemoryContextSwitchTo(), MySubscription, Subscription::name, NIL, NoLock, LogicalRepSequenceInfo::nspname, PGC_SIGHUP, ProcessConfigFile(), quote_literal_cstr(), RelationData::rd_rel, REMOTE_SEQ_COL_COUNT, report_sequence_errors(), resetStringInfo(), seqinfos, LogicalRepSequenceInfo::seqname, StartTransactionCommand(), WalRcvExecResult::status, table_close(), TTSOpsMinimalTuple, WalRcvExecResult::tupledesc, WalRcvExecResult::tuplestore, tuplestore_gettupleslot(), walrcv_clear_result(), walrcv_exec, and WALRCV_OK_TUPLES.

Referenced by LogicalRepSyncSequences().

◆ get_and_validate_seq_info()

static CopySeqResult get_and_validate_seq_info ( TupleTableSlot slot,
Relation sequence_rel,
LogicalRepSequenceInfo **  seqinfo,
int *  seqidx 
)
static

Definition at line 230 of file sequencesync.c.

232{
233 bool isnull;
234 int col = 0;
235 Oid remote_typid;
236 int64 remote_start;
237 int64 remote_increment;
238 int64 remote_min;
239 int64 remote_max;
240 bool remote_cycle;
242 HeapTuple tup;
243 Form_pg_sequence local_seq;
244 LogicalRepSequenceInfo *seqinfo_local;
245
246 *seqidx = DatumGetInt32(slot_getattr(slot, ++col, &isnull));
247 Assert(!isnull);
248
249 /* Identify the corresponding local sequence for the given index. */
250 *seqinfo = seqinfo_local =
252
253 seqinfo_local->last_value = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
254 Assert(!isnull);
255
256 seqinfo_local->is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull));
257 Assert(!isnull);
258
259 seqinfo_local->page_lsn = DatumGetLSN(slot_getattr(slot, ++col, &isnull));
260 Assert(!isnull);
261
262 remote_typid = DatumGetObjectId(slot_getattr(slot, ++col, &isnull));
263 Assert(!isnull);
264
265 remote_start = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
266 Assert(!isnull);
267
268 remote_increment = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
269 Assert(!isnull);
270
271 remote_min = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
272 Assert(!isnull);
273
274 remote_max = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
275 Assert(!isnull);
276
277 remote_cycle = DatumGetBool(slot_getattr(slot, ++col, &isnull));
278 Assert(!isnull);
279
280 /* Sanity check */
282
283 seqinfo_local->found_on_pub = true;
284
285 *sequence_rel = try_table_open(seqinfo_local->localrelid, RowExclusiveLock);
286
287 /* Sequence was concurrently dropped? */
288 if (!*sequence_rel)
289 return COPYSEQ_SKIPPED;
290
291 tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo_local->localrelid));
292
293 /* Sequence was concurrently dropped? */
294 if (!HeapTupleIsValid(tup))
295 elog(ERROR, "cache lookup failed for sequence %u",
296 seqinfo_local->localrelid);
297
298 local_seq = (Form_pg_sequence) GETSTRUCT(tup);
299
300 /* Sequence parameters for remote/local are the same? */
301 if (local_seq->seqtypid != remote_typid ||
302 local_seq->seqstart != remote_start ||
303 local_seq->seqincrement != remote_increment ||
304 local_seq->seqmin != remote_min ||
305 local_seq->seqmax != remote_max ||
306 local_seq->seqcycle != remote_cycle)
307 result = COPYSEQ_MISMATCH;
308
309 /* Sequence was concurrently renamed? */
310 if (strcmp(seqinfo_local->nspname,
311 get_namespace_name(RelationGetNamespace(*sequence_rel))) ||
312 strcmp(seqinfo_local->seqname, RelationGetRelationName(*sequence_rel)))
313 result = COPYSEQ_MISMATCH;
314
315 ReleaseSysCache(tup);
316 return result;
317}
int64_t int64
Definition: c.h:538
Assert(PointerIsAligned(start, uint64))
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
Definition: htup_details.h:728
#define RowExclusiveLock
Definition: lockdefs.h:38
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3533
static XLogRecPtr DatumGetLSN(Datum X)
Definition: pg_lsn.h:25
FormData_pg_sequence * Form_pg_sequence
Definition: pg_sequence.h:40
static bool DatumGetBool(Datum X)
Definition: postgres.h:100
static int64 DatumGetInt64(Datum X)
Definition: postgres.h:393
static Oid DatumGetObjectId(Datum X)
Definition: postgres.h:252
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:262
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:212
#define RelationGetRelationName(relation)
Definition: rel.h:549
#define RelationGetNamespace(relation)
Definition: rel.h:556
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:264
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:220
Relation try_table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:60
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: tuptable.h:398

References Assert(), COPYSEQ_MISMATCH, COPYSEQ_SKIPPED, COPYSEQ_SUCCESS, DatumGetBool(), DatumGetInt32(), DatumGetInt64(), DatumGetLSN(), DatumGetObjectId(), elog, ERROR, LogicalRepSequenceInfo::found_on_pub, get_namespace_name(), GETSTRUCT(), HeapTupleIsValid, LogicalRepSequenceInfo::is_called, LogicalRepSequenceInfo::last_value, list_nth(), LogicalRepSequenceInfo::localrelid, LogicalRepSequenceInfo::nspname, ObjectIdGetDatum(), LogicalRepSequenceInfo::page_lsn, RelationGetNamespace, RelationGetRelationName, ReleaseSysCache(), REMOTE_SEQ_COL_COUNT, RowExclusiveLock, SearchSysCache1(), seqinfos, LogicalRepSequenceInfo::seqname, slot_getattr(), and try_table_open().

Referenced by copy_sequences().

◆ get_sequences_string()

static void get_sequences_string ( List seqindexes,
StringInfo  buf 
)
static

Definition at line 146 of file sequencesync.c.

147{
149 foreach_int(seqidx, seqindexes)
150 {
151 LogicalRepSequenceInfo *seqinfo =
153
154 if (buf->len > 0)
156
157 appendStringInfo(buf, "\"%s.%s\"", seqinfo->nspname, seqinfo->seqname);
158 }
159}
#define foreach_int(var, lst)
Definition: pg_list.h:470
static char * buf
Definition: pg_test_fsync.c:72

References appendStringInfo(), appendStringInfoString(), buf, foreach_int, list_nth(), LogicalRepSequenceInfo::nspname, resetStringInfo(), seqinfos, and LogicalRepSequenceInfo::seqname.

Referenced by report_sequence_errors().

◆ LogicalRepSyncSequences()

static void LogicalRepSyncSequences ( void  )
static

Definition at line 600 of file sequencesync.c.

601{
602 char *err;
603 bool must_use_password;
604 Relation rel;
605 HeapTuple tup;
606 ScanKeyData skey[2];
607 SysScanDesc scan;
609 StringInfoData app_name;
610
612
613 rel = table_open(SubscriptionRelRelationId, AccessShareLock);
614
615 ScanKeyInit(&skey[0],
616 Anum_pg_subscription_rel_srsubid,
617 BTEqualStrategyNumber, F_OIDEQ,
618 ObjectIdGetDatum(subid));
619
620 ScanKeyInit(&skey[1],
621 Anum_pg_subscription_rel_srsubstate,
622 BTEqualStrategyNumber, F_CHAREQ,
623 CharGetDatum(SUBREL_STATE_INIT));
624
625 scan = systable_beginscan(rel, InvalidOid, false,
626 NULL, 2, skey);
627 while (HeapTupleIsValid(tup = systable_getnext(scan)))
628 {
631 Relation sequence_rel;
632 MemoryContext oldctx;
633
635
636 subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
637
638 sequence_rel = try_table_open(subrel->srrelid, RowExclusiveLock);
639
640 /* Skip if sequence was dropped concurrently */
641 if (!sequence_rel)
642 continue;
643
644 /* Skip if the relation is not a sequence */
645 if (sequence_rel->rd_rel->relkind != RELKIND_SEQUENCE)
646 {
647 table_close(sequence_rel, NoLock);
648 continue;
649 }
650
651 /*
652 * Worker needs to process sequences across transaction boundary, so
653 * allocate them under long-lived context.
654 */
656
658 seq->localrelid = subrel->srrelid;
659 seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel));
660 seq->seqname = pstrdup(RelationGetRelationName(sequence_rel));
661 seqinfos = lappend(seqinfos, seq);
662
663 MemoryContextSwitchTo(oldctx);
664
665 table_close(sequence_rel, NoLock);
666 }
667
668 /* Cleanup */
669 systable_endscan(scan);
671
673
674 /*
675 * Exit early if no catalog entries found, likely due to concurrent drops.
676 */
677 if (!seqinfos)
678 return;
679
680 /* Is the use of a password mandatory? */
681 must_use_password = MySubscription->passwordrequired &&
683
684 initStringInfo(&app_name);
685 appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT,
687
688 /*
689 * Establish the connection to the publisher for sequence synchronization.
690 */
693 must_use_password,
694 app_name.data, &err);
695 if (LogRepWorkerWalRcvConn == NULL)
697 errcode(ERRCODE_CONNECTION_FAILURE),
698 errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s",
700
701 pfree(app_name.data);
702
704}
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:477
#define UINT64_FORMAT
Definition: c.h:560
void err(int eval, const char *fmt,...)
Definition: err.c:43
#define palloc0_object(type)
Definition: fe_memutils.h:75
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:603
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:514
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:388
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:56
List * lappend(List *list, void *datum)
Definition: list.c:339
#define AccessShareLock
Definition: lockdefs.h:36
char * pstrdup(const char *in)
Definition: mcxt.c:1759
void pfree(void *pointer)
Definition: mcxt.c:1594
FormData_pg_subscription_rel * Form_pg_subscription_rel
static Datum CharGetDatum(char X)
Definition: postgres.h:132
#define InvalidOid
Definition: postgres_ext.h:37
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
static void copy_sequences(WalReceiverConn *conn)
Definition: sequencesync.c:375
#define BTEqualStrategyNumber
Definition: stratnum.h:31
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
Definition: walreceiver.h:435
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4609

References AccessShareLock, appendStringInfo(), ApplyContext, BTEqualStrategyNumber, CharGetDatum(), CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), Subscription::conninfo, copy_sequences(), StringInfoData::data, ereport, err(), errcode(), errmsg(), ERROR, get_namespace_name(), GETSTRUCT(), GetSystemIdentifier(), HeapTupleIsValid, initStringInfo(), InvalidOid, lappend(), LogicalRepSequenceInfo::localrelid, LogRepWorkerWalRcvConn, MemoryContextSwitchTo(), MyLogicalRepWorker, MySubscription, Subscription::name, NoLock, LogicalRepSequenceInfo::nspname, ObjectIdGetDatum(), Subscription::oid, Subscription::ownersuperuser, palloc0_object, Subscription::passwordrequired, pfree(), pstrdup(), RelationData::rd_rel, RelationGetNamespace, RelationGetRelationName, RowExclusiveLock, ScanKeyInit(), seqinfos, LogicalRepSequenceInfo::seqname, StartTransactionCommand(), LogicalRepWorker::subid, systable_beginscan(), systable_endscan(), systable_getnext(), table_close(), table_open(), try_table_open(), UINT64_FORMAT, and walrcv_connect.

Referenced by start_sequence_sync().

◆ ProcessSequencesForSync()

void ProcessSequencesForSync ( void  )

Definition at line 94 of file sequencesync.c.

95{
96 LogicalRepWorker *sequencesync_worker;
97 int nsyncworkers;
98 bool has_pending_sequences;
99 bool started_tx;
100
101 FetchRelationStates(NULL, &has_pending_sequences, &started_tx);
102
103 if (started_tx)
104 {
106 pgstat_report_stat(true);
107 }
108
109 if (!has_pending_sequences)
110 return;
111
112 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
113
114 /* Check if there is a sequencesync worker already running? */
117 InvalidOid, true);
118 if (sequencesync_worker)
119 {
120 LWLockRelease(LogicalRepWorkerLock);
121 return;
122 }
123
124 /*
125 * Count running sync workers for this subscription, while we have the
126 * lock.
127 */
129 LWLockRelease(LogicalRepWorkerLock);
130
131 /*
132 * It is okay to read/update last_seqsync_start_time here in apply worker
133 * as we have already ensured that sync worker doesn't exist.
134 */
137}
LogicalRepWorker * logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, bool only_running)
Definition: launcher.c:258
int logicalrep_sync_worker_count(Oid subid)
Definition: launcher.c:927
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1174
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1894
@ LW_SHARED
Definition: lwlock.h:113
long pgstat_report_stat(bool force)
Definition: pgstat.c:694
TimestampTz last_seqsync_start_time
void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid, TimestampTz *last_start_time)
Definition: syncutils.c:117
void FetchRelationStates(bool *has_pending_subtables, bool *has_pending_subsequences, bool *started_tx)
Definition: syncutils.c:202
@ WORKERTYPE_SEQUENCESYNC

References CommitTransactionCommand(), FetchRelationStates(), InvalidOid, LogicalRepWorker::last_seqsync_start_time, launch_sync_worker(), logicalrep_sync_worker_count(), logicalrep_worker_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLogicalRepWorker, pgstat_report_stat(), LogicalRepWorker::subid, and WORKERTYPE_SEQUENCESYNC.

Referenced by ProcessSyncingRelations().

◆ report_sequence_errors()

static void report_sequence_errors ( List mismatched_seqs_idx,
List insuffperm_seqs_idx,
List missing_seqs_idx 
)
static

Definition at line 172 of file sequencesync.c.

174{
175 StringInfo seqstr;
176
177 /* Quick exit if there are no errors to report */
178 if (!mismatched_seqs_idx && !insuffperm_seqs_idx && !missing_seqs_idx)
179 return;
180
181 seqstr = makeStringInfo();
182
183 if (mismatched_seqs_idx)
184 {
185 get_sequences_string(mismatched_seqs_idx, seqstr);
187 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
188 errmsg_plural("mismatched or renamed sequence on subscriber (%s)",
189 "mismatched or renamed sequences on subscriber (%s)",
190 list_length(mismatched_seqs_idx),
191 seqstr->data));
192 }
193
194 if (insuffperm_seqs_idx)
195 {
196 get_sequences_string(insuffperm_seqs_idx, seqstr);
198 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
199 errmsg_plural("insufficient privileges on sequence (%s)",
200 "insufficient privileges on sequences (%s)",
201 list_length(insuffperm_seqs_idx),
202 seqstr->data));
203 }
204
205 if (missing_seqs_idx)
206 {
207 get_sequences_string(missing_seqs_idx, seqstr);
209 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
210 errmsg_plural("missing sequence on publisher (%s)",
211 "missing sequences on publisher (%s)",
212 list_length(missing_seqs_idx),
213 seqstr->data));
214 }
215
217 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
218 errmsg("logical replication sequence synchronization failed for subscription \"%s\"",
220}
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
Definition: elog.c:1193
#define WARNING
Definition: elog.h:36
static void get_sequences_string(List *seqindexes, StringInfo buf)
Definition: sequencesync.c:146

References StringInfoData::data, ereport, errcode(), errmsg(), errmsg_plural(), ERROR, get_sequences_string(), list_length(), makeStringInfo(), MySubscription, Subscription::name, and WARNING.

Referenced by copy_sequences().

◆ SequenceSyncWorkerMain()

void SequenceSyncWorkerMain ( Datum  main_arg)

Definition at line 746 of file sequencesync.c.

747{
748 int worker_slot = DatumGetInt32(main_arg);
749
750 SetupApplyOrSyncWorker(worker_slot);
751
753
755}
void SetupApplyOrSyncWorker(int worker_slot)
Definition: worker.c:5869
static void start_sequence_sync(void)
Definition: sequencesync.c:714
pg_noreturn void FinishSyncWorker(void)
Definition: syncutils.c:50

References DatumGetInt32(), FinishSyncWorker(), SetupApplyOrSyncWorker(), and start_sequence_sync().

◆ start_sequence_sync()

static void start_sequence_sync ( void  )
static

Definition at line 714 of file sequencesync.c.

715{
717
718 PG_TRY();
719 {
720 /* Call initial sync. */
722 }
723 PG_CATCH();
724 {
727 else
728 {
729 /*
730 * Report the worker failed during sequence synchronization. Abort
731 * the current transaction so that the stats message is sent in an
732 * idle state.
733 */
737
738 PG_RE_THROW();
739 }
740 }
741 PG_END_TRY();
742}
void DisableSubscriptionAndExit(void)
Definition: worker.c:5943
#define PG_RE_THROW()
Definition: elog.h:405
#define PG_TRY(...)
Definition: elog.h:372
#define PG_END_TRY(...)
Definition: elog.h:397
#define PG_CATCH(...)
Definition: elog.h:382
void pgstat_report_subscription_error(Oid subid, LogicalRepWorkerType wtype)
static void LogicalRepSyncSequences(void)
Definition: sequencesync.c:600
static bool am_sequencesync_worker(void)
void AbortOutOfAnyTransaction(void)
Definition: xact.c:4880

References AbortOutOfAnyTransaction(), am_sequencesync_worker(), Assert(), Subscription::disableonerr, DisableSubscriptionAndExit(), LogicalRepSyncSequences(), MySubscription, Subscription::oid, PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, pgstat_report_subscription_error(), and WORKERTYPE_SEQUENCESYNC.

Referenced by SequenceSyncWorkerMain().

Variable Documentation

◆ seqinfos

List* seqinfos = NIL
static