PostgreSQL Source Code
git master
|
#include "postgres.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "commands/copy.h"
#include "miscadmin.h"
#include "parser/parse_relation.h"
#include "pgstat.h"
#include "replication/logicallauncher.h"
#include "replication/logicalrelation.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "replication/slot.h"
#include "replication/origin.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "utils/acl.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rls.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
Go to the source code of this file.
Functions | |
static bool | FetchTableStates (bool *started_tx) |
static void | pg_attribute_noreturn () finish_sync_worker(void) |
static bool | wait_for_relation_state_change (Oid relid, char expected_state) |
static bool | wait_for_worker_state_change (char expected_state) |
void | invalidate_syncing_table_states (Datum arg, int cacheid, uint32 hashvalue) |
static void | process_syncing_tables_for_sync (XLogRecPtr current_lsn) |
static void | process_syncing_tables_for_apply (XLogRecPtr current_lsn) |
void | process_syncing_tables (XLogRecPtr current_lsn) |
static List * | make_copy_attnamelist (LogicalRepRelMapEntry *rel) |
static int | copy_read_data (void *outbuf, int minread, int maxread) |
static void | fetch_remote_table_info (char *nspname, char *relname, LogicalRepRelation *lrel, List **qual) |
static void | copy_table (Relation rel) |
void | ReplicationSlotNameForTablesync (Oid suboid, Oid relid, char *syncslotname, Size szslot) |
char * | LogicalRepSyncTableStart (XLogRecPtr *origin_startpos) |
bool | AllTablesyncsReady (void) |
void | UpdateTwoPhaseState (Oid suboid, char new_state) |
Variables | |
static bool | table_states_valid = false |
static List * | table_states_not_ready = NIL |
static StringInfo | copybuf = NULL |
bool AllTablesyncsReady | ( | void | ) |
Definition at line 1558 of file tablesync.c.
References CommitTransactionCommand(), FetchTableStates(), NIL, pgstat_report_stat(), and table_states_not_ready.
Referenced by ApplyWorkerMain(), pa_can_start(), and process_syncing_tables_for_apply().
|
static |
Definition at line 686 of file tablesync.c.
References buf, CHECK_FOR_INTERRUPTS, copybuf, StringInfoData::cursor, StringInfoData::data, fd(), StringInfoData::len, len, LogRepWorkerWalRcvConn, MyLatch, PGINVALID_SOCKET, ResetLatch(), WAIT_EVENT_LOGICAL_SYNC_DATA, WaitLatchOrSocket(), walrcv_receive, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_SOCKET_READABLE, and WL_TIMEOUT.
Referenced by copy_table().
|
static |
Definition at line 1083 of file tablesync.c.
References AccessShareLock, addRangeTableEntryForRelation(), appendStringInfo(), appendStringInfoString(), Assert(), LogicalRepRelation::attnames, BeginCopyFrom(), copy_read_data(), copybuf, CopyFrom(), StringInfoData::data, ereport, errcode(), errmsg(), ERROR, fetch_remote_table_info(), for_each_from, get_namespace_name(), i, initStringInfo(), lfirst, linitial, list_free_deep(), LogicalRepRelMapEntry::localrel, logicalrep_rel_close(), logicalrep_rel_open(), logicalrep_relmap_update(), LogRepWorkerWalRcvConn, make_copy_attnamelist(), make_parsestate(), makeStringInfo(), LogicalRepRelation::natts, NIL, NoLock, LogicalRepRelation::nspname, pfree(), quote_identifier(), quote_qualified_identifier(), RelationGetNamespace, RelationGetRelationName, LogicalRepRelation::relkind, LogicalRepRelation::relname, LogicalRepRelation::remoteid, res, strVal, walrcv_clear_result(), walrcv_exec, and WALRCV_OK_COPY_OUT.
Referenced by LogicalRepSyncTableStart().
|
static |
Definition at line 762 of file tablesync.c.
References appendStringInfo(), appendStringInfoString(), ARR_DATA_PTR, ARR_DIMS, Assert(), LogicalRepRelation::attkeys, LogicalRepRelation::attnames, attnum, LogicalRepRelation::atttyps, bms_add_member(), bms_is_member(), StringInfoData::data, DatumGetArrayTypeP, DatumGetBool(), DatumGetChar(), DatumGetInt16(), DatumGetObjectId(), elog(), ereport, WalRcvExecResult::err, errcode(), errmsg(), ERROR, ExecClearTuple(), ExecDropSingleTupleTableSlot(), foreach_current_index, initStringInfo(), lappend(), lengthof, lfirst, list_free_deep(), LogRepWorkerWalRcvConn, MakeSingleTupleTableSlot(), makeString(), MaxTupleAttributeNumber, MySubscription, LogicalRepRelation::natts, NIL, LogicalRepRelation::nspname, palloc0(), pfree(), Subscription::publications, quote_literal_cstr(), LogicalRepRelation::relkind, relname, LogicalRepRelation::relname, LogicalRepRelation::remoteid, LogicalRepRelation::replident, res, resetStringInfo(), slot_getattr(), WalRcvExecResult::status, strVal, TextDatumGetCString, TTSOpsMinimalTuple, WalRcvExecResult::tupledesc, WalRcvExecResult::tuplestore, tuplestore_gettupleslot(), tuplestore_tuple_count(), walrcv_clear_result(), walrcv_exec, WALRCV_OK_TUPLES, and walrcv_server_version.
Referenced by copy_table().
Definition at line 1497 of file tablesync.c.
References CacheMemoryContext, GetSubscriptionRelations(), HasSubscriptionRelations(), IsTransactionState(), lappend(), lfirst, list_free_deep(), MemoryContextSwitchTo(), MySubscription, NIL, Subscription::oid, palloc(), StartTransactionCommand(), table_states_not_ready, and table_states_valid.
Referenced by AllTablesyncsReady(), and process_syncing_tables_for_apply().
Definition at line 271 of file tablesync.c.
References table_states_valid.
Referenced by ApplyWorkerMain(), and ParallelApplyWorkerMain().
char* LogicalRepSyncTableStart | ( | XLogRecPtr * | origin_startpos | ) |
Definition at line 1229 of file tablesync.c.
References ACL_INSERT, aclcheck_error(), ACLCHECK_OK, Assert(), check_enable_rls(), CommandCounterIncrement(), CommitTransactionCommand(), Subscription::conninfo, copy_table(), CRS_USE_SNAPSHOT, DEBUG1, elog(), ereport, err(), errcode(), ERRCODE_DUPLICATE_OBJECT, errmsg(), ERROR, get_relkind_objtype(), GetSubscriptionRelState(), GetTransactionSnapshot(), GetUserId(), GetUserNameFromId(), InvalidOid, InvalidXLogRecPtr, LockRelationOid(), LogRepWorkerWalRcvConn, LSN_FORMAT_ARGS, MyLogicalRepWorker, MySubscription, NAMEDATALEN, NoLock, Subscription::oid, OidIsValid, palloc(), pg_class_aclcheck(), pgstat_report_stat(), PopActiveSnapshot(), PushActiveSnapshot(), RelationData::rd_rel, RelationGetRelationName, RelationGetRelid, LogicalRepWorker::relid, LogicalRepWorker::relmutex, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, ReplicationOriginNameForLogicalRep(), ReplicationSlotDropAtPubNode(), ReplicationSlotNameForTablesync(), replorigin_advance(), replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), res, RLS_ENABLED, RowExclusiveLock, SpinLockAcquire, SpinLockRelease, StartTransactionCommand(), LogicalRepWorker::subid, table_close(), table_open(), UnlockRelationOid(), UpdateSubscriptionRelState(), wait_for_worker_state_change(), walrcv_clear_result(), walrcv_connect, walrcv_create_slot, walrcv_exec, and WALRCV_OK_COMMAND.
Referenced by start_table_sync().
|
static |
Definition at line 666 of file tablesync.c.
References LogicalRepRelation::attnames, i, lappend(), makeString(), LogicalRepRelation::natts, NIL, and LogicalRepRelMapEntry::remoterel.
Referenced by copy_table().
|
static |
Definition at line 133 of file tablesync.c.
References CommitTransactionCommand(), ereport, errmsg(), get_rel_name(), GetXLogWriteRecPtr(), InvalidOid, IsTransactionState(), LOG, logicalrep_worker_wakeup(), MyLogicalRepWorker, MySubscription, Subscription::name, pgstat_report_stat(), proc_exit(), LogicalRepWorker::relid, StartTransactionCommand(), LogicalRepWorker::subid, and XLogFlush().
void process_syncing_tables | ( | XLogRecPtr | current_lsn | ) |
Definition at line 646 of file tablesync.c.
References am_parallel_apply_worker(), am_tablesync_worker(), process_syncing_tables_for_apply(), and process_syncing_tables_for_sync().
Referenced by apply_handle_commit(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), apply_handle_stream_commit(), apply_handle_stream_prepare(), and LogicalRepApplyLoop().
|
static |
Definition at line 408 of file tablesync.c.
References AllTablesyncsReady(), ApplyLauncherForgetWorkerStartTime(), Assert(), CommandCounterIncrement(), CommitTransactionCommand(), LogicalRepWorker::dbid, DSM_HANDLE_INVALID, HASHCTL::entrysize, ereport, errmsg(), FetchTableStates(), GetCurrentTimestamp(), HASH_BLOBS, hash_create(), hash_destroy(), HASH_ELEM, HASH_ENTER, hash_search(), IsTransactionState(), HASHCTL::keysize, last_start_times, lfirst, LOG, logicalrep_sync_worker_count(), LOGICALREP_TWOPHASE_STATE_PENDING, logicalrep_worker_find(), logicalrep_worker_launch(), logicalrep_worker_wakeup_ptr(), SubscriptionRelState::lsn, LW_SHARED, LWLockAcquire(), LWLockRelease(), Max, max_sync_workers_per_subscription, MyLogicalRepWorker, MySubscription, Subscription::name, NAMEDATALEN, NIL, now(), Subscription::oid, pgstat_report_stat(), LogicalRepWorker::proc, proc_exit(), SubscriptionRelState::relid, LogicalRepWorker::relmutex, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, ReplicationOriginNameForLogicalRep(), replorigin_drop_by_name(), SpinLockAcquire, SpinLockRelease, StartTransactionCommand(), SubscriptionRelState::state, LogicalRepWorker::subid, table_states_not_ready, TimestampDifferenceExceeds(), Subscription::twophasestate, UpdateSubscriptionRelState(), LogicalRepWorker::userid, wait_for_relation_state_change(), and wal_retrieve_retry_interval.
Referenced by process_syncing_tables().
|
static |
Definition at line 285 of file tablesync.c.
References CommitTransactionCommand(), InvalidRepOriginId, InvalidXLogRecPtr, IsTransactionState(), LogRepWorkerWalRcvConn, MyLogicalRepWorker, NAMEDATALEN, pgstat_report_stat(), LogicalRepWorker::relid, LogicalRepWorker::relmutex, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, ReplicationOriginNameForLogicalRep(), ReplicationSlotDropAtPubNode(), ReplicationSlotNameForTablesync(), replorigin_drop_by_name(), replorigin_session_origin, replorigin_session_origin_lsn, replorigin_session_origin_timestamp, replorigin_session_reset(), SpinLockAcquire, SpinLockRelease, StartTransactionCommand(), LogicalRepWorker::subid, UpdateSubscriptionRelState(), and walrcv_endstreaming.
Referenced by process_syncing_tables().
Definition at line 1213 of file tablesync.c.
References GetSystemIdentifier(), snprintf, and UINT64_FORMAT.
Referenced by AlterSubscription_refresh(), DropSubscription(), LogicalRepSyncTableStart(), process_syncing_tables_for_sync(), and ReportSlotConnectionError().
void UpdateTwoPhaseState | ( | Oid | suboid, |
char | new_state | ||
) |
Definition at line 1583 of file tablesync.c.
References Assert(), CatalogTupleUpdate(), CharGetDatum(), elog(), ERROR, heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, LOGICALREP_TWOPHASE_STATE_DISABLED, LOGICALREP_TWOPHASE_STATE_ENABLED, LOGICALREP_TWOPHASE_STATE_PENDING, ObjectIdGetDatum(), RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy1, SUBSCRIPTIONOID, HeapTupleData::t_self, table_close(), table_open(), and values.
Referenced by ApplyWorkerMain(), and CreateSubscription().
Definition at line 174 of file tablesync.c.
References CHECK_FOR_INTERRUPTS, GetSubscriptionRelState(), InvalidateCatalogSnapshot(), logicalrep_worker_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLatch, MyLogicalRepWorker, ResetLatch(), LogicalRepWorker::subid, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.
Referenced by process_syncing_tables_for_apply().
|
static |
Definition at line 222 of file tablesync.c.
References CHECK_FOR_INTERRUPTS, InvalidOid, logicalrep_worker_find(), logicalrep_worker_wakeup_ptr(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLatch, MyLogicalRepWorker, LogicalRepWorker::proc, LogicalRepWorker::relstate, ResetLatch(), LogicalRepWorker::subid, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.
Referenced by LogicalRepSyncTableStart().
|
static |
Definition at line 127 of file tablesync.c.
Referenced by copy_read_data(), copy_table(), CopyStreamReceive(), dumpTableData_copy(), GetCopyDataByte(), GetCopyDataEnd(), GetCopyDataString(), GetCopyDataUInt64(), HandleCopyStream(), HandleEndOfCopyStream(), ProcessKeepaliveMsg(), ProcessXLogDataMsg(), ReceiveArchiveStreamChunk(), ReceiveBackupManifestChunk(), ReceiveBackupManifestInMemoryChunk(), ReceiveCopyData(), ReceiveTarCopyChunk(), ReportCopyDataParseError(), and StreamLogicalLog().
Definition at line 124 of file tablesync.c.
Referenced by AllTablesyncsReady(), FetchTableStates(), and process_syncing_tables_for_apply().
Definition at line 123 of file tablesync.c.
Referenced by FetchTableStates(), and invalidate_syncing_table_states().