PostgreSQL Source Code git master
launcher.c File Reference
#include "postgres.h"
#include "access/heapam.h"
#include "access/htup.h"
#include "access/htup_details.h"
#include "access/tableam.h"
#include "access/xact.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "funcapi.h"
#include "lib/dshash.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "replication/logicallauncher.h"
#include "replication/origin.h"
#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
Include dependency graph for launcher.c:

Go to the source code of this file.

Data Structures

struct  LogicalRepCtxStruct
 
struct  LauncherLastStartTimesEntry
 

Macros

#define DEFAULT_NAPTIME_PER_CYCLE   180000L
 
#define PG_STAT_GET_SUBSCRIPTION_COLS   10
 

Typedefs

typedef struct LogicalRepCtxStruct LogicalRepCtxStruct
 
typedef struct LauncherLastStartTimesEntry LauncherLastStartTimesEntry
 

Functions

static void logicalrep_launcher_onexit (int code, Datum arg)
 
static void logicalrep_worker_onexit (int code, Datum arg)
 
static void logicalrep_worker_detach (void)
 
static void logicalrep_worker_cleanup (LogicalRepWorker *worker)
 
static int logicalrep_pa_worker_count (Oid subid)
 
static void logicalrep_launcher_attach_dshmem (void)
 
static void ApplyLauncherSetWorkerStartTime (Oid subid, TimestampTz start_time)
 
static TimestampTz ApplyLauncherGetWorkerStartTime (Oid subid)
 
static void compute_min_nonremovable_xid (LogicalRepWorker *worker, TransactionId *xmin)
 
static bool acquire_conflict_slot_if_exists (void)
 
static void update_conflict_slot_xmin (TransactionId new_xmin)
 
static void init_conflict_slot_xmin (void)
 
static Listget_subscription_list (void)
 
static bool WaitForReplicationWorkerAttach (LogicalRepWorker *worker, uint16 generation, BackgroundWorkerHandle *handle)
 
LogicalRepWorkerlogicalrep_worker_find (LogicalRepWorkerType wtype, Oid subid, Oid relid, bool only_running)
 
Listlogicalrep_workers_find (Oid subid, bool only_running, bool acquire_lock)
 
bool logicalrep_worker_launch (LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples)
 
static void logicalrep_worker_stop_internal (LogicalRepWorker *worker, int signo)
 
void logicalrep_worker_stop (LogicalRepWorkerType wtype, Oid subid, Oid relid)
 
void logicalrep_pa_worker_stop (ParallelApplyWorkerInfo *winfo)
 
void logicalrep_worker_wakeup (LogicalRepWorkerType wtype, Oid subid, Oid relid)
 
void logicalrep_worker_wakeup_ptr (LogicalRepWorker *worker)
 
void logicalrep_worker_attach (int slot)
 
void logicalrep_reset_seqsync_start_time (void)
 
int logicalrep_sync_worker_count (Oid subid)
 
Size ApplyLauncherShmemSize (void)
 
void ApplyLauncherRegister (void)
 
void ApplyLauncherShmemInit (void)
 
void ApplyLauncherForgetWorkerStartTime (Oid subid)
 
void AtEOXact_ApplyLauncher (bool isCommit)
 
void ApplyLauncherWakeupAtCommit (void)
 
void ApplyLauncherWakeup (void)
 
void ApplyLauncherMain (Datum main_arg)
 
void CreateConflictDetectionSlot (void)
 
bool IsLogicalLauncher (void)
 
pid_t GetLeaderApplyWorkerPid (pid_t pid)
 
Datum pg_stat_get_subscription (PG_FUNCTION_ARGS)
 

Variables

int max_logical_replication_workers = 4
 
int max_sync_workers_per_subscription = 2
 
int max_parallel_apply_workers_per_subscription = 2
 
LogicalRepWorkerMyLogicalRepWorker = NULL
 
static LogicalRepCtxStructLogicalRepCtx
 
static const dshash_parameters dsh_params
 
static dsa_arealast_start_times_dsa = NULL
 
static dshash_tablelast_start_times = NULL
 
static bool on_commit_launcher_wakeup = false
 

Macro Definition Documentation

◆ DEFAULT_NAPTIME_PER_CYCLE

#define DEFAULT_NAPTIME_PER_CYCLE   180000L

Definition at line 49 of file launcher.c.

◆ PG_STAT_GET_SUBSCRIPTION_COLS

#define PG_STAT_GET_SUBSCRIPTION_COLS   10

Typedef Documentation

◆ LauncherLastStartTimesEntry

◆ LogicalRepCtxStruct

Function Documentation

◆ acquire_conflict_slot_if_exists()

static bool acquire_conflict_slot_if_exists ( void  )
static

Definition at line 1485 of file launcher.c.

1486{
1488 return false;
1489
1491 return true;
1492}
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
Definition: slot.c:626
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:546
#define CONFLICT_DETECTION_SLOT
Definition: slot.h:28

References CONFLICT_DETECTION_SLOT, ReplicationSlotAcquire(), and SearchNamedReplicationSlot().

Referenced by ApplyLauncherMain().

◆ ApplyLauncherForgetWorkerStartTime()

void ApplyLauncherForgetWorkerStartTime ( Oid  subid)

Definition at line 1154 of file launcher.c.

1155{
1157
1158 (void) dshash_delete_key(last_start_times, &subid);
1159}
bool dshash_delete_key(dshash_table *hash_table, const void *key)
Definition: dshash.c:505
static dshash_table * last_start_times
Definition: launcher.c:91
static void logicalrep_launcher_attach_dshmem(void)
Definition: launcher.c:1068

References dshash_delete_key(), last_start_times, and logicalrep_launcher_attach_dshmem().

Referenced by apply_worker_exit(), DisableSubscriptionAndExit(), DropSubscription(), InitializeLogRepWorker(), maybe_reread_subscription(), and ProcessSyncingTablesForApply().

◆ ApplyLauncherGetWorkerStartTime()

static TimestampTz ApplyLauncherGetWorkerStartTime ( Oid  subid)
static

Definition at line 1128 of file launcher.c.

1129{
1131 TimestampTz ret;
1132
1134
1135 entry = dshash_find(last_start_times, &subid, false);
1136 if (entry == NULL)
1137 return 0;
1138
1139 ret = entry->last_start_time;
1141
1142 return ret;
1143}
int64 TimestampTz
Definition: timestamp.h:39
void dshash_release_lock(dshash_table *hash_table, void *entry)
Definition: dshash.c:560
void * dshash_find(dshash_table *hash_table, const void *key, bool exclusive)
Definition: dshash.c:392
TimestampTz last_start_time
Definition: launcher.c:77

References dshash_find(), dshash_release_lock(), LauncherLastStartTimesEntry::last_start_time, last_start_times, and logicalrep_launcher_attach_dshmem().

Referenced by ApplyLauncherMain().

◆ ApplyLauncherMain()

void ApplyLauncherMain ( Datum  main_arg)

Definition at line 1204 of file launcher.c.

1205{
1207 (errmsg_internal("logical replication launcher started")));
1208
1210
1213
1214 /* Establish signal handlers. */
1216 pqsignal(SIGTERM, die);
1218
1219 /*
1220 * Establish connection to nailed catalogs (we only ever access
1221 * pg_subscription).
1222 */
1224
1225 /*
1226 * Acquire the conflict detection slot at startup to ensure it can be
1227 * dropped if no longer needed after a restart.
1228 */
1230
1231 /* Enter main loop */
1232 for (;;)
1233 {
1234 int rc;
1235 List *sublist;
1236 ListCell *lc;
1237 MemoryContext subctx;
1238 MemoryContext oldctx;
1239 long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
1240 bool can_update_xmin = true;
1241 bool retain_dead_tuples = false;
1243
1245
1246 /* Use temporary context to avoid leaking memory across cycles. */
1248 "Logical Replication Launcher sublist",
1250 oldctx = MemoryContextSwitchTo(subctx);
1251
1252 /*
1253 * Start any missing workers for enabled subscriptions.
1254 *
1255 * Also, during the iteration through all subscriptions, we compute
1256 * the minimum XID required to protect deleted tuples for conflict
1257 * detection if one of the subscription enables retain_dead_tuples
1258 * option.
1259 */
1260 sublist = get_subscription_list();
1261 foreach(lc, sublist)
1262 {
1263 Subscription *sub = (Subscription *) lfirst(lc);
1265 TimestampTz last_start;
1267 long elapsed;
1268
1269 if (sub->retaindeadtuples)
1270 {
1271 retain_dead_tuples = true;
1272
1273 /*
1274 * Create a replication slot to retain information necessary
1275 * for conflict detection such as dead tuples, commit
1276 * timestamps, and origins.
1277 *
1278 * The slot is created before starting the apply worker to
1279 * prevent it from unnecessarily maintaining its
1280 * oldest_nonremovable_xid.
1281 *
1282 * The slot is created even for a disabled subscription to
1283 * ensure that conflict-related information is available when
1284 * applying remote changes that occurred before the
1285 * subscription was enabled.
1286 */
1288
1289 if (sub->retentionactive)
1290 {
1291 /*
1292 * Can't advance xmin of the slot unless all the
1293 * subscriptions actively retaining dead tuples are
1294 * enabled. This is required to ensure that we don't
1295 * advance the xmin of CONFLICT_DETECTION_SLOT if one of
1296 * the subscriptions is not enabled. Otherwise, we won't
1297 * be able to detect conflicts reliably for such a
1298 * subscription even though it has set the
1299 * retain_dead_tuples option.
1300 */
1301 can_update_xmin &= sub->enabled;
1302
1303 /*
1304 * Initialize the slot once the subscription activates
1305 * retention.
1306 */
1309 }
1310 }
1311
1312 if (!sub->enabled)
1313 continue;
1314
1315 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1317 false);
1318
1319 if (w != NULL)
1320 {
1321 /*
1322 * Compute the minimum xmin required to protect dead tuples
1323 * required for conflict detection among all running apply
1324 * workers. This computation is performed while holding
1325 * LogicalRepWorkerLock to prevent accessing invalid worker
1326 * data, in scenarios where a worker might exit and reset its
1327 * state concurrently.
1328 */
1329 if (sub->retaindeadtuples &&
1330 sub->retentionactive &&
1331 can_update_xmin)
1333
1334 LWLockRelease(LogicalRepWorkerLock);
1335
1336 /* worker is running already */
1337 continue;
1338 }
1339
1340 LWLockRelease(LogicalRepWorkerLock);
1341
1342 /*
1343 * Can't advance xmin of the slot unless all the workers
1344 * corresponding to subscriptions actively retaining dead tuples
1345 * are running, disabling the further computation of the minimum
1346 * nonremovable xid.
1347 */
1348 if (sub->retaindeadtuples && sub->retentionactive)
1349 can_update_xmin = false;
1350
1351 /*
1352 * If the worker is eligible to start now, launch it. Otherwise,
1353 * adjust wait_time so that we'll wake up as soon as it can be
1354 * started.
1355 *
1356 * Each subscription's apply worker can only be restarted once per
1357 * wal_retrieve_retry_interval, so that errors do not cause us to
1358 * repeatedly restart the worker as fast as possible. In cases
1359 * where a restart is expected (e.g., subscription parameter
1360 * changes), another process should remove the last-start entry
1361 * for the subscription so that the worker can be restarted
1362 * without waiting for wal_retrieve_retry_interval to elapse.
1363 */
1364 last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
1366 if (last_start == 0 ||
1368 {
1371 sub->dbid, sub->oid, sub->name,
1372 sub->owner, InvalidOid,
1374 sub->retaindeadtuples &&
1375 sub->retentionactive))
1376 {
1377 /*
1378 * We get here either if we failed to launch a worker
1379 * (perhaps for resource-exhaustion reasons) or if we
1380 * launched one but it immediately quit. Either way, it
1381 * seems appropriate to try again after
1382 * wal_retrieve_retry_interval.
1383 */
1384 wait_time = Min(wait_time,
1386 }
1387 }
1388 else
1389 {
1390 wait_time = Min(wait_time,
1391 wal_retrieve_retry_interval - elapsed);
1392 }
1393 }
1394
1395 /*
1396 * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription
1397 * that requires us to retain dead tuples. Otherwise, if required,
1398 * advance the slot's xmin to protect dead tuples required for the
1399 * conflict detection.
1400 *
1401 * Additionally, if all apply workers for subscriptions with
1402 * retain_dead_tuples enabled have requested to stop retention, the
1403 * slot's xmin will be set to InvalidTransactionId allowing the
1404 * removal of dead tuples.
1405 */
1407 {
1408 if (!retain_dead_tuples)
1410 else if (can_update_xmin)
1412 }
1413
1414 /* Switch back to original memory context. */
1415 MemoryContextSwitchTo(oldctx);
1416 /* Clean the temporary memory. */
1417 MemoryContextDelete(subctx);
1418
1419 /* Wait for more work. */
1420 rc = WaitLatch(MyLatch,
1422 wait_time,
1423 WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
1424
1425 if (rc & WL_LATCH_SET)
1426 {
1429 }
1430
1432 {
1433 ConfigReloadPending = false;
1435 }
1436 }
1437
1438 /* Not reachable */
1439}
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1757
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1609
void BackgroundWorkerInitializeConnection(const char *dbname, const char *username, uint32 flags)
Definition: bgworker.c:856
void BackgroundWorkerUnblockSignals(void)
Definition: bgworker.c:930
#define Min(x, y)
Definition: c.h:1006
uint32 TransactionId
Definition: c.h:660
#define DSM_HANDLE_INVALID
Definition: dsm_impl.h:58
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1170
#define DEBUG1
Definition: elog.h:30
#define ereport(elevel,...)
Definition: elog.h:150
int MyProcPid
Definition: globals.c:47
struct Latch * MyLatch
Definition: globals.c:63
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
@ PGC_SIGHUP
Definition: guc.h:75
Assert(PointerIsAligned(start, uint64))
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:337
void ResetLatch(Latch *latch)
Definition: latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:172
#define DEFAULT_NAPTIME_PER_CYCLE
Definition: launcher.c:49
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples)
Definition: launcher.c:324
static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
Definition: launcher.c:1112
LogicalRepWorker * logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, bool only_running)
Definition: launcher.c:258
static void update_conflict_slot_xmin(TransactionId new_xmin)
Definition: launcher.c:1499
static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
Definition: launcher.c:1447
static void logicalrep_launcher_onexit(int code, Datum arg)
Definition: launcher.c:859
void CreateConflictDetectionSlot(void)
Definition: launcher.c:1566
static void init_conflict_slot_xmin(void)
Definition: launcher.c:1535
static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid)
Definition: launcher.c:1128
static LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:71
static bool acquire_conflict_slot_if_exists(void)
Definition: launcher.c:1485
static List * get_subscription_list(void)
Definition: launcher.c:117
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1174
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1894
@ LW_SHARED
Definition: lwlock.h:113
MemoryContext TopMemoryContext
Definition: mcxt.c:166
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:469
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
#define lfirst(lc)
Definition: pg_list.h:172
#define die(msg)
#define pqsignal
Definition: port.h:552
uint64_t Datum
Definition: postgres.h:70
#define InvalidOid
Definition: postgres_ext.h:37
void ReplicationSlotDropAcquired(void)
Definition: slot.c:997
ReplicationSlot * MyReplicationSlot
Definition: slot.c:148
Definition: pg_list.h:54
TransactionId xmin
Definition: slot.h:114
ReplicationSlotPersistentData data
Definition: slot.h:210
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define WL_TIMEOUT
Definition: waiteventset.h:37
#define WL_EXIT_ON_PM_DEATH
Definition: waiteventset.h:39
#define WL_LATCH_SET
Definition: waiteventset.h:34
#define SIGHUP
Definition: win32_port.h:158
@ WORKERTYPE_APPLY
int wal_retrieve_retry_interval
Definition: xlog.c:136

References acquire_conflict_slot_if_exists(), ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, ApplyLauncherGetWorkerStartTime(), ApplyLauncherSetWorkerStartTime(), Assert(), BackgroundWorkerInitializeConnection(), BackgroundWorkerUnblockSignals(), before_shmem_exit(), CHECK_FOR_INTERRUPTS, compute_min_nonremovable_xid(), ConfigReloadPending, CreateConflictDetectionSlot(), ReplicationSlot::data, Subscription::dbid, DEBUG1, DEFAULT_NAPTIME_PER_CYCLE, die, DSM_HANDLE_INVALID, Subscription::enabled, ereport, errmsg_internal(), get_subscription_list(), GetCurrentTimestamp(), init_conflict_slot_xmin(), InvalidOid, InvalidTransactionId, LogicalRepCtxStruct::launcher_pid, lfirst, logicalrep_launcher_onexit(), logicalrep_worker_find(), logicalrep_worker_launch(), LogicalRepCtx, LW_SHARED, LWLockAcquire(), LWLockRelease(), MemoryContextDelete(), MemoryContextSwitchTo(), Min, MyLatch, MyProcPid, MyReplicationSlot, Subscription::name, now(), Subscription::oid, Subscription::owner, PGC_SIGHUP, pqsignal, ProcessConfigFile(), ReplicationSlotDropAcquired(), ResetLatch(), Subscription::retaindeadtuples, Subscription::retentionactive, SIGHUP, SignalHandlerForConfigReload(), TimestampDifferenceMilliseconds(), TopMemoryContext, TransactionIdIsValid, update_conflict_slot_xmin(), WaitLatch(), wal_retrieve_retry_interval, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_TIMEOUT, WORKERTYPE_APPLY, and ReplicationSlotPersistentData::xmin.

◆ ApplyLauncherRegister()

void ApplyLauncherRegister ( void  )

Definition at line 997 of file launcher.c.

998{
1000
1001 /*
1002 * The logical replication launcher is disabled during binary upgrades, to
1003 * prevent logical replication workers from running on the source cluster.
1004 * That could cause replication origins to move forward after having been
1005 * copied to the target cluster, potentially creating conflicts with the
1006 * copied data files.
1007 */
1009 return;
1010
1011 memset(&bgw, 0, sizeof(bgw));
1015 snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
1016 snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
1018 "logical replication launcher");
1020 "logical replication launcher");
1021 bgw.bgw_restart_time = 5;
1022 bgw.bgw_notify_pid = 0;
1023 bgw.bgw_main_arg = (Datum) 0;
1024
1026}
void RegisterBackgroundWorker(BackgroundWorker *worker)
Definition: bgworker.c:943
@ BgWorkerStart_RecoveryFinished
Definition: bgworker.h:81
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition: bgworker.h:60
#define BGWORKER_SHMEM_ACCESS
Definition: bgworker.h:53
#define BGW_MAXLEN
Definition: bgworker.h:86
bool IsBinaryUpgrade
Definition: globals.c:121
int max_logical_replication_workers
Definition: launcher.c:52
#define MAXPGPATH
#define snprintf
Definition: port.h:260
char bgw_function_name[BGW_MAXLEN]
Definition: bgworker.h:97
Datum bgw_main_arg
Definition: bgworker.h:98
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:91
int bgw_restart_time
Definition: bgworker.h:95
char bgw_type[BGW_MAXLEN]
Definition: bgworker.h:92
BgWorkerStartTime bgw_start_time
Definition: bgworker.h:94
pid_t bgw_notify_pid
Definition: bgworker.h:100
char bgw_library_name[MAXPGPATH]
Definition: bgworker.h:96

References BackgroundWorker::bgw_flags, BackgroundWorker::bgw_function_name, BackgroundWorker::bgw_library_name, BackgroundWorker::bgw_main_arg, BGW_MAXLEN, BackgroundWorker::bgw_name, BackgroundWorker::bgw_notify_pid, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_start_time, BackgroundWorker::bgw_type, BGWORKER_BACKEND_DATABASE_CONNECTION, BGWORKER_SHMEM_ACCESS, BgWorkerStart_RecoveryFinished, IsBinaryUpgrade, max_logical_replication_workers, MAXPGPATH, RegisterBackgroundWorker(), and snprintf.

Referenced by PostmasterMain().

◆ ApplyLauncherSetWorkerStartTime()

static void ApplyLauncherSetWorkerStartTime ( Oid  subid,
TimestampTz  start_time 
)
static

Definition at line 1112 of file launcher.c.

1113{
1115 bool found;
1116
1118
1119 entry = dshash_find_or_insert(last_start_times, &subid, &found);
1120 entry->last_start_time = start_time;
1122}
void * dshash_find_or_insert(dshash_table *hash_table, const void *key, bool *found)
Definition: dshash.c:435
static time_t start_time
Definition: pg_ctl.c:96

References dshash_find_or_insert(), dshash_release_lock(), LauncherLastStartTimesEntry::last_start_time, last_start_times, logicalrep_launcher_attach_dshmem(), and start_time.

Referenced by ApplyLauncherMain().

◆ ApplyLauncherShmemInit()

void ApplyLauncherShmemInit ( void  )

Definition at line 1033 of file launcher.c.

1034{
1035 bool found;
1036
1038 ShmemInitStruct("Logical Replication Launcher Data",
1040 &found);
1041
1042 if (!found)
1043 {
1044 int slot;
1045
1047
1050
1051 /* Initialize memory and spin locks for each worker slot. */
1052 for (slot = 0; slot < max_logical_replication_workers; slot++)
1053 {
1054 LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
1055
1056 memset(worker, 0, sizeof(LogicalRepWorker));
1057 SpinLockInit(&worker->relmutex);
1058 }
1059 }
1060}
#define DSA_HANDLE_INVALID
Definition: dsa.h:139
#define DSHASH_HANDLE_INVALID
Definition: dshash.h:27
Size ApplyLauncherShmemSize(void)
Definition: launcher.c:978
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:389
#define SpinLockInit(lock)
Definition: spin.h:57
dsa_handle last_start_dsa
Definition: launcher.c:64
dshash_table_handle last_start_dsh
Definition: launcher.c:65
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:68

References ApplyLauncherShmemSize(), DSA_HANDLE_INVALID, DSHASH_HANDLE_INVALID, LogicalRepCtxStruct::last_start_dsa, LogicalRepCtxStruct::last_start_dsh, LogicalRepCtx, max_logical_replication_workers, LogicalRepWorker::relmutex, ShmemInitStruct(), SpinLockInit, and LogicalRepCtxStruct::workers.

Referenced by CreateOrAttachShmemStructs().

◆ ApplyLauncherShmemSize()

Size ApplyLauncherShmemSize ( void  )

Definition at line 978 of file launcher.c.

979{
980 Size size;
981
982 /*
983 * Need the fixed struct and the array of LogicalRepWorker.
984 */
985 size = sizeof(LogicalRepCtxStruct);
986 size = MAXALIGN(size);
988 sizeof(LogicalRepWorker)));
989 return size;
990}
#define MAXALIGN(LEN)
Definition: c.h:813
size_t Size
Definition: c.h:613
struct LogicalRepCtxStruct LogicalRepCtxStruct
Size add_size(Size s1, Size s2)
Definition: shmem.c:495
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510

References add_size(), max_logical_replication_workers, MAXALIGN, and mul_size().

Referenced by ApplyLauncherShmemInit(), and CalculateShmemSize().

◆ ApplyLauncherWakeup()

void ApplyLauncherWakeup ( void  )

Definition at line 1194 of file launcher.c.

1195{
1196 if (LogicalRepCtx->launcher_pid != 0)
1198}
#define kill(pid, sig)
Definition: win32_port.h:493
#define SIGUSR1
Definition: win32_port.h:170

References kill, LogicalRepCtxStruct::launcher_pid, LogicalRepCtx, and SIGUSR1.

Referenced by AtEOXact_ApplyLauncher(), logicalrep_worker_onexit(), update_retention_status(), and wait_for_local_flush().

◆ ApplyLauncherWakeupAtCommit()

void ApplyLauncherWakeupAtCommit ( void  )

Definition at line 1184 of file launcher.c.

1185{
1188}
static bool on_commit_launcher_wakeup
Definition: launcher.c:93

References on_commit_launcher_wakeup.

Referenced by AlterSubscription(), AlterSubscriptionOwner_internal(), and CreateSubscription().

◆ AtEOXact_ApplyLauncher()

void AtEOXact_ApplyLauncher ( bool  isCommit)

Definition at line 1165 of file launcher.c.

1166{
1167 if (isCommit)
1168 {
1171 }
1172
1174}
void ApplyLauncherWakeup(void)
Definition: launcher.c:1194

References ApplyLauncherWakeup(), and on_commit_launcher_wakeup.

Referenced by AbortTransaction(), CommitTransaction(), and PrepareTransaction().

◆ compute_min_nonremovable_xid()

static void compute_min_nonremovable_xid ( LogicalRepWorker worker,
TransactionId xmin 
)
static

Definition at line 1447 of file launcher.c.

1448{
1449 TransactionId nonremovable_xid;
1450
1451 Assert(worker != NULL);
1452
1453 /*
1454 * The replication slot for conflict detection must be created before the
1455 * worker starts.
1456 */
1458
1459 SpinLockAcquire(&worker->relmutex);
1460 nonremovable_xid = worker->oldest_nonremovable_xid;
1461 SpinLockRelease(&worker->relmutex);
1462
1463 /*
1464 * Return if the apply worker has stopped retention concurrently.
1465 *
1466 * Although this function is invoked only when retentionactive is true,
1467 * the apply worker might stop retention after the launcher fetches the
1468 * retentionactive flag.
1469 */
1470 if (!TransactionIdIsValid(nonremovable_xid))
1471 return;
1472
1473 if (!TransactionIdIsValid(*xmin) ||
1474 TransactionIdPrecedes(nonremovable_xid, *xmin))
1475 *xmin = nonremovable_xid;
1476}
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
TransactionId oldest_nonremovable_xid
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.h:263

References Assert(), MyReplicationSlot, LogicalRepWorker::oldest_nonremovable_xid, LogicalRepWorker::relmutex, SpinLockAcquire, SpinLockRelease, TransactionIdIsValid, and TransactionIdPrecedes().

Referenced by ApplyLauncherMain().

◆ CreateConflictDetectionSlot()

void CreateConflictDetectionSlot ( void  )

Definition at line 1566 of file launcher.c.

1567{
1568 /* Exit early, if the replication slot is already created and acquired */
1570 return;
1571
1572 ereport(LOG,
1573 errmsg("creating replication conflict detection slot"));
1574
1576 false, false);
1577
1579}
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define LOG
Definition: elog.h:31
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
Definition: slot.c:384
@ RS_PERSISTENT
Definition: slot.h:45

References CONFLICT_DETECTION_SLOT, ereport, errmsg(), init_conflict_slot_xmin(), LOG, MyReplicationSlot, ReplicationSlotCreate(), and RS_PERSISTENT.

Referenced by ApplyLauncherMain(), and binary_upgrade_create_conflict_detection_slot().

◆ get_subscription_list()

static List * get_subscription_list ( void  )
static

Definition at line 117 of file launcher.c.

118{
119 List *res = NIL;
120 Relation rel;
121 TableScanDesc scan;
122 HeapTuple tup;
123 MemoryContext resultcxt;
124
125 /* This is the context that we will allocate our output data in */
126 resultcxt = CurrentMemoryContext;
127
128 /*
129 * Start a transaction so we can access pg_subscription.
130 */
132
133 rel = table_open(SubscriptionRelationId, AccessShareLock);
134 scan = table_beginscan_catalog(rel, 0, NULL);
135
137 {
139 Subscription *sub;
140 MemoryContext oldcxt;
141
142 /*
143 * Allocate our results in the caller's context, not the
144 * transaction's. We do this inside the loop, and restore the original
145 * context at the end, so that leaky things like heap_getnext() are
146 * not called in a potentially long-lived context.
147 */
148 oldcxt = MemoryContextSwitchTo(resultcxt);
149
150 sub = (Subscription *) palloc0(sizeof(Subscription));
151 sub->oid = subform->oid;
152 sub->dbid = subform->subdbid;
153 sub->owner = subform->subowner;
154 sub->enabled = subform->subenabled;
155 sub->name = pstrdup(NameStr(subform->subname));
156 sub->retaindeadtuples = subform->subretaindeadtuples;
157 sub->retentionactive = subform->subretentionactive;
158 /* We don't fill fields we are not interested in. */
159
160 res = lappend(res, sub);
161 MemoryContextSwitchTo(oldcxt);
162 }
163
164 table_endscan(scan);
166
168
169 return res;
170}
#define NameStr(name)
Definition: c.h:754
HeapTuple heap_getnext(TableScanDesc sscan, ScanDirection direction)
Definition: heapam.c:1361
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
Definition: htup_details.h:728
List * lappend(List *list, void *datum)
Definition: list.c:339
#define AccessShareLock
Definition: lockdefs.h:36
char * pstrdup(const char *in)
Definition: mcxt.c:1759
void * palloc0(Size size)
Definition: mcxt.c:1395
MemoryContext CurrentMemoryContext
Definition: mcxt.c:160
#define NIL
Definition: pg_list.h:68
FormData_pg_subscription * Form_pg_subscription
@ ForwardScanDirection
Definition: sdir.h:28
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
TableScanDesc table_beginscan_catalog(Relation relation, int nkeys, ScanKeyData *key)
Definition: tableam.c:113
static void table_endscan(TableScanDesc scan)
Definition: tableam.h:985
void StartTransactionCommand(void)
Definition: xact.c:3077
void CommitTransactionCommand(void)
Definition: xact.c:3175

References AccessShareLock, CommitTransactionCommand(), CurrentMemoryContext, Subscription::dbid, Subscription::enabled, ForwardScanDirection, GETSTRUCT(), heap_getnext(), HeapTupleIsValid, lappend(), MemoryContextSwitchTo(), Subscription::name, NameStr, NIL, Subscription::oid, Subscription::owner, palloc0(), pstrdup(), Subscription::retaindeadtuples, Subscription::retentionactive, StartTransactionCommand(), table_beginscan_catalog(), table_close(), table_endscan(), and table_open().

Referenced by ApplyLauncherMain().

◆ GetLeaderApplyWorkerPid()

pid_t GetLeaderApplyWorkerPid ( pid_t  pid)

Definition at line 1595 of file launcher.c.

1596{
1597 int leader_pid = InvalidPid;
1598 int i;
1599
1600 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1601
1602 for (i = 0; i < max_logical_replication_workers; i++)
1603 {
1605
1606 if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
1607 {
1608 leader_pid = w->leader_pid;
1609 break;
1610 }
1611 }
1612
1613 LWLockRelease(LogicalRepWorkerLock);
1614
1615 return leader_pid;
1616}
int i
Definition: isn.c:77
#define InvalidPid
Definition: miscadmin.h:32
int pid
Definition: proc.h:199
#define isParallelApplyWorker(worker)

References i, InvalidPid, isParallelApplyWorker, LogicalRepWorker::leader_pid, LogicalRepCtx, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, PGPROC::pid, LogicalRepWorker::proc, and LogicalRepCtxStruct::workers.

Referenced by pg_stat_get_activity().

◆ init_conflict_slot_xmin()

static void init_conflict_slot_xmin ( void  )
static

Definition at line 1535 of file launcher.c.

1536{
1537 TransactionId xmin_horizon;
1538
1539 /* Replication slot must exist but shouldn't be initialized. */
1542
1543 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
1544
1545 xmin_horizon = GetOldestSafeDecodingTransactionId(false);
1546
1548 MyReplicationSlot->effective_xmin = xmin_horizon;
1549 MyReplicationSlot->data.xmin = xmin_horizon;
1551
1553
1554 LWLockRelease(ProcArrayLock);
1555
1556 /* Write this slot to disk */
1559}
@ LW_EXCLUSIVE
Definition: lwlock.h:112
TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly)
Definition: procarray.c:2907
void ReplicationSlotMarkDirty(void)
Definition: slot.c:1139
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:1178
void ReplicationSlotSave(void)
Definition: slot.c:1121
slock_t mutex
Definition: slot.h:183
TransactionId effective_xmin
Definition: slot.h:206

References Assert(), ReplicationSlot::data, ReplicationSlot::effective_xmin, GetOldestSafeDecodingTransactionId(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotSave(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsValid, and ReplicationSlotPersistentData::xmin.

Referenced by ApplyLauncherMain(), and CreateConflictDetectionSlot().

◆ IsLogicalLauncher()

bool IsLogicalLauncher ( void  )

◆ logicalrep_launcher_attach_dshmem()

static void logicalrep_launcher_attach_dshmem ( void  )
static

Definition at line 1068 of file launcher.c.

1069{
1070 MemoryContext oldcontext;
1071
1072 /* Quick exit if we already did this. */
1074 last_start_times != NULL)
1075 return;
1076
1077 /* Otherwise, use a lock to ensure only one process creates the table. */
1078 LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
1079
1080 /* Be sure any local memory allocated by DSA routines is persistent. */
1082
1084 {
1085 /* Initialize dynamic shared hash table for last-start times. */
1086 last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
1090
1091 /* Store handles in shared memory for other backends to use. */
1094 }
1095 else if (!last_start_times)
1096 {
1097 /* Attach to existing dynamic shared hash table. */
1102 }
1103
1104 MemoryContextSwitchTo(oldcontext);
1105 LWLockRelease(LogicalRepWorkerLock);
1106}
dsa_area * dsa_attach(dsa_handle handle)
Definition: dsa.c:510
void dsa_pin_mapping(dsa_area *area)
Definition: dsa.c:650
dsa_handle dsa_get_handle(dsa_area *area)
Definition: dsa.c:498
void dsa_pin(dsa_area *area)
Definition: dsa.c:990
#define dsa_create(tranche_id)
Definition: dsa.h:117
dshash_table_handle dshash_get_hash_table_handle(dshash_table *hash_table)
Definition: dshash.c:369
dshash_table * dshash_attach(dsa_area *area, const dshash_parameters *params, dshash_table_handle handle, void *arg)
Definition: dshash.c:272
dshash_table * dshash_create(dsa_area *area, const dshash_parameters *params, void *arg)
Definition: dshash.c:208
static dsa_area * last_start_times_dsa
Definition: launcher.c:90
static const dshash_parameters dsh_params
Definition: launcher.c:81

References dsa_attach(), dsa_create, dsa_get_handle(), dsa_pin(), dsa_pin_mapping(), dsh_params, dshash_attach(), dshash_create(), dshash_get_hash_table_handle(), DSHASH_HANDLE_INVALID, LogicalRepCtxStruct::last_start_dsa, LogicalRepCtxStruct::last_start_dsh, last_start_times, last_start_times_dsa, LogicalRepCtx, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MemoryContextSwitchTo(), and TopMemoryContext.

Referenced by ApplyLauncherForgetWorkerStartTime(), ApplyLauncherGetWorkerStartTime(), and ApplyLauncherSetWorkerStartTime().

◆ logicalrep_launcher_onexit()

static void logicalrep_launcher_onexit ( int  code,
Datum  arg 
)
static

Definition at line 859 of file launcher.c.

860{
862}

References LogicalRepCtxStruct::launcher_pid, and LogicalRepCtx.

Referenced by ApplyLauncherMain().

◆ logicalrep_pa_worker_count()

static int logicalrep_pa_worker_count ( Oid  subid)
static

Definition at line 951 of file launcher.c.

952{
953 int i;
954 int res = 0;
955
956 Assert(LWLockHeldByMe(LogicalRepWorkerLock));
957
958 /*
959 * Scan all attached parallel apply workers, only counting those which
960 * have the given subscription id.
961 */
962 for (i = 0; i < max_logical_replication_workers; i++)
963 {
965
966 if (isParallelApplyWorker(w) && w->subid == subid)
967 res++;
968 }
969
970 return res;
971}
bool LWLockHeldByMe(LWLock *lock)
Definition: lwlock.c:1977

References Assert(), i, isParallelApplyWorker, LogicalRepCtx, LWLockHeldByMe(), max_logical_replication_workers, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.

Referenced by logicalrep_worker_launch().

◆ logicalrep_pa_worker_stop()

void logicalrep_pa_worker_stop ( ParallelApplyWorkerInfo winfo)

Definition at line 679 of file launcher.c.

680{
681 int slot_no;
682 uint16 generation;
683 LogicalRepWorker *worker;
684
685 SpinLockAcquire(&winfo->shared->mutex);
686 generation = winfo->shared->logicalrep_worker_generation;
687 slot_no = winfo->shared->logicalrep_worker_slot_no;
688 SpinLockRelease(&winfo->shared->mutex);
689
690 Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
691
692 /*
693 * Detach from the error_mq_handle for the parallel apply worker before
694 * stopping it. This prevents the leader apply worker from trying to
695 * receive the message from the error queue that might already be detached
696 * by the parallel apply worker.
697 */
698 if (winfo->error_mq_handle)
699 {
701 winfo->error_mq_handle = NULL;
702 }
703
704 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
705
706 worker = &LogicalRepCtx->workers[slot_no];
708
709 /*
710 * Only stop the worker if the generation matches and the worker is alive.
711 */
712 if (worker->generation == generation && worker->proc)
714
715 LWLockRelease(LogicalRepWorkerLock);
716}
uint16_t uint16
Definition: c.h:540
static void logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
Definition: launcher.c:569
void shm_mq_detach(shm_mq_handle *mqh)
Definition: shm_mq.c:843
shm_mq_handle * error_mq_handle
ParallelApplyWorkerShared * shared
#define SIGUSR2
Definition: win32_port.h:171

References Assert(), ParallelApplyWorkerInfo::error_mq_handle, LogicalRepWorker::generation, isParallelApplyWorker, ParallelApplyWorkerShared::logicalrep_worker_generation, ParallelApplyWorkerShared::logicalrep_worker_slot_no, logicalrep_worker_stop_internal(), LogicalRepCtx, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, ParallelApplyWorkerShared::mutex, LogicalRepWorker::proc, ParallelApplyWorkerInfo::shared, shm_mq_detach(), SIGUSR2, SpinLockAcquire, SpinLockRelease, and LogicalRepCtxStruct::workers.

Referenced by pa_free_worker().

◆ logicalrep_reset_seqsync_start_time()

void logicalrep_reset_seqsync_start_time ( void  )

Definition at line 872 of file launcher.c.

873{
874 LogicalRepWorker *worker;
875
876 /*
877 * The apply worker can't access last_seqsync_start_time concurrently, so
878 * it is okay to use SHARED lock here. See ProcessSequencesForSync().
879 */
880 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
881
884 true);
885 if (worker)
886 worker->last_seqsync_start_time = 0;
887
888 LWLockRelease(LogicalRepWorkerLock);
889}
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:56
TimestampTz last_seqsync_start_time

References InvalidOid, LogicalRepWorker::last_seqsync_start_time, logicalrep_worker_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLogicalRepWorker, LogicalRepWorker::subid, and WORKERTYPE_APPLY.

Referenced by FinishSyncWorker().

◆ logicalrep_sync_worker_count()

int logicalrep_sync_worker_count ( Oid  subid)

Definition at line 927 of file launcher.c.

928{
929 int i;
930 int res = 0;
931
932 Assert(LWLockHeldByMe(LogicalRepWorkerLock));
933
934 /* Search for attached worker for a given subscription id. */
935 for (i = 0; i < max_logical_replication_workers; i++)
936 {
938
939 if (w->subid == subid && (isTableSyncWorker(w) || isSequenceSyncWorker(w)))
940 res++;
941 }
942
943 return res;
944}
#define isSequenceSyncWorker(worker)
#define isTableSyncWorker(worker)

References Assert(), i, isSequenceSyncWorker, isTableSyncWorker, LogicalRepCtx, LWLockHeldByMe(), max_logical_replication_workers, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.

Referenced by logicalrep_worker_launch(), ProcessSequencesForSync(), and ProcessSyncingTablesForApply().

◆ logicalrep_worker_attach()

void logicalrep_worker_attach ( int  slot)

Definition at line 757 of file launcher.c.

758{
759 /* Block concurrent access. */
760 LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
761
762 Assert(slot >= 0 && slot < max_logical_replication_workers);
764
766 {
767 LWLockRelease(LogicalRepWorkerLock);
769 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
770 errmsg("logical replication worker slot %d is empty, cannot attach",
771 slot)));
772 }
773
775 {
776 LWLockRelease(LogicalRepWorkerLock);
778 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
779 errmsg("logical replication worker slot %d is already used by "
780 "another worker, cannot attach", slot)));
781 }
782
785
786 LWLockRelease(LogicalRepWorkerLock);
787}
int errcode(int sqlerrcode)
Definition: elog.c:863
#define ERROR
Definition: elog.h:39
static void logicalrep_worker_onexit(int code, Datum arg)
Definition: launcher.c:897
PGPROC * MyProc
Definition: proc.c:67

References Assert(), before_shmem_exit(), ereport, errcode(), errmsg(), ERROR, LogicalRepWorker::in_use, logicalrep_worker_onexit(), LogicalRepCtx, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, MyLogicalRepWorker, MyProc, LogicalRepWorker::proc, and LogicalRepCtxStruct::workers.

Referenced by ParallelApplyWorkerMain(), and SetupApplyOrSyncWorker().

◆ logicalrep_worker_cleanup()

static void logicalrep_worker_cleanup ( LogicalRepWorker worker)
static

◆ logicalrep_worker_detach()

static void logicalrep_worker_detach ( void  )
static

Definition at line 794 of file launcher.c.

795{
796 /* Stop the parallel apply workers. */
798 {
799 List *workers;
800 ListCell *lc;
801
802 /*
803 * Detach from the error_mq_handle for all parallel apply workers
804 * before terminating them. This prevents the leader apply worker from
805 * receiving the worker termination message and sending it to logs
806 * when the same is already done by the parallel worker.
807 */
809
810 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
811
812 workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
813 foreach(lc, workers)
814 {
816
819 }
820
821 LWLockRelease(LogicalRepWorkerLock);
822
823 list_free(workers);
824 }
825
826 /* Block concurrent access. */
827 LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
828
830
831 LWLockRelease(LogicalRepWorkerLock);
832}
void pa_detach_all_error_mq(void)
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
Definition: launcher.c:293
static void logicalrep_worker_cleanup(LogicalRepWorker *worker)
Definition: launcher.c:838
void list_free(List *list)
Definition: list.c:1546
static bool am_leader_apply_worker(void)

References am_leader_apply_worker(), isParallelApplyWorker, lfirst, list_free(), logicalrep_worker_cleanup(), logicalrep_worker_stop_internal(), logicalrep_workers_find(), LW_EXCLUSIVE, LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLogicalRepWorker, pa_detach_all_error_mq(), and LogicalRepWorker::subid.

Referenced by logicalrep_worker_onexit().

◆ logicalrep_worker_find()

LogicalRepWorker * logicalrep_worker_find ( LogicalRepWorkerType  wtype,
Oid  subid,
Oid  relid,
bool  only_running 
)

Definition at line 258 of file launcher.c.

260{
261 int i;
262 LogicalRepWorker *res = NULL;
263
264 /* relid must be valid only for table sync workers */
265 Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
266 Assert(LWLockHeldByMe(LogicalRepWorkerLock));
267
268 /* Search for an attached worker that matches the specified criteria. */
269 for (i = 0; i < max_logical_replication_workers; i++)
270 {
272
273 /* Skip parallel apply workers. */
275 continue;
276
277 if (w->in_use && w->subid == subid && w->relid == relid &&
278 w->type == wtype && (!only_running || w->proc))
279 {
280 res = w;
281 break;
282 }
283 }
284
285 return res;
286}
#define OidIsValid(objectId)
Definition: c.h:777
@ WORKERTYPE_TABLESYNC

References Assert(), i, LogicalRepWorker::in_use, isParallelApplyWorker, LogicalRepCtx, LWLockHeldByMe(), max_logical_replication_workers, OidIsValid, LogicalRepWorker::proc, LogicalRepWorker::relid, LogicalRepWorker::subid, LogicalRepWorker::type, LogicalRepCtxStruct::workers, and WORKERTYPE_TABLESYNC.

Referenced by ApplyLauncherMain(), FindDeletedTupleInLocalRel(), logicalrep_reset_seqsync_start_time(), logicalrep_worker_stop(), logicalrep_worker_wakeup(), ProcessSequencesForSync(), ProcessSyncingTablesForApply(), wait_for_table_state_change(), and wait_for_worker_state_change().

◆ logicalrep_worker_launch()

bool logicalrep_worker_launch ( LogicalRepWorkerType  wtype,
Oid  dbid,
Oid  subid,
const char *  subname,
Oid  userid,
Oid  relid,
dsm_handle  subworker_dsm,
bool  retain_dead_tuples 
)

Definition at line 324 of file launcher.c.

328{
330 BackgroundWorkerHandle *bgw_handle;
331 uint16 generation;
332 int i;
333 int slot = 0;
334 LogicalRepWorker *worker = NULL;
335 int nsyncworkers;
336 int nparallelapplyworkers;
338 bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
339 bool is_sequencesync_worker = (wtype == WORKERTYPE_SEQUENCESYNC);
340 bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
341
342 /*----------
343 * Sanity checks:
344 * - must be valid worker type
345 * - tablesync workers are only ones to have relid
346 * - parallel apply worker is the only kind of subworker
347 * - The replication slot used in conflict detection is created when
348 * retain_dead_tuples is enabled
349 */
350 Assert(wtype != WORKERTYPE_UNKNOWN);
351 Assert(is_tablesync_worker == OidIsValid(relid));
352 Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
353 Assert(!retain_dead_tuples || MyReplicationSlot);
354
356 (errmsg_internal("starting logical replication worker for subscription \"%s\"",
357 subname)));
358
359 /* Report this after the initial starting message for consistency. */
362 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
363 errmsg("cannot start logical replication workers when \"max_active_replication_origins\" is 0")));
364
365 /*
366 * We need to do the modification of the shared memory under lock so that
367 * we have consistent view.
368 */
369 LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
370
371retry:
372 /* Find unused worker slot. */
373 for (i = 0; i < max_logical_replication_workers; i++)
374 {
376
377 if (!w->in_use)
378 {
379 worker = w;
380 slot = i;
381 break;
382 }
383 }
384
385 nsyncworkers = logicalrep_sync_worker_count(subid);
386
388
389 /*
390 * If we didn't find a free slot, try to do garbage collection. The
391 * reason we do this is because if some worker failed to start up and its
392 * parent has crashed while waiting, the in_use state was never cleared.
393 */
394 if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
395 {
396 bool did_cleanup = false;
397
398 for (i = 0; i < max_logical_replication_workers; i++)
399 {
401
402 /*
403 * If the worker was marked in use but didn't manage to attach in
404 * time, clean it up.
405 */
406 if (w->in_use && !w->proc &&
409 {
411 "logical replication worker for subscription %u took too long to start; canceled",
412 w->subid);
413
415 did_cleanup = true;
416 }
417 }
418
419 if (did_cleanup)
420 goto retry;
421 }
422
423 /*
424 * We don't allow to invoke more sync workers once we have reached the
425 * sync worker limit per subscription. So, just return silently as we
426 * might get here because of an otherwise harmless race condition.
427 */
428 if ((is_tablesync_worker || is_sequencesync_worker) &&
429 nsyncworkers >= max_sync_workers_per_subscription)
430 {
431 LWLockRelease(LogicalRepWorkerLock);
432 return false;
433 }
434
435 nparallelapplyworkers = logicalrep_pa_worker_count(subid);
436
437 /*
438 * Return false if the number of parallel apply workers reached the limit
439 * per subscription.
440 */
441 if (is_parallel_apply_worker &&
442 nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
443 {
444 LWLockRelease(LogicalRepWorkerLock);
445 return false;
446 }
447
448 /*
449 * However if there are no more free worker slots, inform user about it
450 * before exiting.
451 */
452 if (worker == NULL)
453 {
454 LWLockRelease(LogicalRepWorkerLock);
456 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
457 errmsg("out of logical replication worker slots"),
458 errhint("You might need to increase \"%s\".", "max_logical_replication_workers")));
459 return false;
460 }
461
462 /* Prepare the worker slot. */
463 worker->type = wtype;
464 worker->launch_time = now;
465 worker->in_use = true;
466 worker->generation++;
467 worker->proc = NULL;
468 worker->dbid = dbid;
469 worker->userid = userid;
470 worker->subid = subid;
471 worker->relid = relid;
472 worker->relstate = SUBREL_STATE_UNKNOWN;
474 worker->stream_fileset = NULL;
475 worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
476 worker->parallel_apply = is_parallel_apply_worker;
477 worker->oldest_nonremovable_xid = retain_dead_tuples
480 worker->last_lsn = InvalidXLogRecPtr;
485 worker->last_seqsync_start_time = 0;
486
487 /* Before releasing lock, remember generation for future identification. */
488 generation = worker->generation;
489
490 LWLockRelease(LogicalRepWorkerLock);
491
492 /* Register the new dynamic worker. */
493 memset(&bgw, 0, sizeof(bgw));
497 snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
498
499 switch (worker->type)
500 {
501 case WORKERTYPE_APPLY:
502 snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
504 "logical replication apply worker for subscription %u",
505 subid);
506 snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
507 break;
508
510 snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
512 "logical replication parallel apply worker for subscription %u",
513 subid);
514 snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
515
516 memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
517 break;
518
520 snprintf(bgw.bgw_function_name, BGW_MAXLEN, "SequenceSyncWorkerMain");
522 "logical replication sequencesync worker for subscription %u",
523 subid);
524 snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication sequencesync worker");
525 break;
526
528 snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TableSyncWorkerMain");
530 "logical replication tablesync worker for subscription %u sync %u",
531 subid,
532 relid);
533 snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
534 break;
535
537 /* Should never happen. */
538 elog(ERROR, "unknown worker type");
539 }
540
543 bgw.bgw_main_arg = Int32GetDatum(slot);
544
545 if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
546 {
547 /* Failed to start worker, so clean up the worker slot. */
548 LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
549 Assert(generation == worker->generation);
551 LWLockRelease(LogicalRepWorkerLock);
552
554 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
555 errmsg("out of background worker slots"),
556 errhint("You might need to increase \"%s\".", "max_worker_processes")));
557 return false;
558 }
559
560 /* Now wait until it attaches. */
561 return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
562}
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1781
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
Definition: bgworker.c:1049
#define BGW_NEVER_RESTART
Definition: bgworker.h:85
#define TIMESTAMP_NOBEGIN(j)
Definition: timestamp.h:159
uint32 dsm_handle
Definition: dsm_impl.h:55
int errhint(const char *fmt,...)
Definition: elog.c:1330
#define WARNING
Definition: elog.h:36
#define elog(elevel,...)
Definition: elog.h:226
static int logicalrep_pa_worker_count(Oid subid)
Definition: launcher.c:951
int max_sync_workers_per_subscription
Definition: launcher.c:53
static bool WaitForReplicationWorkerAttach(LogicalRepWorker *worker, uint16 generation, BackgroundWorkerHandle *handle)
Definition: launcher.c:181
int logicalrep_sync_worker_count(Oid subid)
Definition: launcher.c:927
int max_parallel_apply_workers_per_subscription
Definition: launcher.c:54
int max_active_replication_origins
Definition: origin.c:104
NameData subname
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:222
char bgw_extra[BGW_EXTRALEN]
Definition: bgworker.h:99
XLogRecPtr relstate_lsn
TimestampTz last_recv_time
TimestampTz launch_time
TimestampTz reply_time
FileSet * stream_fileset
XLogRecPtr reply_lsn
TimestampTz last_send_time
int wal_receiver_timeout
Definition: walreceiver.c:89
@ WORKERTYPE_SEQUENCESYNC
@ WORKERTYPE_PARALLEL_APPLY
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert(), BackgroundWorker::bgw_extra, BackgroundWorker::bgw_flags, BackgroundWorker::bgw_function_name, BackgroundWorker::bgw_library_name, BackgroundWorker::bgw_main_arg, BGW_MAXLEN, BackgroundWorker::bgw_name, BGW_NEVER_RESTART, BackgroundWorker::bgw_notify_pid, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_start_time, BackgroundWorker::bgw_type, BGWORKER_BACKEND_DATABASE_CONNECTION, BGWORKER_SHMEM_ACCESS, BgWorkerStart_RecoveryFinished, ReplicationSlot::data, LogicalRepWorker::dbid, DEBUG1, DSM_HANDLE_INVALID, elog, ereport, errcode(), errhint(), errmsg(), errmsg_internal(), ERROR, LogicalRepWorker::generation, GetCurrentTimestamp(), i, LogicalRepWorker::in_use, Int32GetDatum(), InvalidPid, InvalidTransactionId, InvalidXLogRecPtr, LogicalRepWorker::last_lsn, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, LogicalRepWorker::last_seqsync_start_time, LogicalRepWorker::launch_time, LogicalRepWorker::leader_pid, logicalrep_pa_worker_count(), logicalrep_sync_worker_count(), logicalrep_worker_cleanup(), LogicalRepCtx, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_active_replication_origins, max_logical_replication_workers, max_parallel_apply_workers_per_subscription, max_sync_workers_per_subscription, MAXPGPATH, MyProcPid, MyReplicationSlot, now(), OidIsValid, LogicalRepWorker::oldest_nonremovable_xid, LogicalRepWorker::parallel_apply, LogicalRepWorker::proc, RegisterDynamicBackgroundWorker(), LogicalRepWorker::relid, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, LogicalRepWorker::reply_lsn, LogicalRepWorker::reply_time, snprintf, LogicalRepWorker::stream_fileset, LogicalRepWorker::subid, subname, TIMESTAMP_NOBEGIN, TimestampDifferenceExceeds(), LogicalRepWorker::type, LogicalRepWorker::userid, WaitForReplicationWorkerAttach(), wal_receiver_timeout, WARNING, LogicalRepCtxStruct::workers, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY, WORKERTYPE_SEQUENCESYNC, WORKERTYPE_TABLESYNC, WORKERTYPE_UNKNOWN, and ReplicationSlotPersistentData::xmin.

Referenced by ApplyLauncherMain(), launch_sync_worker(), and pa_launch_parallel_worker().

◆ logicalrep_worker_onexit()

static void logicalrep_worker_onexit ( int  code,
Datum  arg 
)
static

Definition at line 897 of file launcher.c.

898{
899 /* Disconnect gracefully from the remote side. */
902
904
905 /* Cleanup fileset used for streaming transactions. */
908
909 /*
910 * Session level locks may be acquired outside of a transaction in
911 * parallel apply mode and will not be released when the worker
912 * terminates, so manually release all locks before the worker exits.
913 *
914 * The locks will be acquired once the worker is initialized.
915 */
918
920}
bool InitializingApplyWorker
Definition: worker.c:499
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:477
void FileSetDeleteAll(FileSet *fileset)
Definition: fileset.c:150
static void logicalrep_worker_detach(void)
Definition: launcher.c:794
void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
Definition: lock.c:2307
#define DEFAULT_LOCKMETHOD
Definition: lock.h:127
#define walrcv_disconnect(conn)
Definition: walreceiver.h:467

References ApplyLauncherWakeup(), DEFAULT_LOCKMETHOD, FileSetDeleteAll(), InitializingApplyWorker, LockReleaseAll(), logicalrep_worker_detach(), LogRepWorkerWalRcvConn, MyLogicalRepWorker, LogicalRepWorker::stream_fileset, and walrcv_disconnect.

Referenced by logicalrep_worker_attach().

◆ logicalrep_worker_stop()

void logicalrep_worker_stop ( LogicalRepWorkerType  wtype,
Oid  subid,
Oid  relid 
)

Definition at line 652 of file launcher.c.

653{
654 LogicalRepWorker *worker;
655
656 /* relid must be valid only for table sync workers */
657 Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
658
659 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
660
661 worker = logicalrep_worker_find(wtype, subid, relid, false);
662
663 if (worker)
664 {
666 logicalrep_worker_stop_internal(worker, SIGTERM);
667 }
668
669 LWLockRelease(LogicalRepWorkerLock);
670}

References Assert(), isParallelApplyWorker, logicalrep_worker_find(), logicalrep_worker_stop_internal(), LW_SHARED, LWLockAcquire(), LWLockRelease(), OidIsValid, and WORKERTYPE_TABLESYNC.

Referenced by AlterSubscription_refresh(), and DropSubscription().

◆ logicalrep_worker_stop_internal()

static void logicalrep_worker_stop_internal ( LogicalRepWorker worker,
int  signo 
)
static

Definition at line 569 of file launcher.c.

570{
571 uint16 generation;
572
573 Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
574
575 /*
576 * Remember which generation was our worker so we can check if what we see
577 * is still the same one.
578 */
579 generation = worker->generation;
580
581 /*
582 * If we found a worker but it does not have proc set then it is still
583 * starting up; wait for it to finish starting and then kill it.
584 */
585 while (worker->in_use && !worker->proc)
586 {
587 int rc;
588
589 LWLockRelease(LogicalRepWorkerLock);
590
591 /* Wait a bit --- we don't expect to have to wait long. */
592 rc = WaitLatch(MyLatch,
594 10L, WAIT_EVENT_BGWORKER_STARTUP);
595
596 if (rc & WL_LATCH_SET)
597 {
600 }
601
602 /* Recheck worker status. */
603 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
604
605 /*
606 * Check whether the worker slot is no longer used, which would mean
607 * that the worker has exited, or whether the worker generation is
608 * different, meaning that a different worker has taken the slot.
609 */
610 if (!worker->in_use || worker->generation != generation)
611 return;
612
613 /* Worker has assigned proc, so it has started. */
614 if (worker->proc)
615 break;
616 }
617
618 /* Now terminate the worker ... */
619 kill(worker->proc->pid, signo);
620
621 /* ... and wait for it to die. */
622 for (;;)
623 {
624 int rc;
625
626 /* is it gone? */
627 if (!worker->proc || worker->generation != generation)
628 break;
629
630 LWLockRelease(LogicalRepWorkerLock);
631
632 /* Wait a bit --- we don't expect to have to wait long. */
633 rc = WaitLatch(MyLatch,
635 10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
636
637 if (rc & WL_LATCH_SET)
638 {
641 }
642
643 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
644 }
645}

References Assert(), CHECK_FOR_INTERRUPTS, LogicalRepWorker::generation, LogicalRepWorker::in_use, kill, LW_SHARED, LWLockAcquire(), LWLockHeldByMeInMode(), LWLockRelease(), MyLatch, PGPROC::pid, LogicalRepWorker::proc, ResetLatch(), WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by logicalrep_pa_worker_stop(), logicalrep_worker_detach(), and logicalrep_worker_stop().

◆ logicalrep_worker_wakeup()

void logicalrep_worker_wakeup ( LogicalRepWorkerType  wtype,
Oid  subid,
Oid  relid 
)

Definition at line 723 of file launcher.c.

724{
725 LogicalRepWorker *worker;
726
727 /* relid must be valid only for table sync workers */
728 Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
729
730 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
731
732 worker = logicalrep_worker_find(wtype, subid, relid, true);
733
734 if (worker)
736
737 LWLockRelease(LogicalRepWorkerLock);
738}
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:746

References Assert(), logicalrep_worker_find(), logicalrep_worker_wakeup_ptr(), LW_SHARED, LWLockAcquire(), LWLockRelease(), OidIsValid, and WORKERTYPE_TABLESYNC.

Referenced by apply_handle_stream_start(), and FinishSyncWorker().

◆ logicalrep_worker_wakeup_ptr()

void logicalrep_worker_wakeup_ptr ( LogicalRepWorker worker)

Definition at line 746 of file launcher.c.

747{
748 Assert(LWLockHeldByMe(LogicalRepWorkerLock));
749
750 SetLatch(&worker->proc->procLatch);
751}
void SetLatch(Latch *latch)
Definition: latch.c:290
Latch procLatch
Definition: proc.h:186

References Assert(), LWLockHeldByMe(), LogicalRepWorker::proc, PGPROC::procLatch, and SetLatch().

Referenced by AtEOXact_LogicalRepWorkers(), logicalrep_worker_wakeup(), ProcessSyncingTablesForApply(), and wait_for_worker_state_change().

◆ logicalrep_workers_find()

List * logicalrep_workers_find ( Oid  subid,
bool  only_running,
bool  acquire_lock 
)

Definition at line 293 of file launcher.c.

294{
295 int i;
296 List *res = NIL;
297
298 if (acquire_lock)
299 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
300
301 Assert(LWLockHeldByMe(LogicalRepWorkerLock));
302
303 /* Search for attached worker for a given subscription id. */
304 for (i = 0; i < max_logical_replication_workers; i++)
305 {
307
308 if (w->in_use && w->subid == subid && (!only_running || w->proc))
309 res = lappend(res, w);
310 }
311
312 if (acquire_lock)
313 LWLockRelease(LogicalRepWorkerLock);
314
315 return res;
316}

References Assert(), i, LogicalRepWorker::in_use, lappend(), LogicalRepCtx, LW_SHARED, LWLockAcquire(), LWLockHeldByMe(), LWLockRelease(), max_logical_replication_workers, NIL, LogicalRepWorker::proc, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.

Referenced by AlterSubscription(), AtEOXact_LogicalRepWorkers(), DropSubscription(), and logicalrep_worker_detach().

◆ pg_stat_get_subscription()

Datum pg_stat_get_subscription ( PG_FUNCTION_ARGS  )

Definition at line 1622 of file launcher.c.

1623{
1624#define PG_STAT_GET_SUBSCRIPTION_COLS 10
1625 Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
1626 int i;
1627 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1628
1629 InitMaterializedSRF(fcinfo, 0);
1630
1631 /* Make sure we get consistent view of the workers. */
1632 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1633
1634 for (i = 0; i < max_logical_replication_workers; i++)
1635 {
1636 /* for each row */
1638 bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1639 int worker_pid;
1640 LogicalRepWorker worker;
1641
1642 memcpy(&worker, &LogicalRepCtx->workers[i],
1643 sizeof(LogicalRepWorker));
1644 if (!worker.proc || !IsBackendPid(worker.proc->pid))
1645 continue;
1646
1647 if (OidIsValid(subid) && worker.subid != subid)
1648 continue;
1649
1650 worker_pid = worker.proc->pid;
1651
1652 values[0] = ObjectIdGetDatum(worker.subid);
1653 if (isTableSyncWorker(&worker))
1654 values[1] = ObjectIdGetDatum(worker.relid);
1655 else
1656 nulls[1] = true;
1657 values[2] = Int32GetDatum(worker_pid);
1658
1659 if (isParallelApplyWorker(&worker))
1660 values[3] = Int32GetDatum(worker.leader_pid);
1661 else
1662 nulls[3] = true;
1663
1664 if (!XLogRecPtrIsValid(worker.last_lsn))
1665 nulls[4] = true;
1666 else
1667 values[4] = LSNGetDatum(worker.last_lsn);
1668 if (worker.last_send_time == 0)
1669 nulls[5] = true;
1670 else
1672 if (worker.last_recv_time == 0)
1673 nulls[6] = true;
1674 else
1676 if (!XLogRecPtrIsValid(worker.reply_lsn))
1677 nulls[7] = true;
1678 else
1679 values[7] = LSNGetDatum(worker.reply_lsn);
1680 if (worker.reply_time == 0)
1681 nulls[8] = true;
1682 else
1684
1685 switch (worker.type)
1686 {
1687 case WORKERTYPE_APPLY:
1688 values[9] = CStringGetTextDatum("apply");
1689 break;
1691 values[9] = CStringGetTextDatum("parallel apply");
1692 break;
1694 values[9] = CStringGetTextDatum("sequence synchronization");
1695 break;
1697 values[9] = CStringGetTextDatum("table synchronization");
1698 break;
1699 case WORKERTYPE_UNKNOWN:
1700 /* Should never happen. */
1701 elog(ERROR, "unknown worker type");
1702 }
1703
1704 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1705 values, nulls);
1706
1707 /*
1708 * If only a single subscription was requested, and we found it,
1709 * break.
1710 */
1711 if (OidIsValid(subid))
1712 break;
1713 }
1714
1715 LWLockRelease(LogicalRepWorkerLock);
1716
1717 return (Datum) 0;
1718}
static Datum values[MAXATTR]
Definition: bootstrap.c:153
#define CStringGetTextDatum(s)
Definition: builtins.h:97
#define PG_GETARG_OID(n)
Definition: fmgr.h:275
#define PG_ARGISNULL(n)
Definition: fmgr.h:209
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
#define PG_STAT_GET_SUBSCRIPTION_COLS
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:31
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:262
unsigned int Oid
Definition: postgres_ext.h:32
bool IsBackendPid(int pid)
Definition: procarray.c:3253
TupleDesc setDesc
Definition: execnodes.h:364
Tuplestorestate * setResult
Definition: execnodes.h:363
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition: tuplestore.c:784
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
#define XLogRecPtrIsValid(r)
Definition: xlogdefs.h:29

References CStringGetTextDatum, elog, ERROR, i, InitMaterializedSRF(), Int32GetDatum(), InvalidOid, IsBackendPid(), isParallelApplyWorker, isTableSyncWorker, LogicalRepWorker::last_lsn, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, LogicalRepWorker::leader_pid, LogicalRepCtx, LSNGetDatum(), LW_SHARED, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, ObjectIdGetDatum(), OidIsValid, PG_ARGISNULL, PG_GETARG_OID, PG_STAT_GET_SUBSCRIPTION_COLS, PGPROC::pid, LogicalRepWorker::proc, LogicalRepWorker::relid, LogicalRepWorker::reply_lsn, LogicalRepWorker::reply_time, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, LogicalRepWorker::subid, TimestampTzGetDatum(), tuplestore_putvalues(), LogicalRepWorker::type, values, LogicalRepCtxStruct::workers, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY, WORKERTYPE_SEQUENCESYNC, WORKERTYPE_TABLESYNC, WORKERTYPE_UNKNOWN, and XLogRecPtrIsValid.

◆ update_conflict_slot_xmin()

static void update_conflict_slot_xmin ( TransactionId  new_xmin)
static

Definition at line 1499 of file launcher.c.

1500{
1502 Assert(!TransactionIdIsValid(new_xmin) ||
1504
1505 /* Return if the xmin value of the slot cannot be updated */
1507 return;
1508
1511 MyReplicationSlot->data.xmin = new_xmin;
1513
1514 elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin);
1515
1518
1519 /*
1520 * Like PhysicalConfirmReceivedLocation(), do not save slot information
1521 * each time. This is acceptable because all concurrent transactions on
1522 * the publisher that require the data preceding the slot's xmin should
1523 * have already been applied and flushed on the subscriber before the xmin
1524 * is advanced. So, even if the slot's xmin regresses after a restart, it
1525 * will be advanced again in the next cycle. Therefore, no data required
1526 * for conflict detection will be prematurely removed.
1527 */
1528 return;
1529}
static bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.h:282
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43

References Assert(), ReplicationSlot::data, DEBUG1, ReplicationSlot::effective_xmin, elog, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdEquals, TransactionIdIsValid, TransactionIdPrecedesOrEquals(), and ReplicationSlotPersistentData::xmin.

Referenced by ApplyLauncherMain().

◆ WaitForReplicationWorkerAttach()

static bool WaitForReplicationWorkerAttach ( LogicalRepWorker worker,
uint16  generation,
BackgroundWorkerHandle handle 
)
static

Definition at line 181 of file launcher.c.

184{
185 bool result = false;
186 bool dropped_latch = false;
187
188 for (;;)
189 {
190 BgwHandleStatus status;
191 pid_t pid;
192 int rc;
193
195
196 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
197
198 /* Worker either died or has started. Return false if died. */
199 if (!worker->in_use || worker->proc)
200 {
201 result = worker->in_use;
202 LWLockRelease(LogicalRepWorkerLock);
203 break;
204 }
205
206 LWLockRelease(LogicalRepWorkerLock);
207
208 /* Check if worker has died before attaching, and clean up after it. */
209 status = GetBackgroundWorkerPid(handle, &pid);
210
211 if (status == BGWH_STOPPED)
212 {
213 LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
214 /* Ensure that this was indeed the worker we waited for. */
215 if (generation == worker->generation)
217 LWLockRelease(LogicalRepWorkerLock);
218 break; /* result is already false */
219 }
220
221 /*
222 * We need timeout because we generally don't get notified via latch
223 * about the worker attach. But we don't expect to have to wait long.
224 */
225 rc = WaitLatch(MyLatch,
227 10L, WAIT_EVENT_BGWORKER_STARTUP);
228
229 if (rc & WL_LATCH_SET)
230 {
233 dropped_latch = true;
234 }
235 }
236
237 /*
238 * If we had to clear a latch event in order to wait, be sure to restore
239 * it before exiting. Otherwise caller may miss events.
240 */
241 if (dropped_latch)
243
244 return result;
245}
BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
Definition: bgworker.c:1161
BgwHandleStatus
Definition: bgworker.h:104
@ BGWH_STOPPED
Definition: bgworker.h:107

References BGWH_STOPPED, CHECK_FOR_INTERRUPTS, LogicalRepWorker::generation, GetBackgroundWorkerPid(), LogicalRepWorker::in_use, logicalrep_worker_cleanup(), LW_EXCLUSIVE, LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLatch, LogicalRepWorker::proc, ResetLatch(), SetLatch(), WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by logicalrep_worker_launch().

Variable Documentation

◆ dsh_params

const dshash_parameters dsh_params
static
Initial value:
= {
sizeof(Oid),
LWTRANCHE_LAUNCHER_HASH
}
void dshash_memcpy(void *dest, const void *src, size_t size, void *arg)
Definition: dshash.c:592
dshash_hash dshash_memhash(const void *v, size_t size, void *arg)
Definition: dshash.c:583
int dshash_memcmp(const void *a, const void *b, size_t size, void *arg)
Definition: dshash.c:574

Definition at line 81 of file launcher.c.

Referenced by logicalrep_launcher_attach_dshmem().

◆ last_start_times

◆ last_start_times_dsa

dsa_area* last_start_times_dsa = NULL
static

Definition at line 90 of file launcher.c.

Referenced by logicalrep_launcher_attach_dshmem().

◆ LogicalRepCtx

◆ max_logical_replication_workers

◆ max_parallel_apply_workers_per_subscription

int max_parallel_apply_workers_per_subscription = 2

Definition at line 54 of file launcher.c.

Referenced by logicalrep_worker_launch(), and pa_free_worker().

◆ max_sync_workers_per_subscription

int max_sync_workers_per_subscription = 2

Definition at line 53 of file launcher.c.

Referenced by launch_sync_worker(), and logicalrep_worker_launch().

◆ MyLogicalRepWorker

LogicalRepWorker* MyLogicalRepWorker = NULL

◆ on_commit_launcher_wakeup

bool on_commit_launcher_wakeup = false
static

Definition at line 93 of file launcher.c.

Referenced by ApplyLauncherWakeupAtCommit(), and AtEOXact_ApplyLauncher().