PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
logicallauncher.h File Reference
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

void ApplyLauncherRegister (void)
 
void ApplyLauncherMain (Datum main_arg)
 
Size ApplyLauncherShmemSize (void)
 
void ApplyLauncherShmemInit (void)
 
void ApplyLauncherForgetWorkerStartTime (Oid subid)
 
void ApplyLauncherWakeupAtCommit (void)
 
void ApplyLauncherWakeup (void)
 
void AtEOXact_ApplyLauncher (bool isCommit)
 
void CreateConflictDetectionSlot (void)
 
bool IsLogicalLauncher (void)
 
pid_t GetLeaderApplyWorkerPid (pid_t pid)
 

Variables

PGDLLIMPORT int max_logical_replication_workers
 
PGDLLIMPORT int max_sync_workers_per_subscription
 
PGDLLIMPORT int max_parallel_apply_workers_per_subscription
 

Function Documentation

◆ ApplyLauncherForgetWorkerStartTime()

void ApplyLauncherForgetWorkerStartTime ( Oid  subid)

Definition at line 1101 of file launcher.c.

1102{
1104
1105 (void) dshash_delete_key(last_start_times, &subid);
1106}
bool dshash_delete_key(dshash_table *hash_table, const void *key)
Definition: dshash.c:503
static dshash_table * last_start_times
Definition: launcher.c:91
static void logicalrep_launcher_attach_dshmem(void)
Definition: launcher.c:1015

References dshash_delete_key(), last_start_times, and logicalrep_launcher_attach_dshmem().

Referenced by apply_worker_exit(), DisableSubscriptionAndExit(), DropSubscription(), InitializeLogRepWorker(), maybe_reread_subscription(), and process_syncing_tables_for_apply().

◆ ApplyLauncherMain()

void ApplyLauncherMain ( Datum  main_arg)

Definition at line 1151 of file launcher.c.

1152{
1154 (errmsg_internal("logical replication launcher started")));
1155
1157
1160
1161 /* Establish signal handlers. */
1163 pqsignal(SIGTERM, die);
1165
1166 /*
1167 * Establish connection to nailed catalogs (we only ever access
1168 * pg_subscription).
1169 */
1171
1172 /*
1173 * Acquire the conflict detection slot at startup to ensure it can be
1174 * dropped if no longer needed after a restart.
1175 */
1177
1178 /* Enter main loop */
1179 for (;;)
1180 {
1181 int rc;
1182 List *sublist;
1183 ListCell *lc;
1184 MemoryContext subctx;
1185 MemoryContext oldctx;
1186 long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
1187 bool can_update_xmin = true;
1188 bool retain_dead_tuples = false;
1190
1192
1193 /* Use temporary context to avoid leaking memory across cycles. */
1195 "Logical Replication Launcher sublist",
1197 oldctx = MemoryContextSwitchTo(subctx);
1198
1199 /*
1200 * Start any missing workers for enabled subscriptions.
1201 *
1202 * Also, during the iteration through all subscriptions, we compute
1203 * the minimum XID required to protect deleted tuples for conflict
1204 * detection if one of the subscription enables retain_dead_tuples
1205 * option.
1206 */
1207 sublist = get_subscription_list();
1208 foreach(lc, sublist)
1209 {
1210 Subscription *sub = (Subscription *) lfirst(lc);
1212 TimestampTz last_start;
1214 long elapsed;
1215
1216 if (sub->retaindeadtuples)
1217 {
1218 retain_dead_tuples = true;
1219
1220 /*
1221 * Create a replication slot to retain information necessary
1222 * for conflict detection such as dead tuples, commit
1223 * timestamps, and origins.
1224 *
1225 * The slot is created before starting the apply worker to
1226 * prevent it from unnecessarily maintaining its
1227 * oldest_nonremovable_xid.
1228 *
1229 * The slot is created even for a disabled subscription to
1230 * ensure that conflict-related information is available when
1231 * applying remote changes that occurred before the
1232 * subscription was enabled.
1233 */
1235
1236 if (sub->retentionactive)
1237 {
1238 /*
1239 * Can't advance xmin of the slot unless all the
1240 * subscriptions actively retaining dead tuples are
1241 * enabled. This is required to ensure that we don't
1242 * advance the xmin of CONFLICT_DETECTION_SLOT if one of
1243 * the subscriptions is not enabled. Otherwise, we won't
1244 * be able to detect conflicts reliably for such a
1245 * subscription even though it has set the
1246 * retain_dead_tuples option.
1247 */
1248 can_update_xmin &= sub->enabled;
1249
1250 /*
1251 * Initialize the slot once the subscription activiates
1252 * retention.
1253 */
1256 }
1257 }
1258
1259 if (!sub->enabled)
1260 continue;
1261
1262 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1263 w = logicalrep_worker_find(sub->oid, InvalidOid, false);
1264
1265 if (w != NULL)
1266 {
1267 /*
1268 * Compute the minimum xmin required to protect dead tuples
1269 * required for conflict detection among all running apply
1270 * workers. This computation is performed while holding
1271 * LogicalRepWorkerLock to prevent accessing invalid worker
1272 * data, in scenarios where a worker might exit and reset its
1273 * state concurrently.
1274 */
1275 if (sub->retaindeadtuples &&
1276 sub->retentionactive &&
1277 can_update_xmin)
1279
1280 LWLockRelease(LogicalRepWorkerLock);
1281
1282 /* worker is running already */
1283 continue;
1284 }
1285
1286 LWLockRelease(LogicalRepWorkerLock);
1287
1288 /*
1289 * Can't advance xmin of the slot unless all the workers
1290 * corresponding to subscriptions actively retaining dead tuples
1291 * are running, disabling the further computation of the minimum
1292 * nonremovable xid.
1293 */
1294 if (sub->retaindeadtuples && sub->retentionactive)
1295 can_update_xmin = false;
1296
1297 /*
1298 * If the worker is eligible to start now, launch it. Otherwise,
1299 * adjust wait_time so that we'll wake up as soon as it can be
1300 * started.
1301 *
1302 * Each subscription's apply worker can only be restarted once per
1303 * wal_retrieve_retry_interval, so that errors do not cause us to
1304 * repeatedly restart the worker as fast as possible. In cases
1305 * where a restart is expected (e.g., subscription parameter
1306 * changes), another process should remove the last-start entry
1307 * for the subscription so that the worker can be restarted
1308 * without waiting for wal_retrieve_retry_interval to elapse.
1309 */
1310 last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
1312 if (last_start == 0 ||
1314 {
1317 sub->dbid, sub->oid, sub->name,
1318 sub->owner, InvalidOid,
1320 sub->retaindeadtuples &&
1321 sub->retentionactive))
1322 {
1323 /*
1324 * We get here either if we failed to launch a worker
1325 * (perhaps for resource-exhaustion reasons) or if we
1326 * launched one but it immediately quit. Either way, it
1327 * seems appropriate to try again after
1328 * wal_retrieve_retry_interval.
1329 */
1330 wait_time = Min(wait_time,
1332 }
1333 }
1334 else
1335 {
1336 wait_time = Min(wait_time,
1337 wal_retrieve_retry_interval - elapsed);
1338 }
1339 }
1340
1341 /*
1342 * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription
1343 * that requires us to retain dead tuples. Otherwise, if required,
1344 * advance the slot's xmin to protect dead tuples required for the
1345 * conflict detection.
1346 *
1347 * Additionally, if all apply workers for subscriptions with
1348 * retain_dead_tuples enabled have requested to stop retention, the
1349 * slot's xmin will be set to InvalidTransactionId allowing the
1350 * removal of dead tuples.
1351 */
1353 {
1354 if (!retain_dead_tuples)
1356 else if (can_update_xmin)
1358 }
1359
1360 /* Switch back to original memory context. */
1361 MemoryContextSwitchTo(oldctx);
1362 /* Clean the temporary memory. */
1363 MemoryContextDelete(subctx);
1364
1365 /* Wait for more work. */
1366 rc = WaitLatch(MyLatch,
1368 wait_time,
1369 WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
1370
1371 if (rc & WL_LATCH_SET)
1372 {
1375 }
1376
1378 {
1379 ConfigReloadPending = false;
1381 }
1382 }
1383
1384 /* Not reachable */
1385}
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1757
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1609
void BackgroundWorkerInitializeConnection(const char *dbname, const char *username, uint32 flags)
Definition: bgworker.c:853
void BackgroundWorkerUnblockSignals(void)
Definition: bgworker.c:927
#define Min(x, y)
Definition: c.h:1003
uint32 TransactionId
Definition: c.h:657
int64 TimestampTz
Definition: timestamp.h:39
#define DSM_HANDLE_INVALID
Definition: dsm_impl.h:58
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1161
#define DEBUG1
Definition: elog.h:30
#define ereport(elevel,...)
Definition: elog.h:150
int MyProcPid
Definition: globals.c:47
struct Latch * MyLatch
Definition: globals.c:63
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
@ PGC_SIGHUP
Definition: guc.h:75
Assert(PointerIsAligned(start, uint64))
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:337
void ResetLatch(Latch *latch)
Definition: latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:172
#define DEFAULT_NAPTIME_PER_CYCLE
Definition: launcher.c:49
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples)
Definition: launcher.c:317
static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
Definition: launcher.c:1059
static void update_conflict_slot_xmin(TransactionId new_xmin)
Definition: launcher.c:1445
static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
Definition: launcher.c:1393
static void logicalrep_launcher_onexit(int code, Datum arg)
Definition: launcher.c:833
void CreateConflictDetectionSlot(void)
Definition: launcher.c:1512
static void init_conflict_slot_xmin(void)
Definition: launcher.c:1481
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
Definition: launcher.c:254
static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid)
Definition: launcher.c:1075
static LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:71
static bool acquire_conflict_slot_if_exists(void)
Definition: launcher.c:1431
static List * get_subscription_list(void)
Definition: launcher.c:117
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1174
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1894
@ LW_SHARED
Definition: lwlock.h:113
MemoryContext TopMemoryContext
Definition: mcxt.c:166
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:469
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
#define lfirst(lc)
Definition: pg_list.h:172
#define die(msg)
#define pqsignal
Definition: port.h:531
uint64_t Datum
Definition: postgres.h:70
#define InvalidOid
Definition: postgres_ext.h:37
void ReplicationSlotDropAcquired(void)
Definition: slot.c:964
ReplicationSlot * MyReplicationSlot
Definition: slot.c:148
Definition: pg_list.h:54
TransactionId xmin
Definition: slot.h:96
ReplicationSlotPersistentData data
Definition: slot.h:192
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define WL_TIMEOUT
Definition: waiteventset.h:37
#define WL_EXIT_ON_PM_DEATH
Definition: waiteventset.h:39
#define WL_LATCH_SET
Definition: waiteventset.h:34
#define SIGHUP
Definition: win32_port.h:158
@ WORKERTYPE_APPLY
int wal_retrieve_retry_interval
Definition: xlog.c:135

References acquire_conflict_slot_if_exists(), ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, ApplyLauncherGetWorkerStartTime(), ApplyLauncherSetWorkerStartTime(), Assert(), BackgroundWorkerInitializeConnection(), BackgroundWorkerUnblockSignals(), before_shmem_exit(), CHECK_FOR_INTERRUPTS, compute_min_nonremovable_xid(), ConfigReloadPending, CreateConflictDetectionSlot(), ReplicationSlot::data, Subscription::dbid, DEBUG1, DEFAULT_NAPTIME_PER_CYCLE, die, DSM_HANDLE_INVALID, Subscription::enabled, ereport, errmsg_internal(), get_subscription_list(), GetCurrentTimestamp(), init_conflict_slot_xmin(), InvalidOid, InvalidTransactionId, LogicalRepCtxStruct::launcher_pid, lfirst, logicalrep_launcher_onexit(), logicalrep_worker_find(), logicalrep_worker_launch(), LogicalRepCtx, LW_SHARED, LWLockAcquire(), LWLockRelease(), MemoryContextDelete(), MemoryContextSwitchTo(), Min, MyLatch, MyProcPid, MyReplicationSlot, Subscription::name, now(), Subscription::oid, Subscription::owner, PGC_SIGHUP, pqsignal, ProcessConfigFile(), ReplicationSlotDropAcquired(), ResetLatch(), Subscription::retaindeadtuples, Subscription::retentionactive, SIGHUP, SignalHandlerForConfigReload(), TimestampDifferenceMilliseconds(), TopMemoryContext, TransactionIdIsValid, update_conflict_slot_xmin(), WaitLatch(), wal_retrieve_retry_interval, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_TIMEOUT, WORKERTYPE_APPLY, and ReplicationSlotPersistentData::xmin.

◆ ApplyLauncherRegister()

void ApplyLauncherRegister ( void  )

Definition at line 944 of file launcher.c.

945{
947
948 /*
949 * The logical replication launcher is disabled during binary upgrades, to
950 * prevent logical replication workers from running on the source cluster.
951 * That could cause replication origins to move forward after having been
952 * copied to the target cluster, potentially creating conflicts with the
953 * copied data files.
954 */
956 return;
957
958 memset(&bgw, 0, sizeof(bgw));
962 snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
963 snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
965 "logical replication launcher");
967 "logical replication launcher");
968 bgw.bgw_restart_time = 5;
969 bgw.bgw_notify_pid = 0;
970 bgw.bgw_main_arg = (Datum) 0;
971
973}
void RegisterBackgroundWorker(BackgroundWorker *worker)
Definition: bgworker.c:940
@ BgWorkerStart_RecoveryFinished
Definition: bgworker.h:81
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition: bgworker.h:60
#define BGWORKER_SHMEM_ACCESS
Definition: bgworker.h:53
#define BGW_MAXLEN
Definition: bgworker.h:86
bool IsBinaryUpgrade
Definition: globals.c:121
int max_logical_replication_workers
Definition: launcher.c:52
#define MAXPGPATH
#define snprintf
Definition: port.h:239
char bgw_function_name[BGW_MAXLEN]
Definition: bgworker.h:97
Datum bgw_main_arg
Definition: bgworker.h:98
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:91
int bgw_restart_time
Definition: bgworker.h:95
char bgw_type[BGW_MAXLEN]
Definition: bgworker.h:92
BgWorkerStartTime bgw_start_time
Definition: bgworker.h:94
pid_t bgw_notify_pid
Definition: bgworker.h:100
char bgw_library_name[MAXPGPATH]
Definition: bgworker.h:96

References BackgroundWorker::bgw_flags, BackgroundWorker::bgw_function_name, BackgroundWorker::bgw_library_name, BackgroundWorker::bgw_main_arg, BGW_MAXLEN, BackgroundWorker::bgw_name, BackgroundWorker::bgw_notify_pid, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_start_time, BackgroundWorker::bgw_type, BGWORKER_BACKEND_DATABASE_CONNECTION, BGWORKER_SHMEM_ACCESS, BgWorkerStart_RecoveryFinished, IsBinaryUpgrade, max_logical_replication_workers, MAXPGPATH, RegisterBackgroundWorker(), and snprintf.

Referenced by PostmasterMain().

◆ ApplyLauncherShmemInit()

void ApplyLauncherShmemInit ( void  )

Definition at line 980 of file launcher.c.

981{
982 bool found;
983
985 ShmemInitStruct("Logical Replication Launcher Data",
987 &found);
988
989 if (!found)
990 {
991 int slot;
992
994
997
998 /* Initialize memory and spin locks for each worker slot. */
999 for (slot = 0; slot < max_logical_replication_workers; slot++)
1000 {
1001 LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
1002
1003 memset(worker, 0, sizeof(LogicalRepWorker));
1004 SpinLockInit(&worker->relmutex);
1005 }
1006 }
1007}
#define DSA_HANDLE_INVALID
Definition: dsa.h:139
#define DSHASH_HANDLE_INVALID
Definition: dshash.h:27
Size ApplyLauncherShmemSize(void)
Definition: launcher.c:925
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387
#define SpinLockInit(lock)
Definition: spin.h:57
dsa_handle last_start_dsa
Definition: launcher.c:64
dshash_table_handle last_start_dsh
Definition: launcher.c:65
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:68

References ApplyLauncherShmemSize(), DSA_HANDLE_INVALID, DSHASH_HANDLE_INVALID, LogicalRepCtxStruct::last_start_dsa, LogicalRepCtxStruct::last_start_dsh, LogicalRepCtx, max_logical_replication_workers, LogicalRepWorker::relmutex, ShmemInitStruct(), SpinLockInit, and LogicalRepCtxStruct::workers.

Referenced by CreateOrAttachShmemStructs().

◆ ApplyLauncherShmemSize()

Size ApplyLauncherShmemSize ( void  )

Definition at line 925 of file launcher.c.

926{
927 Size size;
928
929 /*
930 * Need the fixed struct and the array of LogicalRepWorker.
931 */
932 size = sizeof(LogicalRepCtxStruct);
933 size = MAXALIGN(size);
935 sizeof(LogicalRepWorker)));
936 return size;
937}
#define MAXALIGN(LEN)
Definition: c.h:810
size_t Size
Definition: c.h:610
struct LogicalRepCtxStruct LogicalRepCtxStruct
Size add_size(Size s1, Size s2)
Definition: shmem.c:493
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510

References add_size(), max_logical_replication_workers, MAXALIGN, and mul_size().

Referenced by ApplyLauncherShmemInit(), and CalculateShmemSize().

◆ ApplyLauncherWakeup()

void ApplyLauncherWakeup ( void  )

Definition at line 1141 of file launcher.c.

1142{
1143 if (LogicalRepCtx->launcher_pid != 0)
1145}
#define kill(pid, sig)
Definition: win32_port.h:493
#define SIGUSR1
Definition: win32_port.h:170

References kill, LogicalRepCtxStruct::launcher_pid, LogicalRepCtx, and SIGUSR1.

Referenced by AtEOXact_ApplyLauncher(), logicalrep_worker_onexit(), update_retention_status(), and wait_for_local_flush().

◆ ApplyLauncherWakeupAtCommit()

void ApplyLauncherWakeupAtCommit ( void  )

Definition at line 1131 of file launcher.c.

1132{
1135}
static bool on_commit_launcher_wakeup
Definition: launcher.c:93

References on_commit_launcher_wakeup.

Referenced by AlterSubscription(), AlterSubscriptionOwner_internal(), and CreateSubscription().

◆ AtEOXact_ApplyLauncher()

void AtEOXact_ApplyLauncher ( bool  isCommit)

Definition at line 1112 of file launcher.c.

1113{
1114 if (isCommit)
1115 {
1118 }
1119
1121}
void ApplyLauncherWakeup(void)
Definition: launcher.c:1141

References ApplyLauncherWakeup(), and on_commit_launcher_wakeup.

Referenced by AbortTransaction(), CommitTransaction(), and PrepareTransaction().

◆ CreateConflictDetectionSlot()

void CreateConflictDetectionSlot ( void  )

Definition at line 1512 of file launcher.c.

1513{
1514 /* Exit early, if the replication slot is already created and acquired */
1516 return;
1517
1518 ereport(LOG,
1519 errmsg("creating replication conflict detection slot"));
1520
1522 false, false);
1523
1525}
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define LOG
Definition: elog.h:31
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
Definition: slot.c:352
#define CONFLICT_DETECTION_SLOT
Definition: slot.h:28
@ RS_PERSISTENT
Definition: slot.h:45

References CONFLICT_DETECTION_SLOT, ereport, errmsg(), init_conflict_slot_xmin(), LOG, MyReplicationSlot, ReplicationSlotCreate(), and RS_PERSISTENT.

Referenced by ApplyLauncherMain(), and binary_upgrade_create_conflict_detection_slot().

◆ GetLeaderApplyWorkerPid()

pid_t GetLeaderApplyWorkerPid ( pid_t  pid)

Definition at line 1541 of file launcher.c.

1542{
1543 int leader_pid = InvalidPid;
1544 int i;
1545
1546 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1547
1548 for (i = 0; i < max_logical_replication_workers; i++)
1549 {
1551
1552 if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
1553 {
1554 leader_pid = w->leader_pid;
1555 break;
1556 }
1557 }
1558
1559 LWLockRelease(LogicalRepWorkerLock);
1560
1561 return leader_pid;
1562}
int i
Definition: isn.c:77
#define InvalidPid
Definition: miscadmin.h:32
int pid
Definition: proc.h:199
#define isParallelApplyWorker(worker)

References i, InvalidPid, isParallelApplyWorker, LogicalRepWorker::leader_pid, LogicalRepCtx, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, PGPROC::pid, LogicalRepWorker::proc, and LogicalRepCtxStruct::workers.

Referenced by pg_stat_get_activity().

◆ IsLogicalLauncher()

bool IsLogicalLauncher ( void  )

Variable Documentation

◆ max_logical_replication_workers

◆ max_parallel_apply_workers_per_subscription

PGDLLIMPORT int max_parallel_apply_workers_per_subscription
extern

Definition at line 54 of file launcher.c.

Referenced by logicalrep_worker_launch(), and pa_free_worker().

◆ max_sync_workers_per_subscription

PGDLLIMPORT int max_sync_workers_per_subscription
extern

Definition at line 53 of file launcher.c.

Referenced by logicalrep_worker_launch(), and process_syncing_tables_for_apply().