PostgreSQL Source Code  git master
worker_internal.h File Reference
#include <signal.h>
#include "access/xlogdefs.h"
#include "catalog/pg_subscription.h"
#include "datatype/timestamp.h"
#include "storage/lock.h"
#include "storage/spin.h"
Include dependency graph for worker_internal.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  LogicalRepWorker
 

Typedefs

typedef struct LogicalRepWorker LogicalRepWorker
 

Functions

void logicalrep_worker_attach (int slot)
 
LogicalRepWorkerlogicalrep_worker_find (Oid subid, Oid relid, bool only_running)
 
Listlogicalrep_workers_find (Oid subid, bool only_running)
 
void logicalrep_worker_launch (Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid)
 
void logicalrep_worker_stop (Oid subid, Oid relid)
 
void logicalrep_worker_wakeup (Oid subid, Oid relid)
 
void logicalrep_worker_wakeup_ptr (LogicalRepWorker *worker)
 
int logicalrep_sync_worker_count (Oid subid)
 
void ReplicationOriginNameForTablesync (Oid suboid, Oid relid, char *originname, int szorgname)
 
char * LogicalRepSyncTableStart (XLogRecPtr *origin_startpos)
 
void process_syncing_tables (XLogRecPtr current_lsn)
 
void invalidate_syncing_table_states (Datum arg, int cacheid, uint32 hashvalue)
 
static bool am_tablesync_worker (void)
 

Variables

MemoryContext ApplyContext
 
struct WalReceiverConnwrconn
 
SubscriptionMySubscription
 
LogicalRepWorkerMyLogicalRepWorker
 
bool in_remote_transaction
 

Typedef Documentation

◆ LogicalRepWorker

Function Documentation

◆ am_tablesync_worker()

static bool am_tablesync_worker ( void  )
inlinestatic

Definition at line 94 of file worker_internal.h.

References OidIsValid, and LogicalRepWorker::relid.

Referenced by apply_handle_origin(), ApplyWorkerMain(), process_syncing_tables(), and should_apply_changes_for_rel().

95 {
97 }
#define OidIsValid(objectId)
Definition: c.h:710
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57

◆ invalidate_syncing_table_states()

void invalidate_syncing_table_states ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)

Definition at line 264 of file tablesync.c.

References table_states_valid.

Referenced by ApplyWorkerMain().

265 {
266  table_states_valid = false;
267 }
static bool table_states_valid
Definition: tablesync.c:118

◆ logicalrep_sync_worker_count()

int logicalrep_sync_worker_count ( Oid  subid)

Definition at line 665 of file launcher.c.

References Assert, i, LWLockHeldByMe(), max_logical_replication_workers, OidIsValid, LogicalRepWorker::relid, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.

Referenced by logicalrep_worker_launch(), and process_syncing_tables_for_apply().

666 {
667  int i;
668  int res = 0;
669 
670  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
671 
672  /* Search for attached worker for a given subscription id. */
673  for (i = 0; i < max_logical_replication_workers; i++)
674  {
676 
677  if (w->subid == subid && OidIsValid(w->relid))
678  res++;
679  }
680 
681  return res;
682 }
bool LWLockHeldByMe(LWLock *l)
Definition: lwlock.c:1932
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:65
#define OidIsValid(objectId)
Definition: c.h:710
#define Assert(condition)
Definition: c.h:804
int max_logical_replication_workers
Definition: launcher.c:54
LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:68
int i

◆ logicalrep_worker_attach()

void logicalrep_worker_attach ( int  slot)

Definition at line 570 of file launcher.c.

References Assert, before_shmem_exit(), ereport, errcode(), errmsg(), ERROR, LogicalRepWorker::in_use, logicalrep_worker_onexit(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, MyProc, LogicalRepWorker::proc, and LogicalRepCtxStruct::workers.

Referenced by ApplyWorkerMain().

571 {
572  /* Block concurrent access. */
573  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
574 
575  Assert(slot >= 0 && slot < max_logical_replication_workers);
577 
579  {
580  LWLockRelease(LogicalRepWorkerLock);
581  ereport(ERROR,
582  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
583  errmsg("logical replication worker slot %d is empty, cannot attach",
584  slot)));
585  }
586 
588  {
589  LWLockRelease(LogicalRepWorkerLock);
590  ereport(ERROR,
591  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
592  errmsg("logical replication worker slot %d is already used by "
593  "another worker, cannot attach", slot)));
594  }
595 
598 
599  LWLockRelease(LogicalRepWorkerLock);
600 }
PGPROC * MyProc
Definition: proc.c:68
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:65
int errcode(int sqlerrcode)
Definition: elog.c:698
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1816
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57
#define ERROR
Definition: elog.h:46
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:333
static void logicalrep_worker_onexit(int code, Datum arg)
Definition: launcher.c:649
uintptr_t Datum
Definition: postgres.h:411
#define ereport(elevel,...)
Definition: elog.h:157
#define Assert(condition)
Definition: c.h:804
int max_logical_replication_workers
Definition: launcher.c:54
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1203
int errmsg(const char *fmt,...)
Definition: elog.c:909
LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:68

◆ logicalrep_worker_find()

LogicalRepWorker* logicalrep_worker_find ( Oid  subid,
Oid  relid,
bool  only_running 
)

Definition at line 221 of file launcher.c.

References Assert, i, LogicalRepWorker::in_use, LWLockHeldByMe(), max_logical_replication_workers, LogicalRepWorker::proc, LogicalRepWorker::relid, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.

Referenced by ApplyLauncherMain(), logicalrep_worker_stop(), logicalrep_worker_wakeup(), process_syncing_tables_for_apply(), wait_for_relation_state_change(), and wait_for_worker_state_change().

222 {
223  int i;
224  LogicalRepWorker *res = NULL;
225 
226  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
227 
228  /* Search for attached worker for a given subscription id. */
229  for (i = 0; i < max_logical_replication_workers; i++)
230  {
232 
233  if (w->in_use && w->subid == subid && w->relid == relid &&
234  (!only_running || w->proc))
235  {
236  res = w;
237  break;
238  }
239  }
240 
241  return res;
242 }
bool LWLockHeldByMe(LWLock *l)
Definition: lwlock.c:1932
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:65
#define Assert(condition)
Definition: c.h:804
int max_logical_replication_workers
Definition: launcher.c:54
LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:68
int i

◆ logicalrep_worker_launch()

void logicalrep_worker_launch ( Oid  dbid,
Oid  subid,
const char *  subname,
Oid  userid,
Oid  relid 
)

Definition at line 272 of file launcher.c.

References Assert, BackgroundWorker::bgw_flags, BackgroundWorker::bgw_function_name, BackgroundWorker::bgw_library_name, BackgroundWorker::bgw_main_arg, BGW_MAXLEN, BackgroundWorker::bgw_name, BGW_NEVER_RESTART, BackgroundWorker::bgw_notify_pid, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_start_time, BackgroundWorker::bgw_type, BGWORKER_BACKEND_DATABASE_CONNECTION, BGWORKER_SHMEM_ACCESS, BgWorkerStart_RecoveryFinished, LogicalRepWorker::dbid, DEBUG1, elog, ereport, errcode(), errhint(), errmsg(), errmsg_internal(), ERROR, LogicalRepWorker::generation, GetCurrentTimestamp(), i, LogicalRepWorker::in_use, Int32GetDatum, InvalidXLogRecPtr, LogicalRepWorker::last_lsn, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, LogicalRepWorker::launch_time, logicalrep_sync_worker_count(), logicalrep_worker_cleanup(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, max_replication_slots, max_sync_workers_per_subscription, MyProcPid, now(), OidIsValid, LogicalRepWorker::proc, RegisterDynamicBackgroundWorker(), LogicalRepWorker::relid, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, LogicalRepWorker::reply_lsn, LogicalRepWorker::reply_time, snprintf, LogicalRepWorker::subid, TIMESTAMP_NOBEGIN, TimestampDifferenceExceeds(), LogicalRepWorker::userid, WaitForReplicationWorkerAttach(), wal_receiver_timeout, WARNING, and LogicalRepCtxStruct::workers.

Referenced by ApplyLauncherMain(), and process_syncing_tables_for_apply().

274 {
275  BackgroundWorker bgw;
276  BackgroundWorkerHandle *bgw_handle;
277  uint16 generation;
278  int i;
279  int slot = 0;
280  LogicalRepWorker *worker = NULL;
281  int nsyncworkers;
283 
284  ereport(DEBUG1,
285  (errmsg_internal("starting logical replication worker for subscription \"%s\"",
286  subname)));
287 
288  /* Report this after the initial starting message for consistency. */
289  if (max_replication_slots == 0)
290  ereport(ERROR,
291  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
292  errmsg("cannot start logical replication workers when max_replication_slots = 0")));
293 
294  /*
295  * We need to do the modification of the shared memory under lock so that
296  * we have consistent view.
297  */
298  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
299 
300 retry:
301  /* Find unused worker slot. */
302  for (i = 0; i < max_logical_replication_workers; i++)
303  {
305 
306  if (!w->in_use)
307  {
308  worker = w;
309  slot = i;
310  break;
311  }
312  }
313 
314  nsyncworkers = logicalrep_sync_worker_count(subid);
315 
316  now = GetCurrentTimestamp();
317 
318  /*
319  * If we didn't find a free slot, try to do garbage collection. The
320  * reason we do this is because if some worker failed to start up and its
321  * parent has crashed while waiting, the in_use state was never cleared.
322  */
323  if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
324  {
325  bool did_cleanup = false;
326 
327  for (i = 0; i < max_logical_replication_workers; i++)
328  {
330 
331  /*
332  * If the worker was marked in use but didn't manage to attach in
333  * time, clean it up.
334  */
335  if (w->in_use && !w->proc &&
338  {
339  elog(WARNING,
340  "logical replication worker for subscription %u took too long to start; canceled",
341  w->subid);
342 
344  did_cleanup = true;
345  }
346  }
347 
348  if (did_cleanup)
349  goto retry;
350  }
351 
352  /*
353  * If we reached the sync worker limit per subscription, just exit
354  * silently as we might get here because of an otherwise harmless race
355  * condition.
356  */
357  if (nsyncworkers >= max_sync_workers_per_subscription)
358  {
359  LWLockRelease(LogicalRepWorkerLock);
360  return;
361  }
362 
363  /*
364  * However if there are no more free worker slots, inform user about it
365  * before exiting.
366  */
367  if (worker == NULL)
368  {
369  LWLockRelease(LogicalRepWorkerLock);
371  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
372  errmsg("out of logical replication worker slots"),
373  errhint("You might need to increase max_logical_replication_workers.")));
374  return;
375  }
376 
377  /* Prepare the worker slot. */
378  worker->launch_time = now;
379  worker->in_use = true;
380  worker->generation++;
381  worker->proc = NULL;
382  worker->dbid = dbid;
383  worker->userid = userid;
384  worker->subid = subid;
385  worker->relid = relid;
386  worker->relstate = SUBREL_STATE_UNKNOWN;
388  worker->last_lsn = InvalidXLogRecPtr;
391  worker->reply_lsn = InvalidXLogRecPtr;
392  TIMESTAMP_NOBEGIN(worker->reply_time);
393 
394  /* Before releasing lock, remember generation for future identification. */
395  generation = worker->generation;
396 
397  LWLockRelease(LogicalRepWorkerLock);
398 
399  /* Register the new dynamic worker. */
400  memset(&bgw, 0, sizeof(bgw));
404  snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
405  snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
406  if (OidIsValid(relid))
408  "logical replication worker for subscription %u sync %u", subid, relid);
409  else
411  "logical replication worker for subscription %u", subid);
412  snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker");
413 
416  bgw.bgw_main_arg = Int32GetDatum(slot);
417 
418  if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
419  {
420  /* Failed to start worker, so clean up the worker slot. */
421  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
422  Assert(generation == worker->generation);
424  LWLockRelease(LogicalRepWorkerLock);
425 
427  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
428  errmsg("out of background worker slots"),
429  errhint("You might need to increase max_worker_processes.")));
430  return;
431  }
432 
433  /* Now wait until it attaches. */
434  WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
435 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:43
int errhint(const char *fmt,...)
Definition: elog.c:1156
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
int64 TimestampTz
Definition: timestamp.h:39
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:65
TimestampTz last_send_time
XLogRecPtr last_lsn
int bgw_restart_time
Definition: bgworker.h:94
int errcode(int sqlerrcode)
Definition: elog.c:698
NameData subname
#define BGWORKER_SHMEM_ACCESS
Definition: bgworker.h:52
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1711
#define OidIsValid(objectId)
Definition: c.h:710
char bgw_function_name[BGW_MAXLEN]
Definition: bgworker.h:96
int wal_receiver_timeout
Definition: walreceiver.c:90
XLogRecPtr relstate_lsn
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1816
Datum bgw_main_arg
Definition: bgworker.h:97
unsigned short uint16
Definition: c.h:440
#define ERROR
Definition: elog.h:46
int max_sync_workers_per_subscription
Definition: launcher.c:55
XLogRecPtr reply_lsn
static void logicalrep_worker_cleanup(LogicalRepWorker *worker)
Definition: launcher.c:620
#define BGW_NEVER_RESTART
Definition: bgworker.h:84
#define TIMESTAMP_NOBEGIN(j)
Definition: timestamp.h:112
#define WARNING
Definition: elog.h:40
int logicalrep_sync_worker_count(Oid subid)
Definition: launcher.c:665
static void WaitForReplicationWorkerAttach(LogicalRepWorker *worker, uint16 generation, BackgroundWorkerHandle *handle)
Definition: launcher.c:163
#define ereport(elevel,...)
Definition: elog.h:157
TimestampTz launch_time
int errmsg_internal(const char *fmt,...)
Definition: elog.c:996
int max_replication_slots
Definition: slot.c:99
TimestampTz last_recv_time
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:90
#define Assert(condition)
Definition: c.h:804
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition: bgworker.h:59
int max_logical_replication_workers
Definition: launcher.c:54
#define BGW_MAXLEN
Definition: bgworker.h:85
BgWorkerStartTime bgw_start_time
Definition: bgworker.h:93
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
Definition: bgworker.c:962
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1203
#define Int32GetDatum(X)
Definition: postgres.h:523
char bgw_type[BGW_MAXLEN]
Definition: bgworker.h:91
int errmsg(const char *fmt,...)
Definition: elog.c:909
pid_t bgw_notify_pid
Definition: bgworker.h:99
#define elog(elevel,...)
Definition: elog.h:232
LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:68
int i
char bgw_library_name[BGW_MAXLEN]
Definition: bgworker.h:95
#define snprintf
Definition: port.h:216
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544
TimestampTz reply_time

◆ logicalrep_worker_stop()

void logicalrep_worker_stop ( Oid  subid,
Oid  relid 
)

Definition at line 442 of file launcher.c.

References CHECK_FOR_INTERRUPTS, LogicalRepWorker::generation, LogicalRepWorker::in_use, kill, logicalrep_worker_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLatch, PGPROC::pid, LogicalRepWorker::proc, ResetLatch(), WAIT_EVENT_BGWORKER_SHUTDOWN, WAIT_EVENT_BGWORKER_STARTUP, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by AlterSubscription_refresh(), and DropSubscription().

443 {
444  LogicalRepWorker *worker;
445  uint16 generation;
446 
447  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
448 
449  worker = logicalrep_worker_find(subid, relid, false);
450 
451  /* No worker, nothing to do. */
452  if (!worker)
453  {
454  LWLockRelease(LogicalRepWorkerLock);
455  return;
456  }
457 
458  /*
459  * Remember which generation was our worker so we can check if what we see
460  * is still the same one.
461  */
462  generation = worker->generation;
463 
464  /*
465  * If we found a worker but it does not have proc set then it is still
466  * starting up; wait for it to finish starting and then kill it.
467  */
468  while (worker->in_use && !worker->proc)
469  {
470  int rc;
471 
472  LWLockRelease(LogicalRepWorkerLock);
473 
474  /* Wait a bit --- we don't expect to have to wait long. */
475  rc = WaitLatch(MyLatch,
478 
479  if (rc & WL_LATCH_SET)
480  {
483  }
484 
485  /* Recheck worker status. */
486  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
487 
488  /*
489  * Check whether the worker slot is no longer used, which would mean
490  * that the worker has exited, or whether the worker generation is
491  * different, meaning that a different worker has taken the slot.
492  */
493  if (!worker->in_use || worker->generation != generation)
494  {
495  LWLockRelease(LogicalRepWorkerLock);
496  return;
497  }
498 
499  /* Worker has assigned proc, so it has started. */
500  if (worker->proc)
501  break;
502  }
503 
504  /* Now terminate the worker ... */
505  kill(worker->proc->pid, SIGTERM);
506 
507  /* ... and wait for it to die. */
508  for (;;)
509  {
510  int rc;
511 
512  /* is it gone? */
513  if (!worker->proc || worker->generation != generation)
514  break;
515 
516  LWLockRelease(LogicalRepWorkerLock);
517 
518  /* Wait a bit --- we don't expect to have to wait long. */
519  rc = WaitLatch(MyLatch,
520  WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
522 
523  if (rc & WL_LATCH_SET)
524  {
527  }
528 
529  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
530  }
531 
532  LWLockRelease(LogicalRepWorkerLock);
533 }
#define WL_TIMEOUT
Definition: latch.h:128
#define kill(pid, sig)
Definition: win32_port.h:454
void ResetLatch(Latch *latch)
Definition: latch.c:660
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:452
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1816
unsigned short uint16
Definition: c.h:440
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
Definition: launcher.c:221
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1203
struct Latch * MyLatch
Definition: globals.c:57
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:102
int pid
Definition: proc.h:146
#define WL_LATCH_SET
Definition: latch.h:125
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130

◆ logicalrep_worker_wakeup()

void logicalrep_worker_wakeup ( Oid  subid,
Oid  relid 
)

Definition at line 539 of file launcher.c.

References logicalrep_worker_find(), logicalrep_worker_wakeup_ptr(), LW_SHARED, LWLockAcquire(), and LWLockRelease().

Referenced by pg_attribute_noreturn().

540 {
541  LogicalRepWorker *worker;
542 
543  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
544 
545  worker = logicalrep_worker_find(subid, relid, true);
546 
547  if (worker)
549 
550  LWLockRelease(LogicalRepWorkerLock);
551 }
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1816
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:559
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
Definition: launcher.c:221
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1203

◆ logicalrep_worker_wakeup_ptr()

void logicalrep_worker_wakeup_ptr ( LogicalRepWorker worker)

Definition at line 559 of file launcher.c.

References Assert, LWLockHeldByMe(), LogicalRepWorker::proc, PGPROC::procLatch, and SetLatch().

Referenced by logicalrep_worker_wakeup(), process_syncing_tables_for_apply(), and wait_for_worker_state_change().

560 {
561  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
562 
563  SetLatch(&worker->proc->procLatch);
564 }
bool LWLockHeldByMe(LWLock *l)
Definition: lwlock.c:1932
void SetLatch(Latch *latch)
Definition: latch.c:567
Latch procLatch
Definition: proc.h:130
#define Assert(condition)
Definition: c.h:804

◆ logicalrep_workers_find()

List* logicalrep_workers_find ( Oid  subid,
bool  only_running 
)

Definition at line 249 of file launcher.c.

References Assert, i, LogicalRepWorker::in_use, lappend(), LWLockHeldByMe(), max_logical_replication_workers, NIL, LogicalRepWorker::proc, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.

Referenced by DropSubscription().

250 {
251  int i;
252  List *res = NIL;
253 
254  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
255 
256  /* Search for attached worker for a given subscription id. */
257  for (i = 0; i < max_logical_replication_workers; i++)
258  {
260 
261  if (w->in_use && w->subid == subid && (!only_running || w->proc))
262  res = lappend(res, w);
263  }
264 
265  return res;
266 }
#define NIL
Definition: pg_list.h:65
bool LWLockHeldByMe(LWLock *l)
Definition: lwlock.c:1932
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:65
List * lappend(List *list, void *datum)
Definition: list.c:336
#define Assert(condition)
Definition: c.h:804
int max_logical_replication_workers
Definition: launcher.c:54
LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:68
int i
Definition: pg_list.h:50

◆ LogicalRepSyncTableStart()

char* LogicalRepSyncTableStart ( XLogRecPtr origin_startpos)

Definition at line 913 of file tablesync.c.

References Assert, CommandCounterIncrement(), CommitTransactionCommand(), Subscription::conninfo, copy_table(), CRS_USE_SNAPSHOT, DEBUG1, elog, ereport, WalRcvExecResult::err, errcode(), ERRCODE_DUPLICATE_OBJECT, errmsg(), ERROR, GetSubscriptionRelState(), GetTransactionSnapshot(), HOLD_INTERRUPTS, InvalidXLogRecPtr, LockRelationOid(), LSN_FORMAT_ARGS, MyLogicalRepWorker, MySubscription, NAMEDATALEN, NoLock, Subscription::oid, OidIsValid, palloc(), pgstat_report_stat(), PopActiveSnapshot(), PushActiveSnapshot(), LogicalRepWorker::relid, LogicalRepWorker::relmutex, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, ReplicationOriginNameForTablesync(), ReplicationSlotDropAtPubNode(), ReplicationSlotNameForTablesync(), replorigin_advance(), replorigin_by_name(), replorigin_create(), replorigin_session_get_progress(), replorigin_session_origin, replorigin_session_setup(), RESUME_INTERRUPTS, RowExclusiveLock, SpinLockAcquire, SpinLockRelease, StartTransactionCommand(), WalRcvExecResult::status, LogicalRepWorker::subid, table_close(), table_open(), UnlockRelationOid(), UpdateSubscriptionRelState(), wait_for_worker_state_change(), walrcv_clear_result(), walrcv_connect, walrcv_create_slot, walrcv_exec, WALRCV_OK_COMMAND, and wrconn.

Referenced by ApplyWorkerMain().

914 {
915  char *slotname;
916  char *err;
917  char relstate;
918  XLogRecPtr relstate_lsn;
919  Relation rel;
920  WalRcvExecResult *res;
921  char originname[NAMEDATALEN];
922  RepOriginId originid;
923 
924  /* Check the state of the table synchronization. */
928  &relstate_lsn);
930 
932  MyLogicalRepWorker->relstate = relstate;
933  MyLogicalRepWorker->relstate_lsn = relstate_lsn;
935 
936  /*
937  * If synchronization is already done or no longer necessary, exit now
938  * that we've updated shared memory state.
939  */
940  switch (relstate)
941  {
942  case SUBREL_STATE_SYNCDONE:
943  case SUBREL_STATE_READY:
944  case SUBREL_STATE_UNKNOWN:
945  finish_sync_worker(); /* doesn't return */
946  }
947 
948  /* Calculate the name of the tablesync slot. */
949  slotname = (char *) palloc(NAMEDATALEN);
952  slotname,
953  NAMEDATALEN);
954 
955  /*
956  * Here we use the slot name instead of the subscription name as the
957  * application_name, so that it is different from the main apply worker,
958  * so that synchronous replication can distinguish them.
959  */
960  wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
961  if (wrconn == NULL)
962  ereport(ERROR,
963  (errmsg("could not connect to the publisher: %s", err)));
964 
965  Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
966  MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
967  MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
968 
969  /* Assign the origin tracking record name. */
972  originname,
973  sizeof(originname));
974 
975  if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
976  {
977  /*
978  * We have previously errored out before finishing the copy so the
979  * replication slot might exist. We want to remove the slot if it
980  * already exists and proceed.
981  *
982  * XXX We could also instead try to drop the slot, last time we failed
983  * but for that, we might need to clean up the copy state as it might
984  * be in the middle of fetching the rows. Also, if there is a network
985  * breakdown then it wouldn't have succeeded so trying it next time
986  * seems like a better bet.
987  */
988  ReplicationSlotDropAtPubNode(wrconn, slotname, true);
989  }
990  else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
991  {
992  /*
993  * The COPY phase was previously done, but tablesync then crashed
994  * before it was able to finish normally.
995  */
997 
998  /*
999  * The origin tracking name must already exist. It was created first
1000  * time this tablesync was launched.
1001  */
1002  originid = replorigin_by_name(originname, false);
1003  replorigin_session_setup(originid);
1004  replorigin_session_origin = originid;
1005  *origin_startpos = replorigin_session_get_progress(false);
1006 
1008 
1009  goto copy_table_done;
1010  }
1011 
1013  MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
1016 
1017  /* Update the state and make it visible to others. */
1024  pgstat_report_stat(false);
1025 
1027 
1028  /*
1029  * Use a standard write lock here. It might be better to disallow access
1030  * to the table while it's being synchronized. But we don't want to block
1031  * the main apply process from working and it has to open the relation in
1032  * RowExclusiveLock when remapping remote relation id to local one.
1033  */
1035 
1036  /*
1037  * Start a transaction in the remote node in REPEATABLE READ mode. This
1038  * ensures that both the replication slot we create (see below) and the
1039  * COPY are consistent with each other.
1040  */
1041  res = walrcv_exec(wrconn,
1042  "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1043  0, NULL);
1044  if (res->status != WALRCV_OK_COMMAND)
1045  ereport(ERROR,
1046  (errmsg("table copy could not start transaction on publisher: %s",
1047  res->err)));
1048  walrcv_clear_result(res);
1049 
1050  /*
1051  * Create a new permanent logical decoding slot. This slot will be used
1052  * for the catchup phase after COPY is done, so tell it to use the
1053  * snapshot to make the final data consistent.
1054  *
1055  * Prevent cancel/die interrupts while creating slot here because it is
1056  * possible that before the server finishes this command, a concurrent
1057  * drop subscription happens which would complete without removing this
1058  * slot leading to a dangling slot on the server.
1059  */
1060  HOLD_INTERRUPTS();
1061  walrcv_create_slot(wrconn, slotname, false /* permanent */ ,
1062  CRS_USE_SNAPSHOT, origin_startpos);
1064 
1065  /*
1066  * Setup replication origin tracking. The purpose of doing this before the
1067  * copy is to avoid doing the copy again due to any error in setting up
1068  * origin tracking.
1069  */
1070  originid = replorigin_by_name(originname, true);
1071  if (!OidIsValid(originid))
1072  {
1073  /*
1074  * Origin tracking does not exist, so create it now.
1075  *
1076  * Then advance to the LSN got from walrcv_create_slot. This is WAL
1077  * logged for the purpose of recovery. Locks are to prevent the
1078  * replication origin from vanishing while advancing.
1079  */
1080  originid = replorigin_create(originname);
1081 
1082  LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1083  replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
1084  true /* go backward */ , true /* WAL log */ );
1085  UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1086 
1087  replorigin_session_setup(originid);
1088  replorigin_session_origin = originid;
1089  }
1090  else
1091  {
1092  ereport(ERROR,
1094  errmsg("replication origin \"%s\" already exists",
1095  originname)));
1096  }
1097 
1098  /* Now do the initial data copy */
1100  copy_table(rel);
1102 
1103  res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
1104  if (res->status != WALRCV_OK_COMMAND)
1105  ereport(ERROR,
1106  (errmsg("table copy could not finish transaction on publisher: %s",
1107  res->err)));
1108  walrcv_clear_result(res);
1109 
1110  table_close(rel, NoLock);
1111 
1112  /* Make the copy visible. */
1114 
1115  /*
1116  * Update the persisted state to indicate the COPY phase is done; make it
1117  * visible to others.
1118  */
1121  SUBREL_STATE_FINISHEDCOPY,
1123 
1125 
1126 copy_table_done:
1127 
1128  elog(DEBUG1,
1129  "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
1130  originname, LSN_FORMAT_ARGS(*origin_startpos));
1131 
1132  /*
1133  * We are done with the initial data synchronization, update the state.
1134  */
1136  MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
1137  MyLogicalRepWorker->relstate_lsn = *origin_startpos;
1139 
1140  /*
1141  * Finally, wait until the main apply worker tells us to catch up and then
1142  * return to let LogicalRepApplyLoop do it.
1143  */
1144  wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
1145  return slotname;
1146 }
Subscription * MySubscription
Definition: worker.c:161
WalReceiverConn * wrconn
Definition: worker.c:159
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define DEBUG1
Definition: elog.h:25
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:200
void CommitTransactionCommand(void)
Definition: xact.c:2939
uint16 RepOriginId
Definition: xlogdefs.h:65
static void copy_table(Relation rel)
Definition: tablesync.c:802
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1203
int errcode(int sqlerrcode)
Definition: elog.c:698
void PopActiveSnapshot(void)
Definition: snapmgr.c:759
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:872
void pgstat_report_stat(bool disconnect)
Definition: pgstat.c:846
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1068
#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)
Definition: walreceiver.h:423
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:250
#define OidIsValid(objectId)
Definition: c.h:710
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:209
XLogRecPtr relstate_lsn
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:121
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:433
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57
#define ERROR
Definition: elog.h:46
#define NoLock
Definition: lockdefs.h:34
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:680
#define RowExclusiveLock
Definition: lockdefs.h:38
RepOriginId replorigin_create(char *roname)
Definition: origin.c:240
#define SpinLockRelease(lock)
Definition: spin.h:64
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void CommandCounterIncrement(void)
Definition: xact.c:1021
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
#define ereport(elevel,...)
Definition: elog.h:157
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:804
WalRcvExecStatus status
Definition: walreceiver.h:214
RepOriginId replorigin_session_origin
Definition: origin.c:154
void StartTransactionCommand(void)
Definition: xact.c:2838
void ReplicationOriginNameForTablesync(Oid suboid, Oid relid, char *originname, int szorgname)
Definition: tablesync.c:898
void * palloc(Size size)
Definition: mcxt.c:1062
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:119
#define elog(elevel,...)
Definition: elog.h:232
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:109
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:32
static bool wait_for_worker_state_change(char expected_state)
Definition: tablesync.c:215
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:427
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, int szslot)
Definition: tablesync.c:885
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:401

◆ process_syncing_tables()

void process_syncing_tables ( XLogRecPtr  current_lsn)

Definition at line 587 of file tablesync.c.

References am_tablesync_worker(), process_syncing_tables_for_apply(), and process_syncing_tables_for_sync().

Referenced by apply_handle_commit(), apply_handle_stream_commit(), and LogicalRepApplyLoop().

588 {
589  if (am_tablesync_worker())
590  process_syncing_tables_for_sync(current_lsn);
591  else
593 }
static void process_syncing_tables_for_apply(XLogRecPtr current_lsn)
Definition: tablesync.c:355
static bool am_tablesync_worker(void)
static void process_syncing_tables_for_sync(XLogRecPtr current_lsn)
Definition: tablesync.c:278

◆ ReplicationOriginNameForTablesync()

void ReplicationOriginNameForTablesync ( Oid  suboid,
Oid  relid,
char *  originname,
int  szorgname 
)

Definition at line 898 of file tablesync.c.

References snprintf.

Referenced by AlterSubscription_refresh(), DropSubscription(), LogicalRepSyncTableStart(), and process_syncing_tables_for_apply().

900 {
901  snprintf(originname, szorgname, "pg_%u_%u", suboid, relid);
902 }
#define snprintf
Definition: port.h:216

Variable Documentation

◆ ApplyContext

MemoryContext ApplyContext

Definition at line 154 of file worker.c.

Referenced by apply_handle_stream_start().

◆ in_remote_transaction

◆ MyLogicalRepWorker

◆ MySubscription

◆ wrconn