PostgreSQL Source Code  git master
worker_internal.h File Reference
#include <signal.h>
#include "access/xlogdefs.h"
#include "catalog/pg_subscription.h"
#include "datatype/timestamp.h"
#include "storage/lock.h"
Include dependency graph for worker_internal.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  LogicalRepWorker
 

Typedefs

typedef struct LogicalRepWorker LogicalRepWorker
 

Functions

void logicalrep_worker_attach (int slot)
 
LogicalRepWorkerlogicalrep_worker_find (Oid subid, Oid relid, bool only_running)
 
Listlogicalrep_workers_find (Oid subid, bool only_running)
 
void logicalrep_worker_launch (Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid)
 
void logicalrep_worker_stop (Oid subid, Oid relid)
 
void logicalrep_worker_stop_at_commit (Oid subid, Oid relid)
 
void logicalrep_worker_wakeup (Oid subid, Oid relid)
 
void logicalrep_worker_wakeup_ptr (LogicalRepWorker *worker)
 
int logicalrep_sync_worker_count (Oid subid)
 
char * LogicalRepSyncTableStart (XLogRecPtr *origin_startpos)
 
void process_syncing_tables (XLogRecPtr current_lsn)
 
void invalidate_syncing_table_states (Datum arg, int cacheid, uint32 hashvalue)
 
static bool am_tablesync_worker (void)
 

Variables

MemoryContext ApplyContext
 
struct WalReceiverConnwrconn
 
SubscriptionMySubscription
 
LogicalRepWorkerMyLogicalRepWorker
 
bool in_remote_transaction
 

Typedef Documentation

◆ LogicalRepWorker

Function Documentation

◆ am_tablesync_worker()

static bool am_tablesync_worker ( void  )
inlinestatic

Definition at line 90 of file worker_internal.h.

References OidIsValid, and LogicalRepWorker::relid.

Referenced by apply_handle_commit(), apply_handle_origin(), ApplyWorkerMain(), process_syncing_tables(), should_apply_changes_for_rel(), and wait_for_relation_state_change().

91 {
93 }
#define OidIsValid(objectId)
Definition: c.h:638
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:64

◆ invalidate_syncing_table_states()

void invalidate_syncing_table_states ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)

Definition at line 263 of file tablesync.c.

References table_states_valid.

Referenced by ApplyWorkerMain().

264 {
265  table_states_valid = false;
266 }
static bool table_states_valid
Definition: tablesync.c:113

◆ logicalrep_sync_worker_count()

int logicalrep_sync_worker_count ( Oid  subid)

Definition at line 746 of file launcher.c.

References Assert, i, LWLockHeldByMe(), max_logical_replication_workers, OidIsValid, LogicalRepWorker::relid, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.

Referenced by logicalrep_worker_launch(), and process_syncing_tables_for_apply().

747 {
748  int i;
749  int res = 0;
750 
751  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
752 
753  /* Search for attached worker for a given subscription id. */
754  for (i = 0; i < max_logical_replication_workers; i++)
755  {
757 
758  if (w->subid == subid && OidIsValid(w->relid))
759  res++;
760  }
761 
762  return res;
763 }
bool LWLockHeldByMe(LWLock *l)
Definition: lwlock.c:1842
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:72
#define OidIsValid(objectId)
Definition: c.h:638
#define Assert(condition)
Definition: c.h:732
int max_logical_replication_workers
Definition: launcher.c:61
LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:75
int i

◆ logicalrep_worker_attach()

void logicalrep_worker_attach ( int  slot)

Definition at line 637 of file launcher.c.

References Assert, before_shmem_exit(), ereport, errcode(), errmsg(), ERROR, LogicalRepWorker::in_use, logicalrep_worker_onexit(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, MyProc, LogicalRepWorker::proc, and LogicalRepCtxStruct::workers.

Referenced by ApplyWorkerMain().

638 {
639  /* Block concurrent access. */
640  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
641 
642  Assert(slot >= 0 && slot < max_logical_replication_workers);
644 
646  {
647  LWLockRelease(LogicalRepWorkerLock);
648  ereport(ERROR,
649  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
650  errmsg("logical replication worker slot %d is empty, cannot attach",
651  slot)));
652  }
653 
655  {
656  LWLockRelease(LogicalRepWorkerLock);
657  ereport(ERROR,
658  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
659  errmsg("logical replication worker slot %d is already used by "
660  "another worker, cannot attach", slot)));
661  }
662 
665 
666  LWLockRelease(LogicalRepWorkerLock);
667 }
PGPROC * MyProc
Definition: proc.c:68
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:72
int errcode(int sqlerrcode)
Definition: elog.c:570
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:64
#define ERROR
Definition: elog.h:43
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:333
#define ereport(elevel, rest)
Definition: elog.h:141
static void logicalrep_worker_onexit(int code, Datum arg)
Definition: launcher.c:716
uintptr_t Datum
Definition: postgres.h:367
#define Assert(condition)
Definition: c.h:732
int max_logical_replication_workers
Definition: launcher.c:61
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
int errmsg(const char *fmt,...)
Definition: elog.c:784
LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:75

◆ logicalrep_worker_find()

LogicalRepWorker* logicalrep_worker_find ( Oid  subid,
Oid  relid,
bool  only_running 
)

Definition at line 243 of file launcher.c.

References Assert, i, LogicalRepWorker::in_use, LWLockHeldByMe(), max_logical_replication_workers, LogicalRepWorker::proc, LogicalRepWorker::relid, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.

Referenced by ApplyLauncherMain(), logicalrep_worker_stop(), logicalrep_worker_wakeup(), process_syncing_tables_for_apply(), wait_for_relation_state_change(), and wait_for_worker_state_change().

244 {
245  int i;
246  LogicalRepWorker *res = NULL;
247 
248  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
249 
250  /* Search for attached worker for a given subscription id. */
251  for (i = 0; i < max_logical_replication_workers; i++)
252  {
254 
255  if (w->in_use && w->subid == subid && w->relid == relid &&
256  (!only_running || w->proc))
257  {
258  res = w;
259  break;
260  }
261  }
262 
263  return res;
264 }
bool LWLockHeldByMe(LWLock *l)
Definition: lwlock.c:1842
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:72
#define Assert(condition)
Definition: c.h:732
int max_logical_replication_workers
Definition: launcher.c:61
LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:75
int i

◆ logicalrep_worker_launch()

void logicalrep_worker_launch ( Oid  dbid,
Oid  subid,
const char *  subname,
Oid  userid,
Oid  relid 
)

Definition at line 294 of file launcher.c.

References Assert, BackgroundWorker::bgw_flags, BackgroundWorker::bgw_function_name, BackgroundWorker::bgw_library_name, BackgroundWorker::bgw_main_arg, BGW_MAXLEN, BackgroundWorker::bgw_name, BGW_NEVER_RESTART, BackgroundWorker::bgw_notify_pid, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_start_time, BackgroundWorker::bgw_type, BGWORKER_BACKEND_DATABASE_CONNECTION, BGWORKER_SHMEM_ACCESS, BgWorkerStart_RecoveryFinished, LogicalRepWorker::dbid, DEBUG1, elog, ereport, errcode(), errhint(), errmsg(), ERROR, LogicalRepWorker::generation, GetCurrentTimestamp(), i, LogicalRepWorker::in_use, Int32GetDatum, InvalidXLogRecPtr, LogicalRepWorker::last_lsn, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, LogicalRepWorker::launch_time, logicalrep_sync_worker_count(), logicalrep_worker_cleanup(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, max_replication_slots, max_sync_workers_per_subscription, MyProcPid, now(), OidIsValid, LogicalRepWorker::proc, RegisterDynamicBackgroundWorker(), LogicalRepWorker::relid, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, LogicalRepWorker::reply_lsn, LogicalRepWorker::reply_time, snprintf, LogicalRepWorker::subid, TIMESTAMP_NOBEGIN, TimestampDifferenceExceeds(), LogicalRepWorker::userid, WaitForReplicationWorkerAttach(), wal_receiver_timeout, WARNING, and LogicalRepCtxStruct::workers.

Referenced by ApplyLauncherMain(), and process_syncing_tables_for_apply().

296 {
297  BackgroundWorker bgw;
298  BackgroundWorkerHandle *bgw_handle;
299  uint16 generation;
300  int i;
301  int slot = 0;
302  LogicalRepWorker *worker = NULL;
303  int nsyncworkers;
305 
306  ereport(DEBUG1,
307  (errmsg("starting logical replication worker for subscription \"%s\"",
308  subname)));
309 
310  /* Report this after the initial starting message for consistency. */
311  if (max_replication_slots == 0)
312  ereport(ERROR,
313  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
314  errmsg("cannot start logical replication workers when max_replication_slots = 0")));
315 
316  /*
317  * We need to do the modification of the shared memory under lock so that
318  * we have consistent view.
319  */
320  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
321 
322 retry:
323  /* Find unused worker slot. */
324  for (i = 0; i < max_logical_replication_workers; i++)
325  {
327 
328  if (!w->in_use)
329  {
330  worker = w;
331  slot = i;
332  break;
333  }
334  }
335 
336  nsyncworkers = logicalrep_sync_worker_count(subid);
337 
338  now = GetCurrentTimestamp();
339 
340  /*
341  * If we didn't find a free slot, try to do garbage collection. The
342  * reason we do this is because if some worker failed to start up and its
343  * parent has crashed while waiting, the in_use state was never cleared.
344  */
345  if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
346  {
347  bool did_cleanup = false;
348 
349  for (i = 0; i < max_logical_replication_workers; i++)
350  {
352 
353  /*
354  * If the worker was marked in use but didn't manage to attach in
355  * time, clean it up.
356  */
357  if (w->in_use && !w->proc &&
360  {
361  elog(WARNING,
362  "logical replication worker for subscription %u took too long to start; canceled",
363  w->subid);
364 
366  did_cleanup = true;
367  }
368  }
369 
370  if (did_cleanup)
371  goto retry;
372  }
373 
374  /*
375  * If we reached the sync worker limit per subscription, just exit
376  * silently as we might get here because of an otherwise harmless race
377  * condition.
378  */
379  if (nsyncworkers >= max_sync_workers_per_subscription)
380  {
381  LWLockRelease(LogicalRepWorkerLock);
382  return;
383  }
384 
385  /*
386  * However if there are no more free worker slots, inform user about it
387  * before exiting.
388  */
389  if (worker == NULL)
390  {
391  LWLockRelease(LogicalRepWorkerLock);
393  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
394  errmsg("out of logical replication worker slots"),
395  errhint("You might need to increase max_logical_replication_workers.")));
396  return;
397  }
398 
399  /* Prepare the worker slot. */
400  worker->launch_time = now;
401  worker->in_use = true;
402  worker->generation++;
403  worker->proc = NULL;
404  worker->dbid = dbid;
405  worker->userid = userid;
406  worker->subid = subid;
407  worker->relid = relid;
408  worker->relstate = SUBREL_STATE_UNKNOWN;
410  worker->last_lsn = InvalidXLogRecPtr;
413  worker->reply_lsn = InvalidXLogRecPtr;
414  TIMESTAMP_NOBEGIN(worker->reply_time);
415 
416  /* Before releasing lock, remember generation for future identification. */
417  generation = worker->generation;
418 
419  LWLockRelease(LogicalRepWorkerLock);
420 
421  /* Register the new dynamic worker. */
422  memset(&bgw, 0, sizeof(bgw));
426  snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
427  snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
428  if (OidIsValid(relid))
430  "logical replication worker for subscription %u sync %u", subid, relid);
431  else
433  "logical replication worker for subscription %u", subid);
434  snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker");
435 
438  bgw.bgw_main_arg = Int32GetDatum(slot);
439 
440  if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
441  {
442  /* Failed to start worker, so clean up the worker slot. */
443  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
444  Assert(generation == worker->generation);
446  LWLockRelease(LogicalRepWorkerLock);
447 
449  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
450  errmsg("out of background worker slots"),
451  errhint("You might need to increase max_worker_processes.")));
452  return;
453  }
454 
455  /* Now wait until it attaches. */
456  WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
457 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:40
int errhint(const char *fmt,...)
Definition: elog.c:974
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1583
int64 TimestampTz
Definition: timestamp.h:39
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:72
TimestampTz last_send_time
XLogRecPtr last_lsn
int bgw_restart_time
Definition: bgworker.h:94
int errcode(int sqlerrcode)
Definition: elog.c:570
NameData subname
#define BGWORKER_SHMEM_ACCESS
Definition: bgworker.h:52
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1682
#define OidIsValid(objectId)
Definition: c.h:638
char bgw_function_name[BGW_MAXLEN]
Definition: bgworker.h:96
int wal_receiver_timeout
Definition: walreceiver.c:76
XLogRecPtr relstate_lsn
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
Datum bgw_main_arg
Definition: bgworker.h:97
unsigned short uint16
Definition: c.h:357
#define ERROR
Definition: elog.h:43
int max_sync_workers_per_subscription
Definition: launcher.c:62
XLogRecPtr reply_lsn
static void logicalrep_worker_cleanup(LogicalRepWorker *worker)
Definition: launcher.c:687
#define BGW_NEVER_RESTART
Definition: bgworker.h:84
#define TIMESTAMP_NOBEGIN(j)
Definition: timestamp.h:112
#define ereport(elevel, rest)
Definition: elog.h:141
#define WARNING
Definition: elog.h:40
int logicalrep_sync_worker_count(Oid subid)
Definition: launcher.c:746
static void WaitForReplicationWorkerAttach(LogicalRepWorker *worker, uint16 generation, BackgroundWorkerHandle *handle)
Definition: launcher.c:183
TimestampTz launch_time
int max_replication_slots
Definition: slot.c:99
TimestampTz last_recv_time
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:90
#define Assert(condition)
Definition: c.h:732
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition: bgworker.h:59
int max_logical_replication_workers
Definition: launcher.c:61
#define BGW_MAXLEN
Definition: bgworker.h:85
BgWorkerStartTime bgw_start_time
Definition: bgworker.h:93
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
Definition: bgworker.c:932
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
#define Int32GetDatum(X)
Definition: postgres.h:479
char bgw_type[BGW_MAXLEN]
Definition: bgworker.h:91
int errmsg(const char *fmt,...)
Definition: elog.c:784
pid_t bgw_notify_pid
Definition: bgworker.h:99
#define elog(elevel,...)
Definition: elog.h:226
LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:75
int i
char bgw_library_name[BGW_MAXLEN]
Definition: bgworker.h:95
#define snprintf
Definition: port.h:192
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1547
TimestampTz reply_time

◆ logicalrep_worker_stop()

void logicalrep_worker_stop ( Oid  subid,
Oid  relid 
)

Definition at line 464 of file launcher.c.

References CHECK_FOR_INTERRUPTS, LogicalRepWorker::generation, LogicalRepWorker::in_use, kill, logicalrep_worker_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLatch, PGPROC::pid, LogicalRepWorker::proc, ResetLatch(), WAIT_EVENT_BGWORKER_SHUTDOWN, WAIT_EVENT_BGWORKER_STARTUP, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by AtEOXact_ApplyLauncher(), and DropSubscription().

465 {
466  LogicalRepWorker *worker;
467  uint16 generation;
468 
469  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
470 
471  worker = logicalrep_worker_find(subid, relid, false);
472 
473  /* No worker, nothing to do. */
474  if (!worker)
475  {
476  LWLockRelease(LogicalRepWorkerLock);
477  return;
478  }
479 
480  /*
481  * Remember which generation was our worker so we can check if what we see
482  * is still the same one.
483  */
484  generation = worker->generation;
485 
486  /*
487  * If we found a worker but it does not have proc set then it is still
488  * starting up; wait for it to finish starting and then kill it.
489  */
490  while (worker->in_use && !worker->proc)
491  {
492  int rc;
493 
494  LWLockRelease(LogicalRepWorkerLock);
495 
496  /* Wait a bit --- we don't expect to have to wait long. */
497  rc = WaitLatch(MyLatch,
500 
501  if (rc & WL_LATCH_SET)
502  {
505  }
506 
507  /* Recheck worker status. */
508  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
509 
510  /*
511  * Check whether the worker slot is no longer used, which would mean
512  * that the worker has exited, or whether the worker generation is
513  * different, meaning that a different worker has taken the slot.
514  */
515  if (!worker->in_use || worker->generation != generation)
516  {
517  LWLockRelease(LogicalRepWorkerLock);
518  return;
519  }
520 
521  /* Worker has assigned proc, so it has started. */
522  if (worker->proc)
523  break;
524  }
525 
526  /* Now terminate the worker ... */
527  kill(worker->proc->pid, SIGTERM);
528 
529  /* ... and wait for it to die. */
530  for (;;)
531  {
532  int rc;
533 
534  /* is it gone? */
535  if (!worker->proc || worker->generation != generation)
536  break;
537 
538  LWLockRelease(LogicalRepWorkerLock);
539 
540  /* Wait a bit --- we don't expect to have to wait long. */
541  rc = WaitLatch(MyLatch,
542  WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
544 
545  if (rc & WL_LATCH_SET)
546  {
549  }
550 
551  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
552  }
553 
554  LWLockRelease(LogicalRepWorkerLock);
555 }
#define WL_TIMEOUT
Definition: latch.h:127
#define kill(pid, sig)
Definition: win32_port.h:426
void ResetLatch(Latch *latch)
Definition: latch.c:519
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:344
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
unsigned short uint16
Definition: c.h:357
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
Definition: launcher.c:243
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
struct Latch * MyLatch
Definition: globals.c:54
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
int pid
Definition: proc.h:109
#define WL_LATCH_SET
Definition: latch.h:124
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:129

◆ logicalrep_worker_stop_at_commit()

void logicalrep_worker_stop_at_commit ( Oid  subid,
Oid  relid 
)

Definition at line 561 of file launcher.c.

References Assert, GetCurrentTransactionNestLevel(), lappend(), MemoryContextSwitchTo(), StopWorkersData::nestDepth, NIL, on_commit_stop_workers, palloc(), StopWorkersData::parent, LogicalRepWorkerId::relid, LogicalRepWorkerId::subid, TopTransactionContext, and StopWorkersData::workers.

Referenced by AlterSubscription_refresh().

562 {
563  int nestDepth = GetCurrentTransactionNestLevel();
564  LogicalRepWorkerId *wid;
565  MemoryContext oldctx;
566 
567  /* Make sure we store the info in context that survives until commit. */
569 
570  /* Check that previous transactions were properly cleaned up. */
571  Assert(on_commit_stop_workers == NULL ||
572  nestDepth >= on_commit_stop_workers->nestDepth);
573 
574  /*
575  * Push a new stack element if we don't already have one for the current
576  * nestDepth.
577  */
578  if (on_commit_stop_workers == NULL ||
579  nestDepth > on_commit_stop_workers->nestDepth)
580  {
581  StopWorkersData *newdata = palloc(sizeof(StopWorkersData));
582 
583  newdata->nestDepth = nestDepth;
584  newdata->workers = NIL;
585  newdata->parent = on_commit_stop_workers;
586  on_commit_stop_workers = newdata;
587  }
588 
589  /*
590  * Finally add a new worker into the worker list of the current
591  * subtransaction.
592  */
593  wid = palloc(sizeof(LogicalRepWorkerId));
594  wid->subid = subid;
595  wid->relid = relid;
598 
599  MemoryContextSwitchTo(oldctx);
600 }
#define NIL
Definition: pg_list.h:65
MemoryContext TopTransactionContext
Definition: mcxt.c:49
List * workers
Definition: launcher.c:86
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static StopWorkersData * on_commit_stop_workers
Definition: launcher.c:95
List * lappend(List *list, void *datum)
Definition: list.c:322
int GetCurrentTransactionNestLevel(void)
Definition: xact.c:842
#define Assert(condition)
Definition: c.h:732
struct StopWorkersData * parent
Definition: launcher.c:87
void * palloc(Size size)
Definition: mcxt.c:949

◆ logicalrep_worker_wakeup()

void logicalrep_worker_wakeup ( Oid  subid,
Oid  relid 
)

Definition at line 606 of file launcher.c.

References logicalrep_worker_find(), logicalrep_worker_wakeup_ptr(), LW_SHARED, LWLockAcquire(), and LWLockRelease().

Referenced by pg_attribute_noreturn().

607 {
608  LogicalRepWorker *worker;
609 
610  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
611 
612  worker = logicalrep_worker_find(subid, relid, true);
613 
614  if (worker)
616 
617  LWLockRelease(LogicalRepWorkerLock);
618 }
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:626
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
Definition: launcher.c:243
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122

◆ logicalrep_worker_wakeup_ptr()

void logicalrep_worker_wakeup_ptr ( LogicalRepWorker worker)

Definition at line 626 of file launcher.c.

References Assert, LWLockHeldByMe(), LogicalRepWorker::proc, PGPROC::procLatch, and SetLatch().

Referenced by logicalrep_worker_wakeup(), process_syncing_tables_for_apply(), and wait_for_worker_state_change().

627 {
628  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
629 
630  SetLatch(&worker->proc->procLatch);
631 }
bool LWLockHeldByMe(LWLock *l)
Definition: lwlock.c:1842
void SetLatch(Latch *latch)
Definition: latch.c:436
Latch procLatch
Definition: proc.h:104
#define Assert(condition)
Definition: c.h:732

◆ logicalrep_workers_find()

List* logicalrep_workers_find ( Oid  subid,
bool  only_running 
)

Definition at line 271 of file launcher.c.

References Assert, i, LogicalRepWorker::in_use, lappend(), LWLockHeldByMe(), max_logical_replication_workers, NIL, LogicalRepWorker::proc, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.

Referenced by DropSubscription().

272 {
273  int i;
274  List *res = NIL;
275 
276  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
277 
278  /* Search for attached worker for a given subscription id. */
279  for (i = 0; i < max_logical_replication_workers; i++)
280  {
282 
283  if (w->in_use && w->subid == subid && (!only_running || w->proc))
284  res = lappend(res, w);
285  }
286 
287  return res;
288 }
#define NIL
Definition: pg_list.h:65
bool LWLockHeldByMe(LWLock *l)
Definition: lwlock.c:1842
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:72
List * lappend(List *list, void *datum)
Definition: list.c:322
#define Assert(condition)
Definition: c.h:732
int max_logical_replication_workers
Definition: launcher.c:61
LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:75
int i
Definition: pg_list.h:50

◆ LogicalRepSyncTableStart()

char* LogicalRepSyncTableStart ( XLogRecPtr origin_startpos)

Definition at line 805 of file tablesync.c.

References CommandCounterIncrement(), CommitTransactionCommand(), Subscription::conninfo, copy_table(), CRS_USE_SNAPSHOT, elog, ereport, WalRcvExecResult::err, errdetail(), errmsg(), ERROR, GetSubscriptionRelState(), GetTransactionSnapshot(), InvalidXLogRecPtr, MyLogicalRepWorker, MySubscription, NAMEDATALEN, NoLock, Subscription::oid, pgstat_report_stat(), PopActiveSnapshot(), psprintf(), PushActiveSnapshot(), LogicalRepWorker::relid, LogicalRepWorker::relmutex, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, RowExclusiveLock, Subscription::slotname, SpinLockAcquire, SpinLockRelease, StartTransactionCommand(), StaticAssertStmt, WalRcvExecResult::status, LogicalRepWorker::subid, table_close(), table_open(), UpdateSubscriptionRelState(), wait_for_worker_state_change(), walrcv_clear_result(), walrcv_connect, walrcv_create_slot, walrcv_exec, WALRCV_OK_COMMAND, and wrconn.

Referenced by ApplyWorkerMain().

806 {
807  char *slotname;
808  char *err;
809  char relstate;
810  XLogRecPtr relstate_lsn;
811 
812  /* Check the state of the table synchronization. */
816  &relstate_lsn, true);
818 
820  MyLogicalRepWorker->relstate = relstate;
821  MyLogicalRepWorker->relstate_lsn = relstate_lsn;
823 
824  /*
825  * To build a slot name for the sync work, we are limited to NAMEDATALEN -
826  * 1 characters. We cut the original slot name to NAMEDATALEN - 28 chars
827  * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0'). (It's actually the
828  * NAMEDATALEN on the remote that matters, but this scheme will also work
829  * reasonably if that is different.)
830  */
831  StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small"); /* for sanity */
832  slotname = psprintf("%.*s_%u_sync_%u",
833  NAMEDATALEN - 28,
837 
838  /*
839  * Here we use the slot name instead of the subscription name as the
840  * application_name, so that it is different from the main apply worker,
841  * so that synchronous replication can distinguish them.
842  */
843  wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
844  if (wrconn == NULL)
845  ereport(ERROR,
846  (errmsg("could not connect to the publisher: %s", err)));
847 
848  switch (MyLogicalRepWorker->relstate)
849  {
850  case SUBREL_STATE_INIT:
851  case SUBREL_STATE_DATASYNC:
852  {
853  Relation rel;
854  WalRcvExecResult *res;
855 
857  MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
860 
861  /* Update the state and make it visible to others. */
868  pgstat_report_stat(false);
869 
870  /*
871  * We want to do the table data sync in a single transaction.
872  */
874 
875  /*
876  * Use a standard write lock here. It might be better to
877  * disallow access to the table while it's being synchronized.
878  * But we don't want to block the main apply process from
879  * working and it has to open the relation in RowExclusiveLock
880  * when remapping remote relation id to local one.
881  */
883 
884  /*
885  * Create a temporary slot for the sync process. We do this
886  * inside the transaction so that we can use the snapshot made
887  * by the slot to get existing data.
888  */
889  res = walrcv_exec(wrconn,
890  "BEGIN READ ONLY ISOLATION LEVEL "
891  "REPEATABLE READ", 0, NULL);
892  if (res->status != WALRCV_OK_COMMAND)
893  ereport(ERROR,
894  (errmsg("table copy could not start transaction on publisher"),
895  errdetail("The error was: %s", res->err)));
896  walrcv_clear_result(res);
897 
898  /*
899  * Create new temporary logical decoding slot.
900  *
901  * We'll use slot for data copy so make sure the snapshot is
902  * used for the transaction; that way the COPY will get data
903  * that is consistent with the lsn used by the slot to start
904  * decoding.
905  */
906  walrcv_create_slot(wrconn, slotname, true,
907  CRS_USE_SNAPSHOT, origin_startpos);
908 
910  copy_table(rel);
912 
913  res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
914  if (res->status != WALRCV_OK_COMMAND)
915  ereport(ERROR,
916  (errmsg("table copy could not finish transaction on publisher"),
917  errdetail("The error was: %s", res->err)));
918  walrcv_clear_result(res);
919 
920  table_close(rel, NoLock);
921 
922  /* Make the copy visible. */
924 
925  /*
926  * We are done with the initial data synchronization, update
927  * the state.
928  */
930  MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
931  MyLogicalRepWorker->relstate_lsn = *origin_startpos;
933 
934  /* Wait for main apply worker to tell us to catchup. */
935  wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
936 
937  /*----------
938  * There are now two possible states here:
939  * a) Sync is behind the apply. If that's the case we need to
940  * catch up with it by consuming the logical replication
941  * stream up to the relstate_lsn. For that, we exit this
942  * function and continue in ApplyWorkerMain().
943  * b) Sync is caught up with the apply. So it can just set
944  * the state to SYNCDONE and finish.
945  *----------
946  */
947  if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn)
948  {
949  /*
950  * Update the new state in catalog. No need to bother
951  * with the shmem state as we are exiting for good.
952  */
955  SUBREL_STATE_SYNCDONE,
956  *origin_startpos);
957  finish_sync_worker();
958  }
959  break;
960  }
961  case SUBREL_STATE_SYNCDONE:
962  case SUBREL_STATE_READY:
963  case SUBREL_STATE_UNKNOWN:
964 
965  /*
966  * Nothing to do here but finish. (UNKNOWN means the relation was
967  * removed from pg_subscription_rel before the sync worker could
968  * start.)
969  */
970  finish_sync_worker();
971  break;
972  default:
973  elog(ERROR, "unknown relation state \"%c\"",
975  }
976 
977  return slotname;
978 }
Subscription * MySubscription
Definition: worker.c:102
WalReceiverConn * wrconn
Definition: worker.c:100
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
void CommitTransactionCommand(void)
Definition: xact.c:2895
char * psprintf(const char *fmt,...)
Definition: psprintf.c:46
static void copy_table(Relation rel)
Definition: tablesync.c:751
void PopActiveSnapshot(void)
Definition: snapmgr.c:814
#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)
Definition: walreceiver.h:277
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:306
XLogRecPtr relstate_lsn
#define NAMEDATALEN
#define StaticAssertStmt(condition, errmessage)
Definition: c.h:842
#define SpinLockAcquire(lock)
Definition: spin.h:62
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:285
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:64
#define ERROR
Definition: elog.h:43
#define NoLock
Definition: lockdefs.h:34
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:735
#define RowExclusiveLock
Definition: lockdefs.h:38
int errdetail(const char *fmt,...)
Definition: elog.c:860
#define ereport(elevel, rest)
Definition: elog.h:141
#define SpinLockRelease(lock)
Definition: spin.h:64
void CommandCounterIncrement(void)
Definition: xact.c:1003
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
uint64 XLogRecPtr
Definition: xlogdefs.h:21
WalRcvExecStatus status
Definition: walreceiver.h:195
void StartTransactionCommand(void)
Definition: xact.c:2794
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn, bool missing_ok)
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define elog(elevel,...)
Definition: elog.h:226
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
static bool wait_for_worker_state_change(char expected_state)
Definition: tablesync.c:214
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:279
void pgstat_report_stat(bool force)
Definition: pgstat.c:813
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:255

◆ process_syncing_tables()

void process_syncing_tables ( XLogRecPtr  current_lsn)

Definition at line 536 of file tablesync.c.

References am_tablesync_worker(), process_syncing_tables_for_apply(), and process_syncing_tables_for_sync().

Referenced by apply_handle_commit(), and LogicalRepApplyLoop().

537 {
538  if (am_tablesync_worker())
539  process_syncing_tables_for_sync(current_lsn);
540  else
542 }
static void process_syncing_tables_for_apply(XLogRecPtr current_lsn)
Definition: tablesync.c:327
static bool am_tablesync_worker(void)
static void process_syncing_tables_for_sync(XLogRecPtr current_lsn)
Definition: tablesync.c:277

Variable Documentation

◆ ApplyContext

MemoryContext ApplyContext

Definition at line 98 of file worker.c.

◆ in_remote_transaction

bool in_remote_transaction

◆ MyLogicalRepWorker

◆ MySubscription

◆ wrconn