53 #define DEFAULT_NAPTIME_PER_CYCLE 180000L
130 sub->
oid = subform->oid;
131 sub->
dbid = subform->subdbid;
132 sub->
owner = subform->subowner;
133 sub->
enabled = subform->subenabled;
227 (!only_running || w->
proc))
278 (
errmsg_internal(
"starting logical replication worker for subscription \"%s\"",
284 (
errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
285 errmsg(
"cannot start logical replication workers when max_replication_slots = 0")));
318 bool did_cleanup =
false;
333 "logical replication worker for subscription %u took too long to start; canceled",
364 (
errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
365 errmsg(
"out of logical replication worker slots"),
366 errhint(
"You might need to increase max_logical_replication_workers.")));
377 worker->
subid = subid;
378 worker->
relid = relid;
379 worker->
relstate = SUBREL_STATE_UNKNOWN;
394 memset(&bgw, 0,
sizeof(bgw));
402 "logical replication worker for subscription %u sync %u", subid, relid);
405 "logical replication worker for subscription %u", subid);
421 (
errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
422 errmsg(
"out of background worker slots"),
423 errhint(
"You might need to increase max_worker_processes.")));
576 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
577 errmsg(
"logical replication worker slot %d is empty, cannot attach",
585 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
586 errmsg(
"logical replication worker slot %d is already used by "
587 "another worker, cannot attach", slot)));
713 memset(&bgw, 0,
sizeof(bgw));
720 "logical replication launcher");
722 "logical replication launcher");
845 "Logical Replication Launcher sublist",
867 last_start_time =
now;
928 #define PG_STAT_GET_SUBSCRIPTION_COLS 8
957 MemSet(nulls, 0,
sizeof(nulls));
WalReceiverConn * LogRepWorkerWalRcvConn
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
TimestampTz GetCurrentTimestamp(void)
Datum now(PG_FUNCTION_ARGS)
void RegisterBackgroundWorker(BackgroundWorker *worker)
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
#define BGW_NEVER_RESTART
@ BgWorkerStart_RecoveryFinished
#define BGWORKER_BACKEND_DATABASE_CONNECTION
#define BGWORKER_SHMEM_ACCESS
static Datum values[MAXATTR]
#define FLEXIBLE_ARRAY_MEMBER
#define MemSet(start, val, len)
#define OidIsValid(objectId)
#define TIMESTAMP_NOBEGIN(j)
static void PGresult * res
int errmsg_internal(const char *fmt,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void FileSetDeleteAll(FileSet *fileset)
void SetSingleFuncCall(FunctionCallInfo fcinfo, bits32 flags)
void ProcessConfigFile(GucContext context)
HeapTuple heap_getnext(TableScanDesc sscan, ScanDirection direction)
#define HeapTupleIsValid(tuple)
volatile sig_atomic_t ConfigReloadPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define WL_EXIT_ON_PM_DEATH
Datum pg_stat_get_subscription(PG_FUNCTION_ARGS)
#define DEFAULT_NAPTIME_PER_CYCLE
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
void AtEOXact_ApplyLauncher(bool isCommit)
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Size ApplyLauncherShmemSize(void)
bool IsLogicalLauncher(void)
static void WaitForReplicationWorkerAttach(LogicalRepWorker *worker, uint16 generation, BackgroundWorkerHandle *handle)
void logicalrep_worker_attach(int slot)
static List * get_subscription_list(void)
static void logicalrep_launcher_onexit(int code, Datum arg)
void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid)
void ApplyLauncherMain(Datum main_arg)
#define PG_STAT_GET_SUBSCRIPTION_COLS
int max_logical_replication_workers
List * logicalrep_workers_find(Oid subid, bool only_running)
static bool on_commit_launcher_wakeup
struct LogicalRepCtxStruct LogicalRepCtxStruct
void logicalrep_worker_wakeup(Oid subid, Oid relid)
void ApplyLauncherShmemInit(void)
void logicalrep_worker_stop(Oid subid, Oid relid)
LogicalRepWorker * MyLogicalRepWorker
void ApplyLauncherWakeupAtCommit(void)
static LogicalRepCtxStruct * LogicalRepCtx
static void logicalrep_worker_onexit(int code, Datum arg)
int max_sync_workers_per_subscription
static void logicalrep_worker_detach(void)
int logicalrep_sync_worker_count(Oid subid)
void ApplyLauncherRegister(void)
static void ApplyLauncherWakeup(void)
static void logicalrep_worker_cleanup(LogicalRepWorker *worker)
Assert(fmt[strlen(fmt) - 1] !='\n')
List * lappend(List *list, void *datum)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
bool LWLockHeldByMeInMode(LWLock *l, LWLockMode mode)
bool LWLockHeldByMe(LWLock *l)
char * pstrdup(const char *in)
MemoryContext TopMemoryContext
void * palloc0(Size size)
MemoryContext CurrentMemoryContext
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define CHECK_FOR_INTERRUPTS()
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
static void static void status(const char *fmt,...) pg_attribute_printf(1
FormData_pg_subscription * Form_pg_subscription
#define ObjectIdGetDatum(X)
void BackgroundWorkerInitializeConnection(const char *dbname, const char *username, uint32 flags)
void BackgroundWorkerUnblockSignals(void)
bool IsBackendPid(int pid)
Size add_size(Size s1, Size s2)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Size mul_size(Size s1, Size s2)
pqsigfunc pqsignal(int signum, pqsigfunc handler)
int max_replication_slots
Snapshot GetTransactionSnapshot(void)
#define SpinLockInit(lock)
char bgw_function_name[BGW_MAXLEN]
char bgw_name[BGW_MAXLEN]
char bgw_type[BGW_MAXLEN]
BgWorkerStartTime bgw_start_time
char bgw_library_name[BGW_MAXLEN]
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
TimestampTz last_recv_time
TimestampTz last_send_time
Tuplestorestate * setResult
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
TableScanDesc table_beginscan_catalog(Relation relation, int nkeys, struct ScanKeyData *key)
static void table_endscan(TableScanDesc scan)
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, Datum *values, bool *isnull)
#define TimestampTzGetDatum(X)
@ WAIT_EVENT_LOGICAL_LAUNCHER_MAIN
@ WAIT_EVENT_BGWORKER_STARTUP
@ WAIT_EVENT_BGWORKER_SHUTDOWN
#define walrcv_disconnect(conn)
void StartTransactionCommand(void)
void CommitTransactionCommand(void)
int wal_retrieve_retry_interval
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr