65#include "utils/fmgroids.h"
74#define REMOTE_SEQ_COL_COUNT 10
189 errmsg_plural(
"mismatched or renamed sequence on subscriber (%s)",
190 "mismatched or renamed sequences on subscriber (%s)",
201 "insufficient privileges on sequences (%s)",
212 "missing sequences on publisher (%s)",
219 errmsg(
"logical replication sequence synchronization failed for subscription \"%s\"",
303 elog(
ERROR,
"cache lookup failed for sequence %u",
394#define MAX_SEQUENCES_SYNC_PER_BATCH 100
397 "logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d",
471 "SELECT s.seqidx, ps.*, seq.seqtypid,\n"
472 " seq.seqstart, seq.seqincrement, seq.seqmin,\n"
473 " seq.seqmax, seq.seqcycle\n"
474 "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n"
475 "JOIN pg_namespace n ON n.nspname = s.schname\n"
476 "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n"
477 "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n"
478 "JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n",
485 errmsg(
"could not fetch sequence information from the publisher: %s",
513 "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished",
555 errmsg(
"skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently",
578 "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped",
716 errmsg(
"sequencesync worker for subscription \"%s\" could not connect to the publisher: %s",
Datum idx(PG_FUNCTION_ARGS)
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
void DisableSubscriptionAndExit(void)
MemoryContext ApplyContext
void SetupApplyOrSyncWorker(int worker_slot)
WalReceiverConn * LogRepWorkerWalRcvConn
Subscription * MySubscription
#define Assert(condition)
void SetSequence(Oid relid, int64 next, bool iscalled)
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void err(int eval, const char *fmt,...)
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
const TupleTableSlotOps TTSOpsMinimalTuple
#define palloc0_object(type)
void systable_endscan(SysScanDesc sysscan)
HeapTuple systable_getnext(SysScanDesc sysscan)
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
void ProcessConfigFile(GucContext context)
#define HeapTupleIsValid(tuple)
static void * GETSTRUCT(const HeapTupleData *tuple)
volatile sig_atomic_t ConfigReloadPending
LogicalRepWorker * logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, bool only_running)
LogicalRepWorker * MyLogicalRepWorker
int logicalrep_sync_worker_count(Oid subid)
List * lappend(List *list, void *datum)
List * lappend_int(List *list, int datum)
char * get_namespace_name(Oid nspid)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
char * pstrdup(const char *in)
void pfree(void *pointer)
#define CHECK_FOR_INTERRUPTS()
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
static int list_length(const List *l)
static void * list_nth(const List *list, int n)
#define foreach_int(var, lst)
static XLogRecPtr DatumGetLSN(Datum X)
FormData_pg_sequence * Form_pg_sequence
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool already_locked)
FormData_pg_subscription_rel * Form_pg_subscription_rel
static char buf[DEFAULT_XLOG_SEG_SIZE]
long pgstat_report_stat(bool force)
void pgstat_report_subscription_error(Oid subid, LogicalRepWorkerType wtype)
static bool DatumGetBool(Datum X)
static int64 DatumGetInt64(Datum X)
static Oid DatumGetObjectId(Datum X)
static Datum ObjectIdGetDatum(Oid X)
static int32 DatumGetInt32(Datum X)
static Datum CharGetDatum(char X)
char * quote_literal_cstr(const char *rawstr)
#define RelationGetRelationName(relation)
#define RelationGetNamespace(relation)
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
#define REMOTE_SEQ_COL_COUNT
@ COPYSEQ_INSUFFICIENT_PERM
static CopySeqResult get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel, LogicalRepSequenceInfo **seqinfo, int *seqidx)
#define MAX_SEQUENCES_SYNC_PER_BATCH
void SequenceSyncWorkerMain(Datum main_arg)
static void start_sequence_sync(void)
static void LogicalRepSyncSequences(void)
static void copy_sequences(WalReceiverConn *conn)
static void get_sequences_string(List *seqindexes, StringInfo buf)
static void report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx, List *missing_seqs_idx)
void ProcessSequencesForSync(void)
static CopySeqResult copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner)
#define BTEqualStrategyNumber
StringInfo makeStringInfo(void)
void resetStringInfo(StringInfo str)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoString(StringInfo str, const char *s)
void initStringInfo(StringInfo str)
TimestampTz last_seqsync_start_time
Tuplestorestate * tuplestore
void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid, TimestampTz *last_start_time)
pg_noreturn void FinishSyncWorker(void)
void FetchRelationStates(bool *has_pending_subtables, bool *has_pending_subsequences, bool *started_tx)
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Relation try_table_open(Oid relationId, LOCKMODE lockmode)
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
void SwitchToUntrustedUser(Oid userid, UserContext *context)
void RestoreUserContext(UserContext *context)
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
@ WORKERTYPE_SEQUENCESYNC
static bool am_sequencesync_worker(void)
void StartTransactionCommand(void)
void CommitTransactionCommand(void)
void AbortOutOfAnyTransaction(void)
uint64 GetSystemIdentifier(void)