160 (
errmsg(
"logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
198 if (
state == SUBREL_STATE_UNKNOWN)
201 if (
state == expected_state)
214 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
255 if (worker && worker->
proc)
267 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
337 sizeof(syncslotname));
419 struct tablesync_start_time_mapping
426 bool started_tx =
false;
427 bool should_exit =
false;
443 ctl.keysize =
sizeof(
Oid);
444 ctl.entrysize =
sizeof(
struct tablesync_start_time_mapping);
466 if (rstate->
state == SUBREL_STATE_SYNCDONE)
473 if (current_lsn >= rstate->
lsn)
477 rstate->
state = SUBREL_STATE_READY;
478 rstate->
lsn = current_lsn;
520 rstate->
relid,
false);
528 if (rstate->
state == SUBREL_STATE_SYNCWAIT)
534 syncworker->
relstate = SUBREL_STATE_CATCHUP;
541 if (rstate->
state == SUBREL_STATE_SYNCWAIT)
544 if (syncworker->
proc)
571 SUBREL_STATE_SYNCDONE);
596 struct tablesync_start_time_mapping *hentry;
613 hentry->last_start_time =
now;
640 (
errmsg(
"logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
704 attnamelist =
lappend(attnamelist,
734 while (maxread > 0 && bytesread < minread)
762 outbuf = (
char *) outbuf + avail;
768 if (maxread <= 0 || bytesread >= minread)
778 fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
797 List **qual,
bool *gencol_published)
802 Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
803 Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
804 Oid qualRow[] = {TEXTOID};
817 " FROM pg_catalog.pg_class c"
818 " INNER JOIN pg_catalog.pg_namespace n"
819 " ON (c.relnamespace = n.oid)"
820 " WHERE n.nspname = %s"
821 " AND c.relname = %s",
829 (
errcode(ERRCODE_CONNECTION_FAILURE),
830 errmsg(
"could not fetch table info for table \"%s.%s\" from publisher: %s",
836 (
errcode(ERRCODE_UNDEFINED_OBJECT),
837 errmsg(
"table \"%s.%s\" not found on publisher",
861 Oid attrsRow[] = {INT2VECTOROID};
874 " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
875 " THEN NULL ELSE gpt.attrs END)"
876 " FROM pg_publication p,"
877 " LATERAL pg_get_publication_tables(p.pubname) gpt,"
879 " WHERE gpt.relid = %u AND c.oid = gpt.relid"
880 " AND p.pubname IN ( %s )",
889 (
errcode(ERRCODE_CONNECTION_FAILURE),
890 errmsg(
"could not fetch column list info for table \"%s.%s\" from publisher: %s",
903 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
904 errmsg(
"cannot use different column lists for table \"%s.%s\" in different publications",
928 for (natt = 0; natt < nelems; natt++)
947 " a.attnum = ANY(i.indkey)");
954 " FROM pg_catalog.pg_attribute a"
955 " LEFT JOIN pg_catalog.pg_index i"
956 " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
957 " WHERE a.attnum > 0::pg_catalog.int2"
958 " AND NOT a.attisdropped %s"
959 " AND a.attrelid = %u"
960 " ORDER BY a.attnum",
963 "AND a.attgenerated = ''" :
""),
970 (
errcode(ERRCODE_CONNECTION_FAILURE),
971 errmsg(
"could not fetch table info for table \"%s.%s\" from publisher: %s",
1003 lrel->
attnames[natt] = rel_colname;
1019 elog(
ERROR,
"too many columns in remote table \"%s.%s\"",
1052 Assert(pub_names != NULL);
1057 "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
1058 " FROM pg_publication p,"
1059 " LATERAL pg_get_publication_tables(p.pubname) gpt"
1060 " WHERE gpt.relid = %u"
1061 " AND p.pubname IN ( %s )",
1069 (
errmsg(
"could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
1125 bool gencol_published =
false;
1143 if (lrel.
relkind == RELKIND_RELATION && qual ==
NIL && !gencol_published)
1157 for (
int i = 0;
i < lrel.
natts;
i++)
1184 for (
int i = 0;
i < lrel.
natts;
i++)
1197 if (lrel.
relkind == RELKIND_RELATION)
1235 (
errcode(ERRCODE_CONNECTION_FAILURE),
1236 errmsg(
"could not start initial contents copy for table \"%s.%s\": %s",
1244 NULL,
false,
false);
1274 char *syncslotname,
Size szslot)
1301 bool must_use_password;
1326 case SUBREL_STATE_SYNCDONE:
1327 case SUBREL_STATE_READY:
1328 case SUBREL_STATE_UNKNOWN:
1350 (
errcode(ERRCODE_CONNECTION_FAILURE),
1351 errmsg(
"table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",
1362 sizeof(originname));
1398 goto copy_table_done;
1431 "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1435 (
errcode(ERRCODE_CONNECTION_FAILURE),
1436 errmsg(
"table copy could not start transaction on publisher: %s",
1446 slotname,
false ,
false ,
1479 errmsg(
"replication origin \"%s\" already exists",
1511 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1512 errmsg(
"user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1524 (
errcode(ERRCODE_CONNECTION_FAILURE),
1525 errmsg(
"table copy could not finish transaction on publisher: %s",
1543 SUBREL_STATE_FINISHEDCOPY,
1551 "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
1581 static bool has_subrels =
false;
1583 *started_tx =
false;
1609 foreach(lc, rstates)
1652 char *sync_slotname = NULL;
1682 pfree(sync_slotname);
1696 char *slotname = NULL;
1704 sizeof(originname));
1740 bool started_tx =
false;
1741 bool has_subrels =
false;
1767 bool nulls[Natts_pg_subscription];
1768 bool replaces[Natts_pg_subscription];
1771 Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
1772 new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1773 new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1779 "cache lookup failed for subscription oid %u",
1784 memset(nulls,
false,
sizeof(nulls));
1785 memset(replaces,
false,
sizeof(replaces));
1789 replaces[Anum_pg_subscription_subtwophasestate - 1] =
true;
1792 values, nulls, replaces);
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
#define DatumGetArrayTypeP(X)
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
void start_apply(XLogRecPtr origin_startpos)
void DisableSubscriptionAndExit(void)
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
void set_apply_error_context_origin(char *originname)
MemoryContext ApplyContext
void SetupApplyOrSyncWorker(int worker_slot)
WalReceiverConn * LogRepWorkerWalRcvConn
Subscription * MySubscription
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
TimestampTz GetCurrentTimestamp(void)
Datum now(PG_FUNCTION_ARGS)
bool bms_is_member(int x, const Bitmapset *a)
Bitmapset * bms_add_member(Bitmapset *a, int x)
static Datum values[MAXATTR]
#define TextDatumGetCString(d)
#define OidIsValid(objectId)
CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
uint64 CopyFrom(CopyFromState cstate)
#define DSM_HANDLE_INVALID
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
void hash_destroy(HTAB *hashp)
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void err(int eval, const char *fmt,...)
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
const TupleTableSlotOps TTSOpsMinimalTuple
Assert(PointerIsAligned(start, uint64))
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
void heap_freetuple(HeapTuple htup)
#define HeapTupleIsValid(tuple)
#define MaxTupleAttributeNumber
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm)
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
void logicalrep_worker_wakeup(Oid subid, Oid relid)
static dshash_table * last_start_times
LogicalRepWorker * MyLogicalRepWorker
int max_sync_workers_per_subscription
int logicalrep_sync_worker_count(Oid subid)
void ApplyLauncherForgetWorkerStartTime(Oid subid)
List * lappend(List *list, void *datum)
void list_free_deep(List *list)
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
void LockRelationOid(Oid relid, LOCKMODE lockmode)
char * get_rel_name(Oid relid)
char * get_namespace_name(Oid nspid)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
DefElem * makeDefElem(char *name, Node *arg, int location)
char * MemoryContextStrdup(MemoryContext context, const char *string)
void pfree(void *pointer)
void * palloc0(Size size)
MemoryContext CacheMemoryContext
#define CHECK_FOR_INTERRUPTS()
char * GetUserNameFromId(Oid roleid, bool noerr)
ObjectType get_relkind_objtype(char relkind)
TimestampTz replorigin_session_origin_timestamp
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
RepOriginId replorigin_create(const char *roname)
void replorigin_session_reset(void)
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
RepOriginId replorigin_session_origin
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
void replorigin_session_setup(RepOriginId node, int acquired_by)
XLogRecPtr replorigin_session_get_progress(bool flush)
XLogRecPtr replorigin_session_origin_lsn
#define InvalidRepOriginId
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
ParseState * make_parsestate(ParseState *parentParseState)
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, int lockmode, Alias *alias, bool inh, bool inFromCl)
static int server_version
#define for_each_from(cell, lst, N)
List * GetSubscriptionRelations(Oid subid, bool not_ready)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
bool HasSubscriptionRelations(Oid subid)
long pgstat_report_stat(bool force)
void pgstat_report_subscription_error(Oid subid, bool is_apply_error)
static bool DatumGetBool(Datum X)
static Oid DatumGetObjectId(Datum X)
static Datum ObjectIdGetDatum(Oid X)
static char DatumGetChar(Datum X)
static int16 DatumGetInt16(Datum X)
static int32 DatumGetInt32(Datum X)
static Datum CharGetDatum(char X)
static int fd(const char *x, int i)
char * quote_literal_cstr(const char *rawstr)
#define RelationGetRelid(relation)
#define RelationGetDescr(relation)
#define RelationGetRelationName(relation)
#define RelationGetNamespace(relation)
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
char * quote_qualified_identifier(const char *qualifier, const char *ident)
const char * quote_identifier(const char *ident)
Snapshot GetTransactionSnapshot(void)
void PushActiveSnapshot(Snapshot snapshot)
void PopActiveSnapshot(void)
void InvalidateCatalogSnapshot(void)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
#define ERRCODE_DUPLICATE_OBJECT
void destroyStringInfo(StringInfo str)
StringInfo makeStringInfo(void)
void resetStringInfo(StringInfo str)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoString(StringInfo str, const char *s)
void appendStringInfoChar(StringInfo str, char ch)
void initStringInfo(StringInfo str)
LogicalRepRelation remoterel
LogicalRepWorkerType type
Tuplestorestate * tuplestore
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
#define SearchSysCacheCopy1(cacheId, key1)
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
static List * table_states_not_ready
bool AllTablesyncsReady(void)
static bool wait_for_worker_state_change(char expected_state)
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
@ SYNC_TABLE_STATE_REBUILD_STARTED
@ SYNC_TABLE_STATE_NEEDS_REBUILD
static void process_syncing_tables_for_apply(XLogRecPtr current_lsn)
static List * make_copy_attnamelist(LogicalRepRelMapEntry *rel)
void TablesyncWorkerMain(Datum main_arg)
static pg_noreturn void finish_sync_worker(void)
static void process_syncing_tables_for_sync(XLogRecPtr current_lsn)
static void fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, List **qual, bool *gencol_published)
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
static void run_tablesync_worker()
static int copy_read_data(void *outbuf, int minread, int maxread)
static SyncingTablesState table_states_validity
void process_syncing_tables(XLogRecPtr current_lsn)
static char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
static void copy_table(Relation rel)
static bool wait_for_relation_state_change(Oid relid, char expected_state)
static void start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
static StringInfo copybuf
static bool FetchTableStates(bool *started_tx)
void UpdateTwoPhaseState(Oid suboid, char new_state)
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
int64 tuplestore_tuple_count(Tuplestorestate *state)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
void SwitchToUntrustedUser(Oid userid, UserContext *context)
void RestoreUserContext(UserContext *context)
String * makeString(char *str)
#define WL_SOCKET_READABLE
#define WL_EXIT_ON_PM_DEATH
#define walrcv_startstreaming(conn, options)
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_server_version(conn)
#define walrcv_endstreaming(conn, next_tli)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
#define walrcv_receive(conn, buffer, wait_fd)
@ WORKERTYPE_PARALLEL_APPLY
static bool am_tablesync_worker(void)
bool IsTransactionState(void)
void CommandCounterIncrement(void)
void StartTransactionCommand(void)
void CommitTransactionCommand(void)
void AbortOutOfAnyTransaction(void)
uint64 GetSystemIdentifier(void)
XLogRecPtr GetXLogWriteRecPtr(void)
int wal_retrieve_retry_interval
void XLogFlush(XLogRecPtr record)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr