137 finish_sync_worker(
void)
154 (
errmsg(
"logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
192 if (
state == SUBREL_STATE_UNKNOWN)
195 if (
state == expected_state)
208 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
249 if (worker && worker->
proc)
261 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
331 sizeof(syncslotname));
383 finish_sync_worker();
413 struct tablesync_start_time_mapping
420 bool started_tx =
false;
421 bool should_exit =
false;
438 ctl.
entrysize =
sizeof(
struct tablesync_start_time_mapping);
460 if (rstate->
state == SUBREL_STATE_SYNCDONE)
467 if (current_lsn >= rstate->
lsn)
471 rstate->
state = SUBREL_STATE_READY;
472 rstate->
lsn = current_lsn;
514 rstate->
relid,
false);
522 if (rstate->
state == SUBREL_STATE_SYNCWAIT)
528 syncworker->
relstate = SUBREL_STATE_CATCHUP;
535 if (rstate->
state == SUBREL_STATE_SYNCWAIT)
538 if (syncworker->
proc)
555 SUBREL_STATE_SYNCDONE);
580 struct tablesync_start_time_mapping *hentry;
597 hentry->last_start_time =
now;
624 (
errmsg(
"logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
688 attnamelist =
lappend(attnamelist,
718 while (maxread > 0 && bytesread < minread)
746 outbuf = (
void *) ((
char *) outbuf + avail);
752 if (maxread <= 0 || bytesread >= minread)
762 fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
783 Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
784 Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
785 Oid qualRow[] = {TEXTOID};
797 " FROM pg_catalog.pg_class c"
798 " INNER JOIN pg_catalog.pg_namespace n"
799 " ON (c.relnamespace = n.oid)"
800 " WHERE n.nspname = %s"
801 " AND c.relname = %s",
809 (
errcode(ERRCODE_CONNECTION_FAILURE),
810 errmsg(
"could not fetch table info for table \"%s.%s\" from publisher: %s",
816 (
errcode(ERRCODE_UNDEFINED_OBJECT),
817 errmsg(
"table \"%s.%s\" not found on publisher",
841 Oid attrsRow[] = {INT2VECTOROID};
859 " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
860 " THEN NULL ELSE gpt.attrs END)"
861 " FROM pg_publication p,"
862 " LATERAL pg_get_publication_tables(p.pubname) gpt,"
864 " WHERE gpt.relid = %u AND c.oid = gpt.relid"
865 " AND p.pubname IN ( %s )",
874 (
errcode(ERRCODE_CONNECTION_FAILURE),
875 errmsg(
"could not fetch column list info for table \"%s.%s\" from publisher: %s",
888 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
889 errmsg(
"cannot use different column lists for table \"%s.%s\" in different publications",
913 for (natt = 0; natt < nelems; natt++)
934 " a.attnum = ANY(i.indkey)"
935 " FROM pg_catalog.pg_attribute a"
936 " LEFT JOIN pg_catalog.pg_index i"
937 " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
938 " WHERE a.attnum > 0::pg_catalog.int2"
939 " AND NOT a.attisdropped %s"
940 " AND a.attrelid = %u"
941 " ORDER BY a.attnum",
944 "AND a.attgenerated = ''" :
""),
951 (
errcode(ERRCODE_CONNECTION_FAILURE),
952 errmsg(
"could not fetch table info for table \"%s.%s\" from publisher: %s",
993 elog(
ERROR,
"too many columns in remote table \"%s.%s\"",
1042 "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
1043 " FROM pg_publication p,"
1044 " LATERAL pg_get_publication_tables(p.pubname) gpt"
1045 " WHERE gpt.relid = %u"
1046 " AND p.pubname IN ( %s )",
1054 (
errmsg(
"could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
1125 if (lrel.
relkind == RELKIND_RELATION && qual ==
NIL)
1134 for (
int i = 0;
i < lrel.
natts;
i++)
1153 for (
int i = 0;
i < lrel.
natts;
i++)
1166 if (lrel.
relkind == RELKIND_RELATION)
1204 (
errcode(ERRCODE_CONNECTION_FAILURE),
1205 errmsg(
"could not start initial contents copy for table \"%s.%s\": %s",
1213 NULL,
false,
false);
1243 char *syncslotname,
Size szslot)
1270 bool must_use_password;
1297 case SUBREL_STATE_SYNCDONE:
1298 case SUBREL_STATE_READY:
1299 case SUBREL_STATE_UNKNOWN:
1300 finish_sync_worker();
1321 (
errcode(ERRCODE_CONNECTION_FAILURE),
1322 errmsg(
"could not connect to the publisher: %s",
err)));
1332 sizeof(originname));
1368 goto copy_table_done;
1401 "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1405 (
errcode(ERRCODE_CONNECTION_FAILURE),
1406 errmsg(
"table copy could not start transaction on publisher: %s",
1416 slotname,
false ,
false ,
1448 errmsg(
"replication origin \"%s\" already exists",
1480 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1481 errmsg(
"user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1493 (
errcode(ERRCODE_CONNECTION_FAILURE),
1494 errmsg(
"table copy could not finish transaction on publisher: %s",
1512 SUBREL_STATE_FINISHEDCOPY,
1520 "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
1550 static bool has_subrels =
false;
1552 *started_tx =
false;
1576 foreach(lc, rstates)
1611 char *sync_slotname = NULL;
1641 pfree(sync_slotname);
1655 char *slotname = NULL;
1663 sizeof(originname));
1685 finish_sync_worker();
1699 bool started_tx =
false;
1700 bool has_subrels =
false;
1726 bool nulls[Natts_pg_subscription];
1727 bool replaces[Natts_pg_subscription];
1738 "cache lookup failed for subscription oid %u",
1743 memset(nulls,
false,
sizeof(nulls));
1744 memset(replaces,
false,
sizeof(replaces));
1748 replaces[Anum_pg_subscription_subtwophasestate - 1] =
true;
1751 values, nulls, replaces);
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
#define DatumGetArrayTypeP(X)
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
void start_apply(XLogRecPtr origin_startpos)
void DisableSubscriptionAndExit(void)
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
void set_apply_error_context_origin(char *originname)
MemoryContext ApplyContext
void SetupApplyOrSyncWorker(int worker_slot)
WalReceiverConn * LogRepWorkerWalRcvConn
Subscription * MySubscription
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
TimestampTz GetCurrentTimestamp(void)
Datum now(PG_FUNCTION_ARGS)
bool bms_is_member(int x, const Bitmapset *a)
Bitmapset * bms_add_member(Bitmapset *a, int x)
static Datum values[MAXATTR]
#define TextDatumGetCString(d)
#define OidIsValid(objectId)
CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
uint64 CopyFrom(CopyFromState cstate)
static void PGresult * res
elog(ERROR, "%s: %s", p2, msg)
#define DSM_HANDLE_INVALID
void hash_destroy(HTAB *hashp)
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void err(int eval, const char *fmt,...)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
const TupleTableSlotOps TTSOpsMinimalTuple
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
void heap_freetuple(HeapTuple htup)
#define HeapTupleIsValid(tuple)
#define MaxTupleAttributeNumber
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define WL_SOCKET_READABLE
#define WL_EXIT_ON_PM_DEATH
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm)
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
void logicalrep_worker_wakeup(Oid subid, Oid relid)
static dshash_table * last_start_times
LogicalRepWorker * MyLogicalRepWorker
int max_sync_workers_per_subscription
int logicalrep_sync_worker_count(Oid subid)
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Assert(fmt[strlen(fmt) - 1] !='\n')
List * lappend(List *list, void *datum)
void list_free_deep(List *list)
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
void LockRelationOid(Oid relid, LOCKMODE lockmode)
char * get_namespace_name(Oid nspid)
char * get_rel_name(Oid relid)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
DefElem * makeDefElem(char *name, Node *arg, int location)
void pfree(void *pointer)
void * palloc0(Size size)
char * MemoryContextStrdup(MemoryContext context, const char *string)
MemoryContext CacheMemoryContext
#define CHECK_FOR_INTERRUPTS()
char * GetUserNameFromId(Oid roleid, bool noerr)
ObjectType get_relkind_objtype(char relkind)
TimestampTz replorigin_session_origin_timestamp
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
RepOriginId replorigin_create(const char *roname)
void replorigin_session_reset(void)
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
RepOriginId replorigin_session_origin
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
void replorigin_session_setup(RepOriginId node, int acquired_by)
XLogRecPtr replorigin_session_get_progress(bool flush)
XLogRecPtr replorigin_session_origin_lsn
#define InvalidRepOriginId
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
ParseState * make_parsestate(ParseState *parentParseState)
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, int lockmode, Alias *alias, bool inh, bool inFromCl)
#define for_each_from(cell, lst, N)
#define foreach_current_index(cell)
List * GetSubscriptionRelations(Oid subid, bool not_ready)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
bool HasSubscriptionRelations(Oid subid)
#define LOGICALREP_TWOPHASE_STATE_DISABLED
#define LOGICALREP_TWOPHASE_STATE_PENDING
#define LOGICALREP_TWOPHASE_STATE_ENABLED
long pgstat_report_stat(bool force)
void pgstat_report_subscription_error(Oid subid, bool is_apply_error)
static bool DatumGetBool(Datum X)
static Oid DatumGetObjectId(Datum X)
static Datum ObjectIdGetDatum(Oid X)
static char DatumGetChar(Datum X)
static int16 DatumGetInt16(Datum X)
static int32 DatumGetInt32(Datum X)
static Datum CharGetDatum(char X)
static int fd(const char *x, int i)
char * quote_literal_cstr(const char *rawstr)
#define RelationGetRelid(relation)
#define RelationGetDescr(relation)
#define RelationGetRelationName(relation)
#define RelationGetNamespace(relation)
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
const char * quote_identifier(const char *ident)
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Snapshot GetTransactionSnapshot(void)
void PushActiveSnapshot(Snapshot snapshot)
void PopActiveSnapshot(void)
void InvalidateCatalogSnapshot(void)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
#define ERRCODE_DUPLICATE_OBJECT
StringInfo makeStringInfo(void)
void resetStringInfo(StringInfo str)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoString(StringInfo str, const char *s)
void initStringInfo(StringInfo str)
LogicalRepRelation remoterel
LogicalRepWorkerType type
Tuplestorestate * tuplestore
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
bool superuser_arg(Oid roleid)
#define SearchSysCacheCopy1(cacheId, key1)
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
static List * table_states_not_ready
bool AllTablesyncsReady(void)
static bool wait_for_worker_state_change(char expected_state)
static bool table_states_valid
static char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
static void pg_attribute_noreturn() finish_sync_worker(void)
static void process_syncing_tables_for_apply(XLogRecPtr current_lsn)
void TablesyncWorkerMain(Datum main_arg)
static void process_syncing_tables_for_sync(XLogRecPtr current_lsn)
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
static void run_tablesync_worker()
static int copy_read_data(void *outbuf, int minread, int maxread)
void process_syncing_tables(XLogRecPtr current_lsn)
static void copy_table(Relation rel)
static bool wait_for_relation_state_change(Oid relid, char expected_state)
static void start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
static StringInfo copybuf
static bool FetchTableStates(bool *started_tx)
static void fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, List **qual)
static List * make_copy_attnamelist(LogicalRepRelMapEntry *rel)
void UpdateTwoPhaseState(Oid suboid, char new_state)
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
int64 tuplestore_tuple_count(Tuplestorestate *state)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
void SwitchToUntrustedUser(Oid userid, UserContext *context)
void RestoreUserContext(UserContext *context)
String * makeString(char *str)
#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
#define walrcv_startstreaming(conn, options)
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_server_version(conn)
#define walrcv_endstreaming(conn, next_tli)
#define walrcv_connect(conninfo, logical, must_use_password, appname, err)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
#define walrcv_receive(conn, buffer, wait_fd)
@ WORKERTYPE_PARALLEL_APPLY
static bool am_tablesync_worker(void)
bool IsTransactionState(void)
void CommandCounterIncrement(void)
void StartTransactionCommand(void)
void CommitTransactionCommand(void)
void AbortOutOfAnyTransaction(void)
uint64 GetSystemIdentifier(void)
XLogRecPtr GetXLogWriteRecPtr(void)
int wal_retrieve_retry_interval
void XLogFlush(XLogRecPtr record)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr