PostgreSQL Source Code  git master
worker_internal.h File Reference
#include <signal.h>
#include "access/xlogdefs.h"
#include "catalog/pg_subscription.h"
#include "datatype/timestamp.h"
#include "storage/fileset.h"
#include "storage/lock.h"
#include "storage/spin.h"
Include dependency graph for worker_internal.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  LogicalRepWorker
 

Typedefs

typedef struct LogicalRepWorker LogicalRepWorker
 

Functions

void logicalrep_worker_attach (int slot)
 
LogicalRepWorkerlogicalrep_worker_find (Oid subid, Oid relid, bool only_running)
 
Listlogicalrep_workers_find (Oid subid, bool only_running)
 
void logicalrep_worker_launch (Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid)
 
void logicalrep_worker_stop (Oid subid, Oid relid)
 
void logicalrep_worker_wakeup (Oid subid, Oid relid)
 
void logicalrep_worker_wakeup_ptr (LogicalRepWorker *worker)
 
int logicalrep_sync_worker_count (Oid subid)
 
void ReplicationOriginNameForTablesync (Oid suboid, Oid relid, char *originname, int szorgname)
 
char * LogicalRepSyncTableStart (XLogRecPtr *origin_startpos)
 
bool AllTablesyncsReady (void)
 
void UpdateTwoPhaseState (Oid suboid, char new_state)
 
void process_syncing_tables (XLogRecPtr current_lsn)
 
void invalidate_syncing_table_states (Datum arg, int cacheid, uint32 hashvalue)
 
static bool am_tablesync_worker (void)
 

Variables

PGDLLIMPORT MemoryContext ApplyContext
 
PGDLLIMPORT struct WalReceiverConnLogRepWorkerWalRcvConn
 
PGDLLIMPORT SubscriptionMySubscription
 
PGDLLIMPORT LogicalRepWorkerMyLogicalRepWorker
 
PGDLLIMPORT bool in_remote_transaction
 

Typedef Documentation

◆ LogicalRepWorker

Function Documentation

◆ AllTablesyncsReady()

bool AllTablesyncsReady ( void  )

Definition at line 1515 of file tablesync.c.

1516 {
1517  bool started_tx = false;
1518  bool has_subrels = false;
1519 
1520  /* We need up-to-date sync state info for subscription tables here. */
1521  has_subrels = FetchTableStates(&started_tx);
1522 
1523  if (started_tx)
1524  {
1526  pgstat_report_stat(true);
1527  }
1528 
1529  /*
1530  * Return false when there are no tables in subscription or not all tables
1531  * are in ready state; true otherwise.
1532  */
1533  return has_subrels && list_length(table_states_not_ready) == 0;
1534 }
static int list_length(const List *l)
Definition: pg_list.h:149
long pgstat_report_stat(bool force)
Definition: pgstat.c:564
static List * table_states_not_ready
Definition: tablesync.c:124
static bool FetchTableStates(bool *started_tx)
Definition: tablesync.c:1454
void CommitTransactionCommand(void)
Definition: xact.c:3022

References CommitTransactionCommand(), FetchTableStates(), list_length(), pgstat_report_stat(), and table_states_not_ready.

Referenced by ApplyWorkerMain(), and process_syncing_tables_for_apply().

◆ am_tablesync_worker()

static bool am_tablesync_worker ( void  )
inlinestatic

◆ invalidate_syncing_table_states()

void invalidate_syncing_table_states ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)

Definition at line 271 of file tablesync.c.

272 {
273  table_states_valid = false;
274 }
static bool table_states_valid
Definition: tablesync.c:123

References table_states_valid.

Referenced by ApplyWorkerMain().

◆ logicalrep_sync_worker_count()

int logicalrep_sync_worker_count ( Oid  subid)

Definition at line 663 of file launcher.c.

664 {
665  int i;
666  int res = 0;
667 
668  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
669 
670  /* Search for attached worker for a given subscription id. */
671  for (i = 0; i < max_logical_replication_workers; i++)
672  {
674 
675  if (w->subid == subid && OidIsValid(w->relid))
676  res++;
677  }
678 
679  return res;
680 }
int i
Definition: isn.c:73
int max_logical_replication_workers
Definition: launcher.c:55
static LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:69
Assert(fmt[strlen(fmt) - 1] !='\n')
bool LWLockHeldByMe(LWLock *l)
Definition: lwlock.c:1916
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:66

References Assert(), i, LogicalRepCtx, LWLockHeldByMe(), max_logical_replication_workers, OidIsValid, LogicalRepWorker::relid, res, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.

Referenced by logicalrep_worker_launch(), and process_syncing_tables_for_apply().

◆ logicalrep_worker_attach()

void logicalrep_worker_attach ( int  slot)

Definition at line 564 of file launcher.c.

565 {
566  /* Block concurrent access. */
567  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
568 
569  Assert(slot >= 0 && slot < max_logical_replication_workers);
571 
573  {
574  LWLockRelease(LogicalRepWorkerLock);
575  ereport(ERROR,
576  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
577  errmsg("logical replication worker slot %d is empty, cannot attach",
578  slot)));
579  }
580 
582  {
583  LWLockRelease(LogicalRepWorkerLock);
584  ereport(ERROR,
585  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
586  errmsg("logical replication worker slot %d is already used by "
587  "another worker, cannot attach", slot)));
588  }
589 
592 
593  LWLockRelease(LogicalRepWorkerLock);
594 }
int errcode(int sqlerrcode)
Definition: elog.c:693
int errmsg(const char *fmt,...)
Definition: elog.c:904
#define ERROR
Definition: elog.h:33
#define ereport(elevel,...)
Definition: elog.h:143
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:333
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:58
static void logicalrep_worker_onexit(int code, Datum arg)
Definition: launcher.c:643
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1196
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1800
@ LW_EXCLUSIVE
Definition: lwlock.h:104
uintptr_t Datum
Definition: postgres.h:411
PGPROC * MyProc
Definition: proc.c:68

References Assert(), before_shmem_exit(), ereport, errcode(), errmsg(), ERROR, LogicalRepWorker::in_use, logicalrep_worker_onexit(), LogicalRepCtx, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, MyLogicalRepWorker, MyProc, LogicalRepWorker::proc, and LogicalRepCtxStruct::workers.

Referenced by ApplyWorkerMain().

◆ logicalrep_worker_find()

LogicalRepWorker* logicalrep_worker_find ( Oid  subid,
Oid  relid,
bool  only_running 
)

Definition at line 214 of file launcher.c.

215 {
216  int i;
217  LogicalRepWorker *res = NULL;
218 
219  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
220 
221  /* Search for attached worker for a given subscription id. */
222  for (i = 0; i < max_logical_replication_workers; i++)
223  {
225 
226  if (w->in_use && w->subid == subid && w->relid == relid &&
227  (!only_running || w->proc))
228  {
229  res = w;
230  break;
231  }
232  }
233 
234  return res;
235 }

References Assert(), i, LogicalRepWorker::in_use, LogicalRepCtx, LWLockHeldByMe(), max_logical_replication_workers, LogicalRepWorker::proc, LogicalRepWorker::relid, res, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.

Referenced by ApplyLauncherMain(), logicalrep_worker_stop(), logicalrep_worker_wakeup(), process_syncing_tables_for_apply(), wait_for_relation_state_change(), and wait_for_worker_state_change().

◆ logicalrep_worker_launch()

void logicalrep_worker_launch ( Oid  dbid,
Oid  subid,
const char *  subname,
Oid  userid,
Oid  relid 
)

Definition at line 265 of file launcher.c.

267 {
268  BackgroundWorker bgw;
269  BackgroundWorkerHandle *bgw_handle;
270  uint16 generation;
271  int i;
272  int slot = 0;
273  LogicalRepWorker *worker = NULL;
274  int nsyncworkers;
276 
277  ereport(DEBUG1,
278  (errmsg_internal("starting logical replication worker for subscription \"%s\"",
279  subname)));
280 
281  /* Report this after the initial starting message for consistency. */
282  if (max_replication_slots == 0)
283  ereport(ERROR,
284  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
285  errmsg("cannot start logical replication workers when max_replication_slots = 0")));
286 
287  /*
288  * We need to do the modification of the shared memory under lock so that
289  * we have consistent view.
290  */
291  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
292 
293 retry:
294  /* Find unused worker slot. */
295  for (i = 0; i < max_logical_replication_workers; i++)
296  {
298 
299  if (!w->in_use)
300  {
301  worker = w;
302  slot = i;
303  break;
304  }
305  }
306 
307  nsyncworkers = logicalrep_sync_worker_count(subid);
308 
310 
311  /*
312  * If we didn't find a free slot, try to do garbage collection. The
313  * reason we do this is because if some worker failed to start up and its
314  * parent has crashed while waiting, the in_use state was never cleared.
315  */
316  if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
317  {
318  bool did_cleanup = false;
319 
320  for (i = 0; i < max_logical_replication_workers; i++)
321  {
323 
324  /*
325  * If the worker was marked in use but didn't manage to attach in
326  * time, clean it up.
327  */
328  if (w->in_use && !w->proc &&
331  {
332  elog(WARNING,
333  "logical replication worker for subscription %u took too long to start; canceled",
334  w->subid);
335 
337  did_cleanup = true;
338  }
339  }
340 
341  if (did_cleanup)
342  goto retry;
343  }
344 
345  /*
346  * We don't allow to invoke more sync workers once we have reached the
347  * sync worker limit per subscription. So, just return silently as we
348  * might get here because of an otherwise harmless race condition.
349  */
350  if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription)
351  {
352  LWLockRelease(LogicalRepWorkerLock);
353  return;
354  }
355 
356  /*
357  * However if there are no more free worker slots, inform user about it
358  * before exiting.
359  */
360  if (worker == NULL)
361  {
362  LWLockRelease(LogicalRepWorkerLock);
364  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
365  errmsg("out of logical replication worker slots"),
366  errhint("You might need to increase max_logical_replication_workers.")));
367  return;
368  }
369 
370  /* Prepare the worker slot. */
371  worker->launch_time = now;
372  worker->in_use = true;
373  worker->generation++;
374  worker->proc = NULL;
375  worker->dbid = dbid;
376  worker->userid = userid;
377  worker->subid = subid;
378  worker->relid = relid;
379  worker->relstate = SUBREL_STATE_UNKNOWN;
381  worker->stream_fileset = NULL;
382  worker->last_lsn = InvalidXLogRecPtr;
385  worker->reply_lsn = InvalidXLogRecPtr;
386  TIMESTAMP_NOBEGIN(worker->reply_time);
387 
388  /* Before releasing lock, remember generation for future identification. */
389  generation = worker->generation;
390 
391  LWLockRelease(LogicalRepWorkerLock);
392 
393  /* Register the new dynamic worker. */
394  memset(&bgw, 0, sizeof(bgw));
398  snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
399  snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
400  if (OidIsValid(relid))
402  "logical replication worker for subscription %u sync %u", subid, relid);
403  else
405  "logical replication worker for subscription %u", subid);
406  snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker");
407 
410  bgw.bgw_main_arg = Int32GetDatum(slot);
411 
412  if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
413  {
414  /* Failed to start worker, so clean up the worker slot. */
415  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
416  Assert(generation == worker->generation);
418  LWLockRelease(LogicalRepWorkerLock);
419 
421  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
422  errmsg("out of background worker slots"),
423  errhint("You might need to increase max_worker_processes.")));
424  return;
425  }
426 
427  /* Now wait until it attaches. */
428  WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
429 }
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1705
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1538
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
Definition: bgworker.c:956
#define BGW_NEVER_RESTART
Definition: bgworker.h:85
@ BgWorkerStart_RecoveryFinished
Definition: bgworker.h:81
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition: bgworker.h:60
#define BGWORKER_SHMEM_ACCESS
Definition: bgworker.h:53
#define BGW_MAXLEN
Definition: bgworker.h:86
unsigned short uint16
Definition: c.h:440
int64 TimestampTz
Definition: timestamp.h:39
#define TIMESTAMP_NOBEGIN(j)
Definition: timestamp.h:151
int errmsg_internal(const char *fmt,...)
Definition: elog.c:991
int errhint(const char *fmt,...)
Definition: elog.c:1151
#define WARNING
Definition: elog.h:30
#define DEBUG1
Definition: elog.h:24
#define elog(elevel,...)
Definition: elog.h:218
int MyProcPid
Definition: globals.c:44
static void WaitForReplicationWorkerAttach(LogicalRepWorker *worker, uint16 generation, BackgroundWorkerHandle *handle)
Definition: launcher.c:156
int max_sync_workers_per_subscription
Definition: launcher.c:56
int logicalrep_sync_worker_count(Oid subid)
Definition: launcher.c:663
static void logicalrep_worker_cleanup(LogicalRepWorker *worker)
Definition: launcher.c:614
NameData subname
#define snprintf
Definition: port.h:225
#define Int32GetDatum(X)
Definition: postgres.h:523
int max_replication_slots
Definition: slot.c:100
char bgw_function_name[BGW_MAXLEN]
Definition: bgworker.h:97
Datum bgw_main_arg
Definition: bgworker.h:98
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:91
int bgw_restart_time
Definition: bgworker.h:95
char bgw_type[BGW_MAXLEN]
Definition: bgworker.h:92
BgWorkerStartTime bgw_start_time
Definition: bgworker.h:94
pid_t bgw_notify_pid
Definition: bgworker.h:100
char bgw_library_name[BGW_MAXLEN]
Definition: bgworker.h:96
XLogRecPtr relstate_lsn
TimestampTz last_recv_time
TimestampTz launch_time
TimestampTz reply_time
FileSet * stream_fileset
XLogRecPtr reply_lsn
XLogRecPtr last_lsn
TimestampTz last_send_time
int wal_receiver_timeout
Definition: walreceiver.c:91
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert(), BackgroundWorker::bgw_flags, BackgroundWorker::bgw_function_name, BackgroundWorker::bgw_library_name, BackgroundWorker::bgw_main_arg, BGW_MAXLEN, BackgroundWorker::bgw_name, BGW_NEVER_RESTART, BackgroundWorker::bgw_notify_pid, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_start_time, BackgroundWorker::bgw_type, BGWORKER_BACKEND_DATABASE_CONNECTION, BGWORKER_SHMEM_ACCESS, BgWorkerStart_RecoveryFinished, LogicalRepWorker::dbid, DEBUG1, elog, ereport, errcode(), errhint(), errmsg(), errmsg_internal(), ERROR, LogicalRepWorker::generation, GetCurrentTimestamp(), i, LogicalRepWorker::in_use, Int32GetDatum, InvalidXLogRecPtr, LogicalRepWorker::last_lsn, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, LogicalRepWorker::launch_time, logicalrep_sync_worker_count(), logicalrep_worker_cleanup(), LogicalRepCtx, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, max_replication_slots, max_sync_workers_per_subscription, MyProcPid, now(), OidIsValid, LogicalRepWorker::proc, RegisterDynamicBackgroundWorker(), LogicalRepWorker::relid, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, LogicalRepWorker::reply_lsn, LogicalRepWorker::reply_time, snprintf, LogicalRepWorker::stream_fileset, LogicalRepWorker::subid, subname, TIMESTAMP_NOBEGIN, TimestampDifferenceExceeds(), LogicalRepWorker::userid, WaitForReplicationWorkerAttach(), wal_receiver_timeout, WARNING, and LogicalRepCtxStruct::workers.

Referenced by ApplyLauncherMain(), and process_syncing_tables_for_apply().

◆ logicalrep_worker_stop()

void logicalrep_worker_stop ( Oid  subid,
Oid  relid 
)

Definition at line 436 of file launcher.c.

437 {
438  LogicalRepWorker *worker;
439  uint16 generation;
440 
441  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
442 
443  worker = logicalrep_worker_find(subid, relid, false);
444 
445  /* No worker, nothing to do. */
446  if (!worker)
447  {
448  LWLockRelease(LogicalRepWorkerLock);
449  return;
450  }
451 
452  /*
453  * Remember which generation was our worker so we can check if what we see
454  * is still the same one.
455  */
456  generation = worker->generation;
457 
458  /*
459  * If we found a worker but it does not have proc set then it is still
460  * starting up; wait for it to finish starting and then kill it.
461  */
462  while (worker->in_use && !worker->proc)
463  {
464  int rc;
465 
466  LWLockRelease(LogicalRepWorkerLock);
467 
468  /* Wait a bit --- we don't expect to have to wait long. */
469  rc = WaitLatch(MyLatch,
472 
473  if (rc & WL_LATCH_SET)
474  {
477  }
478 
479  /* Recheck worker status. */
480  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
481 
482  /*
483  * Check whether the worker slot is no longer used, which would mean
484  * that the worker has exited, or whether the worker generation is
485  * different, meaning that a different worker has taken the slot.
486  */
487  if (!worker->in_use || worker->generation != generation)
488  {
489  LWLockRelease(LogicalRepWorkerLock);
490  return;
491  }
492 
493  /* Worker has assigned proc, so it has started. */
494  if (worker->proc)
495  break;
496  }
497 
498  /* Now terminate the worker ... */
499  kill(worker->proc->pid, SIGTERM);
500 
501  /* ... and wait for it to die. */
502  for (;;)
503  {
504  int rc;
505 
506  /* is it gone? */
507  if (!worker->proc || worker->generation != generation)
508  break;
509 
510  LWLockRelease(LogicalRepWorkerLock);
511 
512  /* Wait a bit --- we don't expect to have to wait long. */
513  rc = WaitLatch(MyLatch,
516 
517  if (rc & WL_LATCH_SET)
518  {
521  }
522 
523  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
524  }
525 
526  LWLockRelease(LogicalRepWorkerLock);
527 }
struct Latch * MyLatch
Definition: globals.c:58
void ResetLatch(Latch *latch)
Definition: latch.c:658
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:451
#define WL_TIMEOUT
Definition: latch.h:128
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define WL_LATCH_SET
Definition: latch.h:125
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
Definition: launcher.c:214
@ LW_SHARED
Definition: lwlock.h:105
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
int pid
Definition: proc.h:184
@ WAIT_EVENT_BGWORKER_STARTUP
Definition: wait_event.h:87
@ WAIT_EVENT_BGWORKER_SHUTDOWN
Definition: wait_event.h:86
#define kill(pid, sig)
Definition: win32_port.h:464

References CHECK_FOR_INTERRUPTS, LogicalRepWorker::generation, LogicalRepWorker::in_use, kill, logicalrep_worker_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLatch, PGPROC::pid, LogicalRepWorker::proc, ResetLatch(), WAIT_EVENT_BGWORKER_SHUTDOWN, WAIT_EVENT_BGWORKER_STARTUP, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by AlterSubscription_refresh(), and DropSubscription().

◆ logicalrep_worker_wakeup()

void logicalrep_worker_wakeup ( Oid  subid,
Oid  relid 
)

Definition at line 533 of file launcher.c.

534 {
535  LogicalRepWorker *worker;
536 
537  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
538 
539  worker = logicalrep_worker_find(subid, relid, true);
540 
541  if (worker)
543 
544  LWLockRelease(LogicalRepWorkerLock);
545 }
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:553

References logicalrep_worker_find(), logicalrep_worker_wakeup_ptr(), LW_SHARED, LWLockAcquire(), and LWLockRelease().

Referenced by pg_attribute_noreturn().

◆ logicalrep_worker_wakeup_ptr()

void logicalrep_worker_wakeup_ptr ( LogicalRepWorker worker)

Definition at line 553 of file launcher.c.

554 {
555  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
556 
557  SetLatch(&worker->proc->procLatch);
558 }
void SetLatch(Latch *latch)
Definition: latch.c:566
Latch procLatch
Definition: proc.h:168

References Assert(), LWLockHeldByMe(), LogicalRepWorker::proc, PGPROC::procLatch, and SetLatch().

Referenced by logicalrep_worker_wakeup(), process_syncing_tables_for_apply(), and wait_for_worker_state_change().

◆ logicalrep_workers_find()

List* logicalrep_workers_find ( Oid  subid,
bool  only_running 
)

Definition at line 242 of file launcher.c.

243 {
244  int i;
245  List *res = NIL;
246 
247  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
248 
249  /* Search for attached worker for a given subscription id. */
250  for (i = 0; i < max_logical_replication_workers; i++)
251  {
253 
254  if (w->in_use && w->subid == subid && (!only_running || w->proc))
255  res = lappend(res, w);
256  }
257 
258  return res;
259 }
List * lappend(List *list, void *datum)
Definition: list.c:336
#define NIL
Definition: pg_list.h:65
Definition: pg_list.h:51

References Assert(), i, LogicalRepWorker::in_use, lappend(), LogicalRepCtx, LWLockHeldByMe(), max_logical_replication_workers, NIL, LogicalRepWorker::proc, res, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.

Referenced by DropSubscription().

◆ LogicalRepSyncTableStart()

char* LogicalRepSyncTableStart ( XLogRecPtr origin_startpos)

Definition at line 1179 of file tablesync.c.

1180 {
1181  char *slotname;
1182  char *err;
1183  char relstate;
1184  XLogRecPtr relstate_lsn;
1185  Relation rel;
1186  AclResult aclresult;
1188  char originname[NAMEDATALEN];
1189  RepOriginId originid;
1190 
1191  /* Check the state of the table synchronization. */
1195  &relstate_lsn);
1197 
1199  MyLogicalRepWorker->relstate = relstate;
1200  MyLogicalRepWorker->relstate_lsn = relstate_lsn;
1202 
1203  /*
1204  * If synchronization is already done or no longer necessary, exit now
1205  * that we've updated shared memory state.
1206  */
1207  switch (relstate)
1208  {
1209  case SUBREL_STATE_SYNCDONE:
1210  case SUBREL_STATE_READY:
1211  case SUBREL_STATE_UNKNOWN:
1212  finish_sync_worker(); /* doesn't return */
1213  }
1214 
1215  /* Calculate the name of the tablesync slot. */
1216  slotname = (char *) palloc(NAMEDATALEN);
1219  slotname,
1220  NAMEDATALEN);
1221 
1222  /*
1223  * Here we use the slot name instead of the subscription name as the
1224  * application_name, so that it is different from the main apply worker,
1225  * so that synchronous replication can distinguish them.
1226  */
1228  walrcv_connect(MySubscription->conninfo, true, slotname, &err);
1229  if (LogRepWorkerWalRcvConn == NULL)
1230  ereport(ERROR,
1231  (errcode(ERRCODE_CONNECTION_FAILURE),
1232  errmsg("could not connect to the publisher: %s", err)));
1233 
1234  Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
1235  MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
1236  MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
1237 
1238  /* Assign the origin tracking record name. */
1241  originname,
1242  sizeof(originname));
1243 
1244  if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
1245  {
1246  /*
1247  * We have previously errored out before finishing the copy so the
1248  * replication slot might exist. We want to remove the slot if it
1249  * already exists and proceed.
1250  *
1251  * XXX We could also instead try to drop the slot, last time we failed
1252  * but for that, we might need to clean up the copy state as it might
1253  * be in the middle of fetching the rows. Also, if there is a network
1254  * breakdown then it wouldn't have succeeded so trying it next time
1255  * seems like a better bet.
1256  */
1258  }
1259  else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
1260  {
1261  /*
1262  * The COPY phase was previously done, but tablesync then crashed
1263  * before it was able to finish normally.
1264  */
1266 
1267  /*
1268  * The origin tracking name must already exist. It was created first
1269  * time this tablesync was launched.
1270  */
1271  originid = replorigin_by_name(originname, false);
1272  replorigin_session_setup(originid);
1273  replorigin_session_origin = originid;
1274  *origin_startpos = replorigin_session_get_progress(false);
1275 
1277 
1278  goto copy_table_done;
1279  }
1280 
1282  MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
1285 
1286  /* Update the state and make it visible to others. */
1293  pgstat_report_stat(true);
1294 
1296 
1297  /*
1298  * Use a standard write lock here. It might be better to disallow access
1299  * to the table while it's being synchronized. But we don't want to block
1300  * the main apply process from working and it has to open the relation in
1301  * RowExclusiveLock when remapping remote relation id to local one.
1302  */
1304 
1305  /*
1306  * Check that our table sync worker has permission to insert into the
1307  * target table.
1308  */
1309  aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
1310  ACL_INSERT);
1311  if (aclresult != ACLCHECK_OK)
1312  aclcheck_error(aclresult,
1313  get_relkind_objtype(rel->rd_rel->relkind),
1315 
1316  /*
1317  * COPY FROM does not honor RLS policies. That is not a problem for
1318  * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
1319  * who has it implicitly), but other roles should not be able to
1320  * circumvent RLS. Disallow logical replication into RLS enabled
1321  * relations for such roles.
1322  */
1324  ereport(ERROR,
1325  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1326  errmsg("\"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1327  GetUserNameFromId(GetUserId(), true),
1328  RelationGetRelationName(rel))));
1329 
1330  /*
1331  * Start a transaction in the remote node in REPEATABLE READ mode. This
1332  * ensures that both the replication slot we create (see below) and the
1333  * COPY are consistent with each other.
1334  */
1336  "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1337  0, NULL);
1338  if (res->status != WALRCV_OK_COMMAND)
1339  ereport(ERROR,
1340  (errcode(ERRCODE_CONNECTION_FAILURE),
1341  errmsg("table copy could not start transaction on publisher: %s",
1342  res->err)));
1344 
1345  /*
1346  * Create a new permanent logical decoding slot. This slot will be used
1347  * for the catchup phase after COPY is done, so tell it to use the
1348  * snapshot to make the final data consistent.
1349  *
1350  * Prevent cancel/die interrupts while creating slot here because it is
1351  * possible that before the server finishes this command, a concurrent
1352  * drop subscription happens which would complete without removing this
1353  * slot leading to a dangling slot on the server.
1354  */
1355  HOLD_INTERRUPTS();
1357  slotname, false /* permanent */ , false /* two_phase */ ,
1358  CRS_USE_SNAPSHOT, origin_startpos);
1360 
1361  /*
1362  * Setup replication origin tracking. The purpose of doing this before the
1363  * copy is to avoid doing the copy again due to any error in setting up
1364  * origin tracking.
1365  */
1366  originid = replorigin_by_name(originname, true);
1367  if (!OidIsValid(originid))
1368  {
1369  /*
1370  * Origin tracking does not exist, so create it now.
1371  *
1372  * Then advance to the LSN got from walrcv_create_slot. This is WAL
1373  * logged for the purpose of recovery. Locks are to prevent the
1374  * replication origin from vanishing while advancing.
1375  */
1376  originid = replorigin_create(originname);
1377 
1378  LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1379  replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
1380  true /* go backward */ , true /* WAL log */ );
1381  UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1382 
1383  replorigin_session_setup(originid);
1384  replorigin_session_origin = originid;
1385  }
1386  else
1387  {
1388  ereport(ERROR,
1390  errmsg("replication origin \"%s\" already exists",
1391  originname)));
1392  }
1393 
1394  /* Now do the initial data copy */
1396  copy_table(rel);
1398 
1399  res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
1400  if (res->status != WALRCV_OK_COMMAND)
1401  ereport(ERROR,
1402  (errcode(ERRCODE_CONNECTION_FAILURE),
1403  errmsg("table copy could not finish transaction on publisher: %s",
1404  res->err)));
1406 
1407  table_close(rel, NoLock);
1408 
1409  /* Make the copy visible. */
1411 
1412  /*
1413  * Update the persisted state to indicate the COPY phase is done; make it
1414  * visible to others.
1415  */
1418  SUBREL_STATE_FINISHEDCOPY,
1420 
1422 
1423 copy_table_done:
1424 
1425  elog(DEBUG1,
1426  "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
1427  originname, LSN_FORMAT_ARGS(*origin_startpos));
1428 
1429  /*
1430  * We are done with the initial data synchronization, update the state.
1431  */
1433  MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
1434  MyLogicalRepWorker->relstate_lsn = *origin_startpos;
1436 
1437  /*
1438  * Finally, wait until the main apply worker tells us to catch up and then
1439  * return to let LogicalRepApplyLoop do it.
1440  */
1441  wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
1442  return slotname;
1443 }
AclResult
Definition: acl.h:181
@ ACLCHECK_OK
Definition: acl.h:182
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3512
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:5007
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:251
Subscription * MySubscription
Definition: worker.c:253
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:228
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:109
#define NoLock
Definition: lockdefs.h:34
#define RowExclusiveLock
Definition: lockdefs.h:38
void * palloc(Size size)
Definition: mcxt.c:1068
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:134
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:132
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition: miscinit.c:913
Oid GetUserId(void)
Definition: miscinit.c:492
ObjectType get_relkind_objtype(char relkind)
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:209
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:240
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1071
RepOriginId replorigin_session_origin
Definition: origin.c:155
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:872
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1206
#define ACL_INSERT
Definition: parsenodes.h:82
#define NAMEDATALEN
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
#define InvalidOid
Definition: postgres_ext.h:36
#define RelationGetRelid(relation)
Definition: rel.h:489
#define RelationGetRelationName(relation)
Definition: rel.h:523
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition: rls.c:52
@ RLS_ENABLED
Definition: rls.h:45
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:250
void PopActiveSnapshot(void)
Definition: snapmgr.c:776
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:682
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:32
Form_pg_class rd_rel
Definition: rel.h:109
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
static bool wait_for_worker_state_change(char expected_state)
Definition: tablesync.c:222
void ReplicationOriginNameForTablesync(Oid suboid, Oid relid, char *originname, int szorgname)
Definition: tablesync.c:1164
static void copy_table(Relation rel)
Definition: tablesync.c:1021
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, int szslot)
Definition: tablesync.c:1151
#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
Definition: walreceiver.h:426
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:404
@ WALRCV_OK_COMMAND
Definition: walreceiver.h:201
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:436
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:430
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
void CommandCounterIncrement(void)
Definition: xact.c:1074
void StartTransactionCommand(void)
Definition: xact.c:2925
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References ACL_INSERT, aclcheck_error(), ACLCHECK_OK, Assert(), check_enable_rls(), CommandCounterIncrement(), CommitTransactionCommand(), Subscription::conninfo, copy_table(), CRS_USE_SNAPSHOT, DEBUG1, elog, ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errmsg(), ERROR, get_relkind_objtype(), GetSubscriptionRelState(), GetTransactionSnapshot(), GetUserId(), GetUserNameFromId(), HOLD_INTERRUPTS, InvalidOid, InvalidXLogRecPtr, LockRelationOid(), LogRepWorkerWalRcvConn, LSN_FORMAT_ARGS, MyLogicalRepWorker, MySubscription, NAMEDATALEN, NoLock, Subscription::oid, OidIsValid, palloc(), pg_class_aclcheck(), pgstat_report_stat(), PopActiveSnapshot(), PushActiveSnapshot(), RelationData::rd_rel, RelationGetRelationName, RelationGetRelid, LogicalRepWorker::relid, LogicalRepWorker::relmutex, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, ReplicationOriginNameForTablesync(), ReplicationSlotDropAtPubNode(), ReplicationSlotNameForTablesync(), replorigin_advance(), replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), res, RESUME_INTERRUPTS, RLS_ENABLED, RowExclusiveLock, SpinLockAcquire, SpinLockRelease, StartTransactionCommand(), LogicalRepWorker::subid, table_close(), table_open(), UnlockRelationOid(), UpdateSubscriptionRelState(), wait_for_worker_state_change(), walrcv_clear_result(), walrcv_connect, walrcv_create_slot, walrcv_exec, and WALRCV_OK_COMMAND.

Referenced by start_table_sync().

◆ process_syncing_tables()

void process_syncing_tables ( XLogRecPtr  current_lsn)

Definition at line 590 of file tablesync.c.

591 {
592  if (am_tablesync_worker())
593  process_syncing_tables_for_sync(current_lsn);
594  else
596 }
static void process_syncing_tables_for_apply(XLogRecPtr current_lsn)
Definition: tablesync.c:365
static void process_syncing_tables_for_sync(XLogRecPtr current_lsn)
Definition: tablesync.c:285
static bool am_tablesync_worker(void)

References am_tablesync_worker(), process_syncing_tables_for_apply(), and process_syncing_tables_for_sync().

Referenced by apply_handle_commit(), apply_handle_commit_prepared(), apply_handle_prepare(), apply_handle_rollback_prepared(), apply_handle_stream_commit(), apply_handle_stream_prepare(), and LogicalRepApplyLoop().

◆ ReplicationOriginNameForTablesync()

void ReplicationOriginNameForTablesync ( Oid  suboid,
Oid  relid,
char *  originname,
int  szorgname 
)

Definition at line 1164 of file tablesync.c.

1166 {
1167  snprintf(originname, szorgname, "pg_%u_%u", suboid, relid);
1168 }

References snprintf.

Referenced by AlterSubscription_refresh(), ApplyWorkerMain(), DropSubscription(), LogicalRepSyncTableStart(), and process_syncing_tables_for_apply().

◆ UpdateTwoPhaseState()

void UpdateTwoPhaseState ( Oid  suboid,
char  new_state 
)

Definition at line 1540 of file tablesync.c.

1541 {
1542  Relation rel;
1543  HeapTuple tup;
1544  bool nulls[Natts_pg_subscription];
1545  bool replaces[Natts_pg_subscription];
1546  Datum values[Natts_pg_subscription];
1547 
1549  new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1550  new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1551 
1552  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1554  if (!HeapTupleIsValid(tup))
1555  elog(ERROR,
1556  "cache lookup failed for subscription oid %u",
1557  suboid);
1558 
1559  /* Form a new tuple. */
1560  memset(values, 0, sizeof(values));
1561  memset(nulls, false, sizeof(nulls));
1562  memset(replaces, false, sizeof(replaces));
1563 
1564  /* And update/set two_phase state */
1565  values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
1566  replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1567 
1568  tup = heap_modify_tuple(tup, RelationGetDescr(rel),
1569  values, nulls, replaces);
1570  CatalogTupleUpdate(rel, &tup->t_self, tup);
1571 
1572  heap_freetuple(tup);
1574 }
static Datum values[MAXATTR]
Definition: bootstrap.c:156
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:1113
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:301
#define LOGICALREP_TWOPHASE_STATE_DISABLED
#define LOGICALREP_TWOPHASE_STATE_PENDING
#define LOGICALREP_TWOPHASE_STATE_ENABLED
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
#define CharGetDatum(X)
Definition: postgres.h:460
#define RelationGetDescr(relation)
Definition: rel.h:515
ItemPointerData t_self
Definition: htup.h:65
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:179
@ SUBSCRIPTIONOID
Definition: syscache.h:99

References Assert(), CatalogTupleUpdate(), CharGetDatum, elog, ERROR, heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, LOGICALREP_TWOPHASE_STATE_DISABLED, LOGICALREP_TWOPHASE_STATE_ENABLED, LOGICALREP_TWOPHASE_STATE_PENDING, ObjectIdGetDatum, RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy1, SUBSCRIPTIONOID, HeapTupleData::t_self, table_close(), table_open(), and values.

Referenced by ApplyWorkerMain(), and CreateSubscription().

Variable Documentation

◆ ApplyContext

◆ in_remote_transaction

◆ LogRepWorkerWalRcvConn

◆ MyLogicalRepWorker

◆ MySubscription