52 #define DEFAULT_NAPTIME_PER_CYCLE 180000L 151 sub->
oid = subform->oid;
152 sub->
dbid = subform->subdbid;
153 sub->
owner = subform->subowner;
154 sub->
enabled = subform->subenabled;
248 (!only_running || w->
proc))
299 (
errmsg(
"starting logical replication worker for subscription \"%s\"",
305 (
errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
306 errmsg(
"cannot start logical replication workers when max_replication_slots = 0")));
339 bool did_cleanup =
false;
354 "logical replication worker for subscription %u took too long to start; canceled",
385 (
errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
386 errmsg(
"out of logical replication worker slots"),
387 errhint(
"You might need to increase max_logical_replication_workers.")));
398 worker->
subid = subid;
399 worker->
relid = relid;
400 worker->
relstate = SUBREL_STATE_UNKNOWN;
414 memset(&bgw, 0,
sizeof(bgw));
422 "logical replication worker for subscription %u sync %u", subid, relid);
425 "logical replication worker for subscription %u", subid);
441 (
errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
442 errmsg(
"out of background worker slots"),
443 errhint(
"You might need to increase max_worker_processes.")));
563 Assert(on_commit_stop_workers == NULL ||
564 nestDepth >= on_commit_stop_workers->
nestDepth);
570 if (on_commit_stop_workers == NULL ||
571 nestDepth > on_commit_stop_workers->
nestDepth)
578 on_commit_stop_workers = newdata;
588 on_commit_stop_workers->
workers =
635 MyLogicalRepWorker = &LogicalRepCtx->
workers[slot];
637 if (!MyLogicalRepWorker->
in_use)
641 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
642 errmsg(
"logical replication worker slot %d is empty, cannot attach",
646 if (MyLogicalRepWorker->
proc)
650 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
651 errmsg(
"logical replication worker slot %d is already used by " 652 "another worker, cannot attach", slot)));
774 memset(&bgw, 0,
sizeof(bgw));
781 "logical replication launcher");
783 "logical replication launcher");
829 return (on_commit_stop_workers != NULL);
839 Assert(on_commit_stop_workers == NULL ||
840 (on_commit_stop_workers->
nestDepth == 1 &&
841 on_commit_stop_workers->
parent == NULL));
847 if (on_commit_stop_workers != NULL)
867 on_commit_stop_workers = NULL;
883 if (on_commit_stop_workers == NULL ||
884 on_commit_stop_workers->
nestDepth < nestDepth)
889 parent = on_commit_stop_workers->
parent;
899 if (!parent || parent->
nestDepth < nestDepth - 1)
921 pfree(on_commit_stop_workers);
922 on_commit_stop_workers =
parent;
955 (
errmsg(
"logical replication launcher started")));
994 "Logical Replication Launcher sublist",
1002 foreach(lc, sublist)
1016 last_start_time =
now;
1077 #define PG_STAT_GET_SUBSCRIPTION_COLS 8 1089 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1090 errmsg(
"set-valued function called in context that cannot accept a set")));
1093 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1094 errmsg(
"materialize mode required, but it is not allowed in this context")));
1098 elog(
ERROR,
"return type must be a row type");
1121 memcpy(&worker, &LogicalRepCtx->
workers[i],
1129 worker_pid = worker.
proc->
pid;
1131 MemSet(values, 0,
sizeof(values));
1132 MemSet(nulls, 0,
sizeof(nulls));
void AtEOXact_ApplyLauncher(bool isCommit)
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
#define InvalidXLogRecPtr
#define IsA(nodeptr, _type_)
void RegisterBackgroundWorker(BackgroundWorker *worker)
List * logicalrep_workers_find(Oid subid, bool only_running)
void MemoryContextDelete(MemoryContext context)
bool LWLockHeldByMeInMode(LWLock *l, LWLockMode mode)
#define AllocSetContextCreate
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
void table_close(Relation relation, LOCKMODE lockmode)
int errhint(const char *fmt,...)
MemoryContext TopTransactionContext
void ProcessConfigFile(GucContext context)
void ApplyLauncherMain(Datum main_arg)
TableScanDesc table_beginscan_catalog(Relation relation, int nkeys, struct ScanKeyData *key)
bool LWLockHeldByMe(LWLock *l)
TimestampTz GetCurrentTimestamp(void)
void logicalrep_worker_wakeup(Oid subid, Oid relid)
void SignalHandlerForConfigReload(SIGNAL_ARGS)
char * pstrdup(const char *in)
void CommitTransactionCommand(void)
#define SpinLockInit(lock)
#define tuplestore_donestoring(state)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
TimestampTz last_send_time
#define FLEXIBLE_ARRAY_MEMBER
List * list_concat(List *list1, const List *list2)
int errcode(int sqlerrcode)
static StopWorkersData * on_commit_stop_workers
#define MemSet(start, val, len)
FormData_pg_subscription * Form_pg_subscription
void SetLatch(Latch *latch)
#define BGWORKER_SHMEM_ACCESS
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Snapshot GetTransactionSnapshot(void)
#define OidIsValid(objectId)
void list_free_deep(List *list)
char bgw_function_name[BGW_MAXLEN]
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
static void logicalrep_worker_detach(void)
void LWLockRelease(LWLock *lock)
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
void pfree(void *pointer)
LogicalRepWorker * MyLogicalRepWorker
#define ObjectIdGetDatum(X)
int max_sync_workers_per_subscription
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
#define TimestampTzGetDatum(X)
#define ALLOCSET_DEFAULT_SIZES
static void logicalrep_worker_cleanup(LogicalRepWorker *worker)
void logicalrep_worker_attach(int slot)
HeapTuple heap_getnext(TableScanDesc sscan, ScanDirection direction)
void logicalrep_worker_stop(Oid subid, Oid relid)
void AtEOSubXact_ApplyLauncher(bool isCommit, int nestDepth)
Size ApplyLauncherShmemSize(void)
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
#define BGW_NEVER_RESTART
static void logicalrep_launcher_onexit(int code, Datum arg)
#define TIMESTAMP_NOBEGIN(j)
MemoryContext CurrentMemoryContext
static bool on_commit_launcher_wakeup
MemoryContext TopMemoryContext
List * lappend(List *list, void *datum)
#define XLogRecPtrIsInvalid(r)
int wal_retrieve_retry_interval
Tuplestorestate * tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
void BackgroundWorkerInitializeConnection(const char *dbname, const char *username, uint32 flags)
Size mul_size(Size s1, Size s2)
static void logicalrep_worker_onexit(int code, Datum arg)
void * palloc0(Size size)
#define DEFAULT_NAPTIME_PER_CYCLE
void ApplyLauncherRegister(void)
#define PG_STAT_GET_SUBSCRIPTION_COLS
Size add_size(Size s1, Size s2)
int logicalrep_sync_worker_count(Oid subid)
static void WaitForReplicationWorkerAttach(LogicalRepWorker *worker, uint16 generation, BackgroundWorkerHandle *handle)
#define ereport(elevel,...)
pqsigfunc pqsignal(int signum, pqsigfunc handler)
static List * get_subscription_list(void)
int GetCurrentTransactionNestLevel(void)
struct StopWorkersData StopWorkersData
SetFunctionReturnMode returnMode
int max_replication_slots
TimestampTz last_recv_time
#define HeapTupleIsValid(tuple)
void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid)
char bgw_name[BGW_MAXLEN]
#define Assert(condition)
#define BGWORKER_BACKEND_DATABASE_CONNECTION
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
void StartTransactionCommand(void)
int max_logical_replication_workers
BgWorkerStartTime bgw_start_time
struct StopWorkersData * parent
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void ApplyLauncherShmemInit(void)
#define walrcv_disconnect(conn)
MemoryContext ecxt_per_query_memory
struct LogicalRepCtxStruct LogicalRepCtxStruct
Tuplestorestate * setResult
static void table_endscan(TableScanDesc scan)
static Datum values[MAXATTR]
char bgw_type[BGW_MAXLEN]
int errmsg(const char *fmt,...)
bool IsLogicalLauncher(void)
LogicalRepCtxStruct * LogicalRepCtx
volatile sig_atomic_t ConfigReloadPending
bool IsBackendPid(int pid)
#define CHECK_FOR_INTERRUPTS()
Datum pg_stat_get_subscription(PG_FUNCTION_ARGS)
void logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
static void static void status(const char *fmt,...) pg_attribute_printf(1
static void ApplyLauncherWakeup(void)
void ApplyLauncherWakeupAtCommit(void)
bool XactManipulatesLogicalReplicationWorkers(void)
Relation table_open(Oid relationId, LOCKMODE lockmode)
char bgw_library_name[BGW_MAXLEN]
Datum now(PG_FUNCTION_ARGS)
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
#define WL_EXIT_ON_PM_DEATH
struct LogicalRepWorkerId LogicalRepWorkerId
void BackgroundWorkerUnblockSignals(void)