PostgreSQL Source Code  git master
tablesync.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * tablesync.c
3  * PostgreSQL logical replication: initial table data synchronization
4  *
5  * Copyright (c) 2012-2023, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/tablesync.c
9  *
10  * NOTES
11  * This file contains code for initial table data synchronization for
12  * logical replication.
13  *
14  * The initial data synchronization is done separately for each table,
15  * in a separate apply worker that only fetches the initial snapshot data
16  * from the publisher and then synchronizes the position in the stream with
17  * the leader apply worker.
18  *
19  * There are several reasons for doing the synchronization this way:
20  * - It allows us to parallelize the initial data synchronization
21  * which lowers the time needed for it to happen.
22  * - The initial synchronization does not have to hold the xid and LSN
23  * for the time it takes to copy data of all tables, causing less
24  * bloat and lower disk consumption compared to doing the
25  * synchronization in a single process for the whole database.
26  * - It allows us to synchronize any tables added after the initial
27  * synchronization has finished.
28  *
29  * The stream position synchronization works in multiple steps:
30  * - Apply worker requests a tablesync worker to start, setting the new
31  * table state to INIT.
32  * - Tablesync worker starts; changes table state from INIT to DATASYNC while
33  * copying.
34  * - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
35  * worker specific) state to indicate when the copy phase has completed, so
36  * if the worker crashes with this (non-memory) state then the copy will not
37  * be re-attempted.
38  * - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
39  * - Apply worker periodically checks for tables in SYNCWAIT state. When
40  * any appear, it sets the table state to CATCHUP and starts loop-waiting
41  * until either the table state is set to SYNCDONE or the sync worker
42  * exits.
43  * - After the sync worker has seen the state change to CATCHUP, it will
44  * read the stream and apply changes (acting like an apply worker) until
45  * it catches up to the specified stream position. Then it sets the
46  * state to SYNCDONE. There might be zero changes applied between
47  * CATCHUP and SYNCDONE, because the sync worker might be ahead of the
48  * apply worker.
49  * - Once the state is set to SYNCDONE, the apply will continue tracking
50  * the table until it reaches the SYNCDONE stream position, at which
51  * point it sets state to READY and stops tracking. Again, there might
52  * be zero changes in between.
53  *
54  * So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
55  * -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
56  *
57  * The catalog pg_subscription_rel is used to keep information about
58  * subscribed tables and their state. The catalog holds all states
59  * except SYNCWAIT and CATCHUP which are only in shared memory.
60  *
61  * Example flows look like this:
62  * - Apply is in front:
63  * sync:8
64  * -> set in catalog FINISHEDCOPY
65  * -> set in memory SYNCWAIT
66  * apply:10
67  * -> set in memory CATCHUP
68  * -> enter wait-loop
69  * sync:10
70  * -> set in catalog SYNCDONE
71  * -> exit
72  * apply:10
73  * -> exit wait-loop
74  * -> continue rep
75  * apply:11
76  * -> set in catalog READY
77  *
78  * - Sync is in front:
79  * sync:10
80  * -> set in catalog FINISHEDCOPY
81  * -> set in memory SYNCWAIT
82  * apply:8
83  * -> set in memory CATCHUP
84  * -> continue per-table filtering
85  * sync:10
86  * -> set in catalog SYNCDONE
87  * -> exit
88  * apply:10
89  * -> set in catalog READY
90  * -> stop per-table filtering
91  * -> continue rep
92  *-------------------------------------------------------------------------
93  */
94 
95 #include "postgres.h"
96 
97 #include "access/table.h"
98 #include "access/xact.h"
99 #include "catalog/indexing.h"
101 #include "catalog/pg_type.h"
102 #include "commands/copy.h"
103 #include "miscadmin.h"
104 #include "nodes/makefuncs.h"
105 #include "parser/parse_relation.h"
106 #include "pgstat.h"
109 #include "replication/walreceiver.h"
111 #include "replication/slot.h"
112 #include "replication/origin.h"
113 #include "storage/ipc.h"
114 #include "storage/lmgr.h"
115 #include "utils/acl.h"
116 #include "utils/array.h"
117 #include "utils/builtins.h"
118 #include "utils/lsyscache.h"
119 #include "utils/memutils.h"
120 #include "utils/rls.h"
121 #include "utils/snapmgr.h"
122 #include "utils/syscache.h"
123 
124 static bool table_states_valid = false;
126 static bool FetchTableStates(bool *started_tx);
127 
128 static StringInfo copybuf = NULL;
129 
130 /*
131  * Exit routine for synchronization worker.
132  */
133 static void
135 finish_sync_worker(void)
136 {
137  /*
138  * Commit any outstanding transaction. This is the usual case, unless
139  * there was nothing to do for the table.
140  */
141  if (IsTransactionState())
142  {
144  pgstat_report_stat(true);
145  }
146 
147  /* And flush all writes. */
149 
151  ereport(LOG,
152  (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
156 
157  /* Find the leader apply worker and signal it. */
159 
160  /* Stop gracefully */
161  proc_exit(0);
162 }
163 
164 /*
165  * Wait until the relation sync state is set in the catalog to the expected
166  * one; return true when it happens.
167  *
168  * Returns false if the table sync worker or the table itself have
169  * disappeared, or the table state has been reset.
170  *
171  * Currently, this is used in the apply worker when transitioning from
172  * CATCHUP state to SYNCDONE.
173  */
174 static bool
175 wait_for_relation_state_change(Oid relid, char expected_state)
176 {
177  char state;
178 
179  for (;;)
180  {
181  LogicalRepWorker *worker;
182  XLogRecPtr statelsn;
183 
185 
188  relid, &statelsn);
189 
190  if (state == SUBREL_STATE_UNKNOWN)
191  break;
192 
193  if (state == expected_state)
194  return true;
195 
196  /* Check if the sync worker is still running and bail if not. */
197  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
199  false);
200  LWLockRelease(LogicalRepWorkerLock);
201  if (!worker)
202  break;
203 
204  (void) WaitLatch(MyLatch,
207 
209  }
210 
211  return false;
212 }
213 
214 /*
215  * Wait until the apply worker changes the state of our synchronization
216  * worker to the expected one.
217  *
218  * Used when transitioning from SYNCWAIT state to CATCHUP.
219  *
220  * Returns false if the apply worker has disappeared.
221  */
222 static bool
223 wait_for_worker_state_change(char expected_state)
224 {
225  int rc;
226 
227  for (;;)
228  {
229  LogicalRepWorker *worker;
230 
232 
233  /*
234  * Done if already in correct state. (We assume this fetch is atomic
235  * enough to not give a misleading answer if we do it with no lock.)
236  */
237  if (MyLogicalRepWorker->relstate == expected_state)
238  return true;
239 
240  /*
241  * Bail out if the apply worker has died, else signal it we're
242  * waiting.
243  */
244  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
246  InvalidOid, false);
247  if (worker && worker->proc)
249  LWLockRelease(LogicalRepWorkerLock);
250  if (!worker)
251  break;
252 
253  /*
254  * Wait. We expect to get a latch signal back from the apply worker,
255  * but use a timeout in case it dies without sending one.
256  */
257  rc = WaitLatch(MyLatch,
260 
261  if (rc & WL_LATCH_SET)
263  }
264 
265  return false;
266 }
267 
268 /*
269  * Callback from syscache invalidation.
270  */
271 void
273 {
274  table_states_valid = false;
275 }
276 
277 /*
278  * Handle table synchronization cooperation from the synchronization
279  * worker.
280  *
281  * If the sync worker is in CATCHUP state and reached (or passed) the
282  * predetermined synchronization point in the WAL stream, mark the table as
283  * SYNCDONE and finish.
284  */
285 static void
287 {
289 
290  if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
291  current_lsn >= MyLogicalRepWorker->relstate_lsn)
292  {
293  TimeLineID tli;
294  char syncslotname[NAMEDATALEN] = {0};
295  char originname[NAMEDATALEN] = {0};
296 
297  MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
298  MyLogicalRepWorker->relstate_lsn = current_lsn;
299 
301 
302  /*
303  * UpdateSubscriptionRelState must be called within a transaction.
304  */
305  if (!IsTransactionState())
307 
312 
313  /*
314  * End streaming so that LogRepWorkerWalRcvConn can be used to drop
315  * the slot.
316  */
318 
319  /*
320  * Cleanup the tablesync slot.
321  *
322  * This has to be done after updating the state because otherwise if
323  * there is an error while doing the database operations we won't be
324  * able to rollback dropped slot.
325  */
328  syncslotname,
329  sizeof(syncslotname));
330 
331  /*
332  * It is important to give an error if we are unable to drop the slot,
333  * otherwise, it won't be dropped till the corresponding subscription
334  * is dropped. So passing missing_ok = false.
335  */
337 
339  pgstat_report_stat(false);
340 
341  /*
342  * Start a new transaction to clean up the tablesync origin tracking.
343  * This transaction will be ended within the finish_sync_worker().
344  * Now, even, if we fail to remove this here, the apply worker will
345  * ensure to clean it up afterward.
346  *
347  * We need to do this after the table state is set to SYNCDONE.
348  * Otherwise, if an error occurs while performing the database
349  * operation, the worker will be restarted and the in-memory state of
350  * replication progress (remote_lsn) won't be rolled-back which would
351  * have been cleared before restart. So, the restarted worker will use
352  * invalid replication progress state resulting in replay of
353  * transactions that have already been applied.
354  */
356 
359  originname,
360  sizeof(originname));
361 
362  /*
363  * Resetting the origin session removes the ownership of the slot.
364  * This is needed to allow the origin to be dropped.
365  */
370 
371  /*
372  * Drop the tablesync's origin tracking if exists.
373  *
374  * There is a chance that the user is concurrently performing refresh
375  * for the subscription where we remove the table state and its origin
376  * or the apply worker would have removed this origin. So passing
377  * missing_ok = true.
378  */
379  replorigin_drop_by_name(originname, true, false);
380 
381  finish_sync_worker();
382  }
383  else
385 }
386 
387 /*
388  * Handle table synchronization cooperation from the apply worker.
389  *
390  * Walk over all subscription tables that are individually tracked by the
391  * apply process (currently, all that have state other than
392  * SUBREL_STATE_READY) and manage synchronization for them.
393  *
394  * If there are tables that need synchronizing and are not being synchronized
395  * yet, start sync workers for them (if there are free slots for sync
396  * workers). To prevent starting the sync worker for the same relation at a
397  * high frequency after a failure, we store its last start time with each sync
398  * state info. We start the sync worker for the same relation after waiting
399  * at least wal_retrieve_retry_interval.
400  *
401  * For tables that are being synchronized already, check if sync workers
402  * either need action from the apply worker or have finished. This is the
403  * SYNCWAIT to CATCHUP transition.
404  *
405  * If the synchronization position is reached (SYNCDONE), then the table can
406  * be marked as READY and is no longer tracked.
407  */
408 static void
410 {
411  struct tablesync_start_time_mapping
412  {
413  Oid relid;
414  TimestampTz last_start_time;
415  };
416  static HTAB *last_start_times = NULL;
417  ListCell *lc;
418  bool started_tx = false;
419  bool should_exit = false;
420 
422 
423  /* We need up-to-date sync state info for subscription tables here. */
424  FetchTableStates(&started_tx);
425 
426  /*
427  * Prepare a hash table for tracking last start times of workers, to avoid
428  * immediate restarts. We don't need it if there are no tables that need
429  * syncing.
430  */
432  {
433  HASHCTL ctl;
434 
435  ctl.keysize = sizeof(Oid);
436  ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
437  last_start_times = hash_create("Logical replication table sync worker start times",
438  256, &ctl, HASH_ELEM | HASH_BLOBS);
439  }
440 
441  /*
442  * Clean up the hash table when we're done with all tables (just to
443  * release the bit of memory).
444  */
446  {
448  last_start_times = NULL;
449  }
450 
451  /*
452  * Process all tables that are being synchronized.
453  */
454  foreach(lc, table_states_not_ready)
455  {
457 
458  if (rstate->state == SUBREL_STATE_SYNCDONE)
459  {
460  /*
461  * Apply has caught up to the position where the table sync has
462  * finished. Mark the table as ready so that the apply will just
463  * continue to replicate it normally.
464  */
465  if (current_lsn >= rstate->lsn)
466  {
467  char originname[NAMEDATALEN];
468 
469  rstate->state = SUBREL_STATE_READY;
470  rstate->lsn = current_lsn;
471  if (!started_tx)
472  {
474  started_tx = true;
475  }
476 
477  /*
478  * Remove the tablesync origin tracking if exists.
479  *
480  * There is a chance that the user is concurrently performing
481  * refresh for the subscription where we remove the table
482  * state and its origin or the tablesync worker would have
483  * already removed this origin. We can't rely on tablesync
484  * worker to remove the origin tracking as if there is any
485  * error while dropping we won't restart it to drop the
486  * origin. So passing missing_ok = true.
487  */
489  rstate->relid,
490  originname,
491  sizeof(originname));
492  replorigin_drop_by_name(originname, true, false);
493 
494  /*
495  * Update the state to READY only after the origin cleanup.
496  */
498  rstate->relid, rstate->state,
499  rstate->lsn);
500  }
501  }
502  else
503  {
504  LogicalRepWorker *syncworker;
505 
506  /*
507  * Look for a sync worker for this relation.
508  */
509  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
510 
512  rstate->relid, false);
513 
514  if (syncworker)
515  {
516  /* Found one, update our copy of its state */
517  SpinLockAcquire(&syncworker->relmutex);
518  rstate->state = syncworker->relstate;
519  rstate->lsn = syncworker->relstate_lsn;
520  if (rstate->state == SUBREL_STATE_SYNCWAIT)
521  {
522  /*
523  * Sync worker is waiting for apply. Tell sync worker it
524  * can catchup now.
525  */
526  syncworker->relstate = SUBREL_STATE_CATCHUP;
527  syncworker->relstate_lsn =
528  Max(syncworker->relstate_lsn, current_lsn);
529  }
530  SpinLockRelease(&syncworker->relmutex);
531 
532  /* If we told worker to catch up, wait for it. */
533  if (rstate->state == SUBREL_STATE_SYNCWAIT)
534  {
535  /* Signal the sync worker, as it may be waiting for us. */
536  if (syncworker->proc)
537  logicalrep_worker_wakeup_ptr(syncworker);
538 
539  /* Now safe to release the LWLock */
540  LWLockRelease(LogicalRepWorkerLock);
541 
542  /*
543  * Enter busy loop and wait for synchronization worker to
544  * reach expected state (or die trying).
545  */
546  if (!started_tx)
547  {
549  started_tx = true;
550  }
551 
553  SUBREL_STATE_SYNCDONE);
554  }
555  else
556  LWLockRelease(LogicalRepWorkerLock);
557  }
558  else
559  {
560  /*
561  * If there is no sync worker for this table yet, count
562  * running sync workers for this subscription, while we have
563  * the lock.
564  */
565  int nsyncworkers =
567 
568  /* Now safe to release the LWLock */
569  LWLockRelease(LogicalRepWorkerLock);
570 
571  /*
572  * If there are free sync worker slot(s), start a new sync
573  * worker for the table.
574  */
575  if (nsyncworkers < max_sync_workers_per_subscription)
576  {
578  struct tablesync_start_time_mapping *hentry;
579  bool found;
580 
581  hentry = hash_search(last_start_times, &rstate->relid,
582  HASH_ENTER, &found);
583 
584  if (!found ||
585  TimestampDifferenceExceeds(hentry->last_start_time, now,
587  {
592  rstate->relid,
594  hentry->last_start_time = now;
595  }
596  }
597  }
598  }
599  }
600 
601  if (started_tx)
602  {
603  /*
604  * Even when the two_phase mode is requested by the user, it remains
605  * as 'pending' until all tablesyncs have reached READY state.
606  *
607  * When this happens, we restart the apply worker and (if the
608  * conditions are still ok) then the two_phase tri-state will become
609  * 'enabled' at that time.
610  *
611  * Note: If the subscription has no tables then leave the state as
612  * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
613  * work.
614  */
616  {
617  CommandCounterIncrement(); /* make updates visible */
618  if (AllTablesyncsReady())
619  {
620  ereport(LOG,
621  (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
622  MySubscription->name)));
623  should_exit = true;
624  }
625  }
626 
628  pgstat_report_stat(true);
629  }
630 
631  if (should_exit)
632  {
633  /*
634  * Reset the last-start time for this worker so that the launcher will
635  * restart it without waiting for wal_retrieve_retry_interval.
636  */
638 
639  proc_exit(0);
640  }
641 }
642 
643 /*
644  * Process possible state change(s) of tables that are being synchronized.
645  */
646 void
648 {
649  /*
650  * Skip for parallel apply workers because they only operate on tables
651  * that are in a READY state. See pa_can_start() and
652  * should_apply_changes_for_rel().
653  */
655  return;
656 
657  if (am_tablesync_worker())
658  process_syncing_tables_for_sync(current_lsn);
659  else
661 }
662 
663 /*
664  * Create list of columns for COPY based on logical relation mapping.
665  */
666 static List *
668 {
669  List *attnamelist = NIL;
670  int i;
671 
672  for (i = 0; i < rel->remoterel.natts; i++)
673  {
674  attnamelist = lappend(attnamelist,
675  makeString(rel->remoterel.attnames[i]));
676  }
677 
678 
679  return attnamelist;
680 }
681 
682 /*
683  * Data source callback for the COPY FROM, which reads from the remote
684  * connection and passes the data back to our local COPY.
685  */
686 static int
687 copy_read_data(void *outbuf, int minread, int maxread)
688 {
689  int bytesread = 0;
690  int avail;
691 
692  /* If there are some leftover data from previous read, use it. */
693  avail = copybuf->len - copybuf->cursor;
694  if (avail)
695  {
696  if (avail > maxread)
697  avail = maxread;
698  memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
699  copybuf->cursor += avail;
700  maxread -= avail;
701  bytesread += avail;
702  }
703 
704  while (maxread > 0 && bytesread < minread)
705  {
707  int len;
708  char *buf = NULL;
709 
710  for (;;)
711  {
712  /* Try read the data. */
714 
716 
717  if (len == 0)
718  break;
719  else if (len < 0)
720  return bytesread;
721  else
722  {
723  /* Process the data */
724  copybuf->data = buf;
725  copybuf->len = len;
726  copybuf->cursor = 0;
727 
728  avail = copybuf->len - copybuf->cursor;
729  if (avail > maxread)
730  avail = maxread;
731  memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
732  outbuf = (void *) ((char *) outbuf + avail);
733  copybuf->cursor += avail;
734  maxread -= avail;
735  bytesread += avail;
736  }
737 
738  if (maxread <= 0 || bytesread >= minread)
739  return bytesread;
740  }
741 
742  /*
743  * Wait for more data or latch.
744  */
745  (void) WaitLatchOrSocket(MyLatch,
749 
751  }
752 
753  return bytesread;
754 }
755 
756 
757 /*
758  * Get information about remote relation in similar fashion the RELATION
759  * message provides during replication. This function also returns the relation
760  * qualifications to be used in the COPY command.
761  */
762 static void
763 fetch_remote_table_info(char *nspname, char *relname,
764  LogicalRepRelation *lrel, List **qual)
765 {
767  StringInfoData cmd;
768  TupleTableSlot *slot;
769  Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
770  Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
771  Oid qualRow[] = {TEXTOID};
772  bool isnull;
773  int natt;
774  ListCell *lc;
775  Bitmapset *included_cols = NULL;
776 
777  lrel->nspname = nspname;
778  lrel->relname = relname;
779 
780  /* First fetch Oid and replica identity. */
781  initStringInfo(&cmd);
782  appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
783  " FROM pg_catalog.pg_class c"
784  " INNER JOIN pg_catalog.pg_namespace n"
785  " ON (c.relnamespace = n.oid)"
786  " WHERE n.nspname = %s"
787  " AND c.relname = %s",
788  quote_literal_cstr(nspname),
791  lengthof(tableRow), tableRow);
792 
793  if (res->status != WALRCV_OK_TUPLES)
794  ereport(ERROR,
795  (errcode(ERRCODE_CONNECTION_FAILURE),
796  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
797  nspname, relname, res->err)));
798 
799  slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
800  if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
801  ereport(ERROR,
802  (errcode(ERRCODE_UNDEFINED_OBJECT),
803  errmsg("table \"%s.%s\" not found on publisher",
804  nspname, relname)));
805 
806  lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
807  Assert(!isnull);
808  lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
809  Assert(!isnull);
810  lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
811  Assert(!isnull);
812 
815 
816 
817  /*
818  * Get column lists for each relation.
819  *
820  * We need to do this before fetching info about column names and types,
821  * so that we can skip columns that should not be replicated.
822  */
824  {
825  WalRcvExecResult *pubres;
826  TupleTableSlot *tslot;
827  Oid attrsRow[] = {INT2VECTOROID};
828  StringInfoData pub_names;
829 
830  initStringInfo(&pub_names);
831  foreach(lc, MySubscription->publications)
832  {
833  if (foreach_current_index(lc) > 0)
834  appendStringInfoString(&pub_names, ", ");
836  }
837 
838  /*
839  * Fetch info about column lists for the relation (from all the
840  * publications).
841  */
842  resetStringInfo(&cmd);
843  appendStringInfo(&cmd,
844  "SELECT DISTINCT"
845  " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
846  " THEN NULL ELSE gpt.attrs END)"
847  " FROM pg_publication p,"
848  " LATERAL pg_get_publication_tables(p.pubname) gpt,"
849  " pg_class c"
850  " WHERE gpt.relid = %u AND c.oid = gpt.relid"
851  " AND p.pubname IN ( %s )",
852  lrel->remoteid,
853  pub_names.data);
854 
856  lengthof(attrsRow), attrsRow);
857 
858  if (pubres->status != WALRCV_OK_TUPLES)
859  ereport(ERROR,
860  (errcode(ERRCODE_CONNECTION_FAILURE),
861  errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
862  nspname, relname, pubres->err)));
863 
864  /*
865  * We don't support the case where the column list is different for
866  * the same table when combining publications. See comments atop
867  * fetch_table_list. So there should be only one row returned.
868  * Although we already checked this when creating the subscription, we
869  * still need to check here in case the column list was changed after
870  * creating the subscription and before the sync worker is started.
871  */
872  if (tuplestore_tuple_count(pubres->tuplestore) > 1)
873  ereport(ERROR,
874  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
875  errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
876  nspname, relname));
877 
878  /*
879  * Get the column list and build a single bitmap with the attnums.
880  *
881  * If we find a NULL value, it means all the columns should be
882  * replicated.
883  */
885  if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
886  {
887  Datum cfval = slot_getattr(tslot, 1, &isnull);
888 
889  if (!isnull)
890  {
891  ArrayType *arr;
892  int nelems;
893  int16 *elems;
894 
895  arr = DatumGetArrayTypeP(cfval);
896  nelems = ARR_DIMS(arr)[0];
897  elems = (int16 *) ARR_DATA_PTR(arr);
898 
899  for (natt = 0; natt < nelems; natt++)
900  included_cols = bms_add_member(included_cols, elems[natt]);
901  }
902 
903  ExecClearTuple(tslot);
904  }
906 
907  walrcv_clear_result(pubres);
908 
909  pfree(pub_names.data);
910  }
911 
912  /*
913  * Now fetch column names and types.
914  */
915  resetStringInfo(&cmd);
916  appendStringInfo(&cmd,
917  "SELECT a.attnum,"
918  " a.attname,"
919  " a.atttypid,"
920  " a.attnum = ANY(i.indkey)"
921  " FROM pg_catalog.pg_attribute a"
922  " LEFT JOIN pg_catalog.pg_index i"
923  " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
924  " WHERE a.attnum > 0::pg_catalog.int2"
925  " AND NOT a.attisdropped %s"
926  " AND a.attrelid = %u"
927  " ORDER BY a.attnum",
928  lrel->remoteid,
930  "AND a.attgenerated = ''" : ""),
931  lrel->remoteid);
933  lengthof(attrRow), attrRow);
934 
935  if (res->status != WALRCV_OK_TUPLES)
936  ereport(ERROR,
937  (errcode(ERRCODE_CONNECTION_FAILURE),
938  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
939  nspname, relname, res->err)));
940 
941  /* We don't know the number of rows coming, so allocate enough space. */
942  lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
943  lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
944  lrel->attkeys = NULL;
945 
946  /*
947  * Store the columns as a list of names. Ignore those that are not
948  * present in the column list, if there is one.
949  */
950  natt = 0;
951  slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
952  while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
953  {
954  char *rel_colname;
956 
957  attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
958  Assert(!isnull);
959 
960  /* If the column is not in the column list, skip it. */
961  if (included_cols != NULL && !bms_is_member(attnum, included_cols))
962  {
963  ExecClearTuple(slot);
964  continue;
965  }
966 
967  rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
968  Assert(!isnull);
969 
970  lrel->attnames[natt] = rel_colname;
971  lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
972  Assert(!isnull);
973 
974  if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
975  lrel->attkeys = bms_add_member(lrel->attkeys, natt);
976 
977  /* Should never happen. */
978  if (++natt >= MaxTupleAttributeNumber)
979  elog(ERROR, "too many columns in remote table \"%s.%s\"",
980  nspname, relname);
981 
982  ExecClearTuple(slot);
983  }
985 
986  lrel->natts = natt;
987 
989 
990  /*
991  * Get relation's row filter expressions. DISTINCT avoids the same
992  * expression of a table in multiple publications from being included
993  * multiple times in the final expression.
994  *
995  * We need to copy the row even if it matches just one of the
996  * publications, so we later combine all the quals with OR.
997  *
998  * For initial synchronization, row filtering can be ignored in following
999  * cases:
1000  *
1001  * 1) one of the subscribed publications for the table hasn't specified
1002  * any row filter
1003  *
1004  * 2) one of the subscribed publications has puballtables set to true
1005  *
1006  * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
1007  * that includes this relation
1008  */
1010  {
1011  StringInfoData pub_names;
1012 
1013  /* Build the pubname list. */
1014  initStringInfo(&pub_names);
1015  foreach(lc, MySubscription->publications)
1016  {
1017  char *pubname = strVal(lfirst(lc));
1018 
1019  if (foreach_current_index(lc) > 0)
1020  appendStringInfoString(&pub_names, ", ");
1021 
1022  appendStringInfoString(&pub_names, quote_literal_cstr(pubname));
1023  }
1024 
1025  /* Check for row filters. */
1026  resetStringInfo(&cmd);
1027  appendStringInfo(&cmd,
1028  "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
1029  " FROM pg_publication p,"
1030  " LATERAL pg_get_publication_tables(p.pubname) gpt"
1031  " WHERE gpt.relid = %u"
1032  " AND p.pubname IN ( %s )",
1033  lrel->remoteid,
1034  pub_names.data);
1035 
1036  res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
1037 
1038  if (res->status != WALRCV_OK_TUPLES)
1039  ereport(ERROR,
1040  (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
1041  nspname, relname, res->err)));
1042 
1043  /*
1044  * Multiple row filter expressions for the same table will be combined
1045  * by COPY using OR. If any of the filter expressions for this table
1046  * are null, it means the whole table will be copied. In this case it
1047  * is not necessary to construct a unified row filter expression at
1048  * all.
1049  */
1050  slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
1051  while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1052  {
1053  Datum rf = slot_getattr(slot, 1, &isnull);
1054 
1055  if (!isnull)
1056  *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
1057  else
1058  {
1059  /* Ignore filters and cleanup as necessary. */
1060  if (*qual)
1061  {
1062  list_free_deep(*qual);
1063  *qual = NIL;
1064  }
1065  break;
1066  }
1067 
1068  ExecClearTuple(slot);
1069  }
1071 
1073  }
1074 
1075  pfree(cmd.data);
1076 }
1077 
1078 /*
1079  * Copy existing data of a table from publisher.
1080  *
1081  * Caller is responsible for locking the local relation.
1082  */
1083 static void
1085 {
1086  LogicalRepRelMapEntry *relmapentry;
1087  LogicalRepRelation lrel;
1088  List *qual = NIL;
1090  StringInfoData cmd;
1091  CopyFromState cstate;
1092  List *attnamelist;
1093  ParseState *pstate;
1094  List *options = NIL;
1095 
1096  /* Get the publisher relation info. */
1098  RelationGetRelationName(rel), &lrel, &qual);
1099 
1100  /* Put the relation into relmap. */
1101  logicalrep_relmap_update(&lrel);
1102 
1103  /* Map the publisher relation to local one. */
1104  relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
1105  Assert(rel == relmapentry->localrel);
1106 
1107  /* Start copy on the publisher. */
1108  initStringInfo(&cmd);
1109 
1110  /* Regular table with no row filter */
1111  if (lrel.relkind == RELKIND_RELATION && qual == NIL)
1112  {
1113  appendStringInfo(&cmd, "COPY %s (",
1115 
1116  /*
1117  * XXX Do we need to list the columns in all cases? Maybe we're
1118  * replicating all columns?
1119  */
1120  for (int i = 0; i < lrel.natts; i++)
1121  {
1122  if (i > 0)
1123  appendStringInfoString(&cmd, ", ");
1124 
1126  }
1127 
1128  appendStringInfoString(&cmd, ") TO STDOUT");
1129  }
1130  else
1131  {
1132  /*
1133  * For non-tables and tables with row filters, we need to do COPY
1134  * (SELECT ...), but we can't just do SELECT * because we need to not
1135  * copy generated columns. For tables with any row filters, build a
1136  * SELECT query with OR'ed row filters for COPY.
1137  */
1138  appendStringInfoString(&cmd, "COPY (SELECT ");
1139  for (int i = 0; i < lrel.natts; i++)
1140  {
1142  if (i < lrel.natts - 1)
1143  appendStringInfoString(&cmd, ", ");
1144  }
1145 
1146  appendStringInfoString(&cmd, " FROM ");
1147 
1148  /*
1149  * For regular tables, make sure we don't copy data from a child that
1150  * inherits the named table as those will be copied separately.
1151  */
1152  if (lrel.relkind == RELKIND_RELATION)
1153  appendStringInfoString(&cmd, "ONLY ");
1154 
1156  /* list of OR'ed filters */
1157  if (qual != NIL)
1158  {
1159  ListCell *lc;
1160  char *q = strVal(linitial(qual));
1161 
1162  appendStringInfo(&cmd, " WHERE %s", q);
1163  for_each_from(lc, qual, 1)
1164  {
1165  q = strVal(lfirst(lc));
1166  appendStringInfo(&cmd, " OR %s", q);
1167  }
1168  list_free_deep(qual);
1169  }
1170 
1171  appendStringInfoString(&cmd, ") TO STDOUT");
1172  }
1173 
1174  /*
1175  * Prior to v16, initial table synchronization will use text format even
1176  * if the binary option is enabled for a subscription.
1177  */
1180  {
1181  appendStringInfoString(&cmd, " WITH (FORMAT binary)");
1182  options = list_make1(makeDefElem("format",
1183  (Node *) makeString("binary"), -1));
1184  }
1185 
1186  res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
1187  pfree(cmd.data);
1188  if (res->status != WALRCV_OK_COPY_OUT)
1189  ereport(ERROR,
1190  (errcode(ERRCODE_CONNECTION_FAILURE),
1191  errmsg("could not start initial contents copy for table \"%s.%s\": %s",
1192  lrel.nspname, lrel.relname, res->err)));
1194 
1195  copybuf = makeStringInfo();
1196 
1197  pstate = make_parsestate(NULL);
1198  (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
1199  NULL, false, false);
1200 
1201  attnamelist = make_copy_attnamelist(relmapentry);
1202  cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
1203 
1204  /* Do the copy */
1205  (void) CopyFrom(cstate);
1206 
1207  logicalrep_rel_close(relmapentry, NoLock);
1208 }
1209 
1210 /*
1211  * Determine the tablesync slot name.
1212  *
1213  * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
1214  * on slot name length. We append system_identifier to avoid slot_name
1215  * collision with subscriptions in other clusters. With the current scheme
1216  * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
1217  * length of slot_name will be 50.
1218  *
1219  * The returned slot name is stored in the supplied buffer (syncslotname) with
1220  * the given size.
1221  *
1222  * Note: We don't use the subscription slot name as part of tablesync slot name
1223  * because we are responsible for cleaning up these slots and it could become
1224  * impossible to recalculate what name to cleanup if the subscription slot name
1225  * had changed.
1226  */
1227 void
1229  char *syncslotname, Size szslot)
1230 {
1231  snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
1232  relid, GetSystemIdentifier());
1233 }
1234 
1235 /*
1236  * Start syncing the table in the sync worker.
1237  *
1238  * If nothing needs to be done to sync the table, we exit the worker without
1239  * any further action.
1240  *
1241  * The returned slot name is palloc'ed in current memory context.
1242  */
1243 char *
1245 {
1246  char *slotname;
1247  char *err;
1248  char relstate;
1249  XLogRecPtr relstate_lsn;
1250  Relation rel;
1251  AclResult aclresult;
1253  char originname[NAMEDATALEN];
1254  RepOriginId originid;
1255 
1256  /* Check the state of the table synchronization. */
1260  &relstate_lsn);
1262 
1264  MyLogicalRepWorker->relstate = relstate;
1265  MyLogicalRepWorker->relstate_lsn = relstate_lsn;
1267 
1268  /*
1269  * If synchronization is already done or no longer necessary, exit now
1270  * that we've updated shared memory state.
1271  */
1272  switch (relstate)
1273  {
1274  case SUBREL_STATE_SYNCDONE:
1275  case SUBREL_STATE_READY:
1276  case SUBREL_STATE_UNKNOWN:
1277  finish_sync_worker(); /* doesn't return */
1278  }
1279 
1280  /* Calculate the name of the tablesync slot. */
1281  slotname = (char *) palloc(NAMEDATALEN);
1284  slotname,
1285  NAMEDATALEN);
1286 
1287  /*
1288  * Here we use the slot name instead of the subscription name as the
1289  * application_name, so that it is different from the leader apply worker,
1290  * so that synchronous replication can distinguish them.
1291  */
1293  walrcv_connect(MySubscription->conninfo, true, slotname, &err);
1294  if (LogRepWorkerWalRcvConn == NULL)
1295  ereport(ERROR,
1296  (errcode(ERRCODE_CONNECTION_FAILURE),
1297  errmsg("could not connect to the publisher: %s", err)));
1298 
1299  Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
1300  MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
1301  MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
1302 
1303  /* Assign the origin tracking record name. */
1306  originname,
1307  sizeof(originname));
1308 
1309  if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
1310  {
1311  /*
1312  * We have previously errored out before finishing the copy so the
1313  * replication slot might exist. We want to remove the slot if it
1314  * already exists and proceed.
1315  *
1316  * XXX We could also instead try to drop the slot, last time we failed
1317  * but for that, we might need to clean up the copy state as it might
1318  * be in the middle of fetching the rows. Also, if there is a network
1319  * breakdown then it wouldn't have succeeded so trying it next time
1320  * seems like a better bet.
1321  */
1323  }
1324  else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
1325  {
1326  /*
1327  * The COPY phase was previously done, but tablesync then crashed
1328  * before it was able to finish normally.
1329  */
1331 
1332  /*
1333  * The origin tracking name must already exist. It was created first
1334  * time this tablesync was launched.
1335  */
1336  originid = replorigin_by_name(originname, false);
1337  replorigin_session_setup(originid, 0);
1338  replorigin_session_origin = originid;
1339  *origin_startpos = replorigin_session_get_progress(false);
1340 
1342 
1343  goto copy_table_done;
1344  }
1345 
1347  MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
1350 
1351  /* Update the state and make it visible to others. */
1358  pgstat_report_stat(true);
1359 
1361 
1362  /*
1363  * Use a standard write lock here. It might be better to disallow access
1364  * to the table while it's being synchronized. But we don't want to block
1365  * the main apply process from working and it has to open the relation in
1366  * RowExclusiveLock when remapping remote relation id to local one.
1367  */
1369 
1370  /*
1371  * Check that our table sync worker has permission to insert into the
1372  * target table.
1373  */
1374  aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
1375  ACL_INSERT);
1376  if (aclresult != ACLCHECK_OK)
1377  aclcheck_error(aclresult,
1378  get_relkind_objtype(rel->rd_rel->relkind),
1380 
1381  /*
1382  * COPY FROM does not honor RLS policies. That is not a problem for
1383  * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
1384  * who has it implicitly), but other roles should not be able to
1385  * circumvent RLS. Disallow logical replication into RLS enabled
1386  * relations for such roles.
1387  */
1389  ereport(ERROR,
1390  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1391  errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1392  GetUserNameFromId(GetUserId(), true),
1393  RelationGetRelationName(rel))));
1394 
1395  /*
1396  * Start a transaction in the remote node in REPEATABLE READ mode. This
1397  * ensures that both the replication slot we create (see below) and the
1398  * COPY are consistent with each other.
1399  */
1401  "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1402  0, NULL);
1403  if (res->status != WALRCV_OK_COMMAND)
1404  ereport(ERROR,
1405  (errcode(ERRCODE_CONNECTION_FAILURE),
1406  errmsg("table copy could not start transaction on publisher: %s",
1407  res->err)));
1409 
1410  /*
1411  * Create a new permanent logical decoding slot. This slot will be used
1412  * for the catchup phase after COPY is done, so tell it to use the
1413  * snapshot to make the final data consistent.
1414  */
1416  slotname, false /* permanent */ , false /* two_phase */ ,
1417  CRS_USE_SNAPSHOT, origin_startpos);
1418 
1419  /*
1420  * Setup replication origin tracking. The purpose of doing this before the
1421  * copy is to avoid doing the copy again due to any error in setting up
1422  * origin tracking.
1423  */
1424  originid = replorigin_by_name(originname, true);
1425  if (!OidIsValid(originid))
1426  {
1427  /*
1428  * Origin tracking does not exist, so create it now.
1429  *
1430  * Then advance to the LSN got from walrcv_create_slot. This is WAL
1431  * logged for the purpose of recovery. Locks are to prevent the
1432  * replication origin from vanishing while advancing.
1433  */
1434  originid = replorigin_create(originname);
1435 
1436  LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1437  replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
1438  true /* go backward */ , true /* WAL log */ );
1439  UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1440 
1441  replorigin_session_setup(originid, 0);
1442  replorigin_session_origin = originid;
1443  }
1444  else
1445  {
1446  ereport(ERROR,
1448  errmsg("replication origin \"%s\" already exists",
1449  originname)));
1450  }
1451 
1452  /* Now do the initial data copy */
1454  copy_table(rel);
1456 
1457  res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
1458  if (res->status != WALRCV_OK_COMMAND)
1459  ereport(ERROR,
1460  (errcode(ERRCODE_CONNECTION_FAILURE),
1461  errmsg("table copy could not finish transaction on publisher: %s",
1462  res->err)));
1464 
1465  table_close(rel, NoLock);
1466 
1467  /* Make the copy visible. */
1469 
1470  /*
1471  * Update the persisted state to indicate the COPY phase is done; make it
1472  * visible to others.
1473  */
1476  SUBREL_STATE_FINISHEDCOPY,
1478 
1480 
1481 copy_table_done:
1482 
1483  elog(DEBUG1,
1484  "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
1485  originname, LSN_FORMAT_ARGS(*origin_startpos));
1486 
1487  /*
1488  * We are done with the initial data synchronization, update the state.
1489  */
1491  MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
1492  MyLogicalRepWorker->relstate_lsn = *origin_startpos;
1494 
1495  /*
1496  * Finally, wait until the leader apply worker tells us to catch up and
1497  * then return to let LogicalRepApplyLoop do it.
1498  */
1499  wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
1500  return slotname;
1501 }
1502 
1503 /*
1504  * Common code to fetch the up-to-date sync state info into the static lists.
1505  *
1506  * Returns true if subscription has 1 or more tables, else false.
1507  *
1508  * Note: If this function started the transaction (indicated by the parameter)
1509  * then it is the caller's responsibility to commit it.
1510  */
1511 static bool
1512 FetchTableStates(bool *started_tx)
1513 {
1514  static bool has_subrels = false;
1515 
1516  *started_tx = false;
1517 
1518  if (!table_states_valid)
1519  {
1520  MemoryContext oldctx;
1521  List *rstates;
1522  ListCell *lc;
1523  SubscriptionRelState *rstate;
1524 
1525  /* Clean the old lists. */
1528 
1529  if (!IsTransactionState())
1530  {
1532  *started_tx = true;
1533  }
1534 
1535  /* Fetch all non-ready tables. */
1536  rstates = GetSubscriptionRelations(MySubscription->oid, true);
1537 
1538  /* Allocate the tracking info in a permanent memory context. */
1540  foreach(lc, rstates)
1541  {
1542  rstate = palloc(sizeof(SubscriptionRelState));
1543  memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
1545  }
1546  MemoryContextSwitchTo(oldctx);
1547 
1548  /*
1549  * Does the subscription have tables?
1550  *
1551  * If there were not-READY relations found then we know it does. But
1552  * if table_state_not_ready was empty we still need to check again to
1553  * see if there are 0 tables.
1554  */
1555  has_subrels = (table_states_not_ready != NIL) ||
1557 
1558  table_states_valid = true;
1559  }
1560 
1561  return has_subrels;
1562 }
1563 
1564 /*
1565  * If the subscription has no tables then return false.
1566  *
1567  * Otherwise, are all tablesyncs READY?
1568  *
1569  * Note: This function is not suitable to be called from outside of apply or
1570  * tablesync workers because MySubscription needs to be already initialized.
1571  */
1572 bool
1574 {
1575  bool started_tx = false;
1576  bool has_subrels = false;
1577 
1578  /* We need up-to-date sync state info for subscription tables here. */
1579  has_subrels = FetchTableStates(&started_tx);
1580 
1581  if (started_tx)
1582  {
1584  pgstat_report_stat(true);
1585  }
1586 
1587  /*
1588  * Return false when there are no tables in subscription or not all tables
1589  * are in ready state; true otherwise.
1590  */
1591  return has_subrels && (table_states_not_ready == NIL);
1592 }
1593 
1594 /*
1595  * Update the two_phase state of the specified subscription in pg_subscription.
1596  */
1597 void
1598 UpdateTwoPhaseState(Oid suboid, char new_state)
1599 {
1600  Relation rel;
1601  HeapTuple tup;
1602  bool nulls[Natts_pg_subscription];
1603  bool replaces[Natts_pg_subscription];
1604  Datum values[Natts_pg_subscription];
1605 
1607  new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1608  new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1609 
1610  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1612  if (!HeapTupleIsValid(tup))
1613  elog(ERROR,
1614  "cache lookup failed for subscription oid %u",
1615  suboid);
1616 
1617  /* Form a new tuple. */
1618  memset(values, 0, sizeof(values));
1619  memset(nulls, false, sizeof(nulls));
1620  memset(replaces, false, sizeof(replaces));
1621 
1622  /* And update/set two_phase state */
1623  values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
1624  replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1625 
1626  tup = heap_modify_tuple(tup, RelationGetDescr(rel),
1627  values, nulls, replaces);
1628  CatalogTupleUpdate(rel, &tup->t_self, tup);
1629 
1630  heap_freetuple(tup);
1632 }
AclResult
Definition: acl.h:182
@ ACLCHECK_OK
Definition: acl.h:183
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2679
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:3931
#define ARR_DATA_PTR(a)
Definition: array.h:315
#define DatumGetArrayTypeP(X)
Definition: array.h:254
#define ARR_DIMS(a)
Definition: array.h:287
int16 AttrNumber
Definition: attnum.h:21
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:457
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:312
Subscription * MySubscription
Definition: worker.c:314
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1727
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1582
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1546
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:444
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:755
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define TextDatumGetCString(d)
Definition: builtins.h:95
unsigned int uint32
Definition: c.h:490
signed short int16
Definition: c.h:477
#define Max(x, y)
Definition: c.h:982
#define UINT64_FORMAT
Definition: c.h:533
#define lengthof(array)
Definition: c.h:772
#define OidIsValid(objectId)
Definition: c.h:759
size_t Size
Definition: c.h:589
CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
Definition: copyfrom.c:1334
uint64 CopyFrom(CopyFromState cstate)
Definition: copyfrom.c:632
int64 TimestampTz
Definition: timestamp.h:39
#define DSM_HANDLE_INVALID
Definition: dsm_impl.h:58
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:863
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:953
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:350
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define LOG
Definition: elog.h:31
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
void err(int eval, const char *fmt,...)
Definition: err.c:43
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1254
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:85
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1238
struct Latch * MyLatch
Definition: globals.c:58
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:1113
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define MaxTupleAttributeNumber
Definition: htup_details.h:34
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:313
void proc_exit(int code)
Definition: ipc.c:104
int i
Definition: isn.c:73
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:540
void ResetLatch(Latch *latch)
Definition: latch.c:699
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:492
#define WL_SOCKET_READABLE
Definition: latch.h:126
#define WL_TIMEOUT
Definition: latch.h:128
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define WL_LATCH_SET
Definition: latch.h:125
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
Definition: launcher.c:249
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:663
bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm)
Definition: launcher.c:306
void logicalrep_worker_wakeup(Oid subid, Oid relid)
Definition: launcher.c:643
static dshash_table * last_start_times
Definition: launcher.c:95
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:61
int max_sync_workers_per_subscription
Definition: launcher.c:58
int logicalrep_sync_worker_count(Oid subid)
Definition: launcher.c:811
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition: launcher.c:1031
Assert(fmt[strlen(fmt) - 1] !='\n')
List * lappend(List *list, void *datum)
Definition: list.c:338
void list_free_deep(List *list)
Definition: list.c:1559
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:228
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:109
#define NoLock
Definition: lockdefs.h:34
#define AccessShareLock
Definition: lockdefs.h:36
#define RowExclusiveLock
Definition: lockdefs.h:38
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3331
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1910
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1195
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
@ LW_SHARED
Definition: lwlock.h:116
DefElem * makeDefElem(char *name, Node *arg, int location)
Definition: makefuncs.c:548
void pfree(void *pointer)
Definition: mcxt.c:1436
void * palloc0(Size size)
Definition: mcxt.c:1241
MemoryContext CacheMemoryContext
Definition: mcxt.c:144
void * palloc(Size size)
Definition: mcxt.c:1210
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition: miscinit.c:984
Oid GetUserId(void)
Definition: miscinit.c:510
ObjectType get_relkind_objtype(char relkind)
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:158
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:221
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:252
void replorigin_session_reset(void)
Definition: origin.c:1187
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition: origin.c:411
RepOriginId replorigin_session_origin
Definition: origin.c:156
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:888
void replorigin_session_setup(RepOriginId node, int acquired_by)
Definition: origin.c:1095
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1234
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:157
#define InvalidRepOriginId
Definition: origin.h:33
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:138
ParseState * make_parsestate(ParseState *parentParseState)
Definition: parse_node.c:44
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, int lockmode, Alias *alias, bool inh, bool inFromCl)
#define ACL_INSERT
Definition: parsenodes.h:83
int16 attnum
Definition: pg_attribute.h:83
void * arg
NameData relname
Definition: pg_class.h:38
#define NAMEDATALEN
const void size_t len
#define lfirst(lc)
Definition: pg_list.h:172
#define NIL
Definition: pg_list.h:68
#define list_make1(x1)
Definition: pg_list.h:212
#define for_each_from(cell, lst, N)
Definition: pg_list.h:414
#define linitial(l)
Definition: pg_list.h:178
#define foreach_current_index(cell)
Definition: pg_list.h:403
List * GetSubscriptionRelations(Oid subid, bool not_ready)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
bool HasSubscriptionRelations(Oid subid)
#define LOGICALREP_TWOPHASE_STATE_DISABLED
#define LOGICALREP_TWOPHASE_STATE_PENDING
#define LOGICALREP_TWOPHASE_STATE_ENABLED
static char * buf
Definition: pg_test_fsync.c:67
long pgstat_report_stat(bool force)
Definition: pgstat.c:575
int pgsocket
Definition: port.h:29
#define snprintf
Definition: port.h:238
#define PGINVALID_SOCKET
Definition: port.h:31
static bool DatumGetBool(Datum X)
Definition: postgres.h:90
uintptr_t Datum
Definition: postgres.h:64
static Oid DatumGetObjectId(Datum X)
Definition: postgres.h:242
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
static char DatumGetChar(Datum X)
Definition: postgres.h:112
static int16 DatumGetInt16(Datum X)
Definition: postgres.h:162
static Datum CharGetDatum(char X)
Definition: postgres.h:122
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
static int fd(const char *x, int i)
Definition: preproc-init.c:105
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:103
#define RelationGetRelid(relation)
Definition: rel.h:503
#define RelationGetDescr(relation)
Definition: rel.h:529
#define RelationGetRelationName(relation)
Definition: rel.h:537
#define RelationGetNamespace(relation)
Definition: rel.h:544
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition: rls.c:52
@ RLS_ENABLED
Definition: rls.h:45
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:11551
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:11635
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:251
void PushActiveSnapshot(Snapshot snapshot)
Definition: snapmgr.c:683
void PopActiveSnapshot(void)
Definition: snapmgr.c:778
void InvalidateCatalogSnapshot(void)
Definition: snapmgr.c:457
#define SpinLockRelease(lock)
Definition: spin.h:64
#define SpinLockAcquire(lock)
Definition: spin.h:62
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:162
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:325
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:471
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:32
StringInfo makeStringInfo(void)
Definition: stringinfo.c:41
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
Definition: dynahash.c:220
ItemPointerData t_self
Definition: htup.h:65
Definition: pg_list.h:54
LogicalRepRelation remoterel
LogicalRepRelId remoteid
Definition: logicalproto.h:107
Bitmapset * attkeys
Definition: logicalproto.h:115
XLogRecPtr relstate_lsn
Definition: nodes.h:129
Form_pg_class rd_rel
Definition: rel.h:110
Tuplestorestate * tuplestore
Definition: walreceiver.h:223
TupleDesc tupledesc
Definition: walreceiver.h:224
WalRcvExecStatus status
Definition: walreceiver.h:220
Definition: regguts.h:318
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:179
@ SUBSCRIPTIONOID
Definition: syscache.h:99
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
static List * table_states_not_ready
Definition: tablesync.c:125
bool AllTablesyncsReady(void)
Definition: tablesync.c:1573
static bool wait_for_worker_state_change(char expected_state)
Definition: tablesync.c:223
static bool table_states_valid
Definition: tablesync.c:124
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:272
static void pg_attribute_noreturn() finish_sync_worker(void)
Definition: tablesync.c:134
static void process_syncing_tables_for_apply(XLogRecPtr current_lsn)
Definition: tablesync.c:409
static void process_syncing_tables_for_sync(XLogRecPtr current_lsn)
Definition: tablesync.c:286
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
Definition: tablesync.c:1228
static int copy_read_data(void *outbuf, int minread, int maxread)
Definition: tablesync.c:687
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:647
char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:1244
static void copy_table(Relation rel)
Definition: tablesync.c:1084
static bool wait_for_relation_state_change(Oid relid, char expected_state)
Definition: tablesync.c:175
static StringInfo copybuf
Definition: tablesync.c:128
static bool FetchTableStates(bool *started_tx)
Definition: tablesync.c:1512
static void fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, List **qual)
Definition: tablesync.c:763
static List * make_copy_attnamelist(LogicalRepRelMapEntry *rel)
Definition: tablesync.c:667
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition: tablesync.c:1598
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
Definition: tuplestore.c:1078
int64 tuplestore_tuple_count(Tuplestorestate *state)
Definition: tuplestore.c:546
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:471
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: tuptable.h:427
String * makeString(char *str)
Definition: value.c:63
#define strVal(v)
Definition: value.h:82
@ WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE
Definition: wait_event.h:112
@ WAIT_EVENT_LOGICAL_SYNC_DATA
Definition: wait_event.h:111
#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
Definition: walreceiver.h:430
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:408
@ WALRCV_OK_COMMAND
Definition: walreceiver.h:205
@ WALRCV_OK_TUPLES
Definition: walreceiver.h:207
@ WALRCV_OK_COPY_OUT
Definition: walreceiver.h:209
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:440
#define walrcv_server_version(conn)
Definition: walreceiver.h:418
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:424
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:434
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:426
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
static bool am_parallel_apply_worker(void)
static bool am_tablesync_worker(void)
bool IsTransactionState(void)
Definition: xact.c:378
void CommandCounterIncrement(void)
Definition: xact.c:1078
void StartTransactionCommand(void)
Definition: xact.c:2944
void CommitTransactionCommand(void)
Definition: xact.c:3041
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4177
XLogRecPtr GetXLogWriteRecPtr(void)
Definition: xlog.c:8873
int wal_retrieve_retry_interval
Definition: xlog.c:137
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2514
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint32 TimeLineID
Definition: xlogdefs.h:59