PostgreSQL Source Code
git master
|
#include "postgres.h"
#include "access/heapam.h"
#include "access/htup.h"
#include "access/htup_details.h"
#include "access/tableam.h"
#include "access/xact.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "funcapi.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
#include "postmaster/fork_process.h"
#include "postmaster/postmaster.h"
#include "replication/logicallauncher.h"
#include "replication/logicalworker.h"
#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/snapmgr.h"
#include "utils/timeout.h"
Go to the source code of this file.
Data Structures | |
struct | LogicalRepCtxStruct |
struct | LogicalRepWorkerId |
struct | StopWorkersData |
Macros | |
#define | DEFAULT_NAPTIME_PER_CYCLE 180000L |
#define | PG_STAT_GET_SUBSCRIPTION_COLS 8 |
Typedefs | |
typedef struct LogicalRepCtxStruct | LogicalRepCtxStruct |
typedef struct LogicalRepWorkerId | LogicalRepWorkerId |
typedef struct StopWorkersData | StopWorkersData |
Variables | |
int | max_logical_replication_workers = 4 |
int | max_sync_workers_per_subscription = 2 |
LogicalRepWorker * | MyLogicalRepWorker = NULL |
LogicalRepCtxStruct * | LogicalRepCtx |
static StopWorkersData * | on_commit_stop_workers = NULL |
static volatile sig_atomic_t | got_SIGHUP = false |
static bool | on_commit_launcher_wakeup = false |
#define DEFAULT_NAPTIME_PER_CYCLE 180000L |
Definition at line 51 of file launcher.c.
Referenced by ApplyLauncherMain().
#define PG_STAT_GET_SUBSCRIPTION_COLS 8 |
Referenced by pg_stat_get_subscription().
typedef struct LogicalRepCtxStruct LogicalRepCtxStruct |
typedef struct LogicalRepWorkerId LogicalRepWorkerId |
typedef struct StopWorkersData StopWorkersData |
void ApplyLauncherMain | ( | Datum | main_arg | ) |
Definition at line 962 of file launcher.c.
References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, BackgroundWorkerInitializeConnection(), BackgroundWorkerUnblockSignals(), before_shmem_exit(), CHECK_FOR_INTERRUPTS, Subscription::dbid, DEBUG1, DEFAULT_NAPTIME_PER_CYCLE, die, Subscription::enabled, ereport, errmsg(), get_subscription_list(), GetCurrentTimestamp(), got_SIGHUP, InvalidOid, LogicalRepCtxStruct::launcher_pid, lfirst, logicalrep_launcher_onexit(), logicalrep_launcher_sighup(), logicalrep_worker_find(), logicalrep_worker_launch(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MemoryContextDelete(), MemoryContextSwitchTo(), MyLatch, MyProcPid, Subscription::name, now(), Subscription::oid, Subscription::owner, PGC_SIGHUP, pqsignal(), ProcessConfigFile(), ResetLatch(), SIGHUP, TimestampDifferenceExceeds(), TopMemoryContext, WAIT_EVENT_LOGICAL_LAUNCHER_MAIN, WaitLatch(), wal_retrieve_retry_interval, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.
void ApplyLauncherRegister | ( | void | ) |
Definition at line 779 of file launcher.c.
References BackgroundWorker::bgw_flags, BackgroundWorker::bgw_function_name, BackgroundWorker::bgw_library_name, BackgroundWorker::bgw_main_arg, BGW_MAXLEN, BackgroundWorker::bgw_name, BackgroundWorker::bgw_notify_pid, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_start_time, BackgroundWorker::bgw_type, BGWORKER_BACKEND_DATABASE_CONNECTION, BGWORKER_SHMEM_ACCESS, BgWorkerStart_RecoveryFinished, max_logical_replication_workers, RegisterBackgroundWorker(), and snprintf.
Referenced by PostmasterMain().
void ApplyLauncherShmemInit | ( | void | ) |
Definition at line 808 of file launcher.c.
References ApplyLauncherShmemSize(), max_logical_replication_workers, LogicalRepWorker::relmutex, ShmemInitStruct(), SpinLockInit, and LogicalRepCtxStruct::workers.
Referenced by CreateSharedMemoryAndSemaphores().
Size ApplyLauncherShmemSize | ( | void | ) |
Definition at line 760 of file launcher.c.
References add_size(), max_logical_replication_workers, MAXALIGN, and mul_size().
Referenced by ApplyLauncherShmemInit(), and CreateSharedMemoryAndSemaphores().
|
static |
Definition at line 952 of file launcher.c.
References kill, LogicalRepCtxStruct::launcher_pid, and SIGUSR1.
Referenced by AtEOXact_ApplyLauncher(), and logicalrep_worker_onexit().
void ApplyLauncherWakeupAtCommit | ( | void | ) |
Definition at line 945 of file launcher.c.
References on_commit_launcher_wakeup.
Referenced by AlterSubscription(), and CreateSubscription().
void AtEOSubXact_ApplyLauncher | ( | bool | isCommit, |
int | nestDepth | ||
) |
Definition at line 890 of file launcher.c.
References Assert, list_concat(), list_free_deep(), StopWorkersData::nestDepth, StopWorkersData::parent, pfree(), and StopWorkersData::workers.
Referenced by AbortSubTransaction(), and CommitSubTransaction().
void AtEOXact_ApplyLauncher | ( | bool | isCommit | ) |
Definition at line 848 of file launcher.c.
References ApplyLauncherWakeup(), Assert, lfirst, logicalrep_worker_stop(), StopWorkersData::nestDepth, on_commit_launcher_wakeup, StopWorkersData::parent, LogicalRepWorkerId::relid, LogicalRepWorkerId::subid, LogicalRepCtxStruct::workers, and StopWorkersData::workers.
Referenced by AbortTransaction(), and CommitTransaction().
|
static |
Definition at line 110 of file launcher.c.
References AccessShareLock, CommitTransactionCommand(), CurrentMemoryContext, Subscription::dbid, Subscription::enabled, ForwardScanDirection, GETSTRUCT, GetTransactionSnapshot(), heap_getnext(), HeapTupleIsValid, lappend(), MemoryContextSwitchTo(), Subscription::name, NameStr, NIL, Subscription::oid, Subscription::owner, palloc0(), pstrdup(), StartTransactionCommand(), table_beginscan_catalog(), table_close(), table_endscan(), and table_open().
Referenced by ApplyLauncherMain().
bool IsLogicalLauncher | ( | void | ) |
Definition at line 1078 of file launcher.c.
References LogicalRepCtxStruct::launcher_pid, and MyProcPid.
Referenced by ProcessInterrupts().
|
static |
Definition at line 695 of file launcher.c.
References LogicalRepCtxStruct::launcher_pid.
Referenced by ApplyLauncherMain().
|
static |
Definition at line 719 of file launcher.c.
References got_SIGHUP, MyLatch, and SetLatch().
Referenced by ApplyLauncherMain().
int logicalrep_sync_worker_count | ( | Oid | subid | ) |
Definition at line 736 of file launcher.c.
References Assert, i, LWLockHeldByMe(), max_logical_replication_workers, OidIsValid, LogicalRepWorker::relid, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.
Referenced by logicalrep_worker_launch(), and process_syncing_tables_for_apply().
void logicalrep_worker_attach | ( | int | slot | ) |
Definition at line 627 of file launcher.c.
References Assert, before_shmem_exit(), ereport, errcode(), errmsg(), ERROR, LogicalRepWorker::in_use, logicalrep_worker_onexit(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, MyProc, LogicalRepWorker::proc, and LogicalRepCtxStruct::workers.
Referenced by ApplyWorkerMain().
|
static |
Definition at line 677 of file launcher.c.
References Assert, LogicalRepWorker::dbid, LogicalRepWorker::in_use, InvalidOid, LW_EXCLUSIVE, LWLockHeldByMeInMode(), LogicalRepWorker::proc, LogicalRepWorker::relid, LogicalRepWorker::subid, and LogicalRepWorker::userid.
Referenced by logicalrep_worker_detach(), logicalrep_worker_launch(), and WaitForReplicationWorkerAttach().
|
static |
Definition at line 663 of file launcher.c.
References logicalrep_worker_cleanup(), LW_EXCLUSIVE, LWLockAcquire(), and LWLockRelease().
Referenced by logicalrep_worker_onexit().
LogicalRepWorker* logicalrep_worker_find | ( | Oid | subid, |
Oid | relid, | ||
bool | only_running | ||
) |
Definition at line 233 of file launcher.c.
References Assert, i, LogicalRepWorker::in_use, LWLockHeldByMe(), max_logical_replication_workers, LogicalRepWorker::proc, LogicalRepWorker::relid, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.
Referenced by ApplyLauncherMain(), logicalrep_worker_stop(), logicalrep_worker_wakeup(), process_syncing_tables_for_apply(), wait_for_relation_state_change(), and wait_for_worker_state_change().
Definition at line 284 of file launcher.c.
References Assert, BackgroundWorker::bgw_flags, BackgroundWorker::bgw_function_name, BackgroundWorker::bgw_library_name, BackgroundWorker::bgw_main_arg, BGW_MAXLEN, BackgroundWorker::bgw_name, BGW_NEVER_RESTART, BackgroundWorker::bgw_notify_pid, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_start_time, BackgroundWorker::bgw_type, BGWORKER_BACKEND_DATABASE_CONNECTION, BGWORKER_SHMEM_ACCESS, BgWorkerStart_RecoveryFinished, LogicalRepWorker::dbid, DEBUG1, elog, ereport, errcode(), errhint(), errmsg(), ERROR, LogicalRepWorker::generation, GetCurrentTimestamp(), i, LogicalRepWorker::in_use, Int32GetDatum, InvalidXLogRecPtr, LogicalRepWorker::last_lsn, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, LogicalRepWorker::launch_time, logicalrep_sync_worker_count(), logicalrep_worker_cleanup(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, max_replication_slots, max_sync_workers_per_subscription, MyProcPid, now(), OidIsValid, LogicalRepWorker::proc, RegisterDynamicBackgroundWorker(), LogicalRepWorker::relid, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, LogicalRepWorker::reply_lsn, LogicalRepWorker::reply_time, snprintf, LogicalRepWorker::subid, TIMESTAMP_NOBEGIN, TimestampDifferenceExceeds(), LogicalRepWorker::userid, WaitForReplicationWorkerAttach(), wal_receiver_timeout, WARNING, and LogicalRepCtxStruct::workers.
Referenced by ApplyLauncherMain(), and process_syncing_tables_for_apply().
|
static |
Definition at line 706 of file launcher.c.
References ApplyLauncherWakeup(), logicalrep_worker_detach(), walrcv_disconnect, and wrconn.
Referenced by logicalrep_worker_attach().
Definition at line 454 of file launcher.c.
References CHECK_FOR_INTERRUPTS, LogicalRepWorker::generation, LogicalRepWorker::in_use, kill, logicalrep_worker_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLatch, PGPROC::pid, LogicalRepWorker::proc, ResetLatch(), WAIT_EVENT_BGWORKER_SHUTDOWN, WAIT_EVENT_BGWORKER_STARTUP, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.
Referenced by AtEOXact_ApplyLauncher(), and DropSubscription().
Definition at line 551 of file launcher.c.
References Assert, GetCurrentTransactionNestLevel(), lappend(), MemoryContextSwitchTo(), StopWorkersData::nestDepth, NIL, on_commit_stop_workers, palloc(), StopWorkersData::parent, LogicalRepWorkerId::relid, LogicalRepWorkerId::subid, TopTransactionContext, and StopWorkersData::workers.
Referenced by AlterSubscription_refresh().
Definition at line 596 of file launcher.c.
References logicalrep_worker_find(), logicalrep_worker_wakeup_ptr(), LW_SHARED, LWLockAcquire(), and LWLockRelease().
Referenced by pg_attribute_noreturn().
void logicalrep_worker_wakeup_ptr | ( | LogicalRepWorker * | worker | ) |
Definition at line 616 of file launcher.c.
References Assert, LWLockHeldByMe(), LogicalRepWorker::proc, PGPROC::procLatch, and SetLatch().
Referenced by logicalrep_worker_wakeup(), process_syncing_tables_for_apply(), and wait_for_worker_state_change().
Definition at line 261 of file launcher.c.
References Assert, i, LogicalRepWorker::in_use, lappend(), LWLockHeldByMe(), max_logical_replication_workers, NIL, LogicalRepWorker::proc, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.
Referenced by DropSubscription().
Datum pg_stat_get_subscription | ( | PG_FUNCTION_ARGS | ) |
Definition at line 1087 of file launcher.c.
References ReturnSetInfo::allowedModes, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, get_call_result_type(), i, Int32GetDatum, InvalidOid, IsA, IsBackendPid(), LogicalRepWorker::last_lsn, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, LSNGetDatum, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, MemoryContextSwitchTo(), MemSet, ObjectIdGetDatum, OidIsValid, PG_ARGISNULL, PG_GETARG_OID, PG_STAT_GET_SUBSCRIPTION_COLS, PGPROC::pid, LogicalRepWorker::proc, LogicalRepWorker::relid, LogicalRepWorker::reply_lsn, LogicalRepWorker::reply_time, ReturnSetInfo::returnMode, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, LogicalRepWorker::subid, TimestampTzGetDatum, tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), TYPEFUNC_COMPOSITE, values, work_mem, LogicalRepCtxStruct::workers, and XLogRecPtrIsInvalid.
|
static |
Definition at line 175 of file launcher.c.
References BGWH_STOPPED, CHECK_FOR_INTERRUPTS, LogicalRepWorker::generation, GetBackgroundWorkerPid(), LogicalRepWorker::in_use, logicalrep_worker_cleanup(), LW_EXCLUSIVE, LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLatch, LogicalRepWorker::proc, ResetLatch(), status(), WAIT_EVENT_BGWORKER_STARTUP, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.
Referenced by logicalrep_worker_launch().
bool XactManipulatesLogicalReplicationWorkers | ( | void | ) |
|
static |
Definition at line 96 of file launcher.c.
Referenced by ApplyLauncherMain(), and logicalrep_launcher_sighup().
LogicalRepCtxStruct* LogicalRepCtx |
Definition at line 67 of file launcher.c.
int max_logical_replication_workers = 4 |
Definition at line 53 of file launcher.c.
Referenced by ApplyLauncherRegister(), ApplyLauncherShmemInit(), ApplyLauncherShmemSize(), logicalrep_sync_worker_count(), logicalrep_worker_attach(), logicalrep_worker_find(), logicalrep_worker_launch(), logicalrep_workers_find(), and pg_stat_get_subscription().
int max_sync_workers_per_subscription = 2 |
Definition at line 54 of file launcher.c.
Referenced by logicalrep_worker_launch(), and process_syncing_tables_for_apply().
LogicalRepWorker* MyLogicalRepWorker = NULL |
Definition at line 56 of file launcher.c.
Referenced by ApplyWorkerMain(), IsLogicalWorker(), LogicalRepSyncTableStart(), maybe_reread_subscription(), pg_attribute_noreturn(), process_syncing_tables_for_apply(), process_syncing_tables_for_sync(), should_apply_changes_for_rel(), UpdateWorkerStats(), wait_for_relation_state_change(), and wait_for_worker_state_change().
Definition at line 98 of file launcher.c.
Referenced by ApplyLauncherWakeupAtCommit(), and AtEOXact_ApplyLauncher().
|
static |
Definition at line 87 of file launcher.c.
Referenced by logicalrep_worker_stop_at_commit().