144finish_sync_worker(
void)
161 (
errmsg(
"logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
199 if (
state == SUBREL_STATE_UNKNOWN)
202 if (
state == expected_state)
215 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
256 if (worker && worker->
proc)
268 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
338 sizeof(syncslotname));
390 finish_sync_worker();
420 struct tablesync_start_time_mapping
427 bool started_tx =
false;
428 bool should_exit =
false;
444 ctl.keysize =
sizeof(
Oid);
445 ctl.entrysize =
sizeof(
struct tablesync_start_time_mapping);
467 if (rstate->
state == SUBREL_STATE_SYNCDONE)
474 if (current_lsn >= rstate->
lsn)
478 rstate->
state = SUBREL_STATE_READY;
479 rstate->
lsn = current_lsn;
521 rstate->
relid,
false);
529 if (rstate->
state == SUBREL_STATE_SYNCWAIT)
535 syncworker->
relstate = SUBREL_STATE_CATCHUP;
542 if (rstate->
state == SUBREL_STATE_SYNCWAIT)
545 if (syncworker->
proc)
572 SUBREL_STATE_SYNCDONE);
597 struct tablesync_start_time_mapping *hentry;
614 hentry->last_start_time =
now;
641 (
errmsg(
"logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
705 attnamelist =
lappend(attnamelist,
735 while (maxread > 0 && bytesread < minread)
763 outbuf = (
char *) outbuf + avail;
769 if (maxread <= 0 || bytesread >= minread)
779 fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
798 List **qual,
bool *gencol_published)
803 Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
804 Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
805 Oid qualRow[] = {TEXTOID};
818 " FROM pg_catalog.pg_class c"
819 " INNER JOIN pg_catalog.pg_namespace n"
820 " ON (c.relnamespace = n.oid)"
821 " WHERE n.nspname = %s"
822 " AND c.relname = %s",
830 (
errcode(ERRCODE_CONNECTION_FAILURE),
831 errmsg(
"could not fetch table info for table \"%s.%s\" from publisher: %s",
837 (
errcode(ERRCODE_UNDEFINED_OBJECT),
838 errmsg(
"table \"%s.%s\" not found on publisher",
862 Oid attrsRow[] = {INT2VECTOROID};
875 " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
876 " THEN NULL ELSE gpt.attrs END)"
877 " FROM pg_publication p,"
878 " LATERAL pg_get_publication_tables(p.pubname) gpt,"
880 " WHERE gpt.relid = %u AND c.oid = gpt.relid"
881 " AND p.pubname IN ( %s )",
890 (
errcode(ERRCODE_CONNECTION_FAILURE),
891 errmsg(
"could not fetch column list info for table \"%s.%s\" from publisher: %s",
904 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
905 errmsg(
"cannot use different column lists for table \"%s.%s\" in different publications",
929 for (natt = 0; natt < nelems; natt++)
948 " a.attnum = ANY(i.indkey)");
955 " FROM pg_catalog.pg_attribute a"
956 " LEFT JOIN pg_catalog.pg_index i"
957 " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
958 " WHERE a.attnum > 0::pg_catalog.int2"
959 " AND NOT a.attisdropped %s"
960 " AND a.attrelid = %u"
961 " ORDER BY a.attnum",
964 "AND a.attgenerated = ''" :
""),
971 (
errcode(ERRCODE_CONNECTION_FAILURE),
972 errmsg(
"could not fetch table info for table \"%s.%s\" from publisher: %s",
1004 lrel->
attnames[natt] = rel_colname;
1020 elog(
ERROR,
"too many columns in remote table \"%s.%s\"",
1053 Assert(pub_names != NULL);
1058 "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
1059 " FROM pg_publication p,"
1060 " LATERAL pg_get_publication_tables(p.pubname) gpt"
1061 " WHERE gpt.relid = %u"
1062 " AND p.pubname IN ( %s )",
1070 (
errmsg(
"could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
1126 bool gencol_published =
false;
1144 if (lrel.
relkind == RELKIND_RELATION && qual ==
NIL && !gencol_published)
1158 for (
int i = 0;
i < lrel.
natts;
i++)
1185 for (
int i = 0;
i < lrel.
natts;
i++)
1198 if (lrel.
relkind == RELKIND_RELATION)
1236 (
errcode(ERRCODE_CONNECTION_FAILURE),
1237 errmsg(
"could not start initial contents copy for table \"%s.%s\": %s",
1245 NULL,
false,
false);
1275 char *syncslotname,
Size szslot)
1302 bool must_use_password;
1327 case SUBREL_STATE_SYNCDONE:
1328 case SUBREL_STATE_READY:
1329 case SUBREL_STATE_UNKNOWN:
1330 finish_sync_worker();
1351 (
errcode(ERRCODE_CONNECTION_FAILURE),
1352 errmsg(
"table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",
1363 sizeof(originname));
1399 goto copy_table_done;
1432 "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1436 (
errcode(ERRCODE_CONNECTION_FAILURE),
1437 errmsg(
"table copy could not start transaction on publisher: %s",
1447 slotname,
false ,
false ,
1480 errmsg(
"replication origin \"%s\" already exists",
1512 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1513 errmsg(
"user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1525 (
errcode(ERRCODE_CONNECTION_FAILURE),
1526 errmsg(
"table copy could not finish transaction on publisher: %s",
1544 SUBREL_STATE_FINISHEDCOPY,
1552 "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
1582 static bool has_subrels =
false;
1584 *started_tx =
false;
1610 foreach(lc, rstates)
1653 char *sync_slotname = NULL;
1683 pfree(sync_slotname);
1697 char *slotname = NULL;
1705 sizeof(originname));
1727 finish_sync_worker();
1741 bool started_tx =
false;
1742 bool has_subrels =
false;
1768 bool nulls[Natts_pg_subscription];
1769 bool replaces[Natts_pg_subscription];
1772 Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
1773 new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1774 new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1780 "cache lookup failed for subscription oid %u",
1785 memset(nulls,
false,
sizeof(nulls));
1786 memset(replaces,
false,
sizeof(replaces));
1790 replaces[Anum_pg_subscription_subtwophasestate - 1] =
true;
1793 values, nulls, replaces);
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
#define DatumGetArrayTypeP(X)
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
void start_apply(XLogRecPtr origin_startpos)
void DisableSubscriptionAndExit(void)
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
void set_apply_error_context_origin(char *originname)
MemoryContext ApplyContext
void SetupApplyOrSyncWorker(int worker_slot)
WalReceiverConn * LogRepWorkerWalRcvConn
Subscription * MySubscription
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
TimestampTz GetCurrentTimestamp(void)
Datum now(PG_FUNCTION_ARGS)
bool bms_is_member(int x, const Bitmapset *a)
Bitmapset * bms_add_member(Bitmapset *a, int x)
static Datum values[MAXATTR]
#define TextDatumGetCString(d)
#define Assert(condition)
#define OidIsValid(objectId)
CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
uint64 CopyFrom(CopyFromState cstate)
static void PGresult * res
#define DSM_HANDLE_INVALID
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
void hash_destroy(HTAB *hashp)
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void err(int eval, const char *fmt,...)
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
const TupleTableSlotOps TTSOpsMinimalTuple
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
void heap_freetuple(HeapTuple htup)
#define HeapTupleIsValid(tuple)
#define MaxTupleAttributeNumber
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define WL_SOCKET_READABLE
#define WL_EXIT_ON_PM_DEATH
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm)
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
void logicalrep_worker_wakeup(Oid subid, Oid relid)
static dshash_table * last_start_times
LogicalRepWorker * MyLogicalRepWorker
int max_sync_workers_per_subscription
int logicalrep_sync_worker_count(Oid subid)
void ApplyLauncherForgetWorkerStartTime(Oid subid)
List * lappend(List *list, void *datum)
void list_free_deep(List *list)
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
void LockRelationOid(Oid relid, LOCKMODE lockmode)
char * get_rel_name(Oid relid)
char * get_namespace_name(Oid nspid)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
DefElem * makeDefElem(char *name, Node *arg, int location)
char * MemoryContextStrdup(MemoryContext context, const char *string)
void pfree(void *pointer)
void * palloc0(Size size)
MemoryContext CacheMemoryContext
#define CHECK_FOR_INTERRUPTS()
char * GetUserNameFromId(Oid roleid, bool noerr)
ObjectType get_relkind_objtype(char relkind)
TimestampTz replorigin_session_origin_timestamp
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
RepOriginId replorigin_create(const char *roname)
void replorigin_session_reset(void)
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
RepOriginId replorigin_session_origin
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
void replorigin_session_setup(RepOriginId node, int acquired_by)
XLogRecPtr replorigin_session_get_progress(bool flush)
XLogRecPtr replorigin_session_origin_lsn
#define InvalidRepOriginId
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
ParseState * make_parsestate(ParseState *parentParseState)
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, int lockmode, Alias *alias, bool inh, bool inFromCl)
static int server_version
#define for_each_from(cell, lst, N)
List * GetSubscriptionRelations(Oid subid, bool not_ready)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
bool HasSubscriptionRelations(Oid subid)
long pgstat_report_stat(bool force)
void pgstat_report_subscription_error(Oid subid, bool is_apply_error)
static bool DatumGetBool(Datum X)
static Oid DatumGetObjectId(Datum X)
static Datum ObjectIdGetDatum(Oid X)
static char DatumGetChar(Datum X)
static int16 DatumGetInt16(Datum X)
static int32 DatumGetInt32(Datum X)
static Datum CharGetDatum(char X)
static int fd(const char *x, int i)
char * quote_literal_cstr(const char *rawstr)
#define RelationGetRelid(relation)
#define RelationGetDescr(relation)
#define RelationGetRelationName(relation)
#define RelationGetNamespace(relation)
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
char * quote_qualified_identifier(const char *qualifier, const char *ident)
const char * quote_identifier(const char *ident)
Snapshot GetTransactionSnapshot(void)
void PushActiveSnapshot(Snapshot snapshot)
void PopActiveSnapshot(void)
void InvalidateCatalogSnapshot(void)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
#define ERRCODE_DUPLICATE_OBJECT
void destroyStringInfo(StringInfo str)
StringInfo makeStringInfo(void)
void resetStringInfo(StringInfo str)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoString(StringInfo str, const char *s)
void appendStringInfoChar(StringInfo str, char ch)
void initStringInfo(StringInfo str)
LogicalRepRelation remoterel
LogicalRepWorkerType type
Tuplestorestate * tuplestore
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
#define SearchSysCacheCopy1(cacheId, key1)
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
static List * table_states_not_ready
bool AllTablesyncsReady(void)
static bool wait_for_worker_state_change(char expected_state)
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
@ SYNC_TABLE_STATE_REBUILD_STARTED
@ SYNC_TABLE_STATE_NEEDS_REBUILD
static void pg_attribute_noreturn() finish_sync_worker(void)
static void process_syncing_tables_for_apply(XLogRecPtr current_lsn)
static List * make_copy_attnamelist(LogicalRepRelMapEntry *rel)
void TablesyncWorkerMain(Datum main_arg)
static void process_syncing_tables_for_sync(XLogRecPtr current_lsn)
static void fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, List **qual, bool *gencol_published)
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
static void run_tablesync_worker()
static int copy_read_data(void *outbuf, int minread, int maxread)
static SyncingTablesState table_states_validity
void process_syncing_tables(XLogRecPtr current_lsn)
static char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
static void copy_table(Relation rel)
static bool wait_for_relation_state_change(Oid relid, char expected_state)
static void start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
static StringInfo copybuf
static bool FetchTableStates(bool *started_tx)
void UpdateTwoPhaseState(Oid suboid, char new_state)
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
int64 tuplestore_tuple_count(Tuplestorestate *state)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
void SwitchToUntrustedUser(Oid userid, UserContext *context)
void RestoreUserContext(UserContext *context)
String * makeString(char *str)
#define walrcv_startstreaming(conn, options)
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_server_version(conn)
#define walrcv_endstreaming(conn, next_tli)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
#define walrcv_receive(conn, buffer, wait_fd)
@ WORKERTYPE_PARALLEL_APPLY
static bool am_tablesync_worker(void)
bool IsTransactionState(void)
void CommandCounterIncrement(void)
void StartTransactionCommand(void)
void CommitTransactionCommand(void)
void AbortOutOfAnyTransaction(void)
uint64 GetSystemIdentifier(void)
XLogRecPtr GetXLogWriteRecPtr(void)
int wal_retrieve_retry_interval
void XLogFlush(XLogRecPtr record)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr