144 finish_sync_worker(
void)
161 (
errmsg(
"logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
199 if (
state == SUBREL_STATE_UNKNOWN)
202 if (
state == expected_state)
215 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
256 if (worker && worker->
proc)
268 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
338 sizeof(syncslotname));
390 finish_sync_worker();
420 struct tablesync_start_time_mapping
427 bool started_tx =
false;
428 bool should_exit =
false;
444 ctl.keysize =
sizeof(
Oid);
445 ctl.entrysize =
sizeof(
struct tablesync_start_time_mapping);
467 if (rstate->
state == SUBREL_STATE_SYNCDONE)
474 if (current_lsn >= rstate->
lsn)
478 rstate->
state = SUBREL_STATE_READY;
479 rstate->
lsn = current_lsn;
521 rstate->
relid,
false);
529 if (rstate->
state == SUBREL_STATE_SYNCWAIT)
535 syncworker->
relstate = SUBREL_STATE_CATCHUP;
542 if (rstate->
state == SUBREL_STATE_SYNCWAIT)
545 if (syncworker->
proc)
572 SUBREL_STATE_SYNCDONE);
597 struct tablesync_start_time_mapping *hentry;
614 hentry->last_start_time =
now;
641 (
errmsg(
"logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
705 attnamelist =
lappend(attnamelist,
735 while (maxread > 0 && bytesread < minread)
763 outbuf = (
void *) ((
char *) outbuf + avail);
769 if (maxread <= 0 || bytesread >= minread)
779 fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
800 Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
801 Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
802 Oid qualRow[] = {TEXTOID};
814 " FROM pg_catalog.pg_class c"
815 " INNER JOIN pg_catalog.pg_namespace n"
816 " ON (c.relnamespace = n.oid)"
817 " WHERE n.nspname = %s"
818 " AND c.relname = %s",
826 (
errcode(ERRCODE_CONNECTION_FAILURE),
827 errmsg(
"could not fetch table info for table \"%s.%s\" from publisher: %s",
833 (
errcode(ERRCODE_UNDEFINED_OBJECT),
834 errmsg(
"table \"%s.%s\" not found on publisher",
858 Oid attrsRow[] = {INT2VECTOROID};
876 " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
877 " THEN NULL ELSE gpt.attrs END)"
878 " FROM pg_publication p,"
879 " LATERAL pg_get_publication_tables(p.pubname) gpt,"
881 " WHERE gpt.relid = %u AND c.oid = gpt.relid"
882 " AND p.pubname IN ( %s )",
891 (
errcode(ERRCODE_CONNECTION_FAILURE),
892 errmsg(
"could not fetch column list info for table \"%s.%s\" from publisher: %s",
905 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
906 errmsg(
"cannot use different column lists for table \"%s.%s\" in different publications",
930 for (natt = 0; natt < nelems; natt++)
951 " a.attnum = ANY(i.indkey)"
952 " FROM pg_catalog.pg_attribute a"
953 " LEFT JOIN pg_catalog.pg_index i"
954 " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
955 " WHERE a.attnum > 0::pg_catalog.int2"
956 " AND NOT a.attisdropped %s"
957 " AND a.attrelid = %u"
958 " ORDER BY a.attnum",
961 "AND a.attgenerated = ''" :
""),
968 (
errcode(ERRCODE_CONNECTION_FAILURE),
969 errmsg(
"could not fetch table info for table \"%s.%s\" from publisher: %s",
1001 lrel->
attnames[natt] = rel_colname;
1010 elog(
ERROR,
"too many columns in remote table \"%s.%s\"",
1048 char *pubname =
strVal(pubstr);
1059 "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
1060 " FROM pg_publication p,"
1061 " LATERAL pg_get_publication_tables(p.pubname) gpt"
1062 " WHERE gpt.relid = %u"
1063 " AND p.pubname IN ( %s )",
1071 (
errmsg(
"could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
1142 if (lrel.
relkind == RELKIND_RELATION && qual ==
NIL)
1156 for (
int i = 0;
i < lrel.
natts;
i++)
1178 for (
int i = 0;
i < lrel.
natts;
i++)
1191 if (lrel.
relkind == RELKIND_RELATION)
1229 (
errcode(ERRCODE_CONNECTION_FAILURE),
1230 errmsg(
"could not start initial contents copy for table \"%s.%s\": %s",
1238 NULL,
false,
false);
1268 char *syncslotname,
Size szslot)
1295 bool must_use_password;
1320 case SUBREL_STATE_SYNCDONE:
1321 case SUBREL_STATE_READY:
1322 case SUBREL_STATE_UNKNOWN:
1323 finish_sync_worker();
1344 (
errcode(ERRCODE_CONNECTION_FAILURE),
1345 errmsg(
"table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",
1356 sizeof(originname));
1392 goto copy_table_done;
1425 "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1429 (
errcode(ERRCODE_CONNECTION_FAILURE),
1430 errmsg(
"table copy could not start transaction on publisher: %s",
1440 slotname,
false ,
false ,
1473 errmsg(
"replication origin \"%s\" already exists",
1505 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1506 errmsg(
"user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1518 (
errcode(ERRCODE_CONNECTION_FAILURE),
1519 errmsg(
"table copy could not finish transaction on publisher: %s",
1537 SUBREL_STATE_FINISHEDCOPY,
1545 "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
1575 static bool has_subrels =
false;
1577 *started_tx =
false;
1603 foreach(lc, rstates)
1646 char *sync_slotname = NULL;
1676 pfree(sync_slotname);
1690 char *slotname = NULL;
1698 sizeof(originname));
1720 finish_sync_worker();
1734 bool started_tx =
false;
1735 bool has_subrels =
false;
1761 bool nulls[Natts_pg_subscription];
1762 bool replaces[Natts_pg_subscription];
1773 "cache lookup failed for subscription oid %u",
1778 memset(nulls,
false,
sizeof(nulls));
1779 memset(replaces,
false,
sizeof(replaces));
1783 replaces[Anum_pg_subscription_subtwophasestate - 1] =
true;
1786 values, nulls, replaces);
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
#define DatumGetArrayTypeP(X)
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
void start_apply(XLogRecPtr origin_startpos)
void DisableSubscriptionAndExit(void)
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
void set_apply_error_context_origin(char *originname)
MemoryContext ApplyContext
void SetupApplyOrSyncWorker(int worker_slot)
WalReceiverConn * LogRepWorkerWalRcvConn
Subscription * MySubscription
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
TimestampTz GetCurrentTimestamp(void)
Datum now(PG_FUNCTION_ARGS)
bool bms_is_member(int x, const Bitmapset *a)
Bitmapset * bms_add_member(Bitmapset *a, int x)
static Datum values[MAXATTR]
#define TextDatumGetCString(d)
#define Assert(condition)
#define OidIsValid(objectId)
CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
uint64 CopyFrom(CopyFromState cstate)
static void PGresult * res
#define DSM_HANDLE_INVALID
void hash_destroy(HTAB *hashp)
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void err(int eval, const char *fmt,...)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
const TupleTableSlotOps TTSOpsMinimalTuple
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
void heap_freetuple(HeapTuple htup)
#define HeapTupleIsValid(tuple)
#define MaxTupleAttributeNumber
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define WL_SOCKET_READABLE
#define WL_EXIT_ON_PM_DEATH
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm)
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
void logicalrep_worker_wakeup(Oid subid, Oid relid)
static dshash_table * last_start_times
LogicalRepWorker * MyLogicalRepWorker
int max_sync_workers_per_subscription
int logicalrep_sync_worker_count(Oid subid)
void ApplyLauncherForgetWorkerStartTime(Oid subid)
List * lappend(List *list, void *datum)
void list_free_deep(List *list)
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
void LockRelationOid(Oid relid, LOCKMODE lockmode)
char * get_namespace_name(Oid nspid)
char * get_rel_name(Oid relid)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
DefElem * makeDefElem(char *name, Node *arg, int location)
void pfree(void *pointer)
void * palloc0(Size size)
char * MemoryContextStrdup(MemoryContext context, const char *string)
MemoryContext CacheMemoryContext
#define CHECK_FOR_INTERRUPTS()
char * GetUserNameFromId(Oid roleid, bool noerr)
ObjectType get_relkind_objtype(char relkind)
TimestampTz replorigin_session_origin_timestamp
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
RepOriginId replorigin_create(const char *roname)
void replorigin_session_reset(void)
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
RepOriginId replorigin_session_origin
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
void replorigin_session_setup(RepOriginId node, int acquired_by)
XLogRecPtr replorigin_session_get_progress(bool flush)
XLogRecPtr replorigin_session_origin_lsn
#define InvalidRepOriginId
ParseState * make_parsestate(ParseState *parentParseState)
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, int lockmode, Alias *alias, bool inh, bool inFromCl)
#define foreach_current_index(var_or_cell)
#define for_each_from(cell, lst, N)
#define foreach_node(type, var, lst)
List * GetSubscriptionRelations(Oid subid, bool not_ready)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
bool HasSubscriptionRelations(Oid subid)
#define LOGICALREP_TWOPHASE_STATE_DISABLED
#define LOGICALREP_TWOPHASE_STATE_PENDING
#define LOGICALREP_TWOPHASE_STATE_ENABLED
long pgstat_report_stat(bool force)
void pgstat_report_subscription_error(Oid subid, bool is_apply_error)
static bool DatumGetBool(Datum X)
static Oid DatumGetObjectId(Datum X)
static Datum ObjectIdGetDatum(Oid X)
static char DatumGetChar(Datum X)
static int16 DatumGetInt16(Datum X)
static int32 DatumGetInt32(Datum X)
static Datum CharGetDatum(char X)
static int fd(const char *x, int i)
char * quote_literal_cstr(const char *rawstr)
MemoryContextSwitchTo(old_ctx)
#define RelationGetRelid(relation)
#define RelationGetDescr(relation)
#define RelationGetRelationName(relation)
#define RelationGetNamespace(relation)
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
const char * quote_identifier(const char *ident)
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Snapshot GetTransactionSnapshot(void)
void PushActiveSnapshot(Snapshot snapshot)
void PopActiveSnapshot(void)
void InvalidateCatalogSnapshot(void)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
#define ERRCODE_DUPLICATE_OBJECT
StringInfo makeStringInfo(void)
void resetStringInfo(StringInfo str)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoString(StringInfo str, const char *s)
void appendStringInfoChar(StringInfo str, char ch)
void initStringInfo(StringInfo str)
LogicalRepRelation remoterel
LogicalRepWorkerType type
Tuplestorestate * tuplestore
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
#define SearchSysCacheCopy1(cacheId, key1)
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
static List * table_states_not_ready
bool AllTablesyncsReady(void)
static bool wait_for_worker_state_change(char expected_state)
static char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
@ SYNC_TABLE_STATE_REBUILD_STARTED
@ SYNC_TABLE_STATE_NEEDS_REBUILD
static void pg_attribute_noreturn() finish_sync_worker(void)
static void process_syncing_tables_for_apply(XLogRecPtr current_lsn)
void TablesyncWorkerMain(Datum main_arg)
static void process_syncing_tables_for_sync(XLogRecPtr current_lsn)
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
static void run_tablesync_worker()
static int copy_read_data(void *outbuf, int minread, int maxread)
static SyncingTablesState table_states_validity
void process_syncing_tables(XLogRecPtr current_lsn)
static void copy_table(Relation rel)
static bool wait_for_relation_state_change(Oid relid, char expected_state)
static void start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
static StringInfo copybuf
static bool FetchTableStates(bool *started_tx)
static void fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, List **qual)
static List * make_copy_attnamelist(LogicalRepRelMapEntry *rel)
void UpdateTwoPhaseState(Oid suboid, char new_state)
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
int64 tuplestore_tuple_count(Tuplestorestate *state)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
void SwitchToUntrustedUser(Oid userid, UserContext *context)
void RestoreUserContext(UserContext *context)
String * makeString(char *str)
#define walrcv_startstreaming(conn, options)
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_server_version(conn)
#define walrcv_endstreaming(conn, next_tli)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
#define walrcv_receive(conn, buffer, wait_fd)
@ WORKERTYPE_PARALLEL_APPLY
static bool am_tablesync_worker(void)
bool IsTransactionState(void)
void CommandCounterIncrement(void)
void StartTransactionCommand(void)
void CommitTransactionCommand(void)
void AbortOutOfAnyTransaction(void)
uint64 GetSystemIdentifier(void)
XLogRecPtr GetXLogWriteRecPtr(void)
int wal_retrieve_retry_interval
void XLogFlush(XLogRecPtr record)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr