PostgreSQL Source Code  git master
tablesync.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * tablesync.c
3  * PostgreSQL logical replication: initial table data synchronization
4  *
5  * Copyright (c) 2012-2022, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/tablesync.c
9  *
10  * NOTES
11  * This file contains code for initial table data synchronization for
12  * logical replication.
13  *
14  * The initial data synchronization is done separately for each table,
15  * in a separate apply worker that only fetches the initial snapshot data
16  * from the publisher and then synchronizes the position in the stream with
17  * the main apply worker.
18  *
19  * There are several reasons for doing the synchronization this way:
20  * - It allows us to parallelize the initial data synchronization
21  * which lowers the time needed for it to happen.
22  * - The initial synchronization does not have to hold the xid and LSN
23  * for the time it takes to copy data of all tables, causing less
24  * bloat and lower disk consumption compared to doing the
25  * synchronization in a single process for the whole database.
26  * - It allows us to synchronize any tables added after the initial
27  * synchronization has finished.
28  *
29  * The stream position synchronization works in multiple steps:
30  * - Apply worker requests a tablesync worker to start, setting the new
31  * table state to INIT.
32  * - Tablesync worker starts; changes table state from INIT to DATASYNC while
33  * copying.
34  * - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
35  * worker specific) state to indicate when the copy phase has completed, so
36  * if the worker crashes with this (non-memory) state then the copy will not
37  * be re-attempted.
38  * - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
39  * - Apply worker periodically checks for tables in SYNCWAIT state. When
40  * any appear, it sets the table state to CATCHUP and starts loop-waiting
41  * until either the table state is set to SYNCDONE or the sync worker
42  * exits.
43  * - After the sync worker has seen the state change to CATCHUP, it will
44  * read the stream and apply changes (acting like an apply worker) until
45  * it catches up to the specified stream position. Then it sets the
46  * state to SYNCDONE. There might be zero changes applied between
47  * CATCHUP and SYNCDONE, because the sync worker might be ahead of the
48  * apply worker.
49  * - Once the state is set to SYNCDONE, the apply will continue tracking
50  * the table until it reaches the SYNCDONE stream position, at which
51  * point it sets state to READY and stops tracking. Again, there might
52  * be zero changes in between.
53  *
54  * So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
55  * -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
56  *
57  * The catalog pg_subscription_rel is used to keep information about
58  * subscribed tables and their state. The catalog holds all states
59  * except SYNCWAIT and CATCHUP which are only in shared memory.
60  *
61  * Example flows look like this:
62  * - Apply is in front:
63  * sync:8
64  * -> set in catalog FINISHEDCOPY
65  * -> set in memory SYNCWAIT
66  * apply:10
67  * -> set in memory CATCHUP
68  * -> enter wait-loop
69  * sync:10
70  * -> set in catalog SYNCDONE
71  * -> exit
72  * apply:10
73  * -> exit wait-loop
74  * -> continue rep
75  * apply:11
76  * -> set in catalog READY
77  *
78  * - Sync is in front:
79  * sync:10
80  * -> set in catalog FINISHEDCOPY
81  * -> set in memory SYNCWAIT
82  * apply:8
83  * -> set in memory CATCHUP
84  * -> continue per-table filtering
85  * sync:10
86  * -> set in catalog SYNCDONE
87  * -> exit
88  * apply:10
89  * -> set in catalog READY
90  * -> stop per-table filtering
91  * -> continue rep
92  *-------------------------------------------------------------------------
93  */
94 
95 #include "postgres.h"
96 
97 #include "access/table.h"
98 #include "access/xact.h"
99 #include "catalog/indexing.h"
101 #include "catalog/pg_type.h"
102 #include "commands/copy.h"
103 #include "miscadmin.h"
104 #include "parser/parse_relation.h"
105 #include "pgstat.h"
108 #include "replication/walreceiver.h"
110 #include "replication/slot.h"
111 #include "replication/origin.h"
112 #include "storage/ipc.h"
113 #include "storage/lmgr.h"
114 #include "utils/acl.h"
115 #include "utils/array.h"
116 #include "utils/builtins.h"
117 #include "utils/lsyscache.h"
118 #include "utils/memutils.h"
119 #include "utils/rls.h"
120 #include "utils/snapmgr.h"
121 #include "utils/syscache.h"
122 
123 static bool table_states_valid = false;
125 static bool FetchTableStates(bool *started_tx);
126 
127 static StringInfo copybuf = NULL;
128 
129 /*
130  * Exit routine for synchronization worker.
131  */
132 static void
134 finish_sync_worker(void)
135 {
136  /*
137  * Commit any outstanding transaction. This is the usual case, unless
138  * there was nothing to do for the table.
139  */
140  if (IsTransactionState())
141  {
143  pgstat_report_stat(true);
144  }
145 
146  /* And flush all writes. */
148 
150  ereport(LOG,
151  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
155 
156  /* Find the main apply worker and signal it. */
158 
159  /* Stop gracefully */
160  proc_exit(0);
161 }
162 
163 /*
164  * Wait until the relation sync state is set in the catalog to the expected
165  * one; return true when it happens.
166  *
167  * Returns false if the table sync worker or the table itself have
168  * disappeared, or the table state has been reset.
169  *
170  * Currently, this is used in the apply worker when transitioning from
171  * CATCHUP state to SYNCDONE.
172  */
173 static bool
174 wait_for_relation_state_change(Oid relid, char expected_state)
175 {
176  char state;
177 
178  for (;;)
179  {
180  LogicalRepWorker *worker;
181  XLogRecPtr statelsn;
182 
184 
187  relid, &statelsn);
188 
189  if (state == SUBREL_STATE_UNKNOWN)
190  break;
191 
192  if (state == expected_state)
193  return true;
194 
195  /* Check if the sync worker is still running and bail if not. */
196  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
198  false);
199  LWLockRelease(LogicalRepWorkerLock);
200  if (!worker)
201  break;
202 
203  (void) WaitLatch(MyLatch,
206 
208  }
209 
210  return false;
211 }
212 
213 /*
214  * Wait until the apply worker changes the state of our synchronization
215  * worker to the expected one.
216  *
217  * Used when transitioning from SYNCWAIT state to CATCHUP.
218  *
219  * Returns false if the apply worker has disappeared.
220  */
221 static bool
222 wait_for_worker_state_change(char expected_state)
223 {
224  int rc;
225 
226  for (;;)
227  {
228  LogicalRepWorker *worker;
229 
231 
232  /*
233  * Done if already in correct state. (We assume this fetch is atomic
234  * enough to not give a misleading answer if we do it with no lock.)
235  */
236  if (MyLogicalRepWorker->relstate == expected_state)
237  return true;
238 
239  /*
240  * Bail out if the apply worker has died, else signal it we're
241  * waiting.
242  */
243  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
245  InvalidOid, false);
246  if (worker && worker->proc)
248  LWLockRelease(LogicalRepWorkerLock);
249  if (!worker)
250  break;
251 
252  /*
253  * Wait. We expect to get a latch signal back from the apply worker,
254  * but use a timeout in case it dies without sending one.
255  */
256  rc = WaitLatch(MyLatch,
259 
260  if (rc & WL_LATCH_SET)
262  }
263 
264  return false;
265 }
266 
267 /*
268  * Callback from syscache invalidation.
269  */
270 void
272 {
273  table_states_valid = false;
274 }
275 
276 /*
277  * Handle table synchronization cooperation from the synchronization
278  * worker.
279  *
280  * If the sync worker is in CATCHUP state and reached (or passed) the
281  * predetermined synchronization point in the WAL stream, mark the table as
282  * SYNCDONE and finish.
283  */
284 static void
286 {
288 
289  if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
290  current_lsn >= MyLogicalRepWorker->relstate_lsn)
291  {
292  TimeLineID tli;
293  char syncslotname[NAMEDATALEN] = {0};
294 
295  MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
296  MyLogicalRepWorker->relstate_lsn = current_lsn;
297 
299 
300  /*
301  * UpdateSubscriptionRelState must be called within a transaction.
302  * That transaction will be ended within the finish_sync_worker().
303  */
304  if (!IsTransactionState())
306 
311 
312  /*
313  * End streaming so that LogRepWorkerWalRcvConn can be used to drop
314  * the slot.
315  */
317 
318  /*
319  * Cleanup the tablesync slot.
320  *
321  * This has to be done after updating the state because otherwise if
322  * there is an error while doing the database operations we won't be
323  * able to rollback dropped slot.
324  */
327  syncslotname,
328  sizeof(syncslotname));
329 
330  /*
331  * It is important to give an error if we are unable to drop the slot,
332  * otherwise, it won't be dropped till the corresponding subscription
333  * is dropped. So passing missing_ok = false.
334  */
336 
337  finish_sync_worker();
338  }
339  else
341 }
342 
343 /*
344  * Handle table synchronization cooperation from the apply worker.
345  *
346  * Walk over all subscription tables that are individually tracked by the
347  * apply process (currently, all that have state other than
348  * SUBREL_STATE_READY) and manage synchronization for them.
349  *
350  * If there are tables that need synchronizing and are not being synchronized
351  * yet, start sync workers for them (if there are free slots for sync
352  * workers). To prevent starting the sync worker for the same relation at a
353  * high frequency after a failure, we store its last start time with each sync
354  * state info. We start the sync worker for the same relation after waiting
355  * at least wal_retrieve_retry_interval.
356  *
357  * For tables that are being synchronized already, check if sync workers
358  * either need action from the apply worker or have finished. This is the
359  * SYNCWAIT to CATCHUP transition.
360  *
361  * If the synchronization position is reached (SYNCDONE), then the table can
362  * be marked as READY and is no longer tracked.
363  */
364 static void
366 {
367  struct tablesync_start_time_mapping
368  {
369  Oid relid;
370  TimestampTz last_start_time;
371  };
372  static HTAB *last_start_times = NULL;
373  ListCell *lc;
374  bool started_tx = false;
375 
377 
378  /* We need up-to-date sync state info for subscription tables here. */
379  FetchTableStates(&started_tx);
380 
381  /*
382  * Prepare a hash table for tracking last start times of workers, to avoid
383  * immediate restarts. We don't need it if there are no tables that need
384  * syncing.
385  */
386  if (table_states_not_ready && !last_start_times)
387  {
388  HASHCTL ctl;
389 
390  ctl.keysize = sizeof(Oid);
391  ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
392  last_start_times = hash_create("Logical replication table sync worker start times",
393  256, &ctl, HASH_ELEM | HASH_BLOBS);
394  }
395 
396  /*
397  * Clean up the hash table when we're done with all tables (just to
398  * release the bit of memory).
399  */
400  else if (!table_states_not_ready && last_start_times)
401  {
402  hash_destroy(last_start_times);
403  last_start_times = NULL;
404  }
405 
406  /*
407  * Even when the two_phase mode is requested by the user, it remains as
408  * 'pending' until all tablesyncs have reached READY state.
409  *
410  * When this happens, we restart the apply worker and (if the conditions
411  * are still ok) then the two_phase tri-state will become 'enabled' at
412  * that time.
413  *
414  * Note: If the subscription has no tables then leave the state as
415  * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
416  * work.
417  */
420  {
421  ereport(LOG,
422  (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
423  MySubscription->name)));
424 
425  proc_exit(0);
426  }
427 
428  /*
429  * Process all tables that are being synchronized.
430  */
431  foreach(lc, table_states_not_ready)
432  {
434 
435  if (rstate->state == SUBREL_STATE_SYNCDONE)
436  {
437  /*
438  * Apply has caught up to the position where the table sync has
439  * finished. Mark the table as ready so that the apply will just
440  * continue to replicate it normally.
441  */
442  if (current_lsn >= rstate->lsn)
443  {
444  char originname[NAMEDATALEN];
445 
446  rstate->state = SUBREL_STATE_READY;
447  rstate->lsn = current_lsn;
448  if (!started_tx)
449  {
451  started_tx = true;
452  }
453 
454  /*
455  * Remove the tablesync origin tracking if exists.
456  *
457  * The normal case origin drop is done here instead of in the
458  * process_syncing_tables_for_sync function because we don't
459  * allow to drop the origin till the process owning the origin
460  * is alive.
461  *
462  * There is a chance that the user is concurrently performing
463  * refresh for the subscription where we remove the table
464  * state and its origin and by this time the origin might be
465  * already removed. So passing missing_ok = true.
466  */
468  rstate->relid,
469  originname,
470  sizeof(originname));
471  replorigin_drop_by_name(originname, true, false);
472 
473  /*
474  * Update the state to READY only after the origin cleanup.
475  */
477  rstate->relid, rstate->state,
478  rstate->lsn);
479  }
480  }
481  else
482  {
483  LogicalRepWorker *syncworker;
484 
485  /*
486  * Look for a sync worker for this relation.
487  */
488  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
489 
491  rstate->relid, false);
492 
493  if (syncworker)
494  {
495  /* Found one, update our copy of its state */
496  SpinLockAcquire(&syncworker->relmutex);
497  rstate->state = syncworker->relstate;
498  rstate->lsn = syncworker->relstate_lsn;
499  if (rstate->state == SUBREL_STATE_SYNCWAIT)
500  {
501  /*
502  * Sync worker is waiting for apply. Tell sync worker it
503  * can catchup now.
504  */
505  syncworker->relstate = SUBREL_STATE_CATCHUP;
506  syncworker->relstate_lsn =
507  Max(syncworker->relstate_lsn, current_lsn);
508  }
509  SpinLockRelease(&syncworker->relmutex);
510 
511  /* If we told worker to catch up, wait for it. */
512  if (rstate->state == SUBREL_STATE_SYNCWAIT)
513  {
514  /* Signal the sync worker, as it may be waiting for us. */
515  if (syncworker->proc)
516  logicalrep_worker_wakeup_ptr(syncworker);
517 
518  /* Now safe to release the LWLock */
519  LWLockRelease(LogicalRepWorkerLock);
520 
521  /*
522  * Enter busy loop and wait for synchronization worker to
523  * reach expected state (or die trying).
524  */
525  if (!started_tx)
526  {
528  started_tx = true;
529  }
530 
532  SUBREL_STATE_SYNCDONE);
533  }
534  else
535  LWLockRelease(LogicalRepWorkerLock);
536  }
537  else
538  {
539  /*
540  * If there is no sync worker for this table yet, count
541  * running sync workers for this subscription, while we have
542  * the lock.
543  */
544  int nsyncworkers =
546 
547  /* Now safe to release the LWLock */
548  LWLockRelease(LogicalRepWorkerLock);
549 
550  /*
551  * If there are free sync worker slot(s), start a new sync
552  * worker for the table.
553  */
554  if (nsyncworkers < max_sync_workers_per_subscription)
555  {
557  struct tablesync_start_time_mapping *hentry;
558  bool found;
559 
560  hentry = hash_search(last_start_times, &rstate->relid,
561  HASH_ENTER, &found);
562 
563  if (!found ||
564  TimestampDifferenceExceeds(hentry->last_start_time, now,
566  {
571  rstate->relid);
572  hentry->last_start_time = now;
573  }
574  }
575  }
576  }
577  }
578 
579  if (started_tx)
580  {
582  pgstat_report_stat(true);
583  }
584 }
585 
586 /*
587  * Process possible state change(s) of tables that are being synchronized.
588  */
589 void
591 {
592  if (am_tablesync_worker())
593  process_syncing_tables_for_sync(current_lsn);
594  else
596 }
597 
598 /*
599  * Create list of columns for COPY based on logical relation mapping.
600  */
601 static List *
603 {
604  List *attnamelist = NIL;
605  int i;
606 
607  for (i = 0; i < rel->remoterel.natts; i++)
608  {
609  attnamelist = lappend(attnamelist,
610  makeString(rel->remoterel.attnames[i]));
611  }
612 
613 
614  return attnamelist;
615 }
616 
617 /*
618  * Data source callback for the COPY FROM, which reads from the remote
619  * connection and passes the data back to our local COPY.
620  */
621 static int
622 copy_read_data(void *outbuf, int minread, int maxread)
623 {
624  int bytesread = 0;
625  int avail;
626 
627  /* If there are some leftover data from previous read, use it. */
628  avail = copybuf->len - copybuf->cursor;
629  if (avail)
630  {
631  if (avail > maxread)
632  avail = maxread;
633  memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
634  copybuf->cursor += avail;
635  maxread -= avail;
636  bytesread += avail;
637  }
638 
639  while (maxread > 0 && bytesread < minread)
640  {
642  int len;
643  char *buf = NULL;
644 
645  for (;;)
646  {
647  /* Try read the data. */
649 
651 
652  if (len == 0)
653  break;
654  else if (len < 0)
655  return bytesread;
656  else
657  {
658  /* Process the data */
659  copybuf->data = buf;
660  copybuf->len = len;
661  copybuf->cursor = 0;
662 
663  avail = copybuf->len - copybuf->cursor;
664  if (avail > maxread)
665  avail = maxread;
666  memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
667  outbuf = (void *) ((char *) outbuf + avail);
668  copybuf->cursor += avail;
669  maxread -= avail;
670  bytesread += avail;
671  }
672 
673  if (maxread <= 0 || bytesread >= minread)
674  return bytesread;
675  }
676 
677  /*
678  * Wait for more data or latch.
679  */
680  (void) WaitLatchOrSocket(MyLatch,
684 
686  }
687 
688  return bytesread;
689 }
690 
691 
692 /*
693  * Get information about remote relation in similar fashion the RELATION
694  * message provides during replication. This function also returns the relation
695  * qualifications to be used in the COPY command.
696  */
697 static void
698 fetch_remote_table_info(char *nspname, char *relname,
699  LogicalRepRelation *lrel, List **qual)
700 {
702  StringInfoData cmd;
703  TupleTableSlot *slot;
704  Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
705  Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
706  Oid qualRow[] = {TEXTOID};
707  bool isnull;
708  int natt;
709  ListCell *lc;
710  bool first;
711  Bitmapset *included_cols = NULL;
712 
713  lrel->nspname = nspname;
714  lrel->relname = relname;
715 
716  /* First fetch Oid and replica identity. */
717  initStringInfo(&cmd);
718  appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
719  " FROM pg_catalog.pg_class c"
720  " INNER JOIN pg_catalog.pg_namespace n"
721  " ON (c.relnamespace = n.oid)"
722  " WHERE n.nspname = %s"
723  " AND c.relname = %s",
724  quote_literal_cstr(nspname),
727  lengthof(tableRow), tableRow);
728 
729  if (res->status != WALRCV_OK_TUPLES)
730  ereport(ERROR,
731  (errcode(ERRCODE_CONNECTION_FAILURE),
732  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
733  nspname, relname, res->err)));
734 
735  slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
736  if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
737  ereport(ERROR,
738  (errcode(ERRCODE_UNDEFINED_OBJECT),
739  errmsg("table \"%s.%s\" not found on publisher",
740  nspname, relname)));
741 
742  lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
743  Assert(!isnull);
744  lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
745  Assert(!isnull);
746  lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
747  Assert(!isnull);
748 
751 
752 
753  /*
754  * Get column lists for each relation.
755  *
756  * We need to do this before fetching info about column names and types,
757  * so that we can skip columns that should not be replicated.
758  */
760  {
761  WalRcvExecResult *pubres;
762  TupleTableSlot *slot;
763  Oid attrsRow[] = {INT2VECTOROID};
764  StringInfoData pub_names;
765  bool first = true;
766 
767  initStringInfo(&pub_names);
768  foreach(lc, MySubscription->publications)
769  {
770  if (!first)
771  appendStringInfo(&pub_names, ", ");
773  first = false;
774  }
775 
776  /*
777  * Fetch info about column lists for the relation (from all the
778  * publications).
779  */
780  resetStringInfo(&cmd);
781  appendStringInfo(&cmd,
782  "SELECT DISTINCT"
783  " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
784  " THEN NULL ELSE gpt.attrs END)"
785  " FROM pg_publication p,"
786  " LATERAL pg_get_publication_tables(p.pubname) gpt,"
787  " pg_class c"
788  " WHERE gpt.relid = %u AND c.oid = gpt.relid"
789  " AND p.pubname IN ( %s )",
790  lrel->remoteid,
791  pub_names.data);
792 
794  lengthof(attrsRow), attrsRow);
795 
796  if (pubres->status != WALRCV_OK_TUPLES)
797  ereport(ERROR,
798  (errcode(ERRCODE_CONNECTION_FAILURE),
799  errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
800  nspname, relname, pubres->err)));
801 
802  /*
803  * We don't support the case where the column list is different for
804  * the same table when combining publications. See comments atop
805  * fetch_table_list. So there should be only one row returned.
806  * Although we already checked this when creating the subscription, we
807  * still need to check here in case the column list was changed after
808  * creating the subscription and before the sync worker is started.
809  */
810  if (tuplestore_tuple_count(pubres->tuplestore) > 1)
811  ereport(ERROR,
812  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
813  errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
814  nspname, relname));
815 
816  /*
817  * Get the column list and build a single bitmap with the attnums.
818  *
819  * If we find a NULL value, it means all the columns should be
820  * replicated.
821  */
823  if (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot))
824  {
825  Datum cfval = slot_getattr(slot, 1, &isnull);
826 
827  if (!isnull)
828  {
829  ArrayType *arr;
830  int nelems;
831  int16 *elems;
832 
833  arr = DatumGetArrayTypeP(cfval);
834  nelems = ARR_DIMS(arr)[0];
835  elems = (int16 *) ARR_DATA_PTR(arr);
836 
837  for (natt = 0; natt < nelems; natt++)
838  included_cols = bms_add_member(included_cols, elems[natt]);
839  }
840 
841  ExecClearTuple(slot);
842  }
844 
845  walrcv_clear_result(pubres);
846 
847  pfree(pub_names.data);
848  }
849 
850  /*
851  * Now fetch column names and types.
852  */
853  resetStringInfo(&cmd);
854  appendStringInfo(&cmd,
855  "SELECT a.attnum,"
856  " a.attname,"
857  " a.atttypid,"
858  " a.attnum = ANY(i.indkey)"
859  " FROM pg_catalog.pg_attribute a"
860  " LEFT JOIN pg_catalog.pg_index i"
861  " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
862  " WHERE a.attnum > 0::pg_catalog.int2"
863  " AND NOT a.attisdropped %s"
864  " AND a.attrelid = %u"
865  " ORDER BY a.attnum",
866  lrel->remoteid,
868  "AND a.attgenerated = ''" : ""),
869  lrel->remoteid);
871  lengthof(attrRow), attrRow);
872 
873  if (res->status != WALRCV_OK_TUPLES)
874  ereport(ERROR,
875  (errcode(ERRCODE_CONNECTION_FAILURE),
876  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
877  nspname, relname, res->err)));
878 
879  /* We don't know the number of rows coming, so allocate enough space. */
880  lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
881  lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
882  lrel->attkeys = NULL;
883 
884  /*
885  * Store the columns as a list of names. Ignore those that are not
886  * present in the column list, if there is one.
887  */
888  natt = 0;
889  slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
890  while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
891  {
892  char *rel_colname;
894 
895  attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
896  Assert(!isnull);
897 
898  /* If the column is not in the column list, skip it. */
899  if (included_cols != NULL && !bms_is_member(attnum, included_cols))
900  {
901  ExecClearTuple(slot);
902  continue;
903  }
904 
905  rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
906  Assert(!isnull);
907 
908  lrel->attnames[natt] = rel_colname;
909  lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
910  Assert(!isnull);
911 
912  if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
913  lrel->attkeys = bms_add_member(lrel->attkeys, natt);
914 
915  /* Should never happen. */
916  if (++natt >= MaxTupleAttributeNumber)
917  elog(ERROR, "too many columns in remote table \"%s.%s\"",
918  nspname, relname);
919 
920  ExecClearTuple(slot);
921  }
923 
924  lrel->natts = natt;
925 
927 
928  /*
929  * Get relation's row filter expressions. DISTINCT avoids the same
930  * expression of a table in multiple publications from being included
931  * multiple times in the final expression.
932  *
933  * We need to copy the row even if it matches just one of the
934  * publications, so we later combine all the quals with OR.
935  *
936  * For initial synchronization, row filtering can be ignored in following
937  * cases:
938  *
939  * 1) one of the subscribed publications for the table hasn't specified
940  * any row filter
941  *
942  * 2) one of the subscribed publications has puballtables set to true
943  *
944  * 3) one of the subscribed publications is declared as ALL TABLES IN
945  * SCHEMA that includes this relation
946  */
948  {
949  StringInfoData pub_names;
950 
951  /* Build the pubname list. */
952  initStringInfo(&pub_names);
953  first = true;
954  foreach(lc, MySubscription->publications)
955  {
956  char *pubname = strVal(lfirst(lc));
957 
958  if (first)
959  first = false;
960  else
961  appendStringInfoString(&pub_names, ", ");
962 
963  appendStringInfoString(&pub_names, quote_literal_cstr(pubname));
964  }
965 
966  /* Check for row filters. */
967  resetStringInfo(&cmd);
968  appendStringInfo(&cmd,
969  "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
970  " FROM pg_publication p,"
971  " LATERAL pg_get_publication_tables(p.pubname) gpt"
972  " WHERE gpt.relid = %u"
973  " AND p.pubname IN ( %s )",
974  lrel->remoteid,
975  pub_names.data);
976 
977  res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
978 
979  if (res->status != WALRCV_OK_TUPLES)
980  ereport(ERROR,
981  (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
982  nspname, relname, res->err)));
983 
984  /*
985  * Multiple row filter expressions for the same table will be combined
986  * by COPY using OR. If any of the filter expressions for this table
987  * are null, it means the whole table will be copied. In this case it
988  * is not necessary to construct a unified row filter expression at
989  * all.
990  */
991  slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
992  while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
993  {
994  Datum rf = slot_getattr(slot, 1, &isnull);
995 
996  if (!isnull)
997  *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
998  else
999  {
1000  /* Ignore filters and cleanup as necessary. */
1001  if (*qual)
1002  {
1003  list_free_deep(*qual);
1004  *qual = NIL;
1005  }
1006  break;
1007  }
1008 
1009  ExecClearTuple(slot);
1010  }
1012 
1014  }
1015 
1016  pfree(cmd.data);
1017 }
1018 
1019 /*
1020  * Copy existing data of a table from publisher.
1021  *
1022  * Caller is responsible for locking the local relation.
1023  */
1024 static void
1026 {
1027  LogicalRepRelMapEntry *relmapentry;
1028  LogicalRepRelation lrel;
1029  List *qual = NIL;
1031  StringInfoData cmd;
1032  CopyFromState cstate;
1033  List *attnamelist;
1034  ParseState *pstate;
1035 
1036  /* Get the publisher relation info. */
1038  RelationGetRelationName(rel), &lrel, &qual);
1039 
1040  /* Put the relation into relmap. */
1041  logicalrep_relmap_update(&lrel);
1042 
1043  /* Map the publisher relation to local one. */
1044  relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
1045  Assert(rel == relmapentry->localrel);
1046 
1047  /* Start copy on the publisher. */
1048  initStringInfo(&cmd);
1049 
1050  /* Regular table with no row filter */
1051  if (lrel.relkind == RELKIND_RELATION && qual == NIL)
1052  {
1053  appendStringInfo(&cmd, "COPY %s (",
1055 
1056  /*
1057  * XXX Do we need to list the columns in all cases? Maybe we're
1058  * replicating all columns?
1059  */
1060  for (int i = 0; i < lrel.natts; i++)
1061  {
1062  if (i > 0)
1063  appendStringInfoString(&cmd, ", ");
1064 
1066  }
1067 
1068  appendStringInfo(&cmd, ") TO STDOUT");
1069  }
1070  else
1071  {
1072  /*
1073  * For non-tables and tables with row filters, we need to do COPY
1074  * (SELECT ...), but we can't just do SELECT * because we need to not
1075  * copy generated columns. For tables with any row filters, build a
1076  * SELECT query with OR'ed row filters for COPY.
1077  */
1078  appendStringInfoString(&cmd, "COPY (SELECT ");
1079  for (int i = 0; i < lrel.natts; i++)
1080  {
1082  if (i < lrel.natts - 1)
1083  appendStringInfoString(&cmd, ", ");
1084  }
1085 
1086  appendStringInfoString(&cmd, " FROM ");
1087 
1088  /*
1089  * For regular tables, make sure we don't copy data from a child that
1090  * inherits the named table as those will be copied separately.
1091  */
1092  if (lrel.relkind == RELKIND_RELATION)
1093  appendStringInfoString(&cmd, "ONLY ");
1094 
1096  /* list of OR'ed filters */
1097  if (qual != NIL)
1098  {
1099  ListCell *lc;
1100  char *q = strVal(linitial(qual));
1101 
1102  appendStringInfo(&cmd, " WHERE %s", q);
1103  for_each_from(lc, qual, 1)
1104  {
1105  q = strVal(lfirst(lc));
1106  appendStringInfo(&cmd, " OR %s", q);
1107  }
1108  list_free_deep(qual);
1109  }
1110 
1111  appendStringInfoString(&cmd, ") TO STDOUT");
1112  }
1113  res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
1114  pfree(cmd.data);
1115  if (res->status != WALRCV_OK_COPY_OUT)
1116  ereport(ERROR,
1117  (errcode(ERRCODE_CONNECTION_FAILURE),
1118  errmsg("could not start initial contents copy for table \"%s.%s\": %s",
1119  lrel.nspname, lrel.relname, res->err)));
1121 
1122  copybuf = makeStringInfo();
1123 
1124  pstate = make_parsestate(NULL);
1125  (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
1126  NULL, false, false);
1127 
1128  attnamelist = make_copy_attnamelist(relmapentry);
1129  cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, NIL);
1130 
1131  /* Do the copy */
1132  (void) CopyFrom(cstate);
1133 
1134  logicalrep_rel_close(relmapentry, NoLock);
1135 }
1136 
1137 /*
1138  * Determine the tablesync slot name.
1139  *
1140  * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
1141  * on slot name length. We append system_identifier to avoid slot_name
1142  * collision with subscriptions in other clusters. With the current scheme
1143  * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
1144  * length of slot_name will be 50.
1145  *
1146  * The returned slot name is stored in the supplied buffer (syncslotname) with
1147  * the given size.
1148  *
1149  * Note: We don't use the subscription slot name as part of tablesync slot name
1150  * because we are responsible for cleaning up these slots and it could become
1151  * impossible to recalculate what name to cleanup if the subscription slot name
1152  * had changed.
1153  */
1154 void
1156  char *syncslotname, int szslot)
1157 {
1158  snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
1159  relid, GetSystemIdentifier());
1160 }
1161 
1162 /*
1163  * Form the origin name for tablesync.
1164  *
1165  * Return the name in the supplied buffer.
1166  */
1167 void
1169  char *originname, int szorgname)
1170 {
1171  snprintf(originname, szorgname, "pg_%u_%u", suboid, relid);
1172 }
1173 
1174 /*
1175  * Start syncing the table in the sync worker.
1176  *
1177  * If nothing needs to be done to sync the table, we exit the worker without
1178  * any further action.
1179  *
1180  * The returned slot name is palloc'ed in current memory context.
1181  */
1182 char *
1184 {
1185  char *slotname;
1186  char *err;
1187  char relstate;
1188  XLogRecPtr relstate_lsn;
1189  Relation rel;
1190  AclResult aclresult;
1192  char originname[NAMEDATALEN];
1193  RepOriginId originid;
1194 
1195  /* Check the state of the table synchronization. */
1199  &relstate_lsn);
1201 
1203  MyLogicalRepWorker->relstate = relstate;
1204  MyLogicalRepWorker->relstate_lsn = relstate_lsn;
1206 
1207  /*
1208  * If synchronization is already done or no longer necessary, exit now
1209  * that we've updated shared memory state.
1210  */
1211  switch (relstate)
1212  {
1213  case SUBREL_STATE_SYNCDONE:
1214  case SUBREL_STATE_READY:
1215  case SUBREL_STATE_UNKNOWN:
1216  finish_sync_worker(); /* doesn't return */
1217  }
1218 
1219  /* Calculate the name of the tablesync slot. */
1220  slotname = (char *) palloc(NAMEDATALEN);
1223  slotname,
1224  NAMEDATALEN);
1225 
1226  /*
1227  * Here we use the slot name instead of the subscription name as the
1228  * application_name, so that it is different from the main apply worker,
1229  * so that synchronous replication can distinguish them.
1230  */
1232  walrcv_connect(MySubscription->conninfo, true, slotname, &err);
1233  if (LogRepWorkerWalRcvConn == NULL)
1234  ereport(ERROR,
1235  (errcode(ERRCODE_CONNECTION_FAILURE),
1236  errmsg("could not connect to the publisher: %s", err)));
1237 
1238  Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
1239  MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
1240  MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
1241 
1242  /* Assign the origin tracking record name. */
1245  originname,
1246  sizeof(originname));
1247 
1248  if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
1249  {
1250  /*
1251  * We have previously errored out before finishing the copy so the
1252  * replication slot might exist. We want to remove the slot if it
1253  * already exists and proceed.
1254  *
1255  * XXX We could also instead try to drop the slot, last time we failed
1256  * but for that, we might need to clean up the copy state as it might
1257  * be in the middle of fetching the rows. Also, if there is a network
1258  * breakdown then it wouldn't have succeeded so trying it next time
1259  * seems like a better bet.
1260  */
1262  }
1263  else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
1264  {
1265  /*
1266  * The COPY phase was previously done, but tablesync then crashed
1267  * before it was able to finish normally.
1268  */
1270 
1271  /*
1272  * The origin tracking name must already exist. It was created first
1273  * time this tablesync was launched.
1274  */
1275  originid = replorigin_by_name(originname, false);
1276  replorigin_session_setup(originid);
1277  replorigin_session_origin = originid;
1278  *origin_startpos = replorigin_session_get_progress(false);
1279 
1281 
1282  goto copy_table_done;
1283  }
1284 
1286  MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
1289 
1290  /* Update the state and make it visible to others. */
1297  pgstat_report_stat(true);
1298 
1300 
1301  /*
1302  * Use a standard write lock here. It might be better to disallow access
1303  * to the table while it's being synchronized. But we don't want to block
1304  * the main apply process from working and it has to open the relation in
1305  * RowExclusiveLock when remapping remote relation id to local one.
1306  */
1308 
1309  /*
1310  * Check that our table sync worker has permission to insert into the
1311  * target table.
1312  */
1313  aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
1314  ACL_INSERT);
1315  if (aclresult != ACLCHECK_OK)
1316  aclcheck_error(aclresult,
1317  get_relkind_objtype(rel->rd_rel->relkind),
1319 
1320  /*
1321  * COPY FROM does not honor RLS policies. That is not a problem for
1322  * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
1323  * who has it implicitly), but other roles should not be able to
1324  * circumvent RLS. Disallow logical replication into RLS enabled
1325  * relations for such roles.
1326  */
1328  ereport(ERROR,
1329  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1330  errmsg("\"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1331  GetUserNameFromId(GetUserId(), true),
1332  RelationGetRelationName(rel))));
1333 
1334  /*
1335  * Start a transaction in the remote node in REPEATABLE READ mode. This
1336  * ensures that both the replication slot we create (see below) and the
1337  * COPY are consistent with each other.
1338  */
1340  "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1341  0, NULL);
1342  if (res->status != WALRCV_OK_COMMAND)
1343  ereport(ERROR,
1344  (errcode(ERRCODE_CONNECTION_FAILURE),
1345  errmsg("table copy could not start transaction on publisher: %s",
1346  res->err)));
1348 
1349  /*
1350  * Create a new permanent logical decoding slot. This slot will be used
1351  * for the catchup phase after COPY is done, so tell it to use the
1352  * snapshot to make the final data consistent.
1353  *
1354  * Prevent cancel/die interrupts while creating slot here because it is
1355  * possible that before the server finishes this command, a concurrent
1356  * drop subscription happens which would complete without removing this
1357  * slot leading to a dangling slot on the server.
1358  */
1359  HOLD_INTERRUPTS();
1361  slotname, false /* permanent */ , false /* two_phase */ ,
1362  CRS_USE_SNAPSHOT, origin_startpos);
1364 
1365  /*
1366  * Setup replication origin tracking. The purpose of doing this before the
1367  * copy is to avoid doing the copy again due to any error in setting up
1368  * origin tracking.
1369  */
1370  originid = replorigin_by_name(originname, true);
1371  if (!OidIsValid(originid))
1372  {
1373  /*
1374  * Origin tracking does not exist, so create it now.
1375  *
1376  * Then advance to the LSN got from walrcv_create_slot. This is WAL
1377  * logged for the purpose of recovery. Locks are to prevent the
1378  * replication origin from vanishing while advancing.
1379  */
1380  originid = replorigin_create(originname);
1381 
1382  LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1383  replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
1384  true /* go backward */ , true /* WAL log */ );
1385  UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1386 
1387  replorigin_session_setup(originid);
1388  replorigin_session_origin = originid;
1389  }
1390  else
1391  {
1392  ereport(ERROR,
1394  errmsg("replication origin \"%s\" already exists",
1395  originname)));
1396  }
1397 
1398  /* Now do the initial data copy */
1400  copy_table(rel);
1402 
1403  res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
1404  if (res->status != WALRCV_OK_COMMAND)
1405  ereport(ERROR,
1406  (errcode(ERRCODE_CONNECTION_FAILURE),
1407  errmsg("table copy could not finish transaction on publisher: %s",
1408  res->err)));
1410 
1411  table_close(rel, NoLock);
1412 
1413  /* Make the copy visible. */
1415 
1416  /*
1417  * Update the persisted state to indicate the COPY phase is done; make it
1418  * visible to others.
1419  */
1422  SUBREL_STATE_FINISHEDCOPY,
1424 
1426 
1427 copy_table_done:
1428 
1429  elog(DEBUG1,
1430  "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
1431  originname, LSN_FORMAT_ARGS(*origin_startpos));
1432 
1433  /*
1434  * We are done with the initial data synchronization, update the state.
1435  */
1437  MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
1438  MyLogicalRepWorker->relstate_lsn = *origin_startpos;
1440 
1441  /*
1442  * Finally, wait until the main apply worker tells us to catch up and then
1443  * return to let LogicalRepApplyLoop do it.
1444  */
1445  wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
1446  return slotname;
1447 }
1448 
1449 /*
1450  * Common code to fetch the up-to-date sync state info into the static lists.
1451  *
1452  * Returns true if subscription has 1 or more tables, else false.
1453  *
1454  * Note: If this function started the transaction (indicated by the parameter)
1455  * then it is the caller's responsibility to commit it.
1456  */
1457 static bool
1458 FetchTableStates(bool *started_tx)
1459 {
1460  static bool has_subrels = false;
1461 
1462  *started_tx = false;
1463 
1464  if (!table_states_valid)
1465  {
1466  MemoryContext oldctx;
1467  List *rstates;
1468  ListCell *lc;
1469  SubscriptionRelState *rstate;
1470 
1471  /* Clean the old lists. */
1474 
1475  if (!IsTransactionState())
1476  {
1478  *started_tx = true;
1479  }
1480 
1481  /* Fetch all non-ready tables. */
1483 
1484  /* Allocate the tracking info in a permanent memory context. */
1486  foreach(lc, rstates)
1487  {
1488  rstate = palloc(sizeof(SubscriptionRelState));
1489  memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
1491  }
1492  MemoryContextSwitchTo(oldctx);
1493 
1494  /*
1495  * Does the subscription have tables?
1496  *
1497  * If there were not-READY relations found then we know it does. But
1498  * if table_state_not_ready was empty we still need to check again to
1499  * see if there are 0 tables.
1500  */
1501  has_subrels = (list_length(table_states_not_ready) > 0) ||
1503 
1504  table_states_valid = true;
1505  }
1506 
1507  return has_subrels;
1508 }
1509 
1510 /*
1511  * If the subscription has no tables then return false.
1512  *
1513  * Otherwise, are all tablesyncs READY?
1514  *
1515  * Note: This function is not suitable to be called from outside of apply or
1516  * tablesync workers because MySubscription needs to be already initialized.
1517  */
1518 bool
1520 {
1521  bool started_tx = false;
1522  bool has_subrels = false;
1523 
1524  /* We need up-to-date sync state info for subscription tables here. */
1525  has_subrels = FetchTableStates(&started_tx);
1526 
1527  if (started_tx)
1528  {
1530  pgstat_report_stat(true);
1531  }
1532 
1533  /*
1534  * Return false when there are no tables in subscription or not all tables
1535  * are in ready state; true otherwise.
1536  */
1537  return has_subrels && list_length(table_states_not_ready) == 0;
1538 }
1539 
1540 /*
1541  * Update the two_phase state of the specified subscription in pg_subscription.
1542  */
1543 void
1544 UpdateTwoPhaseState(Oid suboid, char new_state)
1545 {
1546  Relation rel;
1547  HeapTuple tup;
1548  bool nulls[Natts_pg_subscription];
1549  bool replaces[Natts_pg_subscription];
1550  Datum values[Natts_pg_subscription];
1551 
1553  new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1554  new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1555 
1556  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1558  if (!HeapTupleIsValid(tup))
1559  elog(ERROR,
1560  "cache lookup failed for subscription oid %u",
1561  suboid);
1562 
1563  /* Form a new tuple. */
1564  memset(values, 0, sizeof(values));
1565  memset(nulls, false, sizeof(nulls));
1566  memset(replaces, false, sizeof(replaces));
1567 
1568  /* And update/set two_phase state */
1569  values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
1570  replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1571 
1572  tup = heap_modify_tuple(tup, RelationGetDescr(rel),
1573  values, nulls, replaces);
1574  CatalogTupleUpdate(rel, &tup->t_self, tup);
1575 
1576  heap_freetuple(tup);
1578 }
AclResult
Definition: acl.h:181
@ ACLCHECK_OK
Definition: acl.h:182
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3512
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:5007
#define ARR_DATA_PTR(a)
Definition: array.h:315
#define DatumGetArrayTypeP(X)
Definition: array.h:254
#define ARR_DIMS(a)
Definition: array.h:287
int16 AttrNumber
Definition: attnum.h:21
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:251
Subscription * MySubscription
Definition: worker.c:253
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1705
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1538
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:427
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:738
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define TextDatumGetCString(d)
Definition: builtins.h:86
unsigned int uint32
Definition: c.h:452
signed short int16
Definition: c.h:439
#define Max(x, y)
Definition: c.h:991
#define UINT64_FORMAT
Definition: c.h:495
#define lengthof(array)
Definition: c.h:745
#define OidIsValid(objectId)
Definition: c.h:721
CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
Definition: copyfrom.c:1189
uint64 CopyFrom(CopyFromState cstate)
Definition: copyfrom.c:528
int64 TimestampTz
Definition: timestamp.h:39
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:862
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
int errcode(int sqlerrcode)
Definition: elog.c:693
int errmsg(const char *fmt,...)
Definition: elog.c:904
#define LOG
Definition: elog.h:25
#define DEBUG1
Definition: elog.h:24
#define ERROR
Definition: elog.h:33
#define elog(elevel,...)
Definition: elog.h:218
#define ereport(elevel,...)
Definition: elog.h:143
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1254
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:85
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1238
struct Latch * MyLatch
Definition: globals.c:58
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:1113
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define MaxTupleAttributeNumber
Definition: htup_details.h:33
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:301
void proc_exit(int code)
Definition: ipc.c:104
int i
Definition: isn.c:73
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:524
void ResetLatch(Latch *latch)
Definition: latch.c:683
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:476
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define WL_TIMEOUT
Definition: latch.h:128
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define WL_LATCH_SET
Definition: latch.h:125
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
Definition: launcher.c:214
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:553
void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid)
Definition: launcher.c:265
void logicalrep_worker_wakeup(Oid subid, Oid relid)
Definition: launcher.c:533
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:58
int max_sync_workers_per_subscription
Definition: launcher.c:56
int logicalrep_sync_worker_count(Oid subid)
Definition: launcher.c:663
Assert(fmt[strlen(fmt) - 1] !='\n')
List * lappend(List *list, void *datum)
Definition: list.c:336
void list_free_deep(List *list)
Definition: list.c:1519
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:228
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:109
#define NoLock
Definition: lockdefs.h:34
#define AccessShareLock
Definition: lockdefs.h:36
#define RowExclusiveLock
Definition: lockdefs.h:38
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3326
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1909
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1196
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1800
@ LW_SHARED
Definition: lwlock.h:105
void pfree(void *pointer)
Definition: mcxt.c:1175
void * palloc0(Size size)
Definition: mcxt.c:1099
MemoryContext CacheMemoryContext
Definition: mcxt.c:51
void * palloc(Size size)
Definition: mcxt.c:1068
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:134
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:132
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition: miscinit.c:913
Oid GetUserId(void)
Definition: miscinit.c:492
ObjectType get_relkind_objtype(char relkind)
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:209
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:240
void replorigin_session_setup(RepOriginId node)
Definition: origin.c:1071
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition: origin.c:414
RepOriginId replorigin_session_origin
Definition: origin.c:155
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:872
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1206
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
ParseState * make_parsestate(ParseState *parentParseState)
Definition: parse_node.c:43
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, int lockmode, Alias *alias, bool inh, bool inFromCl)
#define ACL_INSERT
Definition: parsenodes.h:82
int16 attnum
Definition: pg_attribute.h:83
void * arg
NameData relname
Definition: pg_class.h:38
#define NAMEDATALEN
const void size_t len
#define lfirst(lc)
Definition: pg_list.h:169
static int list_length(const List *l)
Definition: pg_list.h:149
#define NIL
Definition: pg_list.h:65
#define for_each_from(cell, lst, N)
Definition: pg_list.h:393
#define linitial(l)
Definition: pg_list.h:174
List * GetSubscriptionNotReadyRelations(Oid subid)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
bool HasSubscriptionRelations(Oid subid)
#define LOGICALREP_TWOPHASE_STATE_DISABLED
#define LOGICALREP_TWOPHASE_STATE_PENDING
#define LOGICALREP_TWOPHASE_STATE_ENABLED
static char * buf
Definition: pg_test_fsync.c:67
long pgstat_report_stat(bool force)
Definition: pgstat.c:565
int pgsocket
Definition: port.h:29
#define snprintf
Definition: port.h:225
#define PGINVALID_SOCKET
Definition: port.h:31
#define DatumGetObjectId(X)
Definition: postgres.h:544
uintptr_t Datum
Definition: postgres.h:411
#define DatumGetBool(X)
Definition: postgres.h:437
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
#define DatumGetChar(X)
Definition: postgres.h:453
#define CharGetDatum(X)
Definition: postgres.h:460
#define DatumGetInt16(X)
Definition: postgres.h:488
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
static int fd(const char *x, int i)
Definition: preproc-init.c:105
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:102
#define RelationGetRelid(relation)
Definition: rel.h:488
#define RelationGetDescr(relation)
Definition: rel.h:514
#define RelationGetRelationName(relation)
Definition: rel.h:522
#define RelationGetNamespace(relation)
Definition: rel.h:529
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition: rls.c:52
@ RLS_ENABLED
Definition: rls.h:45
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:12189
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:12273
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:250
void PopActiveSnapshot(void)
Definition: snapmgr.c:776
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:682
void InvalidateCatalogSnapshot(void)
Definition: snapmgr.c:456
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:157
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:319
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:456
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:32
StringInfo makeStringInfo(void)
Definition: stringinfo.c:41
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
Definition: dynahash.c:220
ItemPointerData t_self
Definition: htup.h:65
Definition: pg_list.h:51
LogicalRepRelation remoterel
LogicalRepRelId remoteid
Definition: logicalproto.h:102
Bitmapset * attkeys
Definition: logicalproto.h:110
XLogRecPtr relstate_lsn
Form_pg_class rd_rel
Definition: rel.h:109
Tuplestorestate * tuplestore
Definition: walreceiver.h:219
TupleDesc tupledesc
Definition: walreceiver.h:220
WalRcvExecStatus status
Definition: walreceiver.h:216
Definition: regguts.h:318
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:179
@ SUBSCRIPTIONOID
Definition: syscache.h:99
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
static List * table_states_not_ready
Definition: tablesync.c:124
bool AllTablesyncsReady(void)
Definition: tablesync.c:1519
static bool wait_for_worker_state_change(char expected_state)
Definition: tablesync.c:222
static bool table_states_valid
Definition: tablesync.c:123
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:271
static void pg_attribute_noreturn() finish_sync_worker(void)
Definition: tablesync.c:133
static void process_syncing_tables_for_apply(XLogRecPtr current_lsn)
Definition: tablesync.c:365
static void process_syncing_tables_for_sync(XLogRecPtr current_lsn)
Definition: tablesync.c:285
void ReplicationOriginNameForTablesync(Oid suboid, Oid relid, char *originname, int szorgname)
Definition: tablesync.c:1168
static int copy_read_data(void *outbuf, int minread, int maxread)
Definition: tablesync.c:622
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:590
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:1183
static void copy_table(Relation rel)
Definition: tablesync.c:1025
static bool wait_for_relation_state_change(Oid relid, char expected_state)
Definition: tablesync.c:174
static StringInfo copybuf
Definition: tablesync.c:127
static bool FetchTableStates(bool *started_tx)
Definition: tablesync.c:1458
static void fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, List **qual)
Definition: tablesync.c:698
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, int szslot)
Definition: tablesync.c:1155
static List * make_copy_attnamelist(LogicalRepRelMapEntry *rel)
Definition: tablesync.c:602
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition: tablesync.c:1544
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
Definition: tuplestore.c:1078
int64 tuplestore_tuple_count(Tuplestorestate *state)
Definition: tuplestore.c:546
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: tuptable.h:381
String * makeString(char *str)
Definition: value.c:63
#define strVal(v)
Definition: value.h:72
@ WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE
Definition: wait_event.h:109
@ WAIT_EVENT_LOGICAL_SYNC_DATA
Definition: wait_event.h:108
#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
Definition: walreceiver.h:426
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:404
@ WALRCV_OK_COMMAND
Definition: walreceiver.h:201
@ WALRCV_OK_TUPLES
Definition: walreceiver.h:203
@ WALRCV_OK_COPY_OUT
Definition: walreceiver.h:205
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:436
#define walrcv_server_version(conn)
Definition: walreceiver.h:414
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:420
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:430
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:422
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
static bool am_tablesync_worker(void)
bool IsTransactionState(void)
Definition: xact.c:374
void CommandCounterIncrement(void)
Definition: xact.c:1074
void StartTransactionCommand(void)
Definition: xact.c:2925
void CommitTransactionCommand(void)
Definition: xact.c:3022
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4206
XLogRecPtr GetXLogWriteRecPtr(void)
Definition: xlog.c:8828
int wal_retrieve_retry_interval
Definition: xlog.c:135
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2509
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint32 TimeLineID
Definition: xlogdefs.h:59