47#define DEFAULT_NAPTIME_PER_CYCLE 180000L
146 sub->
oid = subform->oid;
147 sub->
dbid = subform->subdbid;
148 sub->
owner = subform->subowner;
149 sub->
enabled = subform->subenabled;
217 10L, WAIT_EVENT_BGWORKER_STARTUP);
251 (!only_running || w->
proc))
308 int nparallelapplyworkers;
324 (
errmsg_internal(
"starting logical replication worker for subscription \"%s\"",
330 (
errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
331 errmsg(
"cannot start logical replication workers when \"max_replication_slots\"=0")));
364 bool did_cleanup =
false;
379 "logical replication worker for subscription %u took too long to start; canceled",
408 if (is_parallel_apply_worker &&
423 (
errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
424 errmsg(
"out of logical replication worker slots"),
425 errhint(
"You might need to increase \"%s\".",
"max_logical_replication_workers")));
430 worker->
type = wtype;
437 worker->
subid = subid;
438 worker->
relid = relid;
439 worker->
relstate = SUBREL_STATE_UNKNOWN;
456 memset(&bgw, 0,
sizeof(bgw));
462 switch (worker->
type)
467 "logical replication apply worker for subscription %u",
475 "logical replication parallel apply worker for subscription %u",
485 "logical replication tablesync worker for subscription %u sync %u",
509 (
errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
510 errmsg(
"out of background worker slots"),
511 errhint(
"You might need to increase \"%s\".",
"max_worker_processes")));
549 10L, WAIT_EVENT_BGWORKER_STARTUP);
590 10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
716 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
717 errmsg(
"logical replication worker slot %d is empty, cannot attach",
725 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
726 errmsg(
"logical replication worker slot %d is already used by "
727 "another worker, cannot attach", slot)));
929 memset(&bgw, 0,
sizeof(bgw));
936 "logical replication launcher");
938 "logical replication launcher");
1154 "Logical Replication Launcher sublist",
1160 foreach(lc, sublist)
1193 if (last_start == 0 ||
1204 wait_time =
Min(wait_time,
1218 WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
1279#define PG_STAT_GET_SUBSCRIPTION_COLS 10
1305 worker_pid = worker.
proc->
pid;
1340 switch (worker.
type)
void pa_detach_all_error_mq(void)
bool InitializingApplyWorker
WalReceiverConn * LogRepWorkerWalRcvConn
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
TimestampTz GetCurrentTimestamp(void)
Datum now(PG_FUNCTION_ARGS)
void RegisterBackgroundWorker(BackgroundWorker *worker)
void BackgroundWorkerInitializeConnection(const char *dbname, const char *username, uint32 flags)
void BackgroundWorkerUnblockSignals(void)
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
#define BGW_NEVER_RESTART
@ BgWorkerStart_RecoveryFinished
#define BGWORKER_BACKEND_DATABASE_CONNECTION
#define BGWORKER_SHMEM_ACCESS
static Datum values[MAXATTR]
#define CStringGetTextDatum(s)
#define Assert(condition)
#define FLEXIBLE_ARRAY_MEMBER
#define OidIsValid(objectId)
#define TIMESTAMP_NOBEGIN(j)
static void PGresult * res
dsa_area * dsa_attach(dsa_handle handle)
void dsa_pin_mapping(dsa_area *area)
dsa_handle dsa_get_handle(dsa_area *area)
void dsa_pin(dsa_area *area)
#define DSA_HANDLE_INVALID
#define dsa_create(tranch_id)
bool dshash_delete_key(dshash_table *hash_table, const void *key)
void dshash_memcpy(void *dest, const void *src, size_t size, void *arg)
void dshash_release_lock(dshash_table *hash_table, void *entry)
void * dshash_find(dshash_table *hash_table, const void *key, bool exclusive)
dshash_table_handle dshash_get_hash_table_handle(dshash_table *hash_table)
dshash_table * dshash_attach(dsa_area *area, const dshash_parameters *params, dshash_table_handle handle, void *arg)
void * dshash_find_or_insert(dshash_table *hash_table, const void *key, bool *found)
dshash_hash dshash_memhash(const void *v, size_t size, void *arg)
dshash_table * dshash_create(dsa_area *area, const dshash_parameters *params, void *arg)
int dshash_memcmp(const void *a, const void *b, size_t size, void *arg)
#define DSHASH_HANDLE_INVALID
dsa_pointer dshash_table_handle
#define DSM_HANDLE_INVALID
int errmsg_internal(const char *fmt,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void FileSetDeleteAll(FileSet *fileset)
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
void ProcessConfigFile(GucContext context)
HeapTuple heap_getnext(TableScanDesc sscan, ScanDirection direction)
#define HeapTupleIsValid(tuple)
volatile sig_atomic_t ConfigReloadPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define WL_EXIT_ON_PM_DEATH
Datum pg_stat_get_subscription(PG_FUNCTION_ARGS)
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm)
#define DEFAULT_NAPTIME_PER_CYCLE
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
void AtEOXact_ApplyLauncher(bool isCommit)
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Size ApplyLauncherShmemSize(void)
bool IsLogicalLauncher(void)
void logicalrep_worker_attach(int slot)
static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
static void logicalrep_launcher_onexit(int code, Datum arg)
static dsa_area * last_start_times_dsa
void ApplyLauncherMain(Datum main_arg)
#define PG_STAT_GET_SUBSCRIPTION_COLS
int max_logical_replication_workers
void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
static int logicalrep_pa_worker_count(Oid subid)
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
static bool on_commit_launcher_wakeup
struct LogicalRepCtxStruct LogicalRepCtxStruct
static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid)
void logicalrep_worker_wakeup(Oid subid, Oid relid)
void ApplyLauncherShmemInit(void)
static void logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
static dshash_table * last_start_times
void logicalrep_worker_stop(Oid subid, Oid relid)
LogicalRepWorker * MyLogicalRepWorker
void ApplyLauncherWakeupAtCommit(void)
static const dshash_parameters dsh_params
static LogicalRepCtxStruct * LogicalRepCtx
static void logicalrep_worker_onexit(int code, Datum arg)
pid_t GetLeaderApplyWorkerPid(pid_t pid)
int max_sync_workers_per_subscription
static void logicalrep_worker_detach(void)
static bool WaitForReplicationWorkerAttach(LogicalRepWorker *worker, uint16 generation, BackgroundWorkerHandle *handle)
int logicalrep_sync_worker_count(Oid subid)
void ApplyLauncherForgetWorkerStartTime(Oid subid)
void ApplyLauncherRegister(void)
struct LauncherLastStartTimesEntry LauncherLastStartTimesEntry
static void ApplyLauncherWakeup(void)
static void logicalrep_launcher_attach_dshmem(void)
static List * get_subscription_list(void)
int max_parallel_apply_workers_per_subscription
static void logicalrep_worker_cleanup(LogicalRepWorker *worker)
List * lappend(List *list, void *datum)
void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
#define DEFAULT_LOCKMETHOD
bool LWLockHeldByMe(LWLock *lock)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
@ LWTRANCHE_LAUNCHER_HASH
char * pstrdup(const char *in)
void * palloc0(Size size)
MemoryContext TopMemoryContext
MemoryContext CurrentMemoryContext
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define CHECK_FOR_INTERRUPTS()
static Datum LSNGetDatum(XLogRecPtr X)
FormData_pg_subscription * Form_pg_subscription
pqsigfunc pqsignal(int signo, pqsigfunc func)
static Datum ObjectIdGetDatum(Oid X)
static Datum Int32GetDatum(int32 X)
bool IsBackendPid(int pid)
MemoryContextSwitchTo(old_ctx)
void shm_mq_detach(shm_mq_handle *mqh)
Size add_size(Size s1, Size s2)
Size mul_size(Size s1, Size s2)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
static pg_noinline void Size size
int max_replication_slots
#define SpinLockInit(lock)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
char bgw_function_name[BGW_MAXLEN]
char bgw_name[BGW_MAXLEN]
char bgw_type[BGW_MAXLEN]
BgWorkerStartTime bgw_start_time
char bgw_extra[BGW_EXTRALEN]
char bgw_library_name[MAXPGPATH]
TimestampTz last_start_time
dsa_handle last_start_dsa
dshash_table_handle last_start_dsh
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
TimestampTz last_recv_time
LogicalRepWorkerType type
TimestampTz last_send_time
shm_mq_handle * error_mq_handle
ParallelApplyWorkerShared * shared
int logicalrep_worker_slot_no
uint16 logicalrep_worker_generation
Tuplestorestate * setResult
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
TableScanDesc table_beginscan_catalog(Relation relation, int nkeys, struct ScanKeyData *key)
static void table_endscan(TableScanDesc scan)
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
static Datum TimestampTzGetDatum(TimestampTz X)
#define walrcv_disconnect(conn)
#define isParallelApplyWorker(worker)
@ WORKERTYPE_PARALLEL_APPLY
#define isTablesyncWorker(worker)
static bool am_leader_apply_worker(void)
void StartTransactionCommand(void)
void CommitTransactionCommand(void)
int wal_retrieve_retry_interval
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr