PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
worker_internal.h File Reference
#include <signal.h>
#include "access/xlogdefs.h"
#include "catalog/pg_subscription.h"
#include "datatype/timestamp.h"
#include "storage/lock.h"
Include dependency graph for worker_internal.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  LogicalRepWorker
 

Typedefs

typedef struct LogicalRepWorker LogicalRepWorker
 

Functions

void logicalrep_worker_attach (int slot)
 
LogicalRepWorkerlogicalrep_worker_find (Oid subid, Oid relid, bool only_running)
 
void logicalrep_worker_launch (Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid)
 
void logicalrep_worker_stop (Oid subid, Oid relid)
 
void logicalrep_worker_wakeup (Oid subid, Oid relid)
 
void logicalrep_worker_wakeup_ptr (LogicalRepWorker *worker)
 
int logicalrep_sync_worker_count (Oid subid)
 
char * LogicalRepSyncTableStart (XLogRecPtr *origin_startpos)
 
void process_syncing_tables (XLogRecPtr current_lsn)
 
void invalidate_syncing_table_states (Datum arg, int cacheid, uint32 hashvalue)
 
static bool am_tablesync_worker (void)
 

Variables

MemoryContext ApplyContext
 
struct WalReceiverConnwrconn
 
SubscriptionMySubscription
 
LogicalRepWorkerMyLogicalRepWorker
 
bool in_remote_transaction
 

Typedef Documentation

Function Documentation

static bool am_tablesync_worker ( void  )
inlinestatic

Definition at line 88 of file worker_internal.h.

References OidIsValid, and LogicalRepWorker::relid.

Referenced by apply_handle_commit(), apply_handle_origin(), ApplyWorkerMain(), process_syncing_tables(), should_apply_changes_for_rel(), and wait_for_relation_state_change().

89 {
91 }
#define OidIsValid(objectId)
Definition: c.h:538
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:63
void invalidate_syncing_table_states ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)

Definition at line 258 of file tablesync.c.

References table_states_valid.

Referenced by ApplyWorkerMain().

259 {
260  table_states_valid = false;
261 }
static bool table_states_valid
Definition: tablesync.c:112
int logicalrep_sync_worker_count ( Oid  subid)

Definition at line 655 of file launcher.c.

References Assert, i, LWLockHeldByMe(), max_logical_replication_workers, OidIsValid, LogicalRepWorker::relid, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.

Referenced by logicalrep_worker_launch(), and process_syncing_tables_for_apply().

656 {
657  int i;
658  int res = 0;
659 
660  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
661 
662  /* Search for attached worker for a given subscription id. */
663  for (i = 0; i < max_logical_replication_workers; i++)
664  {
666 
667  if (w->subid == subid && OidIsValid(w->relid))
668  res++;
669  }
670 
671  return res;
672 }
bool LWLockHeldByMe(LWLock *l)
Definition: lwlock.c:1831
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:71
#define OidIsValid(objectId)
Definition: c.h:538
#define Assert(condition)
Definition: c.h:675
int max_logical_replication_workers
Definition: launcher.c:60
LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:74
int i
void logicalrep_worker_attach ( int  slot)

Definition at line 546 of file launcher.c.

References Assert, before_shmem_exit(), ereport, errcode(), errmsg(), ERROR, LogicalRepWorker::in_use, logicalrep_worker_onexit(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, MyProc, LogicalRepWorker::proc, and LogicalRepCtxStruct::workers.

Referenced by ApplyWorkerMain().

547 {
548  /* Block concurrent access. */
549  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
550 
551  Assert(slot >= 0 && slot < max_logical_replication_workers);
553 
555  {
556  LWLockRelease(LogicalRepWorkerLock);
557  ereport(ERROR,
558  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
559  errmsg("logical replication worker slot %d is empty, cannot attach",
560  slot)));
561  }
562 
564  {
565  LWLockRelease(LogicalRepWorkerLock);
566  ereport(ERROR,
567  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
568  errmsg("logical replication worker slot %d is already used by "
569  "another worker, cannot attach", slot)));
570  }
571 
574 
575  LWLockRelease(LogicalRepWorkerLock);
576 }
PGPROC * MyProc
Definition: proc.c:67
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:71
int errcode(int sqlerrcode)
Definition: elog.c:575
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:63
#define ERROR
Definition: elog.h:43
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:320
#define ereport(elevel, rest)
Definition: elog.h:122
static void logicalrep_worker_onexit(int code, Datum arg)
Definition: launcher.c:625
uintptr_t Datum
Definition: postgres.h:372
#define Assert(condition)
Definition: c.h:675
int max_logical_replication_workers
Definition: launcher.c:60
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
int errmsg(const char *fmt,...)
Definition: elog.c:797
LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:74
LogicalRepWorker* logicalrep_worker_find ( Oid  subid,
Oid  relid,
bool  only_running 
)

Definition at line 229 of file launcher.c.

References Assert, i, LogicalRepWorker::in_use, LWLockHeldByMe(), max_logical_replication_workers, NULL, LogicalRepWorker::proc, LogicalRepWorker::relid, LogicalRepWorker::subid, and LogicalRepCtxStruct::workers.

Referenced by ApplyLauncherMain(), logicalrep_worker_stop(), logicalrep_worker_wakeup(), process_syncing_tables_for_apply(), wait_for_relation_state_change(), and wait_for_worker_state_change().

230 {
231  int i;
232  LogicalRepWorker *res = NULL;
233 
234  Assert(LWLockHeldByMe(LogicalRepWorkerLock));
235 
236  /* Search for attached worker for a given subscription id. */
237  for (i = 0; i < max_logical_replication_workers; i++)
238  {
240 
241  if (w->in_use && w->subid == subid && w->relid == relid &&
242  (!only_running || w->proc))
243  {
244  res = w;
245  break;
246  }
247  }
248 
249  return res;
250 }
bool LWLockHeldByMe(LWLock *l)
Definition: lwlock.c:1831
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:71
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
int max_logical_replication_workers
Definition: launcher.c:60
LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:74
int i
void logicalrep_worker_launch ( Oid  dbid,
Oid  subid,
const char *  subname,
Oid  userid,
Oid  relid 
)

Definition at line 256 of file launcher.c.

References BackgroundWorker::bgw_flags, BackgroundWorker::bgw_function_name, BackgroundWorker::bgw_library_name, BackgroundWorker::bgw_main_arg, BGW_MAXLEN, BackgroundWorker::bgw_name, BGW_NEVER_RESTART, BackgroundWorker::bgw_notify_pid, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_start_time, BGWORKER_BACKEND_DATABASE_CONNECTION, BGWORKER_SHMEM_ACCESS, BgWorkerStart_RecoveryFinished, LogicalRepWorker::dbid, DEBUG1, elog, ereport, errcode(), errhint(), errmsg(), ERROR, LogicalRepWorker::generation, GetCurrentTimestamp(), i, LogicalRepWorker::in_use, Int32GetDatum, InvalidXLogRecPtr, LogicalRepWorker::last_lsn, LogicalRepWorker::last_recv_time, LogicalRepWorker::last_send_time, LogicalRepWorker::launch_time, logicalrep_sync_worker_count(), logicalrep_worker_cleanup(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), max_logical_replication_workers, max_replication_slots, max_sync_workers_per_subscription, MyProcPid, now(), NULL, OidIsValid, LogicalRepWorker::proc, RegisterDynamicBackgroundWorker(), LogicalRepWorker::relid, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, LogicalRepWorker::reply_lsn, LogicalRepWorker::reply_time, snprintf(), LogicalRepWorker::subid, SUBREL_STATE_UNKNOWN, TIMESTAMP_NOBEGIN, TimestampDifferenceExceeds(), LogicalRepWorker::userid, WaitForReplicationWorkerAttach(), wal_receiver_timeout, WARNING, and LogicalRepCtxStruct::workers.

Referenced by ApplyLauncherMain(), and process_syncing_tables_for_apply().

258 {
259  BackgroundWorker bgw;
260  BackgroundWorkerHandle *bgw_handle;
261  int i;
262  int slot = 0;
263  LogicalRepWorker *worker = NULL;
264  int nsyncworkers;
266 
267  ereport(DEBUG1,
268  (errmsg("starting logical replication worker for subscription \"%s\"",
269  subname)));
270 
271  /* Report this after the initial starting message for consistency. */
272  if (max_replication_slots == 0)
273  ereport(ERROR,
274  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
275  errmsg("cannot start logical replication workers when max_replication_slots = 0")));
276 
277  /*
278  * We need to do the modification of the shared memory under lock so that
279  * we have consistent view.
280  */
281  LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
282 
283 retry:
284  /* Find unused worker slot. */
285  for (i = 0; i < max_logical_replication_workers; i++)
286  {
288 
289  if (!w->in_use)
290  {
291  worker = w;
292  slot = i;
293  break;
294  }
295  }
296 
297  nsyncworkers = logicalrep_sync_worker_count(subid);
298 
299  now = GetCurrentTimestamp();
300 
301  /*
302  * If we didn't find a free slot, try to do garbage collection. The
303  * reason we do this is because if some worker failed to start up and its
304  * parent has crashed while waiting, the in_use state was never cleared.
305  */
306  if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
307  {
308  bool did_cleanup = false;
309 
310  for (i = 0; i < max_logical_replication_workers; i++)
311  {
313 
314  /*
315  * If the worker was marked in use but didn't manage to attach in
316  * time, clean it up.
317  */
318  if (w->in_use && !w->proc &&
321  {
322  elog(WARNING,
323  "logical replication worker for subscription %u took too long to start; canceled",
324  w->subid);
325 
327  did_cleanup = true;
328  }
329  }
330 
331  if (did_cleanup)
332  goto retry;
333  }
334 
335  /*
336  * If we reached the sync worker limit per subscription, just exit
337  * silently as we might get here because of an otherwise harmless race
338  * condition.
339  */
340  if (nsyncworkers >= max_sync_workers_per_subscription)
341  {
342  LWLockRelease(LogicalRepWorkerLock);
343  return;
344  }
345 
346  /*
347  * However if there are no more free worker slots, inform user about it
348  * before exiting.
349  */
350  if (worker == NULL)
351  {
352  LWLockRelease(LogicalRepWorkerLock);
354  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
355  errmsg("out of logical replication worker slots"),
356  errhint("You might need to increase max_logical_replication_workers.")));
357  return;
358  }
359 
360  /* Prepare the worker slot. */
361  worker->launch_time = now;
362  worker->in_use = true;
363  worker->generation++;
364  worker->proc = NULL;
365  worker->dbid = dbid;
366  worker->userid = userid;
367  worker->subid = subid;
368  worker->relid = relid;
369  worker->relstate = SUBREL_STATE_UNKNOWN;
371  worker->last_lsn = InvalidXLogRecPtr;
374  worker->reply_lsn = InvalidXLogRecPtr;
375  TIMESTAMP_NOBEGIN(worker->reply_time);
376 
377  LWLockRelease(LogicalRepWorkerLock);
378 
379  /* Register the new dynamic worker. */
380  memset(&bgw, 0, sizeof(bgw));
384  snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
385  snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
386  if (OidIsValid(relid))
388  "logical replication worker for subscription %u sync %u", subid, relid);
389  else
391  "logical replication worker for subscription %u", subid);
392 
395  bgw.bgw_main_arg = Int32GetDatum(slot);
396 
397  if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
398  {
400  (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
401  errmsg("out of background worker slots"),
402  errhint("You might need to increase max_worker_processes.")));
403  return;
404  }
405 
406  /* Now wait until it attaches. */
407  WaitForReplicationWorkerAttach(worker, bgw_handle);
408 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define DEBUG1
Definition: elog.h:25
int MyProcPid
Definition: globals.c:39
int errhint(const char *fmt,...)
Definition: elog.c:987
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1570
int64 TimestampTz
Definition: timestamp.h:39
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]
Definition: launcher.c:71
TimestampTz last_send_time
XLogRecPtr last_lsn
int bgw_restart_time
Definition: bgworker.h:93
int errcode(int sqlerrcode)
Definition: elog.c:575
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
#define BGWORKER_SHMEM_ACCESS
Definition: bgworker.h:52
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1649
#define OidIsValid(objectId)
Definition: c.h:538
char bgw_function_name[BGW_MAXLEN]
Definition: bgworker.h:95
int wal_receiver_timeout
Definition: walreceiver.c:75
XLogRecPtr relstate_lsn
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
Datum bgw_main_arg
Definition: bgworker.h:96
#define ERROR
Definition: elog.h:43
int max_sync_workers_per_subscription
Definition: launcher.c:61
XLogRecPtr reply_lsn
static void logicalrep_worker_cleanup(LogicalRepWorker *worker)
Definition: launcher.c:596
#define SUBREL_STATE_UNKNOWN
#define BGW_NEVER_RESTART
Definition: bgworker.h:84
#define TIMESTAMP_NOBEGIN(j)
Definition: timestamp.h:112
#define ereport(elevel, rest)
Definition: elog.h:122
#define WARNING
Definition: elog.h:40
int logicalrep_sync_worker_count(Oid subid)
Definition: launcher.c:655
TimestampTz launch_time
int max_replication_slots
Definition: slot.c:99
TimestampTz last_recv_time
#define NULL
Definition: c.h:229
char bgw_name[BGW_MAXLEN]
Definition: bgworker.h:90
#define BGWORKER_BACKEND_DATABASE_CONNECTION
Definition: bgworker.h:59
int max_logical_replication_workers
Definition: launcher.c:60
#define BGW_MAXLEN
Definition: bgworker.h:85
BgWorkerStartTime bgw_start_time
Definition: bgworker.h:92
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, BackgroundWorkerHandle **handle)
Definition: bgworker.c:933
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
static void WaitForReplicationWorkerAttach(LogicalRepWorker *worker, BackgroundWorkerHandle *handle)
Definition: launcher.c:162
#define Int32GetDatum(X)
Definition: postgres.h:485
int errmsg(const char *fmt,...)
Definition: elog.c:797
pid_t bgw_notify_pid
Definition: bgworker.h:98
LogicalRepCtxStruct * LogicalRepCtx
Definition: launcher.c:74
int i
#define elog
Definition: elog.h:219
char bgw_library_name[BGW_MAXLEN]
Definition: bgworker.h:94
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1534
TimestampTz reply_time
void logicalrep_worker_stop ( Oid  subid,
Oid  relid 
)

Definition at line 415 of file launcher.c.

References CHECK_FOR_INTERRUPTS, LogicalRepWorker::generation, LogicalRepWorker::in_use, logicalrep_worker_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), MyLatch, PGPROC::pid, LogicalRepWorker::proc, proc_exit(), ResetLatch(), WAIT_EVENT_BGWORKER_SHUTDOWN, WAIT_EVENT_BGWORKER_STARTUP, WaitLatch(), WL_LATCH_SET, WL_POSTMASTER_DEATH, and WL_TIMEOUT.

Referenced by AlterSubscription_refresh(), and DropSubscription().

416 {
417  LogicalRepWorker *worker;
418  uint16 generation;
419 
420  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
421 
422  worker = logicalrep_worker_find(subid, relid, false);
423 
424  /* No worker, nothing to do. */
425  if (!worker)
426  {
427  LWLockRelease(LogicalRepWorkerLock);
428  return;
429  }
430 
431  /*
432  * Remember which generation was our worker so we can check if what we see
433  * is still the same one.
434  */
435  generation = worker->generation;
436 
437  /*
438  * If we found worker but it does not have proc set it is starting up,
439  * wait for it to finish and then kill it.
440  */
441  while (worker->in_use && !worker->proc)
442  {
443  int rc;
444 
445  LWLockRelease(LogicalRepWorkerLock);
446 
447  /* Wait for signal. */
448  rc = WaitLatch(MyLatch,
451 
452  /* emergency bailout if postmaster has died */
453  if (rc & WL_POSTMASTER_DEATH)
454  proc_exit(1);
455 
456  if (rc & WL_LATCH_SET)
457  {
460  }
461 
462  /* Check worker status. */
463  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
464 
465  /*
466  * Check whether the worker slot is no longer used, which would mean
467  * that the worker has exited, or whether the worker generation is
468  * different, meaning that a different worker has taken the slot.
469  */
470  if (!worker->in_use || worker->generation != generation)
471  {
472  LWLockRelease(LogicalRepWorkerLock);
473  return;
474  }
475 
476  /* Worker has assigned proc, so it has started. */
477  if (worker->proc)
478  break;
479  }
480 
481  /* Now terminate the worker ... */
482  kill(worker->proc->pid, SIGTERM);
483  LWLockRelease(LogicalRepWorkerLock);
484 
485  /* ... and wait for it to die. */
486  for (;;)
487  {
488  int rc;
489 
490  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
491  if (!worker->proc || worker->generation != generation)
492  {
493  LWLockRelease(LogicalRepWorkerLock);
494  break;
495  }
496  LWLockRelease(LogicalRepWorkerLock);
497 
499 
500  /* Wait for more work. */
501  rc = WaitLatch(MyLatch,
502  WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
504 
505  /* emergency bailout if postmaster has died */
506  if (rc & WL_POSTMASTER_DEATH)
507  proc_exit(1);
508 
509  if (rc & WL_LATCH_SET)
510  {
513  }
514  }
515 }
#define WL_TIMEOUT
Definition: latch.h:127
void proc_exit(int code)
Definition: ipc.c:99
void ResetLatch(volatile Latch *latch)
Definition: latch.c:498
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
int WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:336
unsigned short uint16
Definition: c.h:267
#define WL_POSTMASTER_DEATH
Definition: latch.h:128
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
Definition: launcher.c:229
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
struct Latch * MyLatch
Definition: globals.c:52
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:100
int pid
Definition: proc.h:108
#define WL_LATCH_SET
Definition: latch.h:124
void logicalrep_worker_wakeup ( Oid  subid,
Oid  relid 
)

Definition at line 521 of file launcher.c.

References logicalrep_worker_find(), logicalrep_worker_wakeup_ptr(), LW_SHARED, LWLockAcquire(), and LWLockRelease().

Referenced by pg_attribute_noreturn().

522 {
523  LogicalRepWorker *worker;
524 
525  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
526  worker = logicalrep_worker_find(subid, relid, true);
527  LWLockRelease(LogicalRepWorkerLock);
528 
529  if (worker)
531 }
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:537
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
Definition: launcher.c:229
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
void logicalrep_worker_wakeup_ptr ( LogicalRepWorker worker)

Definition at line 537 of file launcher.c.

References LogicalRepWorker::proc, PGPROC::procLatch, and SetLatch().

Referenced by logicalrep_worker_wakeup(), and process_syncing_tables_for_apply().

538 {
539  SetLatch(&worker->proc->procLatch);
540 }
Latch procLatch
Definition: proc.h:103
void SetLatch(volatile Latch *latch)
Definition: latch.c:415
char* LogicalRepSyncTableStart ( XLogRecPtr origin_startpos)

Definition at line 789 of file tablesync.c.

References CommandCounterIncrement(), CommitTransactionCommand(), Subscription::conninfo, copy_table(), CRS_USE_SNAPSHOT, elog, ereport, WalRcvExecResult::err, errdetail(), errmsg(), ERROR, GetSubscriptionRelState(), heap_close, heap_open(), InvalidXLogRecPtr, MyLogicalRepWorker, MySubscription, NAMEDATALEN, NoLock, NULL, Subscription::oid, pgstat_report_stat(), psprintf(), LogicalRepWorker::relid, LogicalRepWorker::relmutex, LogicalRepWorker::relstate, LogicalRepWorker::relstate_lsn, RowExclusiveLock, SetSubscriptionRelState(), Subscription::slotname, SpinLockAcquire, SpinLockRelease, StartTransactionCommand(), StaticAssertStmt, WalRcvExecResult::status, LogicalRepWorker::subid, SUBREL_STATE_CATCHUP, SUBREL_STATE_DATASYNC, SUBREL_STATE_INIT, SUBREL_STATE_READY, SUBREL_STATE_SYNCDONE, SUBREL_STATE_SYNCWAIT, SUBREL_STATE_UNKNOWN, wait_for_worker_state_change(), walrcv_clear_result(), walrcv_connect, walrcv_create_slot, walrcv_exec, WALRCV_OK_COMMAND, and wrconn.

Referenced by ApplyWorkerMain().

790 {
791  char *slotname;
792  char *err;
793  char relstate;
794  XLogRecPtr relstate_lsn;
795 
796  /* Check the state of the table synchronization. */
800  &relstate_lsn, true);
802 
804  MyLogicalRepWorker->relstate = relstate;
805  MyLogicalRepWorker->relstate_lsn = relstate_lsn;
807 
808  /*
809  * To build a slot name for the sync work, we are limited to NAMEDATALEN -
810  * 1 characters. We cut the original slot name to NAMEDATALEN - 28 chars
811  * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0'). (It's actually the
812  * NAMEDATALEN on the remote that matters, but this scheme will also work
813  * reasonably if that is different.)
814  */
815  StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small"); /* for sanity */
816  slotname = psprintf("%.*s_%u_sync_%u",
817  NAMEDATALEN - 28,
821 
822  /*
823  * Here we use the slot name instead of the subscription name as the
824  * application_name, so that it is different from the main apply worker,
825  * so that synchronous replication can distinguish them.
826  */
827  wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
828  if (wrconn == NULL)
829  ereport(ERROR,
830  (errmsg("could not connect to the publisher: %s", err)));
831 
832  switch (MyLogicalRepWorker->relstate)
833  {
834  case SUBREL_STATE_INIT:
836  {
837  Relation rel;
838  WalRcvExecResult *res;
839 
844 
845  /* Update the state and make it visible to others. */
851  true);
853  pgstat_report_stat(false);
854 
855  /*
856  * We want to do the table data sync in a single transaction.
857  */
859 
860  /*
861  * Use a standard write lock here. It might be better to
862  * disallow access to the table while it's being synchronized.
863  * But we don't want to block the main apply process from
864  * working and it has to open the relation in RowExclusiveLock
865  * when remapping remote relation id to local one.
866  */
868 
869  /*
870  * Create a temporary slot for the sync process. We do this
871  * inside the transaction so that we can use the snapshot made
872  * by the slot to get existing data.
873  */
874  res = walrcv_exec(wrconn,
875  "BEGIN READ ONLY ISOLATION LEVEL "
876  "REPEATABLE READ", 0, NULL);
877  if (res->status != WALRCV_OK_COMMAND)
878  ereport(ERROR,
879  (errmsg("table copy could not start transaction on publisher"),
880  errdetail("The error was: %s", res->err)));
881  walrcv_clear_result(res);
882 
883  /*
884  * Create new temporary logical decoding slot.
885  *
886  * We'll use slot for data copy so make sure the snapshot is
887  * used for the transaction; that way the COPY will get data
888  * that is consistent with the lsn used by the slot to start
889  * decoding.
890  */
891  walrcv_create_slot(wrconn, slotname, true,
892  CRS_USE_SNAPSHOT, origin_startpos);
893 
894  copy_table(rel);
895 
896  res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
897  if (res->status != WALRCV_OK_COMMAND)
898  ereport(ERROR,
899  (errmsg("table copy could not finish transaction on publisher"),
900  errdetail("The error was: %s", res->err)));
901  walrcv_clear_result(res);
902 
903  heap_close(rel, NoLock);
904 
905  /* Make the copy visible. */
907 
908  /*
909  * We are done with the initial data synchronization, update
910  * the state.
911  */
914  MyLogicalRepWorker->relstate_lsn = *origin_startpos;
916 
917  /* Wait for main apply worker to tell us to catchup. */
919 
920  /*----------
921  * There are now two possible states here:
922  * a) Sync is behind the apply. If that's the case we need to
923  * catch up with it by consuming the logical replication
924  * stream up to the relstate_lsn. For that, we exit this
925  * function and continue in ApplyWorkerMain().
926  * b) Sync is caught up with the apply. So it can just set
927  * the state to SYNCDONE and finish.
928  *----------
929  */
930  if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn)
931  {
932  /*
933  * Update the new state in catalog. No need to bother
934  * with the shmem state as we are exiting for good.
935  */
939  *origin_startpos,
940  true);
941  finish_sync_worker();
942  }
943  break;
944  }
946  case SUBREL_STATE_READY:
948 
949  /*
950  * Nothing to do here but finish. (UNKNOWN means the relation was
951  * removed from pg_subscription_rel before the sync worker could
952  * start.)
953  */
954  finish_sync_worker();
955  break;
956  default:
957  elog(ERROR, "unknown relation state \"%c\"",
959  }
960 
961  return slotname;
962 }
Subscription * MySubscription
Definition: worker.c:111
#define SUBREL_STATE_INIT
WalReceiverConn * wrconn
Definition: worker.c:109
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define SUBREL_STATE_DATASYNC
void CommitTransactionCommand(void)
Definition: xact.c:2748
char * psprintf(const char *fmt,...)
Definition: psprintf.c:46
static void copy_table(Relation rel)
Definition: tablesync.c:736
#define heap_close(r, l)
Definition: heapam.h:97
#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)
Definition: walreceiver.h:260
#define SUBREL_STATE_SYNCWAIT
XLogRecPtr relstate_lsn
#define NAMEDATALEN
#define StaticAssertStmt(condition, errmessage)
Definition: c.h:757
#define SpinLockAcquire(lock)
Definition: spin.h:62
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:268
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:63
#define ERROR
Definition: elog.h:43
#define SUBREL_STATE_UNKNOWN
#define NoLock
Definition: lockdefs.h:34
#define RowExclusiveLock
Definition: lockdefs.h:38
int errdetail(const char *fmt,...)
Definition: elog.c:873
#define ereport(elevel, rest)
Definition: elog.h:122
#define SpinLockRelease(lock)
Definition: spin.h:64
Oid SetSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool update_only)
#define SUBREL_STATE_SYNCDONE
void CommandCounterIncrement(void)
Definition: xact.c:922
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1284
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
WalRcvExecStatus status
Definition: walreceiver.h:187
void StartTransactionCommand(void)
Definition: xact.c:2678
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn, bool missing_ok)
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define elog
Definition: elog.h:219
static bool wait_for_worker_state_change(char expected_state)
Definition: tablesync.c:219
#define SUBREL_STATE_READY
#define SUBREL_STATE_CATCHUP
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:262
void pgstat_report_stat(bool force)
Definition: pgstat.c:812
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:242
void process_syncing_tables ( XLogRecPtr  current_lsn)

Definition at line 518 of file tablesync.c.

References am_tablesync_worker(), process_syncing_tables_for_apply(), and process_syncing_tables_for_sync().

Referenced by apply_handle_commit(), and LogicalRepApplyLoop().

519 {
520  if (am_tablesync_worker())
521  process_syncing_tables_for_sync(current_lsn);
522  else
524 }
static void process_syncing_tables_for_apply(XLogRecPtr current_lsn)
Definition: tablesync.c:323
static bool am_tablesync_worker(void)
static void process_syncing_tables_for_sync(XLogRecPtr current_lsn)
Definition: tablesync.c:272

Variable Documentation

MemoryContext ApplyContext

Definition at line 107 of file worker.c.

bool in_remote_transaction