PostgreSQL Source Code  git master
tablesync.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * tablesync.c
3  * PostgreSQL logical replication: initial table data synchronization
4  *
5  * Copyright (c) 2012-2024, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/tablesync.c
9  *
10  * NOTES
11  * This file contains code for initial table data synchronization for
12  * logical replication.
13  *
14  * The initial data synchronization is done separately for each table,
15  * in a separate apply worker that only fetches the initial snapshot data
16  * from the publisher and then synchronizes the position in the stream with
17  * the leader apply worker.
18  *
19  * There are several reasons for doing the synchronization this way:
20  * - It allows us to parallelize the initial data synchronization
21  * which lowers the time needed for it to happen.
22  * - The initial synchronization does not have to hold the xid and LSN
23  * for the time it takes to copy data of all tables, causing less
24  * bloat and lower disk consumption compared to doing the
25  * synchronization in a single process for the whole database.
26  * - It allows us to synchronize any tables added after the initial
27  * synchronization has finished.
28  *
29  * The stream position synchronization works in multiple steps:
30  * - Apply worker requests a tablesync worker to start, setting the new
31  * table state to INIT.
32  * - Tablesync worker starts; changes table state from INIT to DATASYNC while
33  * copying.
34  * - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
35  * worker specific) state to indicate when the copy phase has completed, so
36  * if the worker crashes with this (non-memory) state then the copy will not
37  * be re-attempted.
38  * - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
39  * - Apply worker periodically checks for tables in SYNCWAIT state. When
40  * any appear, it sets the table state to CATCHUP and starts loop-waiting
41  * until either the table state is set to SYNCDONE or the sync worker
42  * exits.
43  * - After the sync worker has seen the state change to CATCHUP, it will
44  * read the stream and apply changes (acting like an apply worker) until
45  * it catches up to the specified stream position. Then it sets the
46  * state to SYNCDONE. There might be zero changes applied between
47  * CATCHUP and SYNCDONE, because the sync worker might be ahead of the
48  * apply worker.
49  * - Once the state is set to SYNCDONE, the apply will continue tracking
50  * the table until it reaches the SYNCDONE stream position, at which
51  * point it sets state to READY and stops tracking. Again, there might
52  * be zero changes in between.
53  *
54  * So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
55  * -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
56  *
57  * The catalog pg_subscription_rel is used to keep information about
58  * subscribed tables and their state. The catalog holds all states
59  * except SYNCWAIT and CATCHUP which are only in shared memory.
60  *
61  * Example flows look like this:
62  * - Apply is in front:
63  * sync:8
64  * -> set in catalog FINISHEDCOPY
65  * -> set in memory SYNCWAIT
66  * apply:10
67  * -> set in memory CATCHUP
68  * -> enter wait-loop
69  * sync:10
70  * -> set in catalog SYNCDONE
71  * -> exit
72  * apply:10
73  * -> exit wait-loop
74  * -> continue rep
75  * apply:11
76  * -> set in catalog READY
77  *
78  * - Sync is in front:
79  * sync:10
80  * -> set in catalog FINISHEDCOPY
81  * -> set in memory SYNCWAIT
82  * apply:8
83  * -> set in memory CATCHUP
84  * -> continue per-table filtering
85  * sync:10
86  * -> set in catalog SYNCDONE
87  * -> exit
88  * apply:10
89  * -> set in catalog READY
90  * -> stop per-table filtering
91  * -> continue rep
92  *-------------------------------------------------------------------------
93  */
94 
95 #include "postgres.h"
96 
97 #include "access/table.h"
98 #include "access/xact.h"
99 #include "catalog/indexing.h"
101 #include "catalog/pg_type.h"
102 #include "commands/copy.h"
103 #include "miscadmin.h"
104 #include "nodes/makefuncs.h"
105 #include "parser/parse_relation.h"
106 #include "pgstat.h"
110 #include "replication/origin.h"
111 #include "replication/slot.h"
112 #include "replication/walreceiver.h"
114 #include "storage/ipc.h"
115 #include "storage/lmgr.h"
116 #include "utils/acl.h"
117 #include "utils/array.h"
118 #include "utils/builtins.h"
119 #include "utils/lsyscache.h"
120 #include "utils/memutils.h"
121 #include "utils/rls.h"
122 #include "utils/snapmgr.h"
123 #include "utils/syscache.h"
124 #include "utils/usercontext.h"
125 
126 typedef enum
127 {
132 
135 static bool FetchTableStates(bool *started_tx);
136 
137 static StringInfo copybuf = NULL;
138 
139 /*
140  * Exit routine for synchronization worker.
141  */
142 static void
144 finish_sync_worker(void)
145 {
146  /*
147  * Commit any outstanding transaction. This is the usual case, unless
148  * there was nothing to do for the table.
149  */
150  if (IsTransactionState())
151  {
153  pgstat_report_stat(true);
154  }
155 
156  /* And flush all writes. */
158 
160  ereport(LOG,
161  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
165 
166  /* Find the leader apply worker and signal it. */
168 
169  /* Stop gracefully */
170  proc_exit(0);
171 }
172 
173 /*
174  * Wait until the relation sync state is set in the catalog to the expected
175  * one; return true when it happens.
176  *
177  * Returns false if the table sync worker or the table itself have
178  * disappeared, or the table state has been reset.
179  *
180  * Currently, this is used in the apply worker when transitioning from
181  * CATCHUP state to SYNCDONE.
182  */
183 static bool
184 wait_for_relation_state_change(Oid relid, char expected_state)
185 {
186  char state;
187 
188  for (;;)
189  {
190  LogicalRepWorker *worker;
191  XLogRecPtr statelsn;
192 
194 
197  relid, &statelsn);
198 
199  if (state == SUBREL_STATE_UNKNOWN)
200  break;
201 
202  if (state == expected_state)
203  return true;
204 
205  /* Check if the sync worker is still running and bail if not. */
206  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
208  false);
209  LWLockRelease(LogicalRepWorkerLock);
210  if (!worker)
211  break;
212 
213  (void) WaitLatch(MyLatch,
215  1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
216 
218  }
219 
220  return false;
221 }
222 
223 /*
224  * Wait until the apply worker changes the state of our synchronization
225  * worker to the expected one.
226  *
227  * Used when transitioning from SYNCWAIT state to CATCHUP.
228  *
229  * Returns false if the apply worker has disappeared.
230  */
231 static bool
232 wait_for_worker_state_change(char expected_state)
233 {
234  int rc;
235 
236  for (;;)
237  {
238  LogicalRepWorker *worker;
239 
241 
242  /*
243  * Done if already in correct state. (We assume this fetch is atomic
244  * enough to not give a misleading answer if we do it with no lock.)
245  */
246  if (MyLogicalRepWorker->relstate == expected_state)
247  return true;
248 
249  /*
250  * Bail out if the apply worker has died, else signal it we're
251  * waiting.
252  */
253  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
255  InvalidOid, false);
256  if (worker && worker->proc)
258  LWLockRelease(LogicalRepWorkerLock);
259  if (!worker)
260  break;
261 
262  /*
263  * Wait. We expect to get a latch signal back from the apply worker,
264  * but use a timeout in case it dies without sending one.
265  */
266  rc = WaitLatch(MyLatch,
268  1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
269 
270  if (rc & WL_LATCH_SET)
272  }
273 
274  return false;
275 }
276 
277 /*
278  * Callback from syscache invalidation.
279  */
280 void
282 {
284 }
285 
286 /*
287  * Handle table synchronization cooperation from the synchronization
288  * worker.
289  *
290  * If the sync worker is in CATCHUP state and reached (or passed) the
291  * predetermined synchronization point in the WAL stream, mark the table as
292  * SYNCDONE and finish.
293  */
294 static void
296 {
298 
299  if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
300  current_lsn >= MyLogicalRepWorker->relstate_lsn)
301  {
302  TimeLineID tli;
303  char syncslotname[NAMEDATALEN] = {0};
304  char originname[NAMEDATALEN] = {0};
305 
306  MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
307  MyLogicalRepWorker->relstate_lsn = current_lsn;
308 
310 
311  /*
312  * UpdateSubscriptionRelState must be called within a transaction.
313  */
314  if (!IsTransactionState())
316 
321 
322  /*
323  * End streaming so that LogRepWorkerWalRcvConn can be used to drop
324  * the slot.
325  */
327 
328  /*
329  * Cleanup the tablesync slot.
330  *
331  * This has to be done after updating the state because otherwise if
332  * there is an error while doing the database operations we won't be
333  * able to rollback dropped slot.
334  */
337  syncslotname,
338  sizeof(syncslotname));
339 
340  /*
341  * It is important to give an error if we are unable to drop the slot,
342  * otherwise, it won't be dropped till the corresponding subscription
343  * is dropped. So passing missing_ok = false.
344  */
346 
348  pgstat_report_stat(false);
349 
350  /*
351  * Start a new transaction to clean up the tablesync origin tracking.
352  * This transaction will be ended within the finish_sync_worker().
353  * Now, even, if we fail to remove this here, the apply worker will
354  * ensure to clean it up afterward.
355  *
356  * We need to do this after the table state is set to SYNCDONE.
357  * Otherwise, if an error occurs while performing the database
358  * operation, the worker will be restarted and the in-memory state of
359  * replication progress (remote_lsn) won't be rolled-back which would
360  * have been cleared before restart. So, the restarted worker will use
361  * invalid replication progress state resulting in replay of
362  * transactions that have already been applied.
363  */
365 
368  originname,
369  sizeof(originname));
370 
371  /*
372  * Resetting the origin session removes the ownership of the slot.
373  * This is needed to allow the origin to be dropped.
374  */
379 
380  /*
381  * Drop the tablesync's origin tracking if exists.
382  *
383  * There is a chance that the user is concurrently performing refresh
384  * for the subscription where we remove the table state and its origin
385  * or the apply worker would have removed this origin. So passing
386  * missing_ok = true.
387  */
388  replorigin_drop_by_name(originname, true, false);
389 
390  finish_sync_worker();
391  }
392  else
394 }
395 
396 /*
397  * Handle table synchronization cooperation from the apply worker.
398  *
399  * Walk over all subscription tables that are individually tracked by the
400  * apply process (currently, all that have state other than
401  * SUBREL_STATE_READY) and manage synchronization for them.
402  *
403  * If there are tables that need synchronizing and are not being synchronized
404  * yet, start sync workers for them (if there are free slots for sync
405  * workers). To prevent starting the sync worker for the same relation at a
406  * high frequency after a failure, we store its last start time with each sync
407  * state info. We start the sync worker for the same relation after waiting
408  * at least wal_retrieve_retry_interval.
409  *
410  * For tables that are being synchronized already, check if sync workers
411  * either need action from the apply worker or have finished. This is the
412  * SYNCWAIT to CATCHUP transition.
413  *
414  * If the synchronization position is reached (SYNCDONE), then the table can
415  * be marked as READY and is no longer tracked.
416  */
417 static void
419 {
420  struct tablesync_start_time_mapping
421  {
422  Oid relid;
423  TimestampTz last_start_time;
424  };
425  static HTAB *last_start_times = NULL;
426  ListCell *lc;
427  bool started_tx = false;
428  bool should_exit = false;
429 
431 
432  /* We need up-to-date sync state info for subscription tables here. */
433  FetchTableStates(&started_tx);
434 
435  /*
436  * Prepare a hash table for tracking last start times of workers, to avoid
437  * immediate restarts. We don't need it if there are no tables that need
438  * syncing.
439  */
441  {
442  HASHCTL ctl;
443 
444  ctl.keysize = sizeof(Oid);
445  ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
446  last_start_times = hash_create("Logical replication table sync worker start times",
447  256, &ctl, HASH_ELEM | HASH_BLOBS);
448  }
449 
450  /*
451  * Clean up the hash table when we're done with all tables (just to
452  * release the bit of memory).
453  */
455  {
457  last_start_times = NULL;
458  }
459 
460  /*
461  * Process all tables that are being synchronized.
462  */
463  foreach(lc, table_states_not_ready)
464  {
466 
467  if (rstate->state == SUBREL_STATE_SYNCDONE)
468  {
469  /*
470  * Apply has caught up to the position where the table sync has
471  * finished. Mark the table as ready so that the apply will just
472  * continue to replicate it normally.
473  */
474  if (current_lsn >= rstate->lsn)
475  {
476  char originname[NAMEDATALEN];
477 
478  rstate->state = SUBREL_STATE_READY;
479  rstate->lsn = current_lsn;
480  if (!started_tx)
481  {
483  started_tx = true;
484  }
485 
486  /*
487  * Remove the tablesync origin tracking if exists.
488  *
489  * There is a chance that the user is concurrently performing
490  * refresh for the subscription where we remove the table
491  * state and its origin or the tablesync worker would have
492  * already removed this origin. We can't rely on tablesync
493  * worker to remove the origin tracking as if there is any
494  * error while dropping we won't restart it to drop the
495  * origin. So passing missing_ok = true.
496  */
498  rstate->relid,
499  originname,
500  sizeof(originname));
501  replorigin_drop_by_name(originname, true, false);
502 
503  /*
504  * Update the state to READY only after the origin cleanup.
505  */
507  rstate->relid, rstate->state,
508  rstate->lsn);
509  }
510  }
511  else
512  {
513  LogicalRepWorker *syncworker;
514 
515  /*
516  * Look for a sync worker for this relation.
517  */
518  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
519 
521  rstate->relid, false);
522 
523  if (syncworker)
524  {
525  /* Found one, update our copy of its state */
526  SpinLockAcquire(&syncworker->relmutex);
527  rstate->state = syncworker->relstate;
528  rstate->lsn = syncworker->relstate_lsn;
529  if (rstate->state == SUBREL_STATE_SYNCWAIT)
530  {
531  /*
532  * Sync worker is waiting for apply. Tell sync worker it
533  * can catchup now.
534  */
535  syncworker->relstate = SUBREL_STATE_CATCHUP;
536  syncworker->relstate_lsn =
537  Max(syncworker->relstate_lsn, current_lsn);
538  }
539  SpinLockRelease(&syncworker->relmutex);
540 
541  /* If we told worker to catch up, wait for it. */
542  if (rstate->state == SUBREL_STATE_SYNCWAIT)
543  {
544  /* Signal the sync worker, as it may be waiting for us. */
545  if (syncworker->proc)
546  logicalrep_worker_wakeup_ptr(syncworker);
547 
548  /* Now safe to release the LWLock */
549  LWLockRelease(LogicalRepWorkerLock);
550 
551  if (started_tx)
552  {
553  /*
554  * We must commit the existing transaction to release
555  * the existing locks before entering a busy loop.
556  * This is required to avoid any undetected deadlocks
557  * due to any existing lock as deadlock detector won't
558  * be able to detect the waits on the latch.
559  */
561  pgstat_report_stat(false);
562  }
563 
564  /*
565  * Enter busy loop and wait for synchronization worker to
566  * reach expected state (or die trying).
567  */
569  started_tx = true;
570 
572  SUBREL_STATE_SYNCDONE);
573  }
574  else
575  LWLockRelease(LogicalRepWorkerLock);
576  }
577  else
578  {
579  /*
580  * If there is no sync worker for this table yet, count
581  * running sync workers for this subscription, while we have
582  * the lock.
583  */
584  int nsyncworkers =
586 
587  /* Now safe to release the LWLock */
588  LWLockRelease(LogicalRepWorkerLock);
589 
590  /*
591  * If there are free sync worker slot(s), start a new sync
592  * worker for the table.
593  */
594  if (nsyncworkers < max_sync_workers_per_subscription)
595  {
597  struct tablesync_start_time_mapping *hentry;
598  bool found;
599 
600  hentry = hash_search(last_start_times, &rstate->relid,
601  HASH_ENTER, &found);
602 
603  if (!found ||
604  TimestampDifferenceExceeds(hentry->last_start_time, now,
606  {
612  rstate->relid,
614  hentry->last_start_time = now;
615  }
616  }
617  }
618  }
619  }
620 
621  if (started_tx)
622  {
623  /*
624  * Even when the two_phase mode is requested by the user, it remains
625  * as 'pending' until all tablesyncs have reached READY state.
626  *
627  * When this happens, we restart the apply worker and (if the
628  * conditions are still ok) then the two_phase tri-state will become
629  * 'enabled' at that time.
630  *
631  * Note: If the subscription has no tables then leave the state as
632  * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
633  * work.
634  */
636  {
637  CommandCounterIncrement(); /* make updates visible */
638  if (AllTablesyncsReady())
639  {
640  ereport(LOG,
641  (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
642  MySubscription->name)));
643  should_exit = true;
644  }
645  }
646 
648  pgstat_report_stat(true);
649  }
650 
651  if (should_exit)
652  {
653  /*
654  * Reset the last-start time for this worker so that the launcher will
655  * restart it without waiting for wal_retrieve_retry_interval.
656  */
658 
659  proc_exit(0);
660  }
661 }
662 
663 /*
664  * Process possible state change(s) of tables that are being synchronized.
665  */
666 void
668 {
669  switch (MyLogicalRepWorker->type)
670  {
672 
673  /*
674  * Skip for parallel apply workers because they only operate on
675  * tables that are in a READY state. See pa_can_start() and
676  * should_apply_changes_for_rel().
677  */
678  break;
679 
681  process_syncing_tables_for_sync(current_lsn);
682  break;
683 
684  case WORKERTYPE_APPLY:
686  break;
687 
688  case WORKERTYPE_UNKNOWN:
689  /* Should never happen. */
690  elog(ERROR, "Unknown worker type");
691  }
692 }
693 
694 /*
695  * Create list of columns for COPY based on logical relation mapping.
696  */
697 static List *
699 {
700  List *attnamelist = NIL;
701  int i;
702 
703  for (i = 0; i < rel->remoterel.natts; i++)
704  {
705  attnamelist = lappend(attnamelist,
706  makeString(rel->remoterel.attnames[i]));
707  }
708 
709 
710  return attnamelist;
711 }
712 
713 /*
714  * Data source callback for the COPY FROM, which reads from the remote
715  * connection and passes the data back to our local COPY.
716  */
717 static int
718 copy_read_data(void *outbuf, int minread, int maxread)
719 {
720  int bytesread = 0;
721  int avail;
722 
723  /* If there are some leftover data from previous read, use it. */
724  avail = copybuf->len - copybuf->cursor;
725  if (avail)
726  {
727  if (avail > maxread)
728  avail = maxread;
729  memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
730  copybuf->cursor += avail;
731  maxread -= avail;
732  bytesread += avail;
733  }
734 
735  while (maxread > 0 && bytesread < minread)
736  {
738  int len;
739  char *buf = NULL;
740 
741  for (;;)
742  {
743  /* Try read the data. */
745 
747 
748  if (len == 0)
749  break;
750  else if (len < 0)
751  return bytesread;
752  else
753  {
754  /* Process the data */
755  copybuf->data = buf;
756  copybuf->len = len;
757  copybuf->cursor = 0;
758 
759  avail = copybuf->len - copybuf->cursor;
760  if (avail > maxread)
761  avail = maxread;
762  memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
763  outbuf = (void *) ((char *) outbuf + avail);
764  copybuf->cursor += avail;
765  maxread -= avail;
766  bytesread += avail;
767  }
768 
769  if (maxread <= 0 || bytesread >= minread)
770  return bytesread;
771  }
772 
773  /*
774  * Wait for more data or latch.
775  */
776  (void) WaitLatchOrSocket(MyLatch,
779  fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
780 
782  }
783 
784  return bytesread;
785 }
786 
787 
788 /*
789  * Get information about remote relation in similar fashion the RELATION
790  * message provides during replication. This function also returns the relation
791  * qualifications to be used in the COPY command.
792  */
793 static void
794 fetch_remote_table_info(char *nspname, char *relname,
795  LogicalRepRelation *lrel, List **qual)
796 {
798  StringInfoData cmd;
799  TupleTableSlot *slot;
800  Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
801  Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
802  Oid qualRow[] = {TEXTOID};
803  bool isnull;
804  int natt;
805  ListCell *lc;
806  Bitmapset *included_cols = NULL;
807 
808  lrel->nspname = nspname;
809  lrel->relname = relname;
810 
811  /* First fetch Oid and replica identity. */
812  initStringInfo(&cmd);
813  appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
814  " FROM pg_catalog.pg_class c"
815  " INNER JOIN pg_catalog.pg_namespace n"
816  " ON (c.relnamespace = n.oid)"
817  " WHERE n.nspname = %s"
818  " AND c.relname = %s",
819  quote_literal_cstr(nspname),
822  lengthof(tableRow), tableRow);
823 
824  if (res->status != WALRCV_OK_TUPLES)
825  ereport(ERROR,
826  (errcode(ERRCODE_CONNECTION_FAILURE),
827  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
828  nspname, relname, res->err)));
829 
830  slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
831  if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
832  ereport(ERROR,
833  (errcode(ERRCODE_UNDEFINED_OBJECT),
834  errmsg("table \"%s.%s\" not found on publisher",
835  nspname, relname)));
836 
837  lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
838  Assert(!isnull);
839  lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
840  Assert(!isnull);
841  lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
842  Assert(!isnull);
843 
846 
847 
848  /*
849  * Get column lists for each relation.
850  *
851  * We need to do this before fetching info about column names and types,
852  * so that we can skip columns that should not be replicated.
853  */
855  {
856  WalRcvExecResult *pubres;
857  TupleTableSlot *tslot;
858  Oid attrsRow[] = {INT2VECTOROID};
859  StringInfoData pub_names;
860 
861  initStringInfo(&pub_names);
862  foreach(lc, MySubscription->publications)
863  {
864  if (foreach_current_index(lc) > 0)
865  appendStringInfoString(&pub_names, ", ");
867  }
868 
869  /*
870  * Fetch info about column lists for the relation (from all the
871  * publications).
872  */
873  resetStringInfo(&cmd);
874  appendStringInfo(&cmd,
875  "SELECT DISTINCT"
876  " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
877  " THEN NULL ELSE gpt.attrs END)"
878  " FROM pg_publication p,"
879  " LATERAL pg_get_publication_tables(p.pubname) gpt,"
880  " pg_class c"
881  " WHERE gpt.relid = %u AND c.oid = gpt.relid"
882  " AND p.pubname IN ( %s )",
883  lrel->remoteid,
884  pub_names.data);
885 
887  lengthof(attrsRow), attrsRow);
888 
889  if (pubres->status != WALRCV_OK_TUPLES)
890  ereport(ERROR,
891  (errcode(ERRCODE_CONNECTION_FAILURE),
892  errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
893  nspname, relname, pubres->err)));
894 
895  /*
896  * We don't support the case where the column list is different for
897  * the same table when combining publications. See comments atop
898  * fetch_table_list. So there should be only one row returned.
899  * Although we already checked this when creating the subscription, we
900  * still need to check here in case the column list was changed after
901  * creating the subscription and before the sync worker is started.
902  */
903  if (tuplestore_tuple_count(pubres->tuplestore) > 1)
904  ereport(ERROR,
905  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
906  errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
907  nspname, relname));
908 
909  /*
910  * Get the column list and build a single bitmap with the attnums.
911  *
912  * If we find a NULL value, it means all the columns should be
913  * replicated.
914  */
916  if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
917  {
918  Datum cfval = slot_getattr(tslot, 1, &isnull);
919 
920  if (!isnull)
921  {
922  ArrayType *arr;
923  int nelems;
924  int16 *elems;
925 
926  arr = DatumGetArrayTypeP(cfval);
927  nelems = ARR_DIMS(arr)[0];
928  elems = (int16 *) ARR_DATA_PTR(arr);
929 
930  for (natt = 0; natt < nelems; natt++)
931  included_cols = bms_add_member(included_cols, elems[natt]);
932  }
933 
934  ExecClearTuple(tslot);
935  }
937 
938  walrcv_clear_result(pubres);
939 
940  pfree(pub_names.data);
941  }
942 
943  /*
944  * Now fetch column names and types.
945  */
946  resetStringInfo(&cmd);
947  appendStringInfo(&cmd,
948  "SELECT a.attnum,"
949  " a.attname,"
950  " a.atttypid,"
951  " a.attnum = ANY(i.indkey)"
952  " FROM pg_catalog.pg_attribute a"
953  " LEFT JOIN pg_catalog.pg_index i"
954  " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
955  " WHERE a.attnum > 0::pg_catalog.int2"
956  " AND NOT a.attisdropped %s"
957  " AND a.attrelid = %u"
958  " ORDER BY a.attnum",
959  lrel->remoteid,
961  "AND a.attgenerated = ''" : ""),
962  lrel->remoteid);
964  lengthof(attrRow), attrRow);
965 
966  if (res->status != WALRCV_OK_TUPLES)
967  ereport(ERROR,
968  (errcode(ERRCODE_CONNECTION_FAILURE),
969  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
970  nspname, relname, res->err)));
971 
972  /* We don't know the number of rows coming, so allocate enough space. */
973  lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
974  lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
975  lrel->attkeys = NULL;
976 
977  /*
978  * Store the columns as a list of names. Ignore those that are not
979  * present in the column list, if there is one.
980  */
981  natt = 0;
982  slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
983  while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
984  {
985  char *rel_colname;
987 
988  attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
989  Assert(!isnull);
990 
991  /* If the column is not in the column list, skip it. */
992  if (included_cols != NULL && !bms_is_member(attnum, included_cols))
993  {
994  ExecClearTuple(slot);
995  continue;
996  }
997 
998  rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
999  Assert(!isnull);
1000 
1001  lrel->attnames[natt] = rel_colname;
1002  lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
1003  Assert(!isnull);
1004 
1005  if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
1006  lrel->attkeys = bms_add_member(lrel->attkeys, natt);
1007 
1008  /* Should never happen. */
1009  if (++natt >= MaxTupleAttributeNumber)
1010  elog(ERROR, "too many columns in remote table \"%s.%s\"",
1011  nspname, relname);
1012 
1013  ExecClearTuple(slot);
1014  }
1016 
1017  lrel->natts = natt;
1018 
1020 
1021  /*
1022  * Get relation's row filter expressions. DISTINCT avoids the same
1023  * expression of a table in multiple publications from being included
1024  * multiple times in the final expression.
1025  *
1026  * We need to copy the row even if it matches just one of the
1027  * publications, so we later combine all the quals with OR.
1028  *
1029  * For initial synchronization, row filtering can be ignored in following
1030  * cases:
1031  *
1032  * 1) one of the subscribed publications for the table hasn't specified
1033  * any row filter
1034  *
1035  * 2) one of the subscribed publications has puballtables set to true
1036  *
1037  * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
1038  * that includes this relation
1039  */
1041  {
1042  StringInfoData pub_names;
1043 
1044  /* Build the pubname list. */
1045  initStringInfo(&pub_names);
1047  {
1048  char *pubname = strVal(pubstr);
1049 
1050  if (foreach_current_index(pubstr) > 0)
1051  appendStringInfoString(&pub_names, ", ");
1052 
1053  appendStringInfoString(&pub_names, quote_literal_cstr(pubname));
1054  }
1055 
1056  /* Check for row filters. */
1057  resetStringInfo(&cmd);
1058  appendStringInfo(&cmd,
1059  "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
1060  " FROM pg_publication p,"
1061  " LATERAL pg_get_publication_tables(p.pubname) gpt"
1062  " WHERE gpt.relid = %u"
1063  " AND p.pubname IN ( %s )",
1064  lrel->remoteid,
1065  pub_names.data);
1066 
1067  res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
1068 
1069  if (res->status != WALRCV_OK_TUPLES)
1070  ereport(ERROR,
1071  (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
1072  nspname, relname, res->err)));
1073 
1074  /*
1075  * Multiple row filter expressions for the same table will be combined
1076  * by COPY using OR. If any of the filter expressions for this table
1077  * are null, it means the whole table will be copied. In this case it
1078  * is not necessary to construct a unified row filter expression at
1079  * all.
1080  */
1081  slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
1082  while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1083  {
1084  Datum rf = slot_getattr(slot, 1, &isnull);
1085 
1086  if (!isnull)
1087  *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
1088  else
1089  {
1090  /* Ignore filters and cleanup as necessary. */
1091  if (*qual)
1092  {
1093  list_free_deep(*qual);
1094  *qual = NIL;
1095  }
1096  break;
1097  }
1098 
1099  ExecClearTuple(slot);
1100  }
1102 
1104  }
1105 
1106  pfree(cmd.data);
1107 }
1108 
1109 /*
1110  * Copy existing data of a table from publisher.
1111  *
1112  * Caller is responsible for locking the local relation.
1113  */
1114 static void
1116 {
1117  LogicalRepRelMapEntry *relmapentry;
1118  LogicalRepRelation lrel;
1119  List *qual = NIL;
1121  StringInfoData cmd;
1122  CopyFromState cstate;
1123  List *attnamelist;
1124  ParseState *pstate;
1125  List *options = NIL;
1126 
1127  /* Get the publisher relation info. */
1129  RelationGetRelationName(rel), &lrel, &qual);
1130 
1131  /* Put the relation into relmap. */
1132  logicalrep_relmap_update(&lrel);
1133 
1134  /* Map the publisher relation to local one. */
1135  relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
1136  Assert(rel == relmapentry->localrel);
1137 
1138  /* Start copy on the publisher. */
1139  initStringInfo(&cmd);
1140 
1141  /* Regular table with no row filter */
1142  if (lrel.relkind == RELKIND_RELATION && qual == NIL)
1143  {
1144  appendStringInfo(&cmd, "COPY %s",
1146 
1147  /* If the table has columns, then specify the columns */
1148  if (lrel.natts)
1149  {
1150  appendStringInfoString(&cmd, " (");
1151 
1152  /*
1153  * XXX Do we need to list the columns in all cases? Maybe we're
1154  * replicating all columns?
1155  */
1156  for (int i = 0; i < lrel.natts; i++)
1157  {
1158  if (i > 0)
1159  appendStringInfoString(&cmd, ", ");
1160 
1162  }
1163 
1164  appendStringInfoChar(&cmd, ')');
1165  }
1166 
1167  appendStringInfoString(&cmd, " TO STDOUT");
1168  }
1169  else
1170  {
1171  /*
1172  * For non-tables and tables with row filters, we need to do COPY
1173  * (SELECT ...), but we can't just do SELECT * because we need to not
1174  * copy generated columns. For tables with any row filters, build a
1175  * SELECT query with OR'ed row filters for COPY.
1176  */
1177  appendStringInfoString(&cmd, "COPY (SELECT ");
1178  for (int i = 0; i < lrel.natts; i++)
1179  {
1181  if (i < lrel.natts - 1)
1182  appendStringInfoString(&cmd, ", ");
1183  }
1184 
1185  appendStringInfoString(&cmd, " FROM ");
1186 
1187  /*
1188  * For regular tables, make sure we don't copy data from a child that
1189  * inherits the named table as those will be copied separately.
1190  */
1191  if (lrel.relkind == RELKIND_RELATION)
1192  appendStringInfoString(&cmd, "ONLY ");
1193 
1195  /* list of OR'ed filters */
1196  if (qual != NIL)
1197  {
1198  ListCell *lc;
1199  char *q = strVal(linitial(qual));
1200 
1201  appendStringInfo(&cmd, " WHERE %s", q);
1202  for_each_from(lc, qual, 1)
1203  {
1204  q = strVal(lfirst(lc));
1205  appendStringInfo(&cmd, " OR %s", q);
1206  }
1207  list_free_deep(qual);
1208  }
1209 
1210  appendStringInfoString(&cmd, ") TO STDOUT");
1211  }
1212 
1213  /*
1214  * Prior to v16, initial table synchronization will use text format even
1215  * if the binary option is enabled for a subscription.
1216  */
1219  {
1220  appendStringInfoString(&cmd, " WITH (FORMAT binary)");
1221  options = list_make1(makeDefElem("format",
1222  (Node *) makeString("binary"), -1));
1223  }
1224 
1225  res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
1226  pfree(cmd.data);
1227  if (res->status != WALRCV_OK_COPY_OUT)
1228  ereport(ERROR,
1229  (errcode(ERRCODE_CONNECTION_FAILURE),
1230  errmsg("could not start initial contents copy for table \"%s.%s\": %s",
1231  lrel.nspname, lrel.relname, res->err)));
1233 
1234  copybuf = makeStringInfo();
1235 
1236  pstate = make_parsestate(NULL);
1237  (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
1238  NULL, false, false);
1239 
1240  attnamelist = make_copy_attnamelist(relmapentry);
1241  cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
1242 
1243  /* Do the copy */
1244  (void) CopyFrom(cstate);
1245 
1246  logicalrep_rel_close(relmapentry, NoLock);
1247 }
1248 
1249 /*
1250  * Determine the tablesync slot name.
1251  *
1252  * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
1253  * on slot name length. We append system_identifier to avoid slot_name
1254  * collision with subscriptions in other clusters. With the current scheme
1255  * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
1256  * length of slot_name will be 50.
1257  *
1258  * The returned slot name is stored in the supplied buffer (syncslotname) with
1259  * the given size.
1260  *
1261  * Note: We don't use the subscription slot name as part of tablesync slot name
1262  * because we are responsible for cleaning up these slots and it could become
1263  * impossible to recalculate what name to cleanup if the subscription slot name
1264  * had changed.
1265  */
1266 void
1268  char *syncslotname, Size szslot)
1269 {
1270  snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
1271  relid, GetSystemIdentifier());
1272 }
1273 
1274 /*
1275  * Start syncing the table in the sync worker.
1276  *
1277  * If nothing needs to be done to sync the table, we exit the worker without
1278  * any further action.
1279  *
1280  * The returned slot name is palloc'ed in current memory context.
1281  */
1282 static char *
1284 {
1285  char *slotname;
1286  char *err;
1287  char relstate;
1288  XLogRecPtr relstate_lsn;
1289  Relation rel;
1290  AclResult aclresult;
1292  char originname[NAMEDATALEN];
1293  RepOriginId originid;
1294  UserContext ucxt;
1295  bool must_use_password;
1296  bool run_as_owner;
1297 
1298  /* Check the state of the table synchronization. */
1302  &relstate_lsn);
1304 
1305  /* Is the use of a password mandatory? */
1306  must_use_password = MySubscription->passwordrequired &&
1308 
1310  MyLogicalRepWorker->relstate = relstate;
1311  MyLogicalRepWorker->relstate_lsn = relstate_lsn;
1313 
1314  /*
1315  * If synchronization is already done or no longer necessary, exit now
1316  * that we've updated shared memory state.
1317  */
1318  switch (relstate)
1319  {
1320  case SUBREL_STATE_SYNCDONE:
1321  case SUBREL_STATE_READY:
1322  case SUBREL_STATE_UNKNOWN:
1323  finish_sync_worker(); /* doesn't return */
1324  }
1325 
1326  /* Calculate the name of the tablesync slot. */
1327  slotname = (char *) palloc(NAMEDATALEN);
1330  slotname,
1331  NAMEDATALEN);
1332 
1333  /*
1334  * Here we use the slot name instead of the subscription name as the
1335  * application_name, so that it is different from the leader apply worker,
1336  * so that synchronous replication can distinguish them.
1337  */
1339  walrcv_connect(MySubscription->conninfo, true, true,
1340  must_use_password,
1341  slotname, &err);
1342  if (LogRepWorkerWalRcvConn == NULL)
1343  ereport(ERROR,
1344  (errcode(ERRCODE_CONNECTION_FAILURE),
1345  errmsg("table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",
1346  MySubscription->name, err)));
1347 
1348  Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
1349  MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
1350  MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
1351 
1352  /* Assign the origin tracking record name. */
1355  originname,
1356  sizeof(originname));
1357 
1358  if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
1359  {
1360  /*
1361  * We have previously errored out before finishing the copy so the
1362  * replication slot might exist. We want to remove the slot if it
1363  * already exists and proceed.
1364  *
1365  * XXX We could also instead try to drop the slot, last time we failed
1366  * but for that, we might need to clean up the copy state as it might
1367  * be in the middle of fetching the rows. Also, if there is a network
1368  * breakdown then it wouldn't have succeeded so trying it next time
1369  * seems like a better bet.
1370  */
1372  }
1373  else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
1374  {
1375  /*
1376  * The COPY phase was previously done, but tablesync then crashed
1377  * before it was able to finish normally.
1378  */
1380 
1381  /*
1382  * The origin tracking name must already exist. It was created first
1383  * time this tablesync was launched.
1384  */
1385  originid = replorigin_by_name(originname, false);
1386  replorigin_session_setup(originid, 0);
1387  replorigin_session_origin = originid;
1388  *origin_startpos = replorigin_session_get_progress(false);
1389 
1391 
1392  goto copy_table_done;
1393  }
1394 
1396  MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
1399 
1400  /* Update the state and make it visible to others. */
1407  pgstat_report_stat(true);
1408 
1410 
1411  /*
1412  * Use a standard write lock here. It might be better to disallow access
1413  * to the table while it's being synchronized. But we don't want to block
1414  * the main apply process from working and it has to open the relation in
1415  * RowExclusiveLock when remapping remote relation id to local one.
1416  */
1418 
1419  /*
1420  * Start a transaction in the remote node in REPEATABLE READ mode. This
1421  * ensures that both the replication slot we create (see below) and the
1422  * COPY are consistent with each other.
1423  */
1425  "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1426  0, NULL);
1427  if (res->status != WALRCV_OK_COMMAND)
1428  ereport(ERROR,
1429  (errcode(ERRCODE_CONNECTION_FAILURE),
1430  errmsg("table copy could not start transaction on publisher: %s",
1431  res->err)));
1433 
1434  /*
1435  * Create a new permanent logical decoding slot. This slot will be used
1436  * for the catchup phase after COPY is done, so tell it to use the
1437  * snapshot to make the final data consistent.
1438  */
1440  slotname, false /* permanent */ , false /* two_phase */ ,
1442  CRS_USE_SNAPSHOT, origin_startpos);
1443 
1444  /*
1445  * Setup replication origin tracking. The purpose of doing this before the
1446  * copy is to avoid doing the copy again due to any error in setting up
1447  * origin tracking.
1448  */
1449  originid = replorigin_by_name(originname, true);
1450  if (!OidIsValid(originid))
1451  {
1452  /*
1453  * Origin tracking does not exist, so create it now.
1454  *
1455  * Then advance to the LSN got from walrcv_create_slot. This is WAL
1456  * logged for the purpose of recovery. Locks are to prevent the
1457  * replication origin from vanishing while advancing.
1458  */
1459  originid = replorigin_create(originname);
1460 
1461  LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1462  replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
1463  true /* go backward */ , true /* WAL log */ );
1464  UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1465 
1466  replorigin_session_setup(originid, 0);
1467  replorigin_session_origin = originid;
1468  }
1469  else
1470  {
1471  ereport(ERROR,
1473  errmsg("replication origin \"%s\" already exists",
1474  originname)));
1475  }
1476 
1477  /*
1478  * Make sure that the copy command runs as the table owner, unless the
1479  * user has opted out of that behaviour.
1480  */
1481  run_as_owner = MySubscription->runasowner;
1482  if (!run_as_owner)
1483  SwitchToUntrustedUser(rel->rd_rel->relowner, &ucxt);
1484 
1485  /*
1486  * Check that our table sync worker has permission to insert into the
1487  * target table.
1488  */
1489  aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
1490  ACL_INSERT);
1491  if (aclresult != ACLCHECK_OK)
1492  aclcheck_error(aclresult,
1493  get_relkind_objtype(rel->rd_rel->relkind),
1495 
1496  /*
1497  * COPY FROM does not honor RLS policies. That is not a problem for
1498  * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
1499  * who has it implicitly), but other roles should not be able to
1500  * circumvent RLS. Disallow logical replication into RLS enabled
1501  * relations for such roles.
1502  */
1504  ereport(ERROR,
1505  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1506  errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1507  GetUserNameFromId(GetUserId(), true),
1508  RelationGetRelationName(rel))));
1509 
1510  /* Now do the initial data copy */
1512  copy_table(rel);
1514 
1515  res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
1516  if (res->status != WALRCV_OK_COMMAND)
1517  ereport(ERROR,
1518  (errcode(ERRCODE_CONNECTION_FAILURE),
1519  errmsg("table copy could not finish transaction on publisher: %s",
1520  res->err)));
1522 
1523  if (!run_as_owner)
1524  RestoreUserContext(&ucxt);
1525 
1526  table_close(rel, NoLock);
1527 
1528  /* Make the copy visible. */
1530 
1531  /*
1532  * Update the persisted state to indicate the COPY phase is done; make it
1533  * visible to others.
1534  */
1537  SUBREL_STATE_FINISHEDCOPY,
1539 
1541 
1542 copy_table_done:
1543 
1544  elog(DEBUG1,
1545  "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
1546  originname, LSN_FORMAT_ARGS(*origin_startpos));
1547 
1548  /*
1549  * We are done with the initial data synchronization, update the state.
1550  */
1552  MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
1553  MyLogicalRepWorker->relstate_lsn = *origin_startpos;
1555 
1556  /*
1557  * Finally, wait until the leader apply worker tells us to catch up and
1558  * then return to let LogicalRepApplyLoop do it.
1559  */
1560  wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
1561  return slotname;
1562 }
1563 
1564 /*
1565  * Common code to fetch the up-to-date sync state info into the static lists.
1566  *
1567  * Returns true if subscription has 1 or more tables, else false.
1568  *
1569  * Note: If this function started the transaction (indicated by the parameter)
1570  * then it is the caller's responsibility to commit it.
1571  */
1572 static bool
1573 FetchTableStates(bool *started_tx)
1574 {
1575  static bool has_subrels = false;
1576 
1577  *started_tx = false;
1578 
1580  {
1581  MemoryContext oldctx;
1582  List *rstates;
1583  ListCell *lc;
1584  SubscriptionRelState *rstate;
1585 
1587 
1588  /* Clean the old lists. */
1591 
1592  if (!IsTransactionState())
1593  {
1595  *started_tx = true;
1596  }
1597 
1598  /* Fetch all non-ready tables. */
1599  rstates = GetSubscriptionRelations(MySubscription->oid, true);
1600 
1601  /* Allocate the tracking info in a permanent memory context. */
1603  foreach(lc, rstates)
1604  {
1605  rstate = palloc(sizeof(SubscriptionRelState));
1606  memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
1608  }
1609  MemoryContextSwitchTo(oldctx);
1610 
1611  /*
1612  * Does the subscription have tables?
1613  *
1614  * If there were not-READY relations found then we know it does. But
1615  * if table_states_not_ready was empty we still need to check again to
1616  * see if there are 0 tables.
1617  */
1618  has_subrels = (table_states_not_ready != NIL) ||
1620 
1621  /*
1622  * If the subscription relation cache has been invalidated since we
1623  * entered this routine, we still use and return the relations we just
1624  * finished constructing, to avoid infinite loops, but we leave the
1625  * table states marked as stale so that we'll rebuild it again on next
1626  * access. Otherwise, we mark the table states as valid.
1627  */
1630  }
1631 
1632  return has_subrels;
1633 }
1634 
1635 /*
1636  * Execute the initial sync with error handling. Disable the subscription,
1637  * if it's required.
1638  *
1639  * Allocate the slot name in long-lived context on return. Note that we don't
1640  * handle FATAL errors which are probably because of system resource error and
1641  * are not repeatable.
1642  */
1643 static void
1644 start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
1645 {
1646  char *sync_slotname = NULL;
1647 
1649 
1650  PG_TRY();
1651  {
1652  /* Call initial sync. */
1653  sync_slotname = LogicalRepSyncTableStart(origin_startpos);
1654  }
1655  PG_CATCH();
1656  {
1659  else
1660  {
1661  /*
1662  * Report the worker failed during table synchronization. Abort
1663  * the current transaction so that the stats message is sent in an
1664  * idle state.
1665  */
1668 
1669  PG_RE_THROW();
1670  }
1671  }
1672  PG_END_TRY();
1673 
1674  /* allocate slot name in long-lived context */
1675  *slotname = MemoryContextStrdup(ApplyContext, sync_slotname);
1676  pfree(sync_slotname);
1677 }
1678 
1679 /*
1680  * Runs the tablesync worker.
1681  *
1682  * It starts syncing tables. After a successful sync, sets streaming options
1683  * and starts streaming to catchup with apply worker.
1684  */
1685 static void
1687 {
1688  char originname[NAMEDATALEN];
1689  XLogRecPtr origin_startpos = InvalidXLogRecPtr;
1690  char *slotname = NULL;
1692 
1693  start_table_sync(&origin_startpos, &slotname);
1694 
1697  originname,
1698  sizeof(originname));
1699 
1700  set_apply_error_context_origin(originname);
1701 
1702  set_stream_options(&options, slotname, &origin_startpos);
1703 
1705 
1706  /* Apply the changes till we catchup with the apply worker. */
1707  start_apply(origin_startpos);
1708 }
1709 
1710 /* Logical Replication Tablesync worker entry point */
1711 void
1713 {
1714  int worker_slot = DatumGetInt32(main_arg);
1715 
1716  SetupApplyOrSyncWorker(worker_slot);
1717 
1719 
1720  finish_sync_worker();
1721 }
1722 
1723 /*
1724  * If the subscription has no tables then return false.
1725  *
1726  * Otherwise, are all tablesyncs READY?
1727  *
1728  * Note: This function is not suitable to be called from outside of apply or
1729  * tablesync workers because MySubscription needs to be already initialized.
1730  */
1731 bool
1733 {
1734  bool started_tx = false;
1735  bool has_subrels = false;
1736 
1737  /* We need up-to-date sync state info for subscription tables here. */
1738  has_subrels = FetchTableStates(&started_tx);
1739 
1740  if (started_tx)
1741  {
1743  pgstat_report_stat(true);
1744  }
1745 
1746  /*
1747  * Return false when there are no tables in subscription or not all tables
1748  * are in ready state; true otherwise.
1749  */
1750  return has_subrels && (table_states_not_ready == NIL);
1751 }
1752 
1753 /*
1754  * Update the two_phase state of the specified subscription in pg_subscription.
1755  */
1756 void
1757 UpdateTwoPhaseState(Oid suboid, char new_state)
1758 {
1759  Relation rel;
1760  HeapTuple tup;
1761  bool nulls[Natts_pg_subscription];
1762  bool replaces[Natts_pg_subscription];
1763  Datum values[Natts_pg_subscription];
1764 
1766  new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1767  new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1768 
1769  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1770  tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
1771  if (!HeapTupleIsValid(tup))
1772  elog(ERROR,
1773  "cache lookup failed for subscription oid %u",
1774  suboid);
1775 
1776  /* Form a new tuple. */
1777  memset(values, 0, sizeof(values));
1778  memset(nulls, false, sizeof(nulls));
1779  memset(replaces, false, sizeof(replaces));
1780 
1781  /* And update/set two_phase state */
1782  values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
1783  replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1784 
1785  tup = heap_modify_tuple(tup, RelationGetDescr(rel),
1786  values, nulls, replaces);
1787  CatalogTupleUpdate(rel, &tup->t_self, tup);
1788 
1789  heap_freetuple(tup);
1791 }
AclResult
Definition: acl.h:182
@ ACLCHECK_OK
Definition: acl.h:183
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2698
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4089
#define ARR_DATA_PTR(a)
Definition: array.h:322
#define DatumGetArrayTypeP(X)
Definition: array.h:261
#define ARR_DIMS(a)
Definition: array.h:294
int16 AttrNumber
Definition: attnum.h:21
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
Definition: worker.c:4430
void start_apply(XLogRecPtr origin_startpos)
Definition: worker.c:4499
void DisableSubscriptionAndExit(void)
Definition: worker.c:4804
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:426
void set_apply_error_context_origin(char *originname)
Definition: worker.c:5145
MemoryContext ApplyContext
Definition: worker.c:292
void SetupApplyOrSyncWorker(int worker_slot)
Definition: worker.c:4730
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:297
Subscription * MySubscription
Definition: worker.c:299
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1780
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1644
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1608
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:510
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:815
static Datum values[MAXATTR]
Definition: bootstrap.c:150
#define TextDatumGetCString(d)
Definition: builtins.h:98
unsigned int uint32
Definition: c.h:506
signed short int16
Definition: c.h:493
#define Max(x, y)
Definition: c.h:998
#define Assert(condition)
Definition: c.h:858
#define UINT64_FORMAT
Definition: c.h:549
#define lengthof(array)
Definition: c.h:788
#define OidIsValid(objectId)
Definition: c.h:775
size_t Size
Definition: c.h:605
CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
Definition: copyfrom.c:1380
uint64 CopyFrom(CopyFromState cstate)
Definition: copyfrom.c:640
int64 TimestampTz
Definition: timestamp.h:39
#define DSM_HANDLE_INVALID
Definition: dsm_impl.h:58
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:865
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define LOG
Definition: elog.h:31
#define PG_RE_THROW()
Definition: elog.h:412
#define PG_TRY(...)
Definition: elog.h:371
#define PG_END_TRY(...)
Definition: elog.h:396
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define PG_CATCH(...)
Definition: elog.h:381
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
void err(int eval, const char *fmt,...)
Definition: err.c:43
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1341
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:86
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1325
struct Latch * MyLatch
Definition: globals.c:62
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition: heaptuple.c:1209
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1434
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define MaxTupleAttributeNumber
Definition: htup_details.h:34
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:313
void proc_exit(int code)
Definition: ipc.c:104
int i
Definition: isn.c:73
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:565
void ResetLatch(Latch *latch)
Definition: latch.c:724
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:517
#define WL_SOCKET_READABLE
Definition: latch.h:128
#define WL_TIMEOUT
Definition: latch.h:130
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm)
Definition: launcher.c:306
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
Definition: launcher.c:243
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:702
void logicalrep_worker_wakeup(Oid subid, Oid relid)
Definition: launcher.c:682
static dshash_table * last_start_times
Definition: launcher.c:89
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:54
int max_sync_workers_per_subscription
Definition: launcher.c:51
int logicalrep_sync_worker_count(Oid subid)
Definition: launcher.c:854
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition: launcher.c:1081
List * lappend(List *list, void *datum)
Definition: list.c:339
void list_free_deep(List *list)
Definition: list.c:1560
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:227
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:108
#define NoLock
Definition: lockdefs.h:34
#define AccessShareLock
Definition: lockdefs.h:36
#define RowExclusiveLock
Definition: lockdefs.h:38
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3366
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1928
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1168
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1781
@ LW_SHARED
Definition: lwlock.h:115
DefElem * makeDefElem(char *name, Node *arg, int location)
Definition: makefuncs.c:564
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc0(Size size)
Definition: mcxt.c:1347
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1683
MemoryContext CacheMemoryContext
Definition: mcxt.c:152
void * palloc(Size size)
Definition: mcxt.c:1317
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition: miscinit.c:980
Oid GetUserId(void)
Definition: miscinit.c:514
ObjectType get_relkind_objtype(char relkind)
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:161
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:225
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:256
void replorigin_session_reset(void)
Definition: origin.c:1194
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition: origin.c:415
RepOriginId replorigin_session_origin
Definition: origin.c:159
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:892
void replorigin_session_setup(RepOriginId node, int acquired_by)
Definition: origin.c:1101
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1241
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:160
#define InvalidRepOriginId
Definition: origin.h:33
ParseState * make_parsestate(ParseState *parentParseState)
Definition: parse_node.c:39
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, int lockmode, Alias *alias, bool inh, bool inFromCl)
#define ACL_INSERT
Definition: parsenodes.h:76
int16 attnum
Definition: pg_attribute.h:74
void * arg
NameData relname
Definition: pg_class.h:38
#define NAMEDATALEN
const void size_t len
#define lfirst(lc)
Definition: pg_list.h:172
#define NIL
Definition: pg_list.h:68
#define foreach_current_index(var_or_cell)
Definition: pg_list.h:403
#define list_make1(x1)
Definition: pg_list.h:212
#define for_each_from(cell, lst, N)
Definition: pg_list.h:414
#define linitial(l)
Definition: pg_list.h:178
#define foreach_node(type, var, lst)
Definition: pg_list.h:496
static char ** options
List * GetSubscriptionRelations(Oid subid, bool not_ready)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
bool HasSubscriptionRelations(Oid subid)
#define LOGICALREP_TWOPHASE_STATE_DISABLED
#define LOGICALREP_TWOPHASE_STATE_PENDING
#define LOGICALREP_TWOPHASE_STATE_ENABLED
static char * buf
Definition: pg_test_fsync.c:73
long pgstat_report_stat(bool force)
Definition: pgstat.c:660
void pgstat_report_subscription_error(Oid subid, bool is_apply_error)
int pgsocket
Definition: port.h:29
#define snprintf
Definition: port.h:238
#define PGINVALID_SOCKET
Definition: port.h:31
static bool DatumGetBool(Datum X)
Definition: postgres.h:90
uintptr_t Datum
Definition: postgres.h:64
static Oid DatumGetObjectId(Datum X)
Definition: postgres.h:242
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
static char DatumGetChar(Datum X)
Definition: postgres.h:112
static int16 DatumGetInt16(Datum X)
Definition: postgres.h:162
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:202
static Datum CharGetDatum(char X)
Definition: postgres.h:122
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
static int fd(const char *x, int i)
Definition: preproc-init.c:105
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:103
MemoryContextSwitchTo(old_ctx)
tree ctl
Definition: radixtree.h:1853
#define RelationGetRelid(relation)
Definition: rel.h:505
#define RelationGetDescr(relation)
Definition: rel.h:531
#define RelationGetRelationName(relation)
Definition: rel.h:539
#define RelationGetNamespace(relation)
Definition: rel.h:546
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition: rls.c:52
@ RLS_ENABLED
Definition: rls.h:45
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:12840
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:12924
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:216
void PushActiveSnapshot(Snapshot snapshot)
Definition: snapmgr.c:648
void PopActiveSnapshot(void)
Definition: snapmgr.c:743
void InvalidateCatalogSnapshot(void)
Definition: snapmgr.c:422
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:164
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:327
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:473
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:32
StringInfo makeStringInfo(void)
Definition: stringinfo.c:41
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:78
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:97
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:182
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:194
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
Definition: dynahash.c:220
ItemPointerData t_self
Definition: htup.h:65
Definition: pg_list.h:54
LogicalRepRelation remoterel
LogicalRepRelId remoteid
Definition: logicalproto.h:107
Bitmapset * attkeys
Definition: logicalproto.h:115
XLogRecPtr relstate_lsn
LogicalRepWorkerType type
Definition: nodes.h:129
Form_pg_class rd_rel
Definition: rel.h:111
Definition: value.h:64
Tuplestorestate * tuplestore
Definition: walreceiver.h:222
TupleDesc tupledesc
Definition: walreceiver.h:223
WalRcvExecStatus status
Definition: walreceiver.h:219
Definition: regguts.h:323
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:86
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
static List * table_states_not_ready
Definition: tablesync.c:134
bool AllTablesyncsReady(void)
Definition: tablesync.c:1732
static bool wait_for_worker_state_change(char expected_state)
Definition: tablesync.c:232
static char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:1283
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:281
SyncingTablesState
Definition: tablesync.c:127
@ SYNC_TABLE_STATE_REBUILD_STARTED
Definition: tablesync.c:129
@ SYNC_TABLE_STATE_VALID
Definition: tablesync.c:130
@ SYNC_TABLE_STATE_NEEDS_REBUILD
Definition: tablesync.c:128
static void pg_attribute_noreturn() finish_sync_worker(void)
Definition: tablesync.c:143
static void process_syncing_tables_for_apply(XLogRecPtr current_lsn)
Definition: tablesync.c:418
void TablesyncWorkerMain(Datum main_arg)
Definition: tablesync.c:1712
static void process_syncing_tables_for_sync(XLogRecPtr current_lsn)
Definition: tablesync.c:295
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
Definition: tablesync.c:1267
static void run_tablesync_worker()
Definition: tablesync.c:1686
static int copy_read_data(void *outbuf, int minread, int maxread)
Definition: tablesync.c:718
static SyncingTablesState table_states_validity
Definition: tablesync.c:133
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:667
static void copy_table(Relation rel)
Definition: tablesync.c:1115
static bool wait_for_relation_state_change(Oid relid, char expected_state)
Definition: tablesync.c:184
static void start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
Definition: tablesync.c:1644
static StringInfo copybuf
Definition: tablesync.c:137
static bool FetchTableStates(bool *started_tx)
Definition: tablesync.c:1573
static void fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, List **qual)
Definition: tablesync.c:794
static List * make_copy_attnamelist(LogicalRepRelMapEntry *rel)
Definition: tablesync.c:698
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition: tablesync.c:1757
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
Definition: tuplestore.c:1130
int64 tuplestore_tuple_count(Tuplestorestate *state)
Definition: tuplestore.c:580
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:454
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: tuptable.h:395
void SwitchToUntrustedUser(Oid userid, UserContext *context)
Definition: usercontext.c:33
void RestoreUserContext(UserContext *context)
Definition: usercontext.c:87
String * makeString(char *str)
Definition: value.c:63
#define strVal(v)
Definition: value.h:82
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:450
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
Definition: walreceiver.h:434
@ WALRCV_OK_COMMAND
Definition: walreceiver.h:204
@ WALRCV_OK_TUPLES
Definition: walreceiver.h:206
@ WALRCV_OK_COPY_OUT
Definition: walreceiver.h:208
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
Definition: walreceiver.h:458
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:470
#define walrcv_server_version(conn)
Definition: walreceiver.h:446
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:452
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:464
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:454
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ WORKERTYPE_TABLESYNC
@ WORKERTYPE_UNKNOWN
@ WORKERTYPE_PARALLEL_APPLY
@ WORKERTYPE_APPLY
static bool am_tablesync_worker(void)
bool IsTransactionState(void)
Definition: xact.c:386
void CommandCounterIncrement(void)
Definition: xact.c:1099
void StartTransactionCommand(void)
Definition: xact.c:3039
void CommitTransactionCommand(void)
Definition: xact.c:3137
void AbortOutOfAnyTransaction(void)
Definition: xact.c:4855
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4561
XLogRecPtr GetXLogWriteRecPtr(void)
Definition: xlog.c:9443
int wal_retrieve_retry_interval
Definition: xlog.c:133
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2795
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint32 TimeLineID
Definition: xlogdefs.h:59