PostgreSQL Source Code git master
syncutils.c File Reference
Include dependency graph for syncutils.c:

Go to the source code of this file.

Enumerations

enum  SyncingRelationsState { SYNC_RELATIONS_STATE_NEEDS_REBUILD , SYNC_RELATIONS_STATE_REBUILD_STARTED , SYNC_RELATIONS_STATE_VALID }
 

Functions

pg_noreturn void FinishSyncWorker (void)
 
void InvalidateSyncingRelStates (Datum arg, int cacheid, uint32 hashvalue)
 
void launch_sync_worker (LogicalRepWorkerType wtype, int nsyncworkers, Oid relid, TimestampTz *last_start_time)
 
void ProcessSyncingRelations (XLogRecPtr current_lsn)
 
void FetchRelationStates (bool *has_pending_subtables, bool *has_pending_subsequences, bool *started_tx)
 

Variables

static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD
 

Enumeration Type Documentation

◆ SyncingRelationsState

Enumerator
SYNC_RELATIONS_STATE_NEEDS_REBUILD 
SYNC_RELATIONS_STATE_REBUILD_STARTED 
SYNC_RELATIONS_STATE_VALID 

Definition at line 37 of file syncutils.c.

38{
SyncingRelationsState
Definition: syncutils.c:38
@ SYNC_RELATIONS_STATE_VALID
Definition: syncutils.c:41
@ SYNC_RELATIONS_STATE_NEEDS_REBUILD
Definition: syncutils.c:39
@ SYNC_RELATIONS_STATE_REBUILD_STARTED
Definition: syncutils.c:40

Function Documentation

◆ FetchRelationStates()

void FetchRelationStates ( bool *  has_pending_subtables,
bool *  has_pending_subsequences,
bool *  started_tx 
)

Definition at line 202 of file syncutils.c.

205{
206 /*
207 * has_subtables and has_subsequences_non_ready are declared as static,
208 * since the same value can be used until the system table is invalidated.
209 */
210 static bool has_subtables = false;
211 static bool has_subsequences_non_ready = false;
212
213 *started_tx = false;
214
216 {
217 MemoryContext oldctx;
218 List *rstates;
219 SubscriptionRelState *rstate;
220
222 has_subsequences_non_ready = false;
223
224 /* Clean the old lists. */
227
228 if (!IsTransactionState())
229 {
231 *started_tx = true;
232 }
233
234 /* Fetch tables and sequences that are in non-READY state. */
235 rstates = GetSubscriptionRelations(MySubscription->oid, true, true,
236 true);
237
238 /* Allocate the tracking info in a permanent memory context. */
240 foreach_ptr(SubscriptionRelState, subrel, rstates)
241 {
242 if (get_rel_relkind(subrel->relid) == RELKIND_SEQUENCE)
243 has_subsequences_non_ready = true;
244 else
245 {
246 rstate = palloc(sizeof(SubscriptionRelState));
247 memcpy(rstate, subrel, sizeof(SubscriptionRelState));
249 rstate);
250 }
251 }
252 MemoryContextSwitchTo(oldctx);
253
254 /*
255 * Does the subscription have tables?
256 *
257 * If there were not-READY tables found then we know it does. But if
258 * table_states_not_ready was empty we still need to check again to
259 * see if there are 0 tables.
260 */
261 has_subtables = (table_states_not_ready != NIL) ||
263
264 /*
265 * If the subscription relation cache has been invalidated since we
266 * entered this routine, we still use and return the relations we just
267 * finished constructing, to avoid infinite loops, but we leave the
268 * table states marked as stale so that we'll rebuild it again on next
269 * access. Otherwise, we mark the table states as valid.
270 */
273 }
274
275 if (has_pending_subtables)
276 *has_pending_subtables = has_subtables;
277
278 if (has_pending_subsequences)
279 *has_pending_subsequences = has_subsequences_non_ready;
280}
Subscription * MySubscription
Definition: worker.c:479
List * lappend(List *list, void *datum)
Definition: list.c:339
void list_free_deep(List *list)
Definition: list.c:1560
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:2170
void * palloc(Size size)
Definition: mcxt.c:1365
MemoryContext CacheMemoryContext
Definition: mcxt.c:169
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
#define NIL
Definition: pg_list.h:68
#define foreach_ptr(type, var, lst)
Definition: pg_list.h:469
bool HasSubscriptionTables(Oid subid)
List * GetSubscriptionRelations(Oid subid, bool tables, bool sequences, bool not_ready)
Definition: pg_list.h:54
static SyncingRelationsState relation_states_validity
Definition: syncutils.c:44
List * table_states_not_ready
Definition: tablesync.c:125
bool IsTransactionState(void)
Definition: xact.c:388
void StartTransactionCommand(void)
Definition: xact.c:3077

References CacheMemoryContext, foreach_ptr, get_rel_relkind(), GetSubscriptionRelations(), HasSubscriptionTables(), IsTransactionState(), lappend(), list_free_deep(), MemoryContextSwitchTo(), MySubscription, NIL, Subscription::oid, palloc(), relation_states_validity, StartTransactionCommand(), SYNC_RELATIONS_STATE_REBUILD_STARTED, SYNC_RELATIONS_STATE_VALID, and table_states_not_ready.

Referenced by AllTablesyncsReady(), HasSubscriptionTablesCached(), ProcessSequencesForSync(), and ProcessSyncingTablesForApply().

◆ FinishSyncWorker()

pg_noreturn void FinishSyncWorker ( void  )

Definition at line 50 of file syncutils.c.

51{
53
54 /*
55 * Commit any outstanding transaction. This is the usual case, unless
56 * there was nothing to do for the table.
57 */
59 {
62 }
63
64 /* And flush all writes. */
66
68 {
70 errmsg("logical replication sequence synchronization worker for subscription \"%s\" has finished",
72
73 /*
74 * Reset last_seqsync_start_time, so that next time a sequencesync
75 * worker is needed it can be started promptly.
76 */
78 }
79 else
80 {
83 errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
87
88 /* Find the leader apply worker and signal it. */
91 }
92
93 /* Stop gracefully */
94 proc_exit(0);
95}
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define LOG
Definition: elog.h:31
#define ereport(elevel,...)
Definition: elog.h:150
Assert(PointerIsAligned(start, uint64))
void proc_exit(int code)
Definition: ipc.c:104
void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid)
Definition: launcher.c:723
void logicalrep_reset_seqsync_start_time(void)
Definition: launcher.c:872
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:56
char * get_rel_name(Oid relid)
Definition: lsyscache.c:2095
long pgstat_report_stat(bool force)
Definition: pgstat.c:694
#define InvalidOid
Definition: postgres_ext.h:37
@ WORKERTYPE_APPLY
static bool am_sequencesync_worker(void)
static bool am_tablesync_worker(void)
void CommitTransactionCommand(void)
Definition: xact.c:3175
XLogRecPtr GetXLogWriteRecPtr(void)
Definition: xlog.c:9515
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2783

References am_sequencesync_worker(), am_tablesync_worker(), Assert(), CommitTransactionCommand(), ereport, errmsg(), get_rel_name(), GetXLogWriteRecPtr(), InvalidOid, IsTransactionState(), LOG, logicalrep_reset_seqsync_start_time(), logicalrep_worker_wakeup(), MyLogicalRepWorker, MySubscription, Subscription::name, pgstat_report_stat(), proc_exit(), LogicalRepWorker::relid, StartTransactionCommand(), LogicalRepWorker::subid, WORKERTYPE_APPLY, and XLogFlush().

Referenced by LogicalRepSyncTableStart(), ProcessSyncingTablesForSync(), SequenceSyncWorkerMain(), and TableSyncWorkerMain().

◆ InvalidateSyncingRelStates()

void InvalidateSyncingRelStates ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)

◆ launch_sync_worker()

void launch_sync_worker ( LogicalRepWorkerType  wtype,
int  nsyncworkers,
Oid  relid,
TimestampTz last_start_time 
)

Definition at line 117 of file syncutils.c.

119{
121
122 Assert((wtype == WORKERTYPE_TABLESYNC && OidIsValid(relid)) ||
123 (wtype == WORKERTYPE_SEQUENCESYNC && !OidIsValid(relid)));
124
125 /* If there is a free sync worker slot, start a new sync worker */
126 if (nsyncworkers >= max_sync_workers_per_subscription)
127 return;
128
130
131 if (!(*last_start_time) ||
132 TimestampDifferenceExceeds(*last_start_time, now,
134 {
135 /*
136 * Set the last_start_time even if we fail to start the worker, so
137 * that we won't retry until wal_retrieve_retry_interval has elapsed.
138 */
139 *last_start_time = now;
140 (void) logicalrep_worker_launch(wtype,
145 relid, DSM_HANDLE_INVALID, false);
146 }
147}
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1781
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1609
#define OidIsValid(objectId)
Definition: c.h:777
int64 TimestampTz
Definition: timestamp.h:39
#define DSM_HANDLE_INVALID
Definition: dsm_impl.h:58
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm, bool retain_dead_tuples)
Definition: launcher.c:324
int max_sync_workers_per_subscription
Definition: launcher.c:53
@ WORKERTYPE_TABLESYNC
@ WORKERTYPE_SEQUENCESYNC
int wal_retrieve_retry_interval
Definition: xlog.c:136

References Assert(), LogicalRepWorker::dbid, DSM_HANDLE_INVALID, GetCurrentTimestamp(), logicalrep_worker_launch(), max_sync_workers_per_subscription, MyLogicalRepWorker, MySubscription, Subscription::name, now(), Subscription::oid, OidIsValid, TimestampDifferenceExceeds(), LogicalRepWorker::userid, wal_retrieve_retry_interval, WORKERTYPE_SEQUENCESYNC, and WORKERTYPE_TABLESYNC.

Referenced by ProcessSequencesForSync(), and ProcessSyncingTablesForApply().

◆ ProcessSyncingRelations()

void ProcessSyncingRelations ( XLogRecPtr  current_lsn)

Definition at line 155 of file syncutils.c.

156{
157 switch (MyLogicalRepWorker->type)
158 {
160
161 /*
162 * Skip for parallel apply workers because they only operate on
163 * tables that are in a READY state. See pa_can_start() and
164 * should_apply_changes_for_rel().
165 */
166 break;
167
169 ProcessSyncingTablesForSync(current_lsn);
170 break;
171
172 case WORKERTYPE_APPLY:
173 ProcessSyncingTablesForApply(current_lsn);
175 break;
176
178 /* Should never happen. */
179 elog(ERROR, "sequence synchronization worker is not expected to process relations");
180 break;
181
183 /* Should never happen. */
184 elog(ERROR, "Unknown worker type");
185 }
186}
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
void ProcessSequencesForSync(void)
Definition: sequencesync.c:94
LogicalRepWorkerType type
void ProcessSyncingTablesForSync(XLogRecPtr current_lsn)
Definition: tablesync.c:244
void ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
Definition: tablesync.c:368
@ WORKERTYPE_UNKNOWN
@ WORKERTYPE_PARALLEL_APPLY

References elog, ERROR, MyLogicalRepWorker, ProcessSequencesForSync(), ProcessSyncingTablesForApply(), ProcessSyncingTablesForSync(), LogicalRepWorker::type, WORKERTYPE_APPLY, WORKERTYPE_PARALLEL_APPLY, WORKERTYPE_SEQUENCESYNC, WORKERTYPE_TABLESYNC, and WORKERTYPE_UNKNOWN.

Referenced by apply_handle_commit(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), apply_handle_stream_commit(), apply_handle_stream_prepare(), and LogicalRepApplyLoop().

Variable Documentation

◆ relation_states_validity

SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD
static

Definition at line 44 of file syncutils.c.

Referenced by FetchRelationStates(), and InvalidateSyncingRelStates().