PostgreSQL Source Code git master
|
#include "postgres.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "commands/copy.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "parser/parse_relation.h"
#include "pgstat.h"
#include "replication/logicallauncher.h"
#include "replication/logicalrelation.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "utils/acl.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rls.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/usercontext.h"
Go to the source code of this file.
Enumerations | |
enum | SyncingTablesState { SYNC_TABLE_STATE_NEEDS_REBUILD , SYNC_TABLE_STATE_REBUILD_STARTED , SYNC_TABLE_STATE_VALID } |
Functions | |
static bool | FetchTableStates (bool *started_tx) |
static void | pg_attribute_noreturn () finish_sync_worker(void) |
static bool | wait_for_relation_state_change (Oid relid, char expected_state) |
static bool | wait_for_worker_state_change (char expected_state) |
void | invalidate_syncing_table_states (Datum arg, int cacheid, uint32 hashvalue) |
static void | process_syncing_tables_for_sync (XLogRecPtr current_lsn) |
static void | process_syncing_tables_for_apply (XLogRecPtr current_lsn) |
void | process_syncing_tables (XLogRecPtr current_lsn) |
static List * | make_copy_attnamelist (LogicalRepRelMapEntry *rel) |
static int | copy_read_data (void *outbuf, int minread, int maxread) |
static void | fetch_remote_table_info (char *nspname, char *relname, LogicalRepRelation *lrel, List **qual, bool *gencol_published) |
static void | copy_table (Relation rel) |
void | ReplicationSlotNameForTablesync (Oid suboid, Oid relid, char *syncslotname, Size szslot) |
static char * | LogicalRepSyncTableStart (XLogRecPtr *origin_startpos) |
static void | start_table_sync (XLogRecPtr *origin_startpos, char **slotname) |
static void | run_tablesync_worker () |
void | TablesyncWorkerMain (Datum main_arg) |
bool | AllTablesyncsReady (void) |
void | UpdateTwoPhaseState (Oid suboid, char new_state) |
Variables | |
static SyncingTablesState | table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD |
static List * | table_states_not_ready = NIL |
static StringInfo | copybuf = NULL |
enum SyncingTablesState |
Enumerator | |
---|---|
SYNC_TABLE_STATE_NEEDS_REBUILD | |
SYNC_TABLE_STATE_REBUILD_STARTED | |
SYNC_TABLE_STATE_VALID |
Definition at line 126 of file tablesync.c.
bool AllTablesyncsReady | ( | void | ) |
Definition at line 1739 of file tablesync.c.
References CommitTransactionCommand(), FetchTableStates(), NIL, pgstat_report_stat(), and table_states_not_ready.
Referenced by pa_can_start(), process_syncing_tables_for_apply(), and run_apply_worker().
|
static |
Definition at line 718 of file tablesync.c.
References buf, CHECK_FOR_INTERRUPTS, copybuf, StringInfoData::cursor, StringInfoData::data, fd(), StringInfoData::len, len, LogRepWorkerWalRcvConn, MyLatch, PGINVALID_SOCKET, ResetLatch(), WaitLatchOrSocket(), walrcv_receive, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, and WL_TIMEOUT.
Referenced by copy_table().
|
static |
Definition at line 1115 of file tablesync.c.
References AccessShareLock, addRangeTableEntryForRelation(), appendStringInfo(), appendStringInfoChar(), appendStringInfoString(), Assert, LogicalRepRelation::attnames, BeginCopyFrom(), Subscription::binary, copy_read_data(), copybuf, CopyFrom(), StringInfoData::data, ereport, errcode(), errmsg(), ERROR, fetch_remote_table_info(), for_each_from, get_namespace_name(), i, initStringInfo(), lfirst, linitial, list_free_deep(), list_make1, LogicalRepRelMapEntry::localrel, logicalrep_rel_close(), logicalrep_rel_open(), logicalrep_relmap_update(), LogRepWorkerWalRcvConn, make_copy_attnamelist(), make_parsestate(), makeDefElem(), makeString(), makeStringInfo(), MySubscription, LogicalRepRelation::natts, NIL, NoLock, LogicalRepRelation::nspname, pfree(), quote_identifier(), quote_qualified_identifier(), RelationGetNamespace, RelationGetRelationName, LogicalRepRelation::relkind, LogicalRepRelation::relname, LogicalRepRelation::remoteid, res, strVal, walrcv_clear_result(), walrcv_exec, WALRCV_OK_COPY_OUT, and walrcv_server_version.
Referenced by LogicalRepSyncTableStart().
|
static |
Definition at line 797 of file tablesync.c.
References appendStringInfo(), ARR_DATA_PTR, ARR_DIMS, Assert, LogicalRepRelation::attkeys, LogicalRepRelation::attnames, attnum, LogicalRepRelation::atttyps, bms_add_member(), bms_is_member(), StringInfoData::data, DatumGetArrayTypeP, DatumGetBool(), DatumGetChar(), DatumGetInt16(), DatumGetObjectId(), destroyStringInfo(), elog, ereport, WalRcvExecResult::err, errcode(), errmsg(), ERROR, ExecClearTuple(), ExecDropSingleTupleTableSlot(), GetPublicationsStr(), initStringInfo(), lappend(), lengthof, list_free_deep(), LogRepWorkerWalRcvConn, MakeSingleTupleTableSlot(), makeString(), makeStringInfo(), MaxTupleAttributeNumber, MySubscription, LogicalRepRelation::natts, NIL, LogicalRepRelation::nspname, palloc0(), pfree(), Subscription::publications, quote_literal_cstr(), LogicalRepRelation::relkind, relname, LogicalRepRelation::relname, LogicalRepRelation::remoteid, LogicalRepRelation::replident, res, resetStringInfo(), server_version, slot_getattr(), WalRcvExecResult::status, TextDatumGetCString, TTSOpsMinimalTuple, WalRcvExecResult::tupledesc, WalRcvExecResult::tuplestore, tuplestore_gettupleslot(), tuplestore_tuple_count(), walrcv_clear_result(), walrcv_exec, WALRCV_OK_TUPLES, and walrcv_server_version.
Referenced by copy_table().
|
static |
Definition at line 1580 of file tablesync.c.
References CacheMemoryContext, GetSubscriptionRelations(), HasSubscriptionRelations(), IsTransactionState(), lappend(), lfirst, list_free_deep(), MemoryContextSwitchTo(), MySubscription, NIL, Subscription::oid, palloc(), StartTransactionCommand(), SYNC_TABLE_STATE_REBUILD_STARTED, SYNC_TABLE_STATE_VALID, table_states_not_ready, and table_states_validity.
Referenced by AllTablesyncsReady(), and process_syncing_tables_for_apply().
Definition at line 281 of file tablesync.c.
References SYNC_TABLE_STATE_NEEDS_REBUILD, and table_states_validity.
Referenced by ParallelApplyWorkerMain(), and SetupApplyOrSyncWorker().
|
static |
Definition at line 1290 of file tablesync.c.
References ACL_INSERT, aclcheck_error(), ACLCHECK_OK, Assert, check_enable_rls(), CommandCounterIncrement(), CommitTransactionCommand(), Subscription::conninfo, copy_table(), CRS_USE_SNAPSHOT, DEBUG1, elog, ereport, err(), errcode(), ERRCODE_DUPLICATE_OBJECT, errmsg(), ERROR, Subscription::failover, get_relkind_objtype(), GetSubscriptionRelState(), GetTransactionSnapshot(), GetUserId(), GetUserNameFromId(), InvalidOid, InvalidXLogRecPtr, LockRelationOid(), LogRepWorkerWalRcvConn, LSN_FORMAT_ARGS, MyLogicalRepWorker, MySubscription, Subscription::name, NAMEDATALEN, NoLock, Subscription::oid, OidIsValid, Subscription::ownersuperuser, palloc(), Subscription::passwordrequired, pg_class_aclcheck(), pgstat_report_stat(), PopActiveSnapshot(), PushActiveSnapshot(), RelationData::rd_rel, RelationGetRelationName, RelationGetRelid, LogicalRepWorker::relid, LogicalRepWorker::relmutex, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, ReplicationOriginNameForLogicalRep(), ReplicationSlotDropAtPubNode(), ReplicationSlotNameForTablesync(), replorigin_advance(), replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), res, RestoreUserContext(), RLS_ENABLED, RowExclusiveLock, Subscription::runasowner, SpinLockAcquire, SpinLockRelease, StartTransactionCommand(), LogicalRepWorker::subid, SwitchToUntrustedUser(), table_close(), table_open(), UnlockRelationOid(), UpdateSubscriptionRelState(), wait_for_worker_state_change(), walrcv_clear_result(), walrcv_connect, walrcv_create_slot, walrcv_exec, and WALRCV_OK_COMMAND.
Referenced by start_table_sync().
|
static |
Definition at line 698 of file tablesync.c.
References LogicalRepRelation::attnames, i, lappend(), makeString(), LogicalRepRelation::natts, NIL, and LogicalRepRelMapEntry::remoterel.
Referenced by copy_table().
|
static |
Definition at line 143 of file tablesync.c.
References CommitTransactionCommand(), ereport, errmsg(), get_rel_name(), GetXLogWriteRecPtr(), InvalidOid, IsTransactionState(), LOG, logicalrep_worker_wakeup(), MyLogicalRepWorker, MySubscription, Subscription::name, pgstat_report_stat(), proc_exit(), LogicalRepWorker::relid, StartTransactionCommand(), LogicalRepWorker::subid, and XLogFlush().
void process_syncing_tables | ( | XLogRecPtr | current_lsn | ) |
Definition at line 667 of file tablesync.c.
References elog, ERROR, MyLogicalRepWorker, process_syncing_tables_for_apply(), process_syncing_tables_for_sync(), LogicalRepWorker::type, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY, WORKERTYPE_TABLESYNC, and WORKERTYPE_UNKNOWN.
Referenced by apply_handle_commit(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), apply_handle_stream_commit(), apply_handle_stream_prepare(), and LogicalRepApplyLoop().
|
static |
Definition at line 418 of file tablesync.c.
References AllTablesyncsReady(), ApplyLauncherForgetWorkerStartTime(), Assert, CommandCounterIncrement(), CommitTransactionCommand(), ctl, LogicalRepWorker::dbid, DSM_HANDLE_INVALID, ereport, errmsg(), FetchTableStates(), GetCurrentTimestamp(), HASH_BLOBS, hash_create(), hash_destroy(), HASH_ELEM, HASH_ENTER, hash_search(), IsTransactionState(), last_start_times, lfirst, LOG, logicalrep_sync_worker_count(), logicalrep_worker_find(), logicalrep_worker_launch(), logicalrep_worker_wakeup_ptr(), SubscriptionRelState::lsn, LW_SHARED, LWLockAcquire(), LWLockRelease(), Max, max_sync_workers_per_subscription, MyLogicalRepWorker, MySubscription, Subscription::name, NAMEDATALEN, NIL, now(), Subscription::oid, pgstat_report_stat(), LogicalRepWorker::proc, proc_exit(), SubscriptionRelState::relid, LogicalRepWorker::relmutex, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, ReplicationOriginNameForLogicalRep(), replorigin_drop_by_name(), SpinLockAcquire, SpinLockRelease, StartTransactionCommand(), SubscriptionRelState::state, LogicalRepWorker::subid, table_states_not_ready, TimestampDifferenceExceeds(), Subscription::twophasestate, UpdateSubscriptionRelState(), LogicalRepWorker::userid, wait_for_relation_state_change(), wal_retrieve_retry_interval, and WORKERTYPE_TABLESYNC.
Referenced by process_syncing_tables().
|
static |
Definition at line 295 of file tablesync.c.
References CommitTransactionCommand(), InvalidRepOriginId, InvalidXLogRecPtr, IsTransactionState(), LogRepWorkerWalRcvConn, MyLogicalRepWorker, NAMEDATALEN, pgstat_report_stat(), LogicalRepWorker::relid, LogicalRepWorker::relmutex, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, ReplicationOriginNameForLogicalRep(), ReplicationSlotDropAtPubNode(), ReplicationSlotNameForTablesync(), replorigin_drop_by_name(), replorigin_session_origin, replorigin_session_origin_lsn, replorigin_session_origin_timestamp, replorigin_session_reset(), SpinLockAcquire, SpinLockRelease, StartTransactionCommand(), LogicalRepWorker::subid, UpdateSubscriptionRelState(), and walrcv_endstreaming.
Referenced by process_syncing_tables().
Definition at line 1274 of file tablesync.c.
References GetSystemIdentifier(), snprintf, and UINT64_FORMAT.
Referenced by AlterSubscription_refresh(), DropSubscription(), LogicalRepSyncTableStart(), process_syncing_tables_for_sync(), and ReportSlotConnectionError().
|
static |
Definition at line 1693 of file tablesync.c.
References InvalidXLogRecPtr, LogRepWorkerWalRcvConn, MyLogicalRepWorker, MySubscription, NAMEDATALEN, Subscription::oid, options, LogicalRepWorker::relid, ReplicationOriginNameForLogicalRep(), set_apply_error_context_origin(), set_stream_options(), start_apply(), start_table_sync(), and walrcv_startstreaming.
Referenced by TablesyncWorkerMain().
|
static |
Definition at line 1651 of file tablesync.c.
References AbortOutOfAnyTransaction(), am_tablesync_worker(), ApplyContext, Assert, Subscription::disableonerr, DisableSubscriptionAndExit(), LogicalRepSyncTableStart(), MemoryContextStrdup(), MySubscription, Subscription::oid, pfree(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, and pgstat_report_subscription_error().
Referenced by run_tablesync_worker().
void TablesyncWorkerMain | ( | Datum | main_arg | ) |
Definition at line 1719 of file tablesync.c.
References DatumGetInt32(), run_tablesync_worker(), and SetupApplyOrSyncWorker().
void UpdateTwoPhaseState | ( | Oid | suboid, |
char | new_state | ||
) |
Definition at line 1764 of file tablesync.c.
References Assert, CatalogTupleUpdate(), CharGetDatum(), elog, ERROR, heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, ObjectIdGetDatum(), RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy1, HeapTupleData::t_self, table_close(), table_open(), and values.
Referenced by CreateSubscription(), and run_apply_worker().
|
static |
Definition at line 184 of file tablesync.c.
References CHECK_FOR_INTERRUPTS, GetSubscriptionRelState(), InvalidateCatalogSnapshot(), logicalrep_worker_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLatch, MyLogicalRepWorker, ResetLatch(), LogicalRepWorker::subid, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.
Referenced by process_syncing_tables_for_apply().
|
static |
Definition at line 232 of file tablesync.c.
References CHECK_FOR_INTERRUPTS, InvalidOid, logicalrep_worker_find(), logicalrep_worker_wakeup_ptr(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLatch, MyLogicalRepWorker, LogicalRepWorker::proc, LogicalRepWorker::relstate, ResetLatch(), LogicalRepWorker::subid, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.
Referenced by LogicalRepSyncTableStart().
|
static |
Definition at line 137 of file tablesync.c.
Referenced by copy_read_data(), copy_table(), CopyStreamReceive(), dumpTableData_copy(), GetCopyDataByte(), GetCopyDataEnd(), GetCopyDataString(), GetCopyDataUInt64(), HandleCopyStream(), ProcessKeepaliveMsg(), ProcessXLogDataMsg(), ReceiveArchiveStreamChunk(), ReceiveBackupManifestChunk(), ReceiveBackupManifestInMemoryChunk(), ReceiveCopyData(), ReceiveTarCopyChunk(), ReportCopyDataParseError(), and StreamLogicalLog().
Definition at line 134 of file tablesync.c.
Referenced by AllTablesyncsReady(), FetchTableStates(), and process_syncing_tables_for_apply().
|
static |
Definition at line 133 of file tablesync.c.
Referenced by FetchTableStates(), and invalidate_syncing_table_states().