PostgreSQL Source Code  git master
tablesync.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * tablesync.c
3  * PostgreSQL logical replication: initial table data synchronization
4  *
5  * Copyright (c) 2012-2022, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/tablesync.c
9  *
10  * NOTES
11  * This file contains code for initial table data synchronization for
12  * logical replication.
13  *
14  * The initial data synchronization is done separately for each table,
15  * in a separate apply worker that only fetches the initial snapshot data
16  * from the publisher and then synchronizes the position in the stream with
17  * the main apply worker.
18  *
19  * There are several reasons for doing the synchronization this way:
20  * - It allows us to parallelize the initial data synchronization
21  * which lowers the time needed for it to happen.
22  * - The initial synchronization does not have to hold the xid and LSN
23  * for the time it takes to copy data of all tables, causing less
24  * bloat and lower disk consumption compared to doing the
25  * synchronization in a single process for the whole database.
26  * - It allows us to synchronize any tables added after the initial
27  * synchronization has finished.
28  *
29  * The stream position synchronization works in multiple steps:
30  * - Apply worker requests a tablesync worker to start, setting the new
31  * table state to INIT.
32  * - Tablesync worker starts; changes table state from INIT to DATASYNC while
33  * copying.
34  * - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
35  * worker specific) state to indicate when the copy phase has completed, so
36  * if the worker crashes with this (non-memory) state then the copy will not
37  * be re-attempted.
38  * - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
39  * - Apply worker periodically checks for tables in SYNCWAIT state. When
40  * any appear, it sets the table state to CATCHUP and starts loop-waiting
41  * until either the table state is set to SYNCDONE or the sync worker
42  * exits.
43  * - After the sync worker has seen the state change to CATCHUP, it will
44  * read the stream and apply changes (acting like an apply worker) until
45  * it catches up to the specified stream position. Then it sets the
46  * state to SYNCDONE. There might be zero changes applied between
47  * CATCHUP and SYNCDONE, because the sync worker might be ahead of the
48  * apply worker.
49  * - Once the state is set to SYNCDONE, the apply will continue tracking
50  * the table until it reaches the SYNCDONE stream position, at which
51  * point it sets state to READY and stops tracking. Again, there might
52  * be zero changes in between.
53  *
54  * So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
55  * -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
56  *
57  * The catalog pg_subscription_rel is used to keep information about
58  * subscribed tables and their state. The catalog holds all states
59  * except SYNCWAIT and CATCHUP which are only in shared memory.
60  *
61  * Example flows look like this:
62  * - Apply is in front:
63  * sync:8
64  * -> set in catalog FINISHEDCOPY
65  * -> set in memory SYNCWAIT
66  * apply:10
67  * -> set in memory CATCHUP
68  * -> enter wait-loop
69  * sync:10
70  * -> set in catalog SYNCDONE
71  * -> exit
72  * apply:10
73  * -> exit wait-loop
74  * -> continue rep
75  * apply:11
76  * -> set in catalog READY
77  *
78  * - Sync is in front:
79  * sync:10
80  * -> set in catalog FINISHEDCOPY
81  * -> set in memory SYNCWAIT
82  * apply:8
83  * -> set in memory CATCHUP
84  * -> continue per-table filtering
85  * sync:10
86  * -> set in catalog SYNCDONE
87  * -> exit
88  * apply:10
89  * -> set in catalog READY
90  * -> stop per-table filtering
91  * -> continue rep
92  *-------------------------------------------------------------------------
93  */
94 
95 #include "postgres.h"
96 
97 #include "access/table.h"
98 #include "access/xact.h"
99 #include "catalog/indexing.h"
101 #include "catalog/pg_type.h"
102 #include "commands/copy.h"
103 #include "miscadmin.h"
104 #include "parser/parse_relation.h"
105 #include "pgstat.h"
108 #include "replication/walreceiver.h"
110 #include "replication/slot.h"
111 #include "replication/origin.h"
112 #include "storage/ipc.h"
113 #include "storage/lmgr.h"
114 #include "utils/acl.h"
115 #include "utils/builtins.h"
116 #include "utils/lsyscache.h"
117 #include "utils/memutils.h"
118 #include "utils/rls.h"
119 #include "utils/snapmgr.h"
120 #include "utils/syscache.h"
121 
122 static bool table_states_valid = false;
124 static bool FetchTableStates(bool *started_tx);
125 
127 
128 /*
129  * Exit routine for synchronization worker.
130  */
131 static void
133 finish_sync_worker(void)
134 {
135  /*
136  * Commit any outstanding transaction. This is the usual case, unless
137  * there was nothing to do for the table.
138  */
139  if (IsTransactionState())
140  {
142  pgstat_report_stat(false);
143  }
144 
145  /* And flush all writes. */
147 
149  ereport(LOG,
150  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
154 
155  /* Find the main apply worker and signal it. */
157 
158  /* Stop gracefully */
159  proc_exit(0);
160 }
161 
162 /*
163  * Wait until the relation sync state is set in the catalog to the expected
164  * one; return true when it happens.
165  *
166  * Returns false if the table sync worker or the table itself have
167  * disappeared, or the table state has been reset.
168  *
169  * Currently, this is used in the apply worker when transitioning from
170  * CATCHUP state to SYNCDONE.
171  */
172 static bool
173 wait_for_relation_state_change(Oid relid, char expected_state)
174 {
175  char state;
176 
177  for (;;)
178  {
179  LogicalRepWorker *worker;
180  XLogRecPtr statelsn;
181 
183 
186  relid, &statelsn);
187 
188  if (state == SUBREL_STATE_UNKNOWN)
189  break;
190 
191  if (state == expected_state)
192  return true;
193 
194  /* Check if the sync worker is still running and bail if not. */
195  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
197  false);
198  LWLockRelease(LogicalRepWorkerLock);
199  if (!worker)
200  break;
201 
202  (void) WaitLatch(MyLatch,
205 
207  }
208 
209  return false;
210 }
211 
212 /*
213  * Wait until the apply worker changes the state of our synchronization
214  * worker to the expected one.
215  *
216  * Used when transitioning from SYNCWAIT state to CATCHUP.
217  *
218  * Returns false if the apply worker has disappeared.
219  */
220 static bool
221 wait_for_worker_state_change(char expected_state)
222 {
223  int rc;
224 
225  for (;;)
226  {
227  LogicalRepWorker *worker;
228 
230 
231  /*
232  * Done if already in correct state. (We assume this fetch is atomic
233  * enough to not give a misleading answer if we do it with no lock.)
234  */
235  if (MyLogicalRepWorker->relstate == expected_state)
236  return true;
237 
238  /*
239  * Bail out if the apply worker has died, else signal it we're
240  * waiting.
241  */
242  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
244  InvalidOid, false);
245  if (worker && worker->proc)
247  LWLockRelease(LogicalRepWorkerLock);
248  if (!worker)
249  break;
250 
251  /*
252  * Wait. We expect to get a latch signal back from the apply worker,
253  * but use a timeout in case it dies without sending one.
254  */
255  rc = WaitLatch(MyLatch,
258 
259  if (rc & WL_LATCH_SET)
261  }
262 
263  return false;
264 }
265 
266 /*
267  * Callback from syscache invalidation.
268  */
269 void
271 {
272  table_states_valid = false;
273 }
274 
275 /*
276  * Handle table synchronization cooperation from the synchronization
277  * worker.
278  *
279  * If the sync worker is in CATCHUP state and reached (or passed) the
280  * predetermined synchronization point in the WAL stream, mark the table as
281  * SYNCDONE and finish.
282  */
283 static void
285 {
287 
288  if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
289  current_lsn >= MyLogicalRepWorker->relstate_lsn)
290  {
291  TimeLineID tli;
292  char syncslotname[NAMEDATALEN] = {0};
293 
294  MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
295  MyLogicalRepWorker->relstate_lsn = current_lsn;
296 
298 
299  /*
300  * UpdateSubscriptionRelState must be called within a transaction.
301  * That transaction will be ended within the finish_sync_worker().
302  */
303  if (!IsTransactionState())
305 
310 
311  /*
312  * End streaming so that LogRepWorkerWalRcvConn can be used to drop
313  * the slot.
314  */
316 
317  /*
318  * Cleanup the tablesync slot.
319  *
320  * This has to be done after updating the state because otherwise if
321  * there is an error while doing the database operations we won't be
322  * able to rollback dropped slot.
323  */
326  syncslotname,
327  sizeof(syncslotname));
328 
329  /*
330  * It is important to give an error if we are unable to drop the slot,
331  * otherwise, it won't be dropped till the corresponding subscription
332  * is dropped. So passing missing_ok = false.
333  */
335 
336  finish_sync_worker();
337  }
338  else
340 }
341 
342 /*
343  * Handle table synchronization cooperation from the apply worker.
344  *
345  * Walk over all subscription tables that are individually tracked by the
346  * apply process (currently, all that have state other than
347  * SUBREL_STATE_READY) and manage synchronization for them.
348  *
349  * If there are tables that need synchronizing and are not being synchronized
350  * yet, start sync workers for them (if there are free slots for sync
351  * workers). To prevent starting the sync worker for the same relation at a
352  * high frequency after a failure, we store its last start time with each sync
353  * state info. We start the sync worker for the same relation after waiting
354  * at least wal_retrieve_retry_interval.
355  *
356  * For tables that are being synchronized already, check if sync workers
357  * either need action from the apply worker or have finished. This is the
358  * SYNCWAIT to CATCHUP transition.
359  *
360  * If the synchronization position is reached (SYNCDONE), then the table can
361  * be marked as READY and is no longer tracked.
362  */
363 static void
365 {
366  struct tablesync_start_time_mapping
367  {
368  Oid relid;
369  TimestampTz last_start_time;
370  };
371  static HTAB *last_start_times = NULL;
372  ListCell *lc;
373  bool started_tx = false;
374 
376 
377  /* We need up-to-date sync state info for subscription tables here. */
378  FetchTableStates(&started_tx);
379 
380  /*
381  * Prepare a hash table for tracking last start times of workers, to avoid
382  * immediate restarts. We don't need it if there are no tables that need
383  * syncing.
384  */
385  if (table_states_not_ready && !last_start_times)
386  {
387  HASHCTL ctl;
388 
389  ctl.keysize = sizeof(Oid);
390  ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
391  last_start_times = hash_create("Logical replication table sync worker start times",
392  256, &ctl, HASH_ELEM | HASH_BLOBS);
393  }
394 
395  /*
396  * Clean up the hash table when we're done with all tables (just to
397  * release the bit of memory).
398  */
399  else if (!table_states_not_ready && last_start_times)
400  {
401  hash_destroy(last_start_times);
402  last_start_times = NULL;
403  }
404 
405  /*
406  * Even when the two_phase mode is requested by the user, it remains as
407  * 'pending' until all tablesyncs have reached READY state.
408  *
409  * When this happens, we restart the apply worker and (if the conditions
410  * are still ok) then the two_phase tri-state will become 'enabled' at
411  * that time.
412  *
413  * Note: If the subscription has no tables then leave the state as
414  * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
415  * work.
416  */
419  {
420  ereport(LOG,
421  (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
422  MySubscription->name)));
423 
424  proc_exit(0);
425  }
426 
427  /*
428  * Process all tables that are being synchronized.
429  */
430  foreach(lc, table_states_not_ready)
431  {
433 
434  if (rstate->state == SUBREL_STATE_SYNCDONE)
435  {
436  /*
437  * Apply has caught up to the position where the table sync has
438  * finished. Mark the table as ready so that the apply will just
439  * continue to replicate it normally.
440  */
441  if (current_lsn >= rstate->lsn)
442  {
443  char originname[NAMEDATALEN];
444 
445  rstate->state = SUBREL_STATE_READY;
446  rstate->lsn = current_lsn;
447  if (!started_tx)
448  {
450  started_tx = true;
451  }
452 
453  /*
454  * Remove the tablesync origin tracking if exists.
455  *
456  * The normal case origin drop is done here instead of in the
457  * process_syncing_tables_for_sync function because we don't
458  * allow to drop the origin till the process owning the origin
459  * is alive.
460  *
461  * There is a chance that the user is concurrently performing
462  * refresh for the subscription where we remove the table
463  * state and its origin and by this time the origin might be
464  * already removed. So passing missing_ok = true.
465  */
467  rstate->relid,
468  originname,
469  sizeof(originname));
470  replorigin_drop_by_name(originname, true, false);
471 
472  /*
473  * Update the state to READY only after the origin cleanup.
474  */
476  rstate->relid, rstate->state,
477  rstate->lsn);
478  }
479  }
480  else
481  {
482  LogicalRepWorker *syncworker;
483 
484  /*
485  * Look for a sync worker for this relation.
486  */
487  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
488 
490  rstate->relid, false);
491 
492  if (syncworker)
493  {
494  /* Found one, update our copy of its state */
495  SpinLockAcquire(&syncworker->relmutex);
496  rstate->state = syncworker->relstate;
497  rstate->lsn = syncworker->relstate_lsn;
498  if (rstate->state == SUBREL_STATE_SYNCWAIT)
499  {
500  /*
501  * Sync worker is waiting for apply. Tell sync worker it
502  * can catchup now.
503  */
504  syncworker->relstate = SUBREL_STATE_CATCHUP;
505  syncworker->relstate_lsn =
506  Max(syncworker->relstate_lsn, current_lsn);
507  }
508  SpinLockRelease(&syncworker->relmutex);
509 
510  /* If we told worker to catch up, wait for it. */
511  if (rstate->state == SUBREL_STATE_SYNCWAIT)
512  {
513  /* Signal the sync worker, as it may be waiting for us. */
514  if (syncworker->proc)
515  logicalrep_worker_wakeup_ptr(syncworker);
516 
517  /* Now safe to release the LWLock */
518  LWLockRelease(LogicalRepWorkerLock);
519 
520  /*
521  * Enter busy loop and wait for synchronization worker to
522  * reach expected state (or die trying).
523  */
524  if (!started_tx)
525  {
527  started_tx = true;
528  }
529 
531  SUBREL_STATE_SYNCDONE);
532  }
533  else
534  LWLockRelease(LogicalRepWorkerLock);
535  }
536  else
537  {
538  /*
539  * If there is no sync worker for this table yet, count
540  * running sync workers for this subscription, while we have
541  * the lock.
542  */
543  int nsyncworkers =
545 
546  /* Now safe to release the LWLock */
547  LWLockRelease(LogicalRepWorkerLock);
548 
549  /*
550  * If there are free sync worker slot(s), start a new sync
551  * worker for the table.
552  */
553  if (nsyncworkers < max_sync_workers_per_subscription)
554  {
556  struct tablesync_start_time_mapping *hentry;
557  bool found;
558 
559  hentry = hash_search(last_start_times, &rstate->relid,
560  HASH_ENTER, &found);
561 
562  if (!found ||
563  TimestampDifferenceExceeds(hentry->last_start_time, now,
565  {
570  rstate->relid);
571  hentry->last_start_time = now;
572  }
573  }
574  }
575  }
576  }
577 
578  if (started_tx)
579  {
581  pgstat_report_stat(false);
582  }
583 }
584 
585 /*
586  * Process possible state change(s) of tables that are being synchronized.
587  */
588 void
590 {
591  if (am_tablesync_worker())
592  process_syncing_tables_for_sync(current_lsn);
593  else
595 }
596 
597 /*
598  * Create list of columns for COPY based on logical relation mapping.
599  */
600 static List *
602 {
603  List *attnamelist = NIL;
604  int i;
605 
606  for (i = 0; i < rel->remoterel.natts; i++)
607  {
608  attnamelist = lappend(attnamelist,
609  makeString(rel->remoterel.attnames[i]));
610  }
611 
612 
613  return attnamelist;
614 }
615 
616 /*
617  * Data source callback for the COPY FROM, which reads from the remote
618  * connection and passes the data back to our local COPY.
619  */
620 static int
621 copy_read_data(void *outbuf, int minread, int maxread)
622 {
623  int bytesread = 0;
624  int avail;
625 
626  /* If there are some leftover data from previous read, use it. */
627  avail = copybuf->len - copybuf->cursor;
628  if (avail)
629  {
630  if (avail > maxread)
631  avail = maxread;
632  memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
633  copybuf->cursor += avail;
634  maxread -= avail;
635  bytesread += avail;
636  }
637 
638  while (maxread > 0 && bytesread < minread)
639  {
641  int len;
642  char *buf = NULL;
643 
644  for (;;)
645  {
646  /* Try read the data. */
648 
650 
651  if (len == 0)
652  break;
653  else if (len < 0)
654  return bytesread;
655  else
656  {
657  /* Process the data */
658  copybuf->data = buf;
659  copybuf->len = len;
660  copybuf->cursor = 0;
661 
662  avail = copybuf->len - copybuf->cursor;
663  if (avail > maxread)
664  avail = maxread;
665  memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
666  outbuf = (void *) ((char *) outbuf + avail);
667  copybuf->cursor += avail;
668  maxread -= avail;
669  bytesread += avail;
670  }
671 
672  if (maxread <= 0 || bytesread >= minread)
673  return bytesread;
674  }
675 
676  /*
677  * Wait for more data or latch.
678  */
679  (void) WaitLatchOrSocket(MyLatch,
683 
685  }
686 
687  return bytesread;
688 }
689 
690 
691 /*
692  * Get information about remote relation in similar fashion the RELATION
693  * message provides during replication.
694  */
695 static void
696 fetch_remote_table_info(char *nspname, char *relname,
697  LogicalRepRelation *lrel)
698 {
700  StringInfoData cmd;
701  TupleTableSlot *slot;
702  Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
703  Oid attrRow[] = {TEXTOID, OIDOID, BOOLOID};
704  bool isnull;
705  int natt;
706 
707  lrel->nspname = nspname;
708  lrel->relname = relname;
709 
710  /* First fetch Oid and replica identity. */
711  initStringInfo(&cmd);
712  appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
713  " FROM pg_catalog.pg_class c"
714  " INNER JOIN pg_catalog.pg_namespace n"
715  " ON (c.relnamespace = n.oid)"
716  " WHERE n.nspname = %s"
717  " AND c.relname = %s",
718  quote_literal_cstr(nspname),
721  lengthof(tableRow), tableRow);
722 
723  if (res->status != WALRCV_OK_TUPLES)
724  ereport(ERROR,
725  (errcode(ERRCODE_CONNECTION_FAILURE),
726  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
727  nspname, relname, res->err)));
728 
729  slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
730  if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
731  ereport(ERROR,
732  (errcode(ERRCODE_UNDEFINED_OBJECT),
733  errmsg("table \"%s.%s\" not found on publisher",
734  nspname, relname)));
735 
736  lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
737  Assert(!isnull);
738  lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
739  Assert(!isnull);
740  lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
741  Assert(!isnull);
742 
745 
746  /* Now fetch columns. */
747  resetStringInfo(&cmd);
748  appendStringInfo(&cmd,
749  "SELECT a.attname,"
750  " a.atttypid,"
751  " a.attnum = ANY(i.indkey)"
752  " FROM pg_catalog.pg_attribute a"
753  " LEFT JOIN pg_catalog.pg_index i"
754  " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
755  " WHERE a.attnum > 0::pg_catalog.int2"
756  " AND NOT a.attisdropped %s"
757  " AND a.attrelid = %u"
758  " ORDER BY a.attnum",
759  lrel->remoteid,
761  "AND a.attgenerated = ''" : ""),
762  lrel->remoteid);
764  lengthof(attrRow), attrRow);
765 
766  if (res->status != WALRCV_OK_TUPLES)
767  ereport(ERROR,
768  (errcode(ERRCODE_CONNECTION_FAILURE),
769  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
770  nspname, relname, res->err)));
771 
772  /* We don't know the number of rows coming, so allocate enough space. */
773  lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
774  lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
775  lrel->attkeys = NULL;
776 
777  natt = 0;
778  slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
779  while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
780  {
781  lrel->attnames[natt] =
782  TextDatumGetCString(slot_getattr(slot, 1, &isnull));
783  Assert(!isnull);
784  lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
785  Assert(!isnull);
786  if (DatumGetBool(slot_getattr(slot, 3, &isnull)))
787  lrel->attkeys = bms_add_member(lrel->attkeys, natt);
788 
789  /* Should never happen. */
790  if (++natt >= MaxTupleAttributeNumber)
791  elog(ERROR, "too many columns in remote table \"%s.%s\"",
792  nspname, relname);
793 
794  ExecClearTuple(slot);
795  }
797 
798  lrel->natts = natt;
799 
801  pfree(cmd.data);
802 }
803 
804 /*
805  * Copy existing data of a table from publisher.
806  *
807  * Caller is responsible for locking the local relation.
808  */
809 static void
811 {
812  LogicalRepRelMapEntry *relmapentry;
813  LogicalRepRelation lrel;
815  StringInfoData cmd;
816  CopyFromState cstate;
817  List *attnamelist;
818  ParseState *pstate;
819 
820  /* Get the publisher relation info. */
822  RelationGetRelationName(rel), &lrel);
823 
824  /* Put the relation into relmap. */
826 
827  /* Map the publisher relation to local one. */
828  relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
829  Assert(rel == relmapentry->localrel);
830 
831  /* Start copy on the publisher. */
832  initStringInfo(&cmd);
833  if (lrel.relkind == RELKIND_RELATION)
834  appendStringInfo(&cmd, "COPY %s TO STDOUT",
836  else
837  {
838  /*
839  * For non-tables, we need to do COPY (SELECT ...), but we can't just
840  * do SELECT * because we need to not copy generated columns.
841  */
842  appendStringInfoString(&cmd, "COPY (SELECT ");
843  for (int i = 0; i < lrel.natts; i++)
844  {
846  if (i < lrel.natts - 1)
847  appendStringInfoString(&cmd, ", ");
848  }
849  appendStringInfo(&cmd, " FROM %s) TO STDOUT",
851  }
852  res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
853  pfree(cmd.data);
854  if (res->status != WALRCV_OK_COPY_OUT)
855  ereport(ERROR,
856  (errcode(ERRCODE_CONNECTION_FAILURE),
857  errmsg("could not start initial contents copy for table \"%s.%s\": %s",
858  lrel.nspname, lrel.relname, res->err)));
860 
862 
863  pstate = make_parsestate(NULL);
865  NULL, false, false);
866 
867  attnamelist = make_copy_attnamelist(relmapentry);
868  cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, NIL);
869 
870  /* Do the copy */
871  (void) CopyFrom(cstate);
872 
873  logicalrep_rel_close(relmapentry, NoLock);
874 }
875 
876 /*
877  * Determine the tablesync slot name.
878  *
879  * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
880  * on slot name length. We append system_identifier to avoid slot_name
881  * collision with subscriptions in other clusters. With the current scheme
882  * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
883  * length of slot_name will be 50.
884  *
885  * The returned slot name is stored in the supplied buffer (syncslotname) with
886  * the given size.
887  *
888  * Note: We don't use the subscription slot name as part of tablesync slot name
889  * because we are responsible for cleaning up these slots and it could become
890  * impossible to recalculate what name to cleanup if the subscription slot name
891  * had changed.
892  */
893 void
895  char *syncslotname, int szslot)
896 {
897  snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
898  relid, GetSystemIdentifier());
899 }
900 
901 /*
902  * Form the origin name for tablesync.
903  *
904  * Return the name in the supplied buffer.
905  */
906 void
908  char *originname, int szorgname)
909 {
910  snprintf(originname, szorgname, "pg_%u_%u", suboid, relid);
911 }
912 
913 /*
914  * Start syncing the table in the sync worker.
915  *
916  * If nothing needs to be done to sync the table, we exit the worker without
917  * any further action.
918  *
919  * The returned slot name is palloc'ed in current memory context.
920  */
921 char *
923 {
924  char *slotname;
925  char *err;
926  char relstate;
927  XLogRecPtr relstate_lsn;
928  Relation rel;
929  AclResult aclresult;
931  char originname[NAMEDATALEN];
932  RepOriginId originid;
933 
934  /* Check the state of the table synchronization. */
938  &relstate_lsn);
940 
942  MyLogicalRepWorker->relstate = relstate;
943  MyLogicalRepWorker->relstate_lsn = relstate_lsn;
945 
946  /*
947  * If synchronization is already done or no longer necessary, exit now
948  * that we've updated shared memory state.
949  */
950  switch (relstate)
951  {
952  case SUBREL_STATE_SYNCDONE:
953  case SUBREL_STATE_READY:
954  case SUBREL_STATE_UNKNOWN:
955  finish_sync_worker(); /* doesn't return */
956  }
957 
958  /* Calculate the name of the tablesync slot. */
959  slotname = (char *) palloc(NAMEDATALEN);
962  slotname,
963  NAMEDATALEN);
964 
965  /*
966  * Here we use the slot name instead of the subscription name as the
967  * application_name, so that it is different from the main apply worker,
968  * so that synchronous replication can distinguish them.
969  */
971  walrcv_connect(MySubscription->conninfo, true, slotname, &err);
972  if (LogRepWorkerWalRcvConn == NULL)
973  ereport(ERROR,
974  (errcode(ERRCODE_CONNECTION_FAILURE),
975  errmsg("could not connect to the publisher: %s", err)));
976 
977  Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
978  MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
979  MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
980 
981  /* Assign the origin tracking record name. */
984  originname,
985  sizeof(originname));
986 
987  if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
988  {
989  /*
990  * We have previously errored out before finishing the copy so the
991  * replication slot might exist. We want to remove the slot if it
992  * already exists and proceed.
993  *
994  * XXX We could also instead try to drop the slot, last time we failed
995  * but for that, we might need to clean up the copy state as it might
996  * be in the middle of fetching the rows. Also, if there is a network
997  * breakdown then it wouldn't have succeeded so trying it next time
998  * seems like a better bet.
999  */
1001  }
1002  else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
1003  {
1004  /*
1005  * The COPY phase was previously done, but tablesync then crashed
1006  * before it was able to finish normally.
1007  */
1009 
1010  /*
1011  * The origin tracking name must already exist. It was created first
1012  * time this tablesync was launched.
1013  */
1014  originid = replorigin_by_name(originname, false);
1015  replorigin_session_setup(originid);
1016  replorigin_session_origin = originid;
1017  *origin_startpos = replorigin_session_get_progress(false);
1018 
1020 
1021  goto copy_table_done;
1022  }
1023 
1025  MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
1028 
1029  /* Update the state and make it visible to others. */
1036  pgstat_report_stat(false);
1037 
1039 
1040  /*
1041  * Use a standard write lock here. It might be better to disallow access
1042  * to the table while it's being synchronized. But we don't want to block
1043  * the main apply process from working and it has to open the relation in
1044  * RowExclusiveLock when remapping remote relation id to local one.
1045  */
1047 
1048  /*
1049  * Check that our table sync worker has permission to insert into the
1050  * target table.
1051  */
1052  aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
1053  ACL_INSERT);
1054  if (aclresult != ACLCHECK_OK)
1055  aclcheck_error(aclresult,
1056  get_relkind_objtype(rel->rd_rel->relkind),
1058 
1059  /*
1060  * COPY FROM does not honor RLS policies. That is not a problem for
1061  * subscriptions owned by roles with BYPASSRLS privilege (or superuser, who
1062  * has it implicitly), but other roles should not be able to circumvent
1063  * RLS. Disallow logical replication into RLS enabled relations for such
1064  * roles.
1065  */
1067  ereport(ERROR,
1068  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1069  errmsg("\"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1070  GetUserNameFromId(GetUserId(), true),
1071  RelationGetRelationName(rel))));
1072 
1073  /*
1074  * Start a transaction in the remote node in REPEATABLE READ mode. This
1075  * ensures that both the replication slot we create (see below) and the
1076  * COPY are consistent with each other.
1077  */
1079  "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1080  0, NULL);
1081  if (res->status != WALRCV_OK_COMMAND)
1082  ereport(ERROR,
1083  (errcode(ERRCODE_CONNECTION_FAILURE),
1084  errmsg("table copy could not start transaction on publisher: %s",
1085  res->err)));
1087 
1088  /*
1089  * Create a new permanent logical decoding slot. This slot will be used
1090  * for the catchup phase after COPY is done, so tell it to use the
1091  * snapshot to make the final data consistent.
1092  *
1093  * Prevent cancel/die interrupts while creating slot here because it is
1094  * possible that before the server finishes this command, a concurrent
1095  * drop subscription happens which would complete without removing this
1096  * slot leading to a dangling slot on the server.
1097  */
1098  HOLD_INTERRUPTS();
1100  slotname, false /* permanent */ , false /* two_phase */ ,
1101  CRS_USE_SNAPSHOT, origin_startpos);
1103 
1104  /*
1105  * Setup replication origin tracking. The purpose of doing this before the
1106  * copy is to avoid doing the copy again due to any error in setting up
1107  * origin tracking.
1108  */
1109  originid = replorigin_by_name(originname, true);
1110  if (!OidIsValid(originid))
1111  {
1112  /*
1113  * Origin tracking does not exist, so create it now.
1114  *
1115  * Then advance to the LSN got from walrcv_create_slot. This is WAL
1116  * logged for the purpose of recovery. Locks are to prevent the
1117  * replication origin from vanishing while advancing.
1118  */
1119  originid = replorigin_create(originname);
1120 
1121  LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1122  replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
1123  true /* go backward */ , true /* WAL log */ );
1124  UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1125 
1126  replorigin_session_setup(originid);
1127  replorigin_session_origin = originid;
1128  }
1129  else
1130  {
1131  ereport(ERROR,
1133  errmsg("replication origin \"%s\" already exists",
1134  originname)));
1135  }
1136 
1137  /* Now do the initial data copy */
1139  copy_table(rel);
1141 
1142  res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
1143  if (res->status != WALRCV_OK_COMMAND)
1144  ereport(ERROR,
1145  (errcode(ERRCODE_CONNECTION_FAILURE),
1146  errmsg("table copy could not finish transaction on publisher: %s",
1147  res->err)));
1149 
1150  table_close(rel, NoLock);
1151 
1152  /* Make the copy visible. */
1154 
1155  /*
1156  * Update the persisted state to indicate the COPY phase is done; make it
1157  * visible to others.
1158  */
1161  SUBREL_STATE_FINISHEDCOPY,
1163 
1165 
1166 copy_table_done:
1167 
1168  elog(DEBUG1,
1169  "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
1170  originname, LSN_FORMAT_ARGS(*origin_startpos));
1171 
1172  /*
1173  * We are done with the initial data synchronization, update the state.
1174  */
1176  MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
1177  MyLogicalRepWorker->relstate_lsn = *origin_startpos;
1179 
1180  /*
1181  * Finally, wait until the main apply worker tells us to catch up and then
1182  * return to let LogicalRepApplyLoop do it.
1183  */
1184  wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
1185  return slotname;
1186 }
1187 
1188 /*
1189  * Common code to fetch the up-to-date sync state info into the static lists.
1190  *
1191  * Returns true if subscription has 1 or more tables, else false.
1192  *
1193  * Note: If this function started the transaction (indicated by the parameter)
1194  * then it is the caller's responsibility to commit it.
1195  */
1196 static bool
1197 FetchTableStates(bool *started_tx)
1198 {
1199  static bool has_subrels = false;
1200 
1201  *started_tx = false;
1202 
1203  if (!table_states_valid)
1204  {
1205  MemoryContext oldctx;
1206  List *rstates;
1207  ListCell *lc;
1208  SubscriptionRelState *rstate;
1209 
1210  /* Clean the old lists. */
1213 
1214  if (!IsTransactionState())
1215  {
1217  *started_tx = true;
1218  }
1219 
1220  /* Fetch all non-ready tables. */
1222 
1223  /* Allocate the tracking info in a permanent memory context. */
1225  foreach(lc, rstates)
1226  {
1227  rstate = palloc(sizeof(SubscriptionRelState));
1228  memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
1230  }
1231  MemoryContextSwitchTo(oldctx);
1232 
1233  /*
1234  * Does the subscription have tables?
1235  *
1236  * If there were not-READY relations found then we know it does. But
1237  * if table_state_not_ready was empty we still need to check again to
1238  * see if there are 0 tables.
1239  */
1240  has_subrels = (list_length(table_states_not_ready) > 0) ||
1242 
1243  table_states_valid = true;
1244  }
1245 
1246  return has_subrels;
1247 }
1248 
1249 /*
1250  * If the subscription has no tables then return false.
1251  *
1252  * Otherwise, are all tablesyncs READY?
1253  *
1254  * Note: This function is not suitable to be called from outside of apply or
1255  * tablesync workers because MySubscription needs to be already initialized.
1256  */
1257 bool
1259 {
1260  bool started_tx = false;
1261  bool has_subrels = false;
1262 
1263  /* We need up-to-date sync state info for subscription tables here. */
1264  has_subrels = FetchTableStates(&started_tx);
1265 
1266  if (started_tx)
1267  {
1269  pgstat_report_stat(false);
1270  }
1271 
1272  /*
1273  * Return false when there are no tables in subscription or not all tables
1274  * are in ready state; true otherwise.
1275  */
1276  return has_subrels && list_length(table_states_not_ready) == 0;
1277 }
1278 
1279 /*
1280  * Update the two_phase state of the specified subscription in pg_subscription.
1281  */
1282 void
1283 UpdateTwoPhaseState(Oid suboid, char new_state)
1284 {
1285  Relation rel;
1286  HeapTuple tup;
1287  bool nulls[Natts_pg_subscription];
1288  bool replaces[Natts_pg_subscription];
1289  Datum values[Natts_pg_subscription];
1290 
1292  new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1293  new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1294 
1295  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1297  if (!HeapTupleIsValid(tup))
1298  elog(ERROR,
1299  "cache lookup failed for subscription oid %u",
1300  suboid);
1301 
1302  /* Form a new tuple. */
1303  memset(values, 0, sizeof(values));
1304  memset(nulls, false, sizeof(nulls));
1305  memset(replaces, false, sizeof(replaces));
1306 
1307  /* And update/set two_phase state */
1308  values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
1309  replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1310 
1311  tup = heap_modify_tuple(tup, RelationGetDescr(rel),
1312  values, nulls, replaces);
1313  CatalogTupleUpdate(rel, &tup->t_self, tup);
1314 
1315  heap_freetuple(tup);
1317 }
AclResult
Definition: acl.h:178
@ ACLCHECK_OK
Definition: acl.h:179
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3308
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4682
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:247
Subscription * MySubscription
Definition: worker.c:249
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1711
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1580
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1544
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:738
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define TextDatumGetCString(d)
Definition: builtins.h:86
unsigned int uint32
Definition: c.h:441
#define Max(x, y)
Definition: c.h:980
#define UINT64_FORMAT
Definition: c.h:484
#define lengthof(array)
Definition: c.h:734
#define OidIsValid(objectId)
Definition: c.h:710
CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
Definition: copyfrom.c:1186
uint64 CopyFrom(CopyFromState cstate)
Definition: copyfrom.c:525
int64 TimestampTz
Definition: timestamp.h:39
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:862
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
int errcode(int sqlerrcode)
Definition: elog.c:693
int errmsg(const char *fmt,...)
Definition: elog.c:904
#define LOG
Definition: elog.h:25
#define DEBUG1
Definition: elog.h:24
#define ERROR
Definition: elog.h:33
#define elog(elevel,...)
Definition: elog.h:218
#define ereport(elevel,...)
Definition: elog.h:143
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1254
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:85
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1238
struct Latch * MyLatch
Definition: globals.c:57
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:1113
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define MaxTupleAttributeNumber
Definition: htup_details.h:33
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:301
void proc_exit(int code)
Definition: ipc.c:104
int i
Definition: isn.c:73
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:500
void ResetLatch(Latch *latch)
Definition: latch.c:660
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:452
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define WL_TIMEOUT
Definition: latch.h:128
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define WL_LATCH_SET
Definition: latch.h:125
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
Definition: launcher.c:215
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:554
void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid)
Definition: launcher.c:266
void logicalrep_worker_wakeup(Oid subid, Oid relid)
Definition: launcher.c:534
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:57
int max_sync_workers_per_subscription
Definition: launcher.c:55
int logicalrep_sync_worker_count(Oid subid)
Definition: launcher.c:664
Assert(fmt[strlen(fmt) - 1] !='\n')
List * lappend(List *list, void *datum)
Definition: list.c:336
void list_free_deep(List *list)
Definition: list.c:1519
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:200
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:109
#define NoLock
Definition: lockdefs.h:34
#define AccessShareLock
Definition: lockdefs.h:36
#define RowExclusiveLock
Definition: lockdefs.h:38
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3316
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1899
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1199
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
@ LW_SHARED
Definition: lwlock.h:105
void pfree(void *pointer)
Definition: mcxt.c:1169
void * palloc0(Size size)
Definition: mcxt.c:1093
MemoryContext CacheMemoryContext
Definition: mcxt.c:51
void * palloc(Size size)
Definition: mcxt.c:1062
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:133
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:131
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition: miscinit.c:910
Oid GetUserId(void)
Definition: miscinit.c:495
ObjectType get_relkind_objtype(char relkind)
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:209
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:240
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1071
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition: origin.c:414
RepOriginId replorigin_session_origin
Definition: origin.c:154
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:872
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1206
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
ParseState * make_parsestate(ParseState *parentParseState)
Definition: parse_node.c:44
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, int lockmode, Alias *alias, bool inh, bool inFromCl)
#define ACL_INSERT
Definition: parsenodes.h:82
void * arg
NameData relname
Definition: pg_class.h:38
#define NAMEDATALEN
const void size_t len
#define lfirst(lc)
Definition: pg_list.h:169
static int list_length(const List *l)
Definition: pg_list.h:149
#define NIL
Definition: pg_list.h:65
List * GetSubscriptionNotReadyRelations(Oid subid)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
bool HasSubscriptionRelations(Oid subid)
#define LOGICALREP_TWOPHASE_STATE_DISABLED
#define LOGICALREP_TWOPHASE_STATE_PENDING
#define LOGICALREP_TWOPHASE_STATE_ENABLED
static char * buf
Definition: pg_test_fsync.c:70
void pgstat_report_stat(bool disconnect)
Definition: pgstat.c:861
int pgsocket
Definition: port.h:29
#define snprintf
Definition: port.h:225
#define PGINVALID_SOCKET
Definition: port.h:31
#define DatumGetObjectId(X)
Definition: postgres.h:544
uintptr_t Datum
Definition: postgres.h:411
#define DatumGetBool(X)
Definition: postgres.h:437
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
#define DatumGetChar(X)
Definition: postgres.h:453
#define CharGetDatum(X)
Definition: postgres.h:460
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
static int fd(const char *x, int i)
Definition: preproc-init.c:105
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:102
#define RelationGetRelid(relation)
Definition: rel.h:478
#define RelationGetDescr(relation)
Definition: rel.h:504
#define RelationGetRelationName(relation)
Definition: rel.h:512
#define RelationGetNamespace(relation)
Definition: rel.h:519
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition: rls.c:52
@ RLS_ENABLED
Definition: rls.h:45
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:11440
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:11524
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:250
void PopActiveSnapshot(void)
Definition: snapmgr.c:774
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:680
void InvalidateCatalogSnapshot(void)
Definition: snapmgr.c:456
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:157
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:258
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:433
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:32
StringInfo makeStringInfo(void)
Definition: stringinfo.c:41
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
Definition: dynahash.c:220
ItemPointerData t_self
Definition: htup.h:65
Definition: pg_list.h:51
LogicalRepRelation remoterel
LogicalRepRelId remoteid
Definition: logicalproto.h:101
Bitmapset * attkeys
Definition: logicalproto.h:109
XLogRecPtr relstate_lsn
Form_pg_class rd_rel
Definition: rel.h:109
Definition: regguts.h:318
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:177
@ SUBSCRIPTIONOID
Definition: syscache.h:97
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
static List * table_states_not_ready
Definition: tablesync.c:123
bool AllTablesyncsReady(void)
Definition: tablesync.c:1258
static bool wait_for_worker_state_change(char expected_state)
Definition: tablesync.c:221
static bool table_states_valid
Definition: tablesync.c:122
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:270
static void pg_attribute_noreturn() finish_sync_worker(void)
Definition: tablesync.c:132
static void process_syncing_tables_for_apply(XLogRecPtr current_lsn)
Definition: tablesync.c:364
static void process_syncing_tables_for_sync(XLogRecPtr current_lsn)
Definition: tablesync.c:284
void ReplicationOriginNameForTablesync(Oid suboid, Oid relid, char *originname, int szorgname)
Definition: tablesync.c:907
static void fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel)
Definition: tablesync.c:696
static int copy_read_data(void *outbuf, int minread, int maxread)
Definition: tablesync.c:621
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:589
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:922
static void copy_table(Relation rel)
Definition: tablesync.c:810
static bool wait_for_relation_state_change(Oid relid, char expected_state)
Definition: tablesync.c:173
StringInfo copybuf
Definition: tablesync.c:126
static bool FetchTableStates(bool *started_tx)
Definition: tablesync.c:1197
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, int szslot)
Definition: tablesync.c:894
static List * make_copy_attnamelist(LogicalRepRelMapEntry *rel)
Definition: tablesync.c:601
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition: tablesync.c:1283
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
Definition: tuplestore.c:1078
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: tuptable.h:381
String * makeString(char *str)
Definition: value.c:63
@ WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE
Definition: wait_event.h:110
@ WAIT_EVENT_LOGICAL_SYNC_DATA
Definition: wait_event.h:109
#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
Definition: walreceiver.h:426
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:404
@ WALRCV_OK_COMMAND
Definition: walreceiver.h:201
@ WALRCV_OK_TUPLES
Definition: walreceiver.h:203
@ WALRCV_OK_COPY_OUT
Definition: walreceiver.h:205
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:436
#define walrcv_server_version(conn)
Definition: walreceiver.h:414
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:420
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:430
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:422
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
static bool am_tablesync_worker(void)
bool IsTransactionState(void)
Definition: xact.c:373
void CommandCounterIncrement(void)
Definition: xact.c:1073
void StartTransactionCommand(void)
Definition: xact.c:2909
void CommitTransactionCommand(void)
Definition: xact.c:3010
uint64 GetSystemIdentifier(void)
Definition: xlog.c:5062
XLogRecPtr GetXLogWriteRecPtr(void)
Definition: xlog.c:12001
int wal_retrieve_retry_interval
Definition: xlog.c:115
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2924
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint32 TimeLineID
Definition: xlogdefs.h:59