PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
logicallauncher.h File Reference
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

void ApplyLauncherRegister (void)
 
void ApplyLauncherMain (Datum main_arg)
 
Size ApplyLauncherShmemSize (void)
 
void ApplyLauncherShmemInit (void)
 
void ApplyLauncherForgetWorkerStartTime (Oid subid)
 
void ApplyLauncherWakeupAtCommit (void)
 
void AtEOXact_ApplyLauncher (bool isCommit)
 
bool IsLogicalLauncher (void)
 
pid_t GetLeaderApplyWorkerPid (pid_t pid)
 

Variables

PGDLLIMPORT int max_logical_replication_workers
 
PGDLLIMPORT int max_sync_workers_per_subscription
 
PGDLLIMPORT int max_parallel_apply_workers_per_subscription
 

Function Documentation

◆ ApplyLauncherForgetWorkerStartTime()

void ApplyLauncherForgetWorkerStartTime ( Oid  subid)

Definition at line 1072 of file launcher.c.

1073{
1075
1076 (void) dshash_delete_key(last_start_times, &subid);
1077}
bool dshash_delete_key(dshash_table *hash_table, const void *key)
Definition: dshash.c:503
static dshash_table * last_start_times
Definition: launcher.c:89
static void logicalrep_launcher_attach_dshmem(void)
Definition: launcher.c:986

References dshash_delete_key(), last_start_times, and logicalrep_launcher_attach_dshmem().

Referenced by apply_worker_exit(), DisableSubscriptionAndExit(), DropSubscription(), InitializeLogRepWorker(), maybe_reread_subscription(), and process_syncing_tables_for_apply().

◆ ApplyLauncherMain()

void ApplyLauncherMain ( Datum  main_arg)

Definition at line 1119 of file launcher.c.

1120{
1122 (errmsg_internal("logical replication launcher started")));
1123
1125
1128
1129 /* Establish signal handlers. */
1131 pqsignal(SIGTERM, die);
1133
1134 /*
1135 * Establish connection to nailed catalogs (we only ever access
1136 * pg_subscription).
1137 */
1139
1140 /* Enter main loop */
1141 for (;;)
1142 {
1143 int rc;
1144 List *sublist;
1145 ListCell *lc;
1146 MemoryContext subctx;
1147 MemoryContext oldctx;
1148 long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
1149
1151
1152 /* Use temporary context to avoid leaking memory across cycles. */
1154 "Logical Replication Launcher sublist",
1156 oldctx = MemoryContextSwitchTo(subctx);
1157
1158 /* Start any missing workers for enabled subscriptions. */
1159 sublist = get_subscription_list();
1160 foreach(lc, sublist)
1161 {
1162 Subscription *sub = (Subscription *) lfirst(lc);
1164 TimestampTz last_start;
1166 long elapsed;
1167
1168 if (!sub->enabled)
1169 continue;
1170
1171 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1172 w = logicalrep_worker_find(sub->oid, InvalidOid, false);
1173 LWLockRelease(LogicalRepWorkerLock);
1174
1175 if (w != NULL)
1176 continue; /* worker is running already */
1177
1178 /*
1179 * If the worker is eligible to start now, launch it. Otherwise,
1180 * adjust wait_time so that we'll wake up as soon as it can be
1181 * started.
1182 *
1183 * Each subscription's apply worker can only be restarted once per
1184 * wal_retrieve_retry_interval, so that errors do not cause us to
1185 * repeatedly restart the worker as fast as possible. In cases
1186 * where a restart is expected (e.g., subscription parameter
1187 * changes), another process should remove the last-start entry
1188 * for the subscription so that the worker can be restarted
1189 * without waiting for wal_retrieve_retry_interval to elapse.
1190 */
1191 last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
1193 if (last_start == 0 ||
1195 {
1198 sub->dbid, sub->oid, sub->name,
1199 sub->owner, InvalidOid,
1201 }
1202 else
1203 {
1204 wait_time = Min(wait_time,
1205 wal_retrieve_retry_interval - elapsed);
1206 }
1207 }
1208
1209 /* Switch back to original memory context. */
1210 MemoryContextSwitchTo(oldctx);
1211 /* Clean the temporary memory. */
1212 MemoryContextDelete(subctx);
1213
1214 /* Wait for more work. */
1215 rc = WaitLatch(MyLatch,
1217 wait_time,
1218 WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
1219
1220 if (rc & WL_LATCH_SET)
1221 {
1224 }
1225
1227 {
1228 ConfigReloadPending = false;
1230 }
1231 }
1232
1233 /* Not reachable */
1234}
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1756
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1644
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1608
void BackgroundWorkerInitializeConnection(const char *dbname, const char *username, uint32 flags)
Definition: bgworker.c:852
void BackgroundWorkerUnblockSignals(void)
Definition: bgworker.c:926
#define Min(x, y)
Definition: c.h:958
#define Assert(condition)
Definition: c.h:812
int64 TimestampTz
Definition: timestamp.h:39
#define DSM_HANDLE_INVALID
Definition: dsm_impl.h:58
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1157
#define DEBUG1
Definition: elog.h:30
#define ereport(elevel,...)
Definition: elog.h:149
int MyProcPid
Definition: globals.c:46
struct Latch * MyLatch
Definition: globals.c:62
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
@ PGC_SIGHUP
Definition: guc.h:71
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:337
void ResetLatch(Latch *latch)
Definition: latch.c:724
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:517
#define WL_TIMEOUT
Definition: latch.h:130
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm)
Definition: launcher.c:297
#define DEFAULT_NAPTIME_PER_CYCLE
Definition: launcher.c:47
static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
Definition: launcher.c:1030
static void logicalrep_launcher_onexit(int code, Datum arg)
Definition: launcher.c:804
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
Definition: launcher.c:234
static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid)
Definition: launcher.c:1046
static LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:69
static List * get_subscription_list(void)
Definition: launcher.c:112
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1168
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1781
@ LW_SHARED
Definition: lwlock.h:115
MemoryContext TopMemoryContext
Definition: mcxt.c:149
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
#define lfirst(lc)
Definition: pg_list.h:172
#define die(msg)
pqsigfunc pqsignal(int signo, pqsigfunc func)
uintptr_t Datum
Definition: postgres.h:64
#define InvalidOid
Definition: postgres_ext.h:36
Definition: pg_list.h:54
#define SIGHUP
Definition: win32_port.h:168
@ WORKERTYPE_APPLY
int wal_retrieve_retry_interval
Definition: xlog.c:134

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, ApplyLauncherGetWorkerStartTime(), ApplyLauncherSetWorkerStartTime(), Assert, BackgroundWorkerInitializeConnection(), BackgroundWorkerUnblockSignals(), before_shmem_exit(), CHECK_FOR_INTERRUPTS, ConfigReloadPending, Subscription::dbid, DEBUG1, DEFAULT_NAPTIME_PER_CYCLE, die, DSM_HANDLE_INVALID, Subscription::enabled, ereport, errmsg_internal(), get_subscription_list(), GetCurrentTimestamp(), InvalidOid, LogicalRepCtxStruct::launcher_pid, lfirst, logicalrep_launcher_onexit(), logicalrep_worker_find(), logicalrep_worker_launch(), LogicalRepCtx, LW_SHARED, LWLockAcquire(), LWLockRelease(), MemoryContextDelete(), MemoryContextSwitchTo(), Min, MyLatch, MyProcPid, Subscription::name, now(), Subscription::oid, Subscription::owner, PGC_SIGHUP, pqsignal(), ProcessConfigFile(), ResetLatch(), SIGHUP, SignalHandlerForConfigReload(), TimestampDifferenceMilliseconds(), TopMemoryContext, WaitLatch(), wal_retrieve_retry_interval, WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, WL_TIMEOUT, and WORKERTYPE_APPLY.

◆ ApplyLauncherRegister()

void ApplyLauncherRegister ( void  )

Definition at line 915 of file launcher.c.

916{
918
919 /*
920 * The logical replication launcher is disabled during binary upgrades, to
921 * prevent logical replication workers from running on the source cluster.
922 * That could cause replication origins to move forward after having been
923 * copied to the target cluster, potentially creating conflicts with the
924 * copied data files.
925 */
927 return;
928
929 memset(&bgw, 0, sizeof(bgw));
933 snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
934 snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
936 "logical replication launcher");
938 "logical replication launcher");
939 bgw.bgw_restart_time = 5;
940 bgw.bgw_notify_pid = 0;
941 bgw.bgw_main_arg = (Datum) 0;
942
944}
void RegisterBackgroundWorker(BackgroundWorker *worker)
Definition: bgworker.c:939
@ BgWorkerStart_RecoveryFinished
Definition: bgworker.h:81
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition: bgworker.h:60
#define BGWORKER_SHMEM_ACCESS
Definition: bgworker.h:53
#define BGW_MAXLEN
Definition: bgworker.h:86
bool IsBinaryUpgrade
Definition: globals.c:120
int max_logical_replication_workers
Definition: launcher.c:50
#define MAXPGPATH
#define snprintf
Definition: port.h:238
char bgw_function_name[BGW_MAXLEN]
Definition: bgworker.h:97
Datum bgw_main_arg
Definition: bgworker.h:98
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:91
int bgw_restart_time
Definition: bgworker.h:95
char bgw_type[BGW_MAXLEN]
Definition: bgworker.h:92
BgWorkerStartTime bgw_start_time
Definition: bgworker.h:94
pid_t bgw_notify_pid
Definition: bgworker.h:100
char bgw_library_name[MAXPGPATH]
Definition: bgworker.h:96

References BackgroundWorker::bgw_flags, BackgroundWorker::bgw_function_name, BackgroundWorker::bgw_library_name, BackgroundWorker::bgw_main_arg, BGW_MAXLEN, BackgroundWorker::bgw_name, BackgroundWorker::bgw_notify_pid, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_start_time, BackgroundWorker::bgw_type, BGWORKER_BACKEND_DATABASE_CONNECTION, BGWORKER_SHMEM_ACCESS, BgWorkerStart_RecoveryFinished, IsBinaryUpgrade, max_logical_replication_workers, MAXPGPATH, RegisterBackgroundWorker(), and snprintf.

Referenced by PostmasterMain().

◆ ApplyLauncherShmemInit()

void ApplyLauncherShmemInit ( void  )

Definition at line 951 of file launcher.c.

952{
953 bool found;
954
956 ShmemInitStruct("Logical Replication Launcher Data",
958 &found);
959
960 if (!found)
961 {
962 int slot;
963
965
968
969 /* Initialize memory and spin locks for each worker slot. */
970 for (slot = 0; slot < max_logical_replication_workers; slot++)
971 {
972 LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
973
974 memset(worker, 0, sizeof(LogicalRepWorker));
975 SpinLockInit(&worker->relmutex);
976 }
977 }
978}
#define DSA_HANDLE_INVALID
Definition: dsa.h:139
#define DSHASH_HANDLE_INVALID
Definition: dshash.h:27
Size ApplyLauncherShmemSize(void)
Definition: launcher.c:896
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:382
#define SpinLockInit(lock)
Definition: spin.h:57
dsa_handle last_start_dsa
Definition: launcher.c:62
dshash_table_handle last_start_dsh
Definition: launcher.c:63
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:66

References ApplyLauncherShmemSize(), DSA_HANDLE_INVALID, DSHASH_HANDLE_INVALID, LogicalRepCtxStruct::last_start_dsa, LogicalRepCtxStruct::last_start_dsh, LogicalRepCtx, max_logical_replication_workers, LogicalRepWorker::relmutex, ShmemInitStruct(), SpinLockInit, and LogicalRepCtxStruct::workers.

Referenced by CreateOrAttachShmemStructs().

◆ ApplyLauncherShmemSize()

Size ApplyLauncherShmemSize ( void  )

Definition at line 896 of file launcher.c.

897{
898 Size size;
899
900 /*
901 * Need the fixed struct and the array of LogicalRepWorker.
902 */
903 size = sizeof(LogicalRepCtxStruct);
904 size = MAXALIGN(size);
906 sizeof(LogicalRepWorker)));
907 return size;
908}
#define MAXALIGN(LEN)
Definition: c.h:765
size_t Size
Definition: c.h:559
struct LogicalRepCtxStruct LogicalRepCtxStruct
Size add_size(Size s1, Size s2)
Definition: shmem.c:488
Size mul_size(Size s1, Size s2)
Definition: shmem.c:505
static pg_noinline void Size size
Definition: slab.c:607

References add_size(), max_logical_replication_workers, MAXALIGN, mul_size(), and size.

Referenced by ApplyLauncherShmemInit(), and CalculateShmemSize().

◆ ApplyLauncherWakeupAtCommit()

void ApplyLauncherWakeupAtCommit ( void  )

Definition at line 1102 of file launcher.c.

1103{
1106}
static bool on_commit_launcher_wakeup
Definition: launcher.c:91

References on_commit_launcher_wakeup.

Referenced by AlterSubscription(), AlterSubscriptionOwner_internal(), and CreateSubscription().

◆ AtEOXact_ApplyLauncher()

void AtEOXact_ApplyLauncher ( bool  isCommit)

Definition at line 1083 of file launcher.c.

1084{
1085 if (isCommit)
1086 {
1089 }
1090
1092}
static void ApplyLauncherWakeup(void)
Definition: launcher.c:1109

References ApplyLauncherWakeup(), and on_commit_launcher_wakeup.

Referenced by AbortTransaction(), CommitTransaction(), and PrepareTransaction().

◆ GetLeaderApplyWorkerPid()

pid_t GetLeaderApplyWorkerPid ( pid_t  pid)

Definition at line 1250 of file launcher.c.

1251{
1252 int leader_pid = InvalidPid;
1253 int i;
1254
1255 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1256
1257 for (i = 0; i < max_logical_replication_workers; i++)
1258 {
1260
1261 if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
1262 {
1263 leader_pid = w->leader_pid;
1264 break;
1265 }
1266 }
1267
1268 LWLockRelease(LogicalRepWorkerLock);
1269
1270 return leader_pid;
1271}
int i
Definition: isn.c:72
#define InvalidPid
Definition: miscadmin.h:32
int pid
Definition: proc.h:182
#define isParallelApplyWorker(worker)

References i, InvalidPid, isParallelApplyWorker, LogicalRepWorker::leader_pid, LogicalRepCtx, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, PGPROC::pid, LogicalRepWorker::proc, and LogicalRepCtxStruct::workers.

Referenced by pg_stat_get_activity().

◆ IsLogicalLauncher()

bool IsLogicalLauncher ( void  )

Definition at line 1240 of file launcher.c.

1241{
1243}

References LogicalRepCtxStruct::launcher_pid, LogicalRepCtx, and MyProcPid.

Referenced by ProcessInterrupts().

Variable Documentation

◆ max_logical_replication_workers

◆ max_parallel_apply_workers_per_subscription

PGDLLIMPORT int max_parallel_apply_workers_per_subscription
extern

Definition at line 52 of file launcher.c.

Referenced by logicalrep_worker_launch(), and pa_free_worker().

◆ max_sync_workers_per_subscription

PGDLLIMPORT int max_sync_workers_per_subscription
extern

Definition at line 51 of file launcher.c.

Referenced by logicalrep_worker_launch(), and process_syncing_tables_for_apply().