PostgreSQL Source Code git master
Loading...
Searching...
No Matches
syncutils.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 * syncutils.c
3 * PostgreSQL logical replication: common synchronization code
4 *
5 * Copyright (c) 2025-2026, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/syncutils.c
9 *
10 * NOTES
11 * This file contains code common for synchronization workers.
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres.h"
16
18#include "pgstat.h"
21#include "storage/ipc.h"
22#include "utils/lsyscache.h"
23#include "utils/memutils.h"
24
25/*
26 * Enum for phases of the subscription relations state.
27 *
28 * SYNC_RELATIONS_STATE_NEEDS_REBUILD indicates that the subscription relations
29 * state is no longer valid, and the subscription relations should be rebuilt.
30 *
31 * SYNC_RELATIONS_STATE_REBUILD_STARTED indicates that the subscription
32 * relations state is being rebuilt.
33 *
34 * SYNC_RELATIONS_STATE_VALID indicates that the subscription relation state is
35 * up-to-date and valid.
36 */
43
45
46/*
47 * Exit routine for synchronization worker.
48 */
49pg_noreturn void
51{
53
54 /*
55 * Commit any outstanding transaction. This is the usual case, unless
56 * there was nothing to do for the table.
57 */
59 {
62 }
63
64 /* And flush all writes. */
66
68 {
70 errmsg("logical replication sequence synchronization worker for subscription \"%s\" has finished",
72
73 /*
74 * Reset last_seqsync_start_time, so that next time a sequencesync
75 * worker is needed it can be started promptly.
76 */
78 }
79 else
80 {
83 errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
87
88 /* Find the leader apply worker and signal it. */
91 }
92
93 /* Stop gracefully */
94 proc_exit(0);
95}
96
97/*
98 * Callback from syscache invalidation.
99 */
100void
106
107/*
108 * Attempt to launch a sync worker for one or more sequences or a table, if
109 * a worker slot is available and the retry interval has elapsed.
110 *
111 * wtype: sync worker type.
112 * nsyncworkers: Number of currently running sync workers for the subscription.
113 * relid: InvalidOid for sequencesync worker, actual relid for tablesync
114 * worker.
115 * last_start_time: Pointer to the last start time of the worker.
116 */
117void
119 TimestampTz *last_start_time)
120{
122
125
126 /* If there is a free sync worker slot, start a new sync worker */
128 return;
129
131
132 if (!(*last_start_time) ||
133 TimestampDifferenceExceeds(*last_start_time, now,
135 {
136 /*
137 * Set the last_start_time even if we fail to start the worker, so
138 * that we won't retry until wal_retrieve_retry_interval has elapsed.
139 */
140 *last_start_time = now;
146 relid, DSM_HANDLE_INVALID, false);
147 }
148}
149
150/*
151 * Process possible state change(s) of relations that are being synchronized
152 * and start new tablesync workers for the newly added tables. Also, start a
153 * new sequencesync worker for the newly added sequences.
154 */
155void
157{
158 switch (MyLogicalRepWorker->type)
159 {
161
162 /*
163 * Skip for parallel apply workers because they only operate on
164 * tables that are in a READY state. See pa_can_start() and
165 * should_apply_changes_for_rel().
166 */
167 break;
168
171 break;
172
173 case WORKERTYPE_APPLY:
176 break;
177
179 /* Should never happen. */
180 elog(ERROR, "sequence synchronization worker is not expected to process relations");
181 break;
182
184 /* Should never happen. */
185 elog(ERROR, "Unknown worker type");
186 }
187}
188
189/*
190 * Common code to fetch the up-to-date sync state info for tables and sequences.
191 *
192 * The pg_subscription_rel catalog is shared by tables and sequences. Changes
193 * to either sequences or tables can affect the validity of relation states, so
194 * we identify non-READY tables and non-READY sequences together to ensure
195 * consistency.
196 *
197 * has_pending_subtables: true if the subscription has one or more tables that
198 * are not in READY state, otherwise false.
199 * has_pending_subsequences: true if the subscription has one or more sequences
200 * that are not in READY state, otherwise false.
201 */
202void
205 bool *started_tx)
206{
207 /*
208 * has_subtables and has_subsequences_non_ready are declared as static,
209 * since the same value can be used until the system table is invalidated.
210 */
211 static bool has_subtables = false;
212 static bool has_subsequences_non_ready = false;
213
214 *started_tx = false;
215
217 {
219 List *rstates;
220 SubscriptionRelState *rstate;
221
224
225 /* Clean the old lists. */
228
229 if (!IsTransactionState())
230 {
232 *started_tx = true;
233 }
234
235 /* Fetch tables and sequences that are in non-READY state. */
237 true);
238
239 /* Allocate the tracking info in a permanent memory context. */
242 {
245 else
246 {
248 memcpy(rstate, subrel, sizeof(SubscriptionRelState));
250 rstate);
251 }
252 }
254
255 /*
256 * Does the subscription have tables?
257 *
258 * If there were not-READY tables found then we know it does. But if
259 * table_states_not_ready was empty we still need to check again to
260 * see if there are 0 tables.
261 */
264
265 /*
266 * If the subscription relation cache has been invalidated since we
267 * entered this routine, we still use and return the relations we just
268 * finished constructing, to avoid infinite loops, but we leave the
269 * table states marked as stale so that we'll rebuild it again on next
270 * access. Otherwise, we mark the table states as valid.
271 */
274 }
275
278
281}
Subscription * MySubscription
Definition worker.c:480
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition timestamp.c:1781
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1645
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1609
#define pg_noreturn
Definition c.h:176
#define Assert(condition)
Definition c.h:885
uint32_t uint32
Definition c.h:558
#define OidIsValid(objectId)
Definition c.h:800
int64 TimestampTz
Definition timestamp.h:39
#define DSM_HANDLE_INVALID
Definition dsm_impl.h:58
Datum arg
Definition elog.c:1322
int errmsg(const char *fmt,...)
Definition elog.c:1093
#define LOG
Definition elog.h:31
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
#define palloc_object(type)
Definition fe_memutils.h:74
void proc_exit(int code)
Definition ipc.c:105
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples)
Definition launcher.c:324
void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid)
Definition launcher.c:723
void logicalrep_reset_seqsync_start_time(void)
Definition launcher.c:872
LogicalRepWorker * MyLogicalRepWorker
Definition launcher.c:56
int max_sync_workers_per_subscription
Definition launcher.c:53
List * lappend(List *list, void *datum)
Definition list.c:339
void list_free_deep(List *list)
Definition list.c:1560
char * get_rel_name(Oid relid)
Definition lsyscache.c:2078
char get_rel_relkind(Oid relid)
Definition lsyscache.c:2153
MemoryContext CacheMemoryContext
Definition mcxt.c:169
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
#define NIL
Definition pg_list.h:68
#define foreach_ptr(type, var, lst)
Definition pg_list.h:469
bool HasSubscriptionTables(Oid subid)
List * GetSubscriptionRelations(Oid subid, bool tables, bool sequences, bool not_ready)
long pgstat_report_stat(bool force)
Definition pgstat.c:704
uint64_t Datum
Definition postgres.h:70
#define InvalidOid
unsigned int Oid
static int fb(int x)
void ProcessSequencesForSync(void)
Definition pg_list.h:54
LogicalRepWorkerType type
void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid, TimestampTz *last_start_time)
Definition syncutils.c:118
void ProcessSyncingRelations(XLogRecPtr current_lsn)
Definition syncutils.c:156
pg_noreturn void FinishSyncWorker(void)
Definition syncutils.c:50
SyncingRelationsState
Definition syncutils.c:38
@ SYNC_RELATIONS_STATE_VALID
Definition syncutils.c:41
@ SYNC_RELATIONS_STATE_NEEDS_REBUILD
Definition syncutils.c:39
@ SYNC_RELATIONS_STATE_REBUILD_STARTED
Definition syncutils.c:40
void FetchRelationStates(bool *has_pending_subtables, bool *has_pending_subsequences, bool *started_tx)
Definition syncutils.c:203
static SyncingRelationsState relation_states_validity
Definition syncutils.c:44
void InvalidateSyncingRelStates(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
Definition syncutils.c:101
List * table_states_not_ready
Definition tablesync.c:126
void ProcessSyncingTablesForSync(XLogRecPtr current_lsn)
Definition tablesync.c:245
void ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
Definition tablesync.c:367
LogicalRepWorkerType
@ WORKERTYPE_TABLESYNC
@ WORKERTYPE_UNKNOWN
@ WORKERTYPE_SEQUENCESYNC
@ WORKERTYPE_PARALLEL_APPLY
@ WORKERTYPE_APPLY
static bool am_sequencesync_worker(void)
static bool am_tablesync_worker(void)
bool IsTransactionState(void)
Definition xact.c:388
void StartTransactionCommand(void)
Definition xact.c:3080
void CommitTransactionCommand(void)
Definition xact.c:3178
XLogRecPtr GetXLogWriteRecPtr(void)
Definition xlog.c:9624
int wal_retrieve_retry_interval
Definition xlog.c:137
void XLogFlush(XLogRecPtr record)
Definition xlog.c:2783
uint64 XLogRecPtr
Definition xlogdefs.h:21