PostgreSQL Source Code git master
tablesync.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 * tablesync.c
3 * PostgreSQL logical replication: initial table data synchronization
4 *
5 * Copyright (c) 2012-2025, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/tablesync.c
9 *
10 * NOTES
11 * This file contains code for initial table data synchronization for
12 * logical replication.
13 *
14 * The initial data synchronization is done separately for each table,
15 * in a separate apply worker that only fetches the initial snapshot data
16 * from the publisher and then synchronizes the position in the stream with
17 * the leader apply worker.
18 *
19 * There are several reasons for doing the synchronization this way:
20 * - It allows us to parallelize the initial data synchronization
21 * which lowers the time needed for it to happen.
22 * - The initial synchronization does not have to hold the xid and LSN
23 * for the time it takes to copy data of all tables, causing less
24 * bloat and lower disk consumption compared to doing the
25 * synchronization in a single process for the whole database.
26 * - It allows us to synchronize any tables added after the initial
27 * synchronization has finished.
28 *
29 * The stream position synchronization works in multiple steps:
30 * - Apply worker requests a tablesync worker to start, setting the new
31 * table state to INIT.
32 * - Tablesync worker starts; changes table state from INIT to DATASYNC while
33 * copying.
34 * - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
35 * worker specific) state to indicate when the copy phase has completed, so
36 * if the worker crashes with this (non-memory) state then the copy will not
37 * be re-attempted.
38 * - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
39 * - Apply worker periodically checks for tables in SYNCWAIT state. When
40 * any appear, it sets the table state to CATCHUP and starts loop-waiting
41 * until either the table state is set to SYNCDONE or the sync worker
42 * exits.
43 * - After the sync worker has seen the state change to CATCHUP, it will
44 * read the stream and apply changes (acting like an apply worker) until
45 * it catches up to the specified stream position. Then it sets the
46 * state to SYNCDONE. There might be zero changes applied between
47 * CATCHUP and SYNCDONE, because the sync worker might be ahead of the
48 * apply worker.
49 * - Once the state is set to SYNCDONE, the apply will continue tracking
50 * the table until it reaches the SYNCDONE stream position, at which
51 * point it sets state to READY and stops tracking. Again, there might
52 * be zero changes in between.
53 *
54 * So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
55 * -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
56 *
57 * The catalog pg_subscription_rel is used to keep information about
58 * subscribed tables and their state. The catalog holds all states
59 * except SYNCWAIT and CATCHUP which are only in shared memory.
60 *
61 * Example flows look like this:
62 * - Apply is in front:
63 * sync:8
64 * -> set in catalog FINISHEDCOPY
65 * -> set in memory SYNCWAIT
66 * apply:10
67 * -> set in memory CATCHUP
68 * -> enter wait-loop
69 * sync:10
70 * -> set in catalog SYNCDONE
71 * -> exit
72 * apply:10
73 * -> exit wait-loop
74 * -> continue rep
75 * apply:11
76 * -> set in catalog READY
77 *
78 * - Sync is in front:
79 * sync:10
80 * -> set in catalog FINISHEDCOPY
81 * -> set in memory SYNCWAIT
82 * apply:8
83 * -> set in memory CATCHUP
84 * -> continue per-table filtering
85 * sync:10
86 * -> set in catalog SYNCDONE
87 * -> exit
88 * apply:10
89 * -> set in catalog READY
90 * -> stop per-table filtering
91 * -> continue rep
92 *-------------------------------------------------------------------------
93 */
94
95#include "postgres.h"
96
97#include "access/table.h"
98#include "access/xact.h"
99#include "catalog/indexing.h"
101#include "catalog/pg_type.h"
102#include "commands/copy.h"
103#include "miscadmin.h"
104#include "nodes/makefuncs.h"
106#include "pgstat.h"
110#include "replication/origin.h"
111#include "replication/slot.h"
114#include "storage/ipc.h"
115#include "storage/lmgr.h"
116#include "utils/acl.h"
117#include "utils/array.h"
118#include "utils/builtins.h"
119#include "utils/lsyscache.h"
120#include "utils/memutils.h"
121#include "utils/rls.h"
122#include "utils/snapmgr.h"
123#include "utils/syscache.h"
124#include "utils/usercontext.h"
125
126typedef enum
127{
132
135static bool FetchTableStates(bool *started_tx);
136
137static StringInfo copybuf = NULL;
138
139/*
140 * Exit routine for synchronization worker.
141 */
142static void
144finish_sync_worker(void)
145{
146 /*
147 * Commit any outstanding transaction. This is the usual case, unless
148 * there was nothing to do for the table.
149 */
150 if (IsTransactionState())
151 {
153 pgstat_report_stat(true);
154 }
155
156 /* And flush all writes. */
158
160 ereport(LOG,
161 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
165
166 /* Find the leader apply worker and signal it. */
168
169 /* Stop gracefully */
170 proc_exit(0);
171}
172
173/*
174 * Wait until the relation sync state is set in the catalog to the expected
175 * one; return true when it happens.
176 *
177 * Returns false if the table sync worker or the table itself have
178 * disappeared, or the table state has been reset.
179 *
180 * Currently, this is used in the apply worker when transitioning from
181 * CATCHUP state to SYNCDONE.
182 */
183static bool
184wait_for_relation_state_change(Oid relid, char expected_state)
185{
186 char state;
187
188 for (;;)
189 {
190 LogicalRepWorker *worker;
191 XLogRecPtr statelsn;
192
194
197 relid, &statelsn);
198
199 if (state == SUBREL_STATE_UNKNOWN)
200 break;
201
202 if (state == expected_state)
203 return true;
204
205 /* Check if the sync worker is still running and bail if not. */
206 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
208 false);
209 LWLockRelease(LogicalRepWorkerLock);
210 if (!worker)
211 break;
212
213 (void) WaitLatch(MyLatch,
215 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
216
218 }
219
220 return false;
221}
222
223/*
224 * Wait until the apply worker changes the state of our synchronization
225 * worker to the expected one.
226 *
227 * Used when transitioning from SYNCWAIT state to CATCHUP.
228 *
229 * Returns false if the apply worker has disappeared.
230 */
231static bool
233{
234 int rc;
235
236 for (;;)
237 {
238 LogicalRepWorker *worker;
239
241
242 /*
243 * Done if already in correct state. (We assume this fetch is atomic
244 * enough to not give a misleading answer if we do it with no lock.)
245 */
246 if (MyLogicalRepWorker->relstate == expected_state)
247 return true;
248
249 /*
250 * Bail out if the apply worker has died, else signal it we're
251 * waiting.
252 */
253 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
255 InvalidOid, false);
256 if (worker && worker->proc)
258 LWLockRelease(LogicalRepWorkerLock);
259 if (!worker)
260 break;
261
262 /*
263 * Wait. We expect to get a latch signal back from the apply worker,
264 * but use a timeout in case it dies without sending one.
265 */
266 rc = WaitLatch(MyLatch,
268 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
269
270 if (rc & WL_LATCH_SET)
272 }
273
274 return false;
275}
276
277/*
278 * Callback from syscache invalidation.
279 */
280void
282{
284}
285
286/*
287 * Handle table synchronization cooperation from the synchronization
288 * worker.
289 *
290 * If the sync worker is in CATCHUP state and reached (or passed) the
291 * predetermined synchronization point in the WAL stream, mark the table as
292 * SYNCDONE and finish.
293 */
294static void
296{
298
299 if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
300 current_lsn >= MyLogicalRepWorker->relstate_lsn)
301 {
302 TimeLineID tli;
303 char syncslotname[NAMEDATALEN] = {0};
304 char originname[NAMEDATALEN] = {0};
305
306 MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
307 MyLogicalRepWorker->relstate_lsn = current_lsn;
308
310
311 /*
312 * UpdateSubscriptionRelState must be called within a transaction.
313 */
314 if (!IsTransactionState())
316
321
322 /*
323 * End streaming so that LogRepWorkerWalRcvConn can be used to drop
324 * the slot.
325 */
327
328 /*
329 * Cleanup the tablesync slot.
330 *
331 * This has to be done after updating the state because otherwise if
332 * there is an error while doing the database operations we won't be
333 * able to rollback dropped slot.
334 */
337 syncslotname,
338 sizeof(syncslotname));
339
340 /*
341 * It is important to give an error if we are unable to drop the slot,
342 * otherwise, it won't be dropped till the corresponding subscription
343 * is dropped. So passing missing_ok = false.
344 */
346
348 pgstat_report_stat(false);
349
350 /*
351 * Start a new transaction to clean up the tablesync origin tracking.
352 * This transaction will be ended within the finish_sync_worker().
353 * Now, even, if we fail to remove this here, the apply worker will
354 * ensure to clean it up afterward.
355 *
356 * We need to do this after the table state is set to SYNCDONE.
357 * Otherwise, if an error occurs while performing the database
358 * operation, the worker will be restarted and the in-memory state of
359 * replication progress (remote_lsn) won't be rolled-back which would
360 * have been cleared before restart. So, the restarted worker will use
361 * invalid replication progress state resulting in replay of
362 * transactions that have already been applied.
363 */
365
368 originname,
369 sizeof(originname));
370
371 /*
372 * Resetting the origin session removes the ownership of the slot.
373 * This is needed to allow the origin to be dropped.
374 */
379
380 /*
381 * Drop the tablesync's origin tracking if exists.
382 *
383 * There is a chance that the user is concurrently performing refresh
384 * for the subscription where we remove the table state and its origin
385 * or the apply worker would have removed this origin. So passing
386 * missing_ok = true.
387 */
388 replorigin_drop_by_name(originname, true, false);
389
390 finish_sync_worker();
391 }
392 else
394}
395
396/*
397 * Handle table synchronization cooperation from the apply worker.
398 *
399 * Walk over all subscription tables that are individually tracked by the
400 * apply process (currently, all that have state other than
401 * SUBREL_STATE_READY) and manage synchronization for them.
402 *
403 * If there are tables that need synchronizing and are not being synchronized
404 * yet, start sync workers for them (if there are free slots for sync
405 * workers). To prevent starting the sync worker for the same relation at a
406 * high frequency after a failure, we store its last start time with each sync
407 * state info. We start the sync worker for the same relation after waiting
408 * at least wal_retrieve_retry_interval.
409 *
410 * For tables that are being synchronized already, check if sync workers
411 * either need action from the apply worker or have finished. This is the
412 * SYNCWAIT to CATCHUP transition.
413 *
414 * If the synchronization position is reached (SYNCDONE), then the table can
415 * be marked as READY and is no longer tracked.
416 */
417static void
419{
420 struct tablesync_start_time_mapping
421 {
422 Oid relid;
423 TimestampTz last_start_time;
424 };
425 static HTAB *last_start_times = NULL;
426 ListCell *lc;
427 bool started_tx = false;
428 bool should_exit = false;
429
431
432 /* We need up-to-date sync state info for subscription tables here. */
433 FetchTableStates(&started_tx);
434
435 /*
436 * Prepare a hash table for tracking last start times of workers, to avoid
437 * immediate restarts. We don't need it if there are no tables that need
438 * syncing.
439 */
441 {
442 HASHCTL ctl;
443
444 ctl.keysize = sizeof(Oid);
445 ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
446 last_start_times = hash_create("Logical replication table sync worker start times",
447 256, &ctl, HASH_ELEM | HASH_BLOBS);
448 }
449
450 /*
451 * Clean up the hash table when we're done with all tables (just to
452 * release the bit of memory).
453 */
455 {
457 last_start_times = NULL;
458 }
459
460 /*
461 * Process all tables that are being synchronized.
462 */
463 foreach(lc, table_states_not_ready)
464 {
466
467 if (rstate->state == SUBREL_STATE_SYNCDONE)
468 {
469 /*
470 * Apply has caught up to the position where the table sync has
471 * finished. Mark the table as ready so that the apply will just
472 * continue to replicate it normally.
473 */
474 if (current_lsn >= rstate->lsn)
475 {
476 char originname[NAMEDATALEN];
477
478 rstate->state = SUBREL_STATE_READY;
479 rstate->lsn = current_lsn;
480 if (!started_tx)
481 {
483 started_tx = true;
484 }
485
486 /*
487 * Remove the tablesync origin tracking if exists.
488 *
489 * There is a chance that the user is concurrently performing
490 * refresh for the subscription where we remove the table
491 * state and its origin or the tablesync worker would have
492 * already removed this origin. We can't rely on tablesync
493 * worker to remove the origin tracking as if there is any
494 * error while dropping we won't restart it to drop the
495 * origin. So passing missing_ok = true.
496 */
498 rstate->relid,
499 originname,
500 sizeof(originname));
501 replorigin_drop_by_name(originname, true, false);
502
503 /*
504 * Update the state to READY only after the origin cleanup.
505 */
507 rstate->relid, rstate->state,
508 rstate->lsn);
509 }
510 }
511 else
512 {
513 LogicalRepWorker *syncworker;
514
515 /*
516 * Look for a sync worker for this relation.
517 */
518 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
519
521 rstate->relid, false);
522
523 if (syncworker)
524 {
525 /* Found one, update our copy of its state */
526 SpinLockAcquire(&syncworker->relmutex);
527 rstate->state = syncworker->relstate;
528 rstate->lsn = syncworker->relstate_lsn;
529 if (rstate->state == SUBREL_STATE_SYNCWAIT)
530 {
531 /*
532 * Sync worker is waiting for apply. Tell sync worker it
533 * can catchup now.
534 */
535 syncworker->relstate = SUBREL_STATE_CATCHUP;
536 syncworker->relstate_lsn =
537 Max(syncworker->relstate_lsn, current_lsn);
538 }
539 SpinLockRelease(&syncworker->relmutex);
540
541 /* If we told worker to catch up, wait for it. */
542 if (rstate->state == SUBREL_STATE_SYNCWAIT)
543 {
544 /* Signal the sync worker, as it may be waiting for us. */
545 if (syncworker->proc)
547
548 /* Now safe to release the LWLock */
549 LWLockRelease(LogicalRepWorkerLock);
550
551 if (started_tx)
552 {
553 /*
554 * We must commit the existing transaction to release
555 * the existing locks before entering a busy loop.
556 * This is required to avoid any undetected deadlocks
557 * due to any existing lock as deadlock detector won't
558 * be able to detect the waits on the latch.
559 */
561 pgstat_report_stat(false);
562 }
563
564 /*
565 * Enter busy loop and wait for synchronization worker to
566 * reach expected state (or die trying).
567 */
569 started_tx = true;
570
572 SUBREL_STATE_SYNCDONE);
573 }
574 else
575 LWLockRelease(LogicalRepWorkerLock);
576 }
577 else
578 {
579 /*
580 * If there is no sync worker for this table yet, count
581 * running sync workers for this subscription, while we have
582 * the lock.
583 */
584 int nsyncworkers =
586
587 /* Now safe to release the LWLock */
588 LWLockRelease(LogicalRepWorkerLock);
589
590 /*
591 * If there are free sync worker slot(s), start a new sync
592 * worker for the table.
593 */
594 if (nsyncworkers < max_sync_workers_per_subscription)
595 {
597 struct tablesync_start_time_mapping *hentry;
598 bool found;
599
600 hentry = hash_search(last_start_times, &rstate->relid,
601 HASH_ENTER, &found);
602
603 if (!found ||
604 TimestampDifferenceExceeds(hentry->last_start_time, now,
606 {
612 rstate->relid,
614 hentry->last_start_time = now;
615 }
616 }
617 }
618 }
619 }
620
621 if (started_tx)
622 {
623 /*
624 * Even when the two_phase mode is requested by the user, it remains
625 * as 'pending' until all tablesyncs have reached READY state.
626 *
627 * When this happens, we restart the apply worker and (if the
628 * conditions are still ok) then the two_phase tri-state will become
629 * 'enabled' at that time.
630 *
631 * Note: If the subscription has no tables then leave the state as
632 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
633 * work.
634 */
635 if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
636 {
637 CommandCounterIncrement(); /* make updates visible */
638 if (AllTablesyncsReady())
639 {
640 ereport(LOG,
641 (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
643 should_exit = true;
644 }
645 }
646
648 pgstat_report_stat(true);
649 }
650
651 if (should_exit)
652 {
653 /*
654 * Reset the last-start time for this worker so that the launcher will
655 * restart it without waiting for wal_retrieve_retry_interval.
656 */
658
659 proc_exit(0);
660 }
661}
662
663/*
664 * Process possible state change(s) of tables that are being synchronized.
665 */
666void
668{
669 switch (MyLogicalRepWorker->type)
670 {
672
673 /*
674 * Skip for parallel apply workers because they only operate on
675 * tables that are in a READY state. See pa_can_start() and
676 * should_apply_changes_for_rel().
677 */
678 break;
679
682 break;
683
684 case WORKERTYPE_APPLY:
686 break;
687
689 /* Should never happen. */
690 elog(ERROR, "Unknown worker type");
691 }
692}
693
694/*
695 * Create list of columns for COPY based on logical relation mapping.
696 */
697static List *
699{
700 List *attnamelist = NIL;
701 int i;
702
703 for (i = 0; i < rel->remoterel.natts; i++)
704 {
705 attnamelist = lappend(attnamelist,
707 }
708
709
710 return attnamelist;
711}
712
713/*
714 * Data source callback for the COPY FROM, which reads from the remote
715 * connection and passes the data back to our local COPY.
716 */
717static int
718copy_read_data(void *outbuf, int minread, int maxread)
719{
720 int bytesread = 0;
721 int avail;
722
723 /* If there are some leftover data from previous read, use it. */
724 avail = copybuf->len - copybuf->cursor;
725 if (avail)
726 {
727 if (avail > maxread)
728 avail = maxread;
729 memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
730 copybuf->cursor += avail;
731 maxread -= avail;
732 bytesread += avail;
733 }
734
735 while (maxread > 0 && bytesread < minread)
736 {
738 int len;
739 char *buf = NULL;
740
741 for (;;)
742 {
743 /* Try read the data. */
745
747
748 if (len == 0)
749 break;
750 else if (len < 0)
751 return bytesread;
752 else
753 {
754 /* Process the data */
755 copybuf->data = buf;
756 copybuf->len = len;
757 copybuf->cursor = 0;
758
759 avail = copybuf->len - copybuf->cursor;
760 if (avail > maxread)
761 avail = maxread;
762 memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
763 outbuf = (char *) outbuf + avail;
764 copybuf->cursor += avail;
765 maxread -= avail;
766 bytesread += avail;
767 }
768
769 if (maxread <= 0 || bytesread >= minread)
770 return bytesread;
771 }
772
773 /*
774 * Wait for more data or latch.
775 */
779 fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
780
782 }
783
784 return bytesread;
785}
786
787
788/*
789 * Get information about remote relation in similar fashion the RELATION
790 * message provides during replication.
791 *
792 * This function also returns (a) the relation qualifications to be used in
793 * the COPY command, and (b) whether the remote relation has published any
794 * generated column.
795 */
796static void
798 List **qual, bool *gencol_published)
799{
801 StringInfoData cmd;
802 TupleTableSlot *slot;
803 Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
804 Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
805 Oid qualRow[] = {TEXTOID};
806 bool isnull;
807 int natt;
808 StringInfo pub_names = NULL;
809 Bitmapset *included_cols = NULL;
811
812 lrel->nspname = nspname;
813 lrel->relname = relname;
814
815 /* First fetch Oid and replica identity. */
816 initStringInfo(&cmd);
817 appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
818 " FROM pg_catalog.pg_class c"
819 " INNER JOIN pg_catalog.pg_namespace n"
820 " ON (c.relnamespace = n.oid)"
821 " WHERE n.nspname = %s"
822 " AND c.relname = %s",
823 quote_literal_cstr(nspname),
826 lengthof(tableRow), tableRow);
827
828 if (res->status != WALRCV_OK_TUPLES)
830 (errcode(ERRCODE_CONNECTION_FAILURE),
831 errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
832 nspname, relname, res->err)));
833
835 if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
837 (errcode(ERRCODE_UNDEFINED_OBJECT),
838 errmsg("table \"%s.%s\" not found on publisher",
839 nspname, relname)));
840
841 lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
842 Assert(!isnull);
843 lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
844 Assert(!isnull);
845 lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
846 Assert(!isnull);
847
850
851
852 /*
853 * Get column lists for each relation.
854 *
855 * We need to do this before fetching info about column names and types,
856 * so that we can skip columns that should not be replicated.
857 */
858 if (server_version >= 150000)
859 {
860 WalRcvExecResult *pubres;
861 TupleTableSlot *tslot;
862 Oid attrsRow[] = {INT2VECTOROID};
863
864 /* Build the pub_names comma-separated string. */
865 pub_names = makeStringInfo();
867
868 /*
869 * Fetch info about column lists for the relation (from all the
870 * publications).
871 */
872 resetStringInfo(&cmd);
873 appendStringInfo(&cmd,
874 "SELECT DISTINCT"
875 " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
876 " THEN NULL ELSE gpt.attrs END)"
877 " FROM pg_publication p,"
878 " LATERAL pg_get_publication_tables(p.pubname) gpt,"
879 " pg_class c"
880 " WHERE gpt.relid = %u AND c.oid = gpt.relid"
881 " AND p.pubname IN ( %s )",
882 lrel->remoteid,
883 pub_names->data);
884
886 lengthof(attrsRow), attrsRow);
887
888 if (pubres->status != WALRCV_OK_TUPLES)
890 (errcode(ERRCODE_CONNECTION_FAILURE),
891 errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
892 nspname, relname, pubres->err)));
893
894 /*
895 * We don't support the case where the column list is different for
896 * the same table when combining publications. See comments atop
897 * fetch_table_list. So there should be only one row returned.
898 * Although we already checked this when creating the subscription, we
899 * still need to check here in case the column list was changed after
900 * creating the subscription and before the sync worker is started.
901 */
902 if (tuplestore_tuple_count(pubres->tuplestore) > 1)
904 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
905 errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
906 nspname, relname));
907
908 /*
909 * Get the column list and build a single bitmap with the attnums.
910 *
911 * If we find a NULL value, it means all the columns should be
912 * replicated.
913 */
915 if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
916 {
917 Datum cfval = slot_getattr(tslot, 1, &isnull);
918
919 if (!isnull)
920 {
921 ArrayType *arr;
922 int nelems;
923 int16 *elems;
924
925 arr = DatumGetArrayTypeP(cfval);
926 nelems = ARR_DIMS(arr)[0];
927 elems = (int16 *) ARR_DATA_PTR(arr);
928
929 for (natt = 0; natt < nelems; natt++)
930 included_cols = bms_add_member(included_cols, elems[natt]);
931 }
932
933 ExecClearTuple(tslot);
934 }
936
937 walrcv_clear_result(pubres);
938 }
939
940 /*
941 * Now fetch column names and types.
942 */
943 resetStringInfo(&cmd);
944 appendStringInfo(&cmd,
945 "SELECT a.attnum,"
946 " a.attname,"
947 " a.atttypid,"
948 " a.attnum = ANY(i.indkey)");
949
950 /* Generated columns can be replicated since version 18. */
951 if (server_version >= 180000)
952 appendStringInfo(&cmd, ", a.attgenerated != ''");
953
954 appendStringInfo(&cmd,
955 " FROM pg_catalog.pg_attribute a"
956 " LEFT JOIN pg_catalog.pg_index i"
957 " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
958 " WHERE a.attnum > 0::pg_catalog.int2"
959 " AND NOT a.attisdropped %s"
960 " AND a.attrelid = %u"
961 " ORDER BY a.attnum",
962 lrel->remoteid,
963 (server_version >= 120000 && server_version < 180000 ?
964 "AND a.attgenerated = ''" : ""),
965 lrel->remoteid);
967 server_version >= 180000 ? lengthof(attrRow) : lengthof(attrRow) - 1, attrRow);
968
969 if (res->status != WALRCV_OK_TUPLES)
971 (errcode(ERRCODE_CONNECTION_FAILURE),
972 errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
973 nspname, relname, res->err)));
974
975 /* We don't know the number of rows coming, so allocate enough space. */
976 lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
977 lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
978 lrel->attkeys = NULL;
979
980 /*
981 * Store the columns as a list of names. Ignore those that are not
982 * present in the column list, if there is one.
983 */
984 natt = 0;
986 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
987 {
988 char *rel_colname;
990
991 attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
992 Assert(!isnull);
993
994 /* If the column is not in the column list, skip it. */
995 if (included_cols != NULL && !bms_is_member(attnum, included_cols))
996 {
997 ExecClearTuple(slot);
998 continue;
999 }
1000
1001 rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
1002 Assert(!isnull);
1003
1004 lrel->attnames[natt] = rel_colname;
1005 lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
1006 Assert(!isnull);
1007
1008 if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
1009 lrel->attkeys = bms_add_member(lrel->attkeys, natt);
1010
1011 /* Remember if the remote table has published any generated column. */
1012 if (server_version >= 180000 && !(*gencol_published))
1013 {
1014 *gencol_published = DatumGetBool(slot_getattr(slot, 5, &isnull));
1015 Assert(!isnull);
1016 }
1017
1018 /* Should never happen. */
1019 if (++natt >= MaxTupleAttributeNumber)
1020 elog(ERROR, "too many columns in remote table \"%s.%s\"",
1021 nspname, relname);
1022
1023 ExecClearTuple(slot);
1024 }
1026
1027 lrel->natts = natt;
1028
1030
1031 /*
1032 * Get relation's row filter expressions. DISTINCT avoids the same
1033 * expression of a table in multiple publications from being included
1034 * multiple times in the final expression.
1035 *
1036 * We need to copy the row even if it matches just one of the
1037 * publications, so we later combine all the quals with OR.
1038 *
1039 * For initial synchronization, row filtering can be ignored in following
1040 * cases:
1041 *
1042 * 1) one of the subscribed publications for the table hasn't specified
1043 * any row filter
1044 *
1045 * 2) one of the subscribed publications has puballtables set to true
1046 *
1047 * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
1048 * that includes this relation
1049 */
1050 if (server_version >= 150000)
1051 {
1052 /* Reuse the already-built pub_names. */
1053 Assert(pub_names != NULL);
1054
1055 /* Check for row filters. */
1056 resetStringInfo(&cmd);
1057 appendStringInfo(&cmd,
1058 "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
1059 " FROM pg_publication p,"
1060 " LATERAL pg_get_publication_tables(p.pubname) gpt"
1061 " WHERE gpt.relid = %u"
1062 " AND p.pubname IN ( %s )",
1063 lrel->remoteid,
1064 pub_names->data);
1065
1066 res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
1067
1068 if (res->status != WALRCV_OK_TUPLES)
1069 ereport(ERROR,
1070 (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
1071 nspname, relname, res->err)));
1072
1073 /*
1074 * Multiple row filter expressions for the same table will be combined
1075 * by COPY using OR. If any of the filter expressions for this table
1076 * are null, it means the whole table will be copied. In this case it
1077 * is not necessary to construct a unified row filter expression at
1078 * all.
1079 */
1080 slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
1081 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1082 {
1083 Datum rf = slot_getattr(slot, 1, &isnull);
1084
1085 if (!isnull)
1086 *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
1087 else
1088 {
1089 /* Ignore filters and cleanup as necessary. */
1090 if (*qual)
1091 {
1092 list_free_deep(*qual);
1093 *qual = NIL;
1094 }
1095 break;
1096 }
1097
1098 ExecClearTuple(slot);
1099 }
1101
1103 destroyStringInfo(pub_names);
1104 }
1105
1106 pfree(cmd.data);
1107}
1108
1109/*
1110 * Copy existing data of a table from publisher.
1111 *
1112 * Caller is responsible for locking the local relation.
1113 */
1114static void
1116{
1117 LogicalRepRelMapEntry *relmapentry;
1118 LogicalRepRelation lrel;
1119 List *qual = NIL;
1121 StringInfoData cmd;
1122 CopyFromState cstate;
1123 List *attnamelist;
1124 ParseState *pstate;
1125 List *options = NIL;
1126 bool gencol_published = false;
1127
1128 /* Get the publisher relation info. */
1130 RelationGetRelationName(rel), &lrel, &qual,
1131 &gencol_published);
1132
1133 /* Put the relation into relmap. */
1135
1136 /* Map the publisher relation to local one. */
1137 relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
1138 Assert(rel == relmapentry->localrel);
1139
1140 /* Start copy on the publisher. */
1141 initStringInfo(&cmd);
1142
1143 /* Regular table with no row filter or generated columns */
1144 if (lrel.relkind == RELKIND_RELATION && qual == NIL && !gencol_published)
1145 {
1146 appendStringInfo(&cmd, "COPY %s",
1148
1149 /* If the table has columns, then specify the columns */
1150 if (lrel.natts)
1151 {
1152 appendStringInfoString(&cmd, " (");
1153
1154 /*
1155 * XXX Do we need to list the columns in all cases? Maybe we're
1156 * replicating all columns?
1157 */
1158 for (int i = 0; i < lrel.natts; i++)
1159 {
1160 if (i > 0)
1161 appendStringInfoString(&cmd, ", ");
1162
1164 }
1165
1166 appendStringInfoChar(&cmd, ')');
1167 }
1168
1169 appendStringInfoString(&cmd, " TO STDOUT");
1170 }
1171 else
1172 {
1173 /*
1174 * For non-tables and tables with row filters, we need to do COPY
1175 * (SELECT ...), but we can't just do SELECT * because we may need to
1176 * copy only subset of columns including generated columns. For tables
1177 * with any row filters, build a SELECT query with OR'ed row filters
1178 * for COPY.
1179 *
1180 * We also need to use this same COPY (SELECT ...) syntax when
1181 * generated columns are published, because copy of generated columns
1182 * is not supported by the normal COPY.
1183 */
1184 appendStringInfoString(&cmd, "COPY (SELECT ");
1185 for (int i = 0; i < lrel.natts; i++)
1186 {
1188 if (i < lrel.natts - 1)
1189 appendStringInfoString(&cmd, ", ");
1190 }
1191
1192 appendStringInfoString(&cmd, " FROM ");
1193
1194 /*
1195 * For regular tables, make sure we don't copy data from a child that
1196 * inherits the named table as those will be copied separately.
1197 */
1198 if (lrel.relkind == RELKIND_RELATION)
1199 appendStringInfoString(&cmd, "ONLY ");
1200
1202 /* list of OR'ed filters */
1203 if (qual != NIL)
1204 {
1205 ListCell *lc;
1206 char *q = strVal(linitial(qual));
1207
1208 appendStringInfo(&cmd, " WHERE %s", q);
1209 for_each_from(lc, qual, 1)
1210 {
1211 q = strVal(lfirst(lc));
1212 appendStringInfo(&cmd, " OR %s", q);
1213 }
1214 list_free_deep(qual);
1215 }
1216
1217 appendStringInfoString(&cmd, ") TO STDOUT");
1218 }
1219
1220 /*
1221 * Prior to v16, initial table synchronization will use text format even
1222 * if the binary option is enabled for a subscription.
1223 */
1226 {
1227 appendStringInfoString(&cmd, " WITH (FORMAT binary)");
1228 options = list_make1(makeDefElem("format",
1229 (Node *) makeString("binary"), -1));
1230 }
1231
1232 res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
1233 pfree(cmd.data);
1234 if (res->status != WALRCV_OK_COPY_OUT)
1235 ereport(ERROR,
1236 (errcode(ERRCODE_CONNECTION_FAILURE),
1237 errmsg("could not start initial contents copy for table \"%s.%s\": %s",
1238 lrel.nspname, lrel.relname, res->err)));
1240
1242
1243 pstate = make_parsestate(NULL);
1245 NULL, false, false);
1246
1247 attnamelist = make_copy_attnamelist(relmapentry);
1248 cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
1249
1250 /* Do the copy */
1251 (void) CopyFrom(cstate);
1252
1253 logicalrep_rel_close(relmapentry, NoLock);
1254}
1255
1256/*
1257 * Determine the tablesync slot name.
1258 *
1259 * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
1260 * on slot name length. We append system_identifier to avoid slot_name
1261 * collision with subscriptions in other clusters. With the current scheme
1262 * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
1263 * length of slot_name will be 50.
1264 *
1265 * The returned slot name is stored in the supplied buffer (syncslotname) with
1266 * the given size.
1267 *
1268 * Note: We don't use the subscription slot name as part of tablesync slot name
1269 * because we are responsible for cleaning up these slots and it could become
1270 * impossible to recalculate what name to cleanup if the subscription slot name
1271 * had changed.
1272 */
1273void
1275 char *syncslotname, Size szslot)
1276{
1277 snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
1278 relid, GetSystemIdentifier());
1279}
1280
1281/*
1282 * Start syncing the table in the sync worker.
1283 *
1284 * If nothing needs to be done to sync the table, we exit the worker without
1285 * any further action.
1286 *
1287 * The returned slot name is palloc'ed in current memory context.
1288 */
1289static char *
1291{
1292 char *slotname;
1293 char *err;
1294 char relstate;
1295 XLogRecPtr relstate_lsn;
1296 Relation rel;
1297 AclResult aclresult;
1299 char originname[NAMEDATALEN];
1300 RepOriginId originid;
1301 UserContext ucxt;
1302 bool must_use_password;
1303 bool run_as_owner;
1304
1305 /* Check the state of the table synchronization. */
1309 &relstate_lsn);
1311
1312 /* Is the use of a password mandatory? */
1313 must_use_password = MySubscription->passwordrequired &&
1315
1317 MyLogicalRepWorker->relstate = relstate;
1318 MyLogicalRepWorker->relstate_lsn = relstate_lsn;
1320
1321 /*
1322 * If synchronization is already done or no longer necessary, exit now
1323 * that we've updated shared memory state.
1324 */
1325 switch (relstate)
1326 {
1327 case SUBREL_STATE_SYNCDONE:
1328 case SUBREL_STATE_READY:
1329 case SUBREL_STATE_UNKNOWN:
1330 finish_sync_worker(); /* doesn't return */
1331 }
1332
1333 /* Calculate the name of the tablesync slot. */
1334 slotname = (char *) palloc(NAMEDATALEN);
1337 slotname,
1338 NAMEDATALEN);
1339
1340 /*
1341 * Here we use the slot name instead of the subscription name as the
1342 * application_name, so that it is different from the leader apply worker,
1343 * so that synchronous replication can distinguish them.
1344 */
1347 must_use_password,
1348 slotname, &err);
1349 if (LogRepWorkerWalRcvConn == NULL)
1350 ereport(ERROR,
1351 (errcode(ERRCODE_CONNECTION_FAILURE),
1352 errmsg("table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",
1353 MySubscription->name, err)));
1354
1355 Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
1356 MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
1357 MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
1358
1359 /* Assign the origin tracking record name. */
1362 originname,
1363 sizeof(originname));
1364
1365 if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
1366 {
1367 /*
1368 * We have previously errored out before finishing the copy so the
1369 * replication slot might exist. We want to remove the slot if it
1370 * already exists and proceed.
1371 *
1372 * XXX We could also instead try to drop the slot, last time we failed
1373 * but for that, we might need to clean up the copy state as it might
1374 * be in the middle of fetching the rows. Also, if there is a network
1375 * breakdown then it wouldn't have succeeded so trying it next time
1376 * seems like a better bet.
1377 */
1379 }
1380 else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
1381 {
1382 /*
1383 * The COPY phase was previously done, but tablesync then crashed
1384 * before it was able to finish normally.
1385 */
1387
1388 /*
1389 * The origin tracking name must already exist. It was created first
1390 * time this tablesync was launched.
1391 */
1392 originid = replorigin_by_name(originname, false);
1393 replorigin_session_setup(originid, 0);
1394 replorigin_session_origin = originid;
1395 *origin_startpos = replorigin_session_get_progress(false);
1396
1398
1399 goto copy_table_done;
1400 }
1401
1403 MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
1406
1407 /* Update the state and make it visible to others. */
1414 pgstat_report_stat(true);
1415
1417
1418 /*
1419 * Use a standard write lock here. It might be better to disallow access
1420 * to the table while it's being synchronized. But we don't want to block
1421 * the main apply process from working and it has to open the relation in
1422 * RowExclusiveLock when remapping remote relation id to local one.
1423 */
1425
1426 /*
1427 * Start a transaction in the remote node in REPEATABLE READ mode. This
1428 * ensures that both the replication slot we create (see below) and the
1429 * COPY are consistent with each other.
1430 */
1432 "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1433 0, NULL);
1434 if (res->status != WALRCV_OK_COMMAND)
1435 ereport(ERROR,
1436 (errcode(ERRCODE_CONNECTION_FAILURE),
1437 errmsg("table copy could not start transaction on publisher: %s",
1438 res->err)));
1440
1441 /*
1442 * Create a new permanent logical decoding slot. This slot will be used
1443 * for the catchup phase after COPY is done, so tell it to use the
1444 * snapshot to make the final data consistent.
1445 */
1447 slotname, false /* permanent */ , false /* two_phase */ ,
1449 CRS_USE_SNAPSHOT, origin_startpos);
1450
1451 /*
1452 * Setup replication origin tracking. The purpose of doing this before the
1453 * copy is to avoid doing the copy again due to any error in setting up
1454 * origin tracking.
1455 */
1456 originid = replorigin_by_name(originname, true);
1457 if (!OidIsValid(originid))
1458 {
1459 /*
1460 * Origin tracking does not exist, so create it now.
1461 *
1462 * Then advance to the LSN got from walrcv_create_slot. This is WAL
1463 * logged for the purpose of recovery. Locks are to prevent the
1464 * replication origin from vanishing while advancing.
1465 */
1466 originid = replorigin_create(originname);
1467
1468 LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1469 replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
1470 true /* go backward */ , true /* WAL log */ );
1471 UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1472
1473 replorigin_session_setup(originid, 0);
1474 replorigin_session_origin = originid;
1475 }
1476 else
1477 {
1478 ereport(ERROR,
1480 errmsg("replication origin \"%s\" already exists",
1481 originname)));
1482 }
1483
1484 /*
1485 * Make sure that the copy command runs as the table owner, unless the
1486 * user has opted out of that behaviour.
1487 */
1488 run_as_owner = MySubscription->runasowner;
1489 if (!run_as_owner)
1490 SwitchToUntrustedUser(rel->rd_rel->relowner, &ucxt);
1491
1492 /*
1493 * Check that our table sync worker has permission to insert into the
1494 * target table.
1495 */
1496 aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
1497 ACL_INSERT);
1498 if (aclresult != ACLCHECK_OK)
1499 aclcheck_error(aclresult,
1500 get_relkind_objtype(rel->rd_rel->relkind),
1502
1503 /*
1504 * COPY FROM does not honor RLS policies. That is not a problem for
1505 * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
1506 * who has it implicitly), but other roles should not be able to
1507 * circumvent RLS. Disallow logical replication into RLS enabled
1508 * relations for such roles.
1509 */
1511 ereport(ERROR,
1512 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1513 errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1516
1517 /* Now do the initial data copy */
1519 copy_table(rel);
1521
1522 res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
1523 if (res->status != WALRCV_OK_COMMAND)
1524 ereport(ERROR,
1525 (errcode(ERRCODE_CONNECTION_FAILURE),
1526 errmsg("table copy could not finish transaction on publisher: %s",
1527 res->err)));
1529
1530 if (!run_as_owner)
1531 RestoreUserContext(&ucxt);
1532
1533 table_close(rel, NoLock);
1534
1535 /* Make the copy visible. */
1537
1538 /*
1539 * Update the persisted state to indicate the COPY phase is done; make it
1540 * visible to others.
1541 */
1544 SUBREL_STATE_FINISHEDCOPY,
1546
1548
1549copy_table_done:
1550
1551 elog(DEBUG1,
1552 "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
1553 originname, LSN_FORMAT_ARGS(*origin_startpos));
1554
1555 /*
1556 * We are done with the initial data synchronization, update the state.
1557 */
1559 MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
1560 MyLogicalRepWorker->relstate_lsn = *origin_startpos;
1562
1563 /*
1564 * Finally, wait until the leader apply worker tells us to catch up and
1565 * then return to let LogicalRepApplyLoop do it.
1566 */
1567 wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
1568 return slotname;
1569}
1570
1571/*
1572 * Common code to fetch the up-to-date sync state info into the static lists.
1573 *
1574 * Returns true if subscription has 1 or more tables, else false.
1575 *
1576 * Note: If this function started the transaction (indicated by the parameter)
1577 * then it is the caller's responsibility to commit it.
1578 */
1579static bool
1580FetchTableStates(bool *started_tx)
1581{
1582 static bool has_subrels = false;
1583
1584 *started_tx = false;
1585
1587 {
1588 MemoryContext oldctx;
1589 List *rstates;
1590 ListCell *lc;
1591 SubscriptionRelState *rstate;
1592
1594
1595 /* Clean the old lists. */
1598
1599 if (!IsTransactionState())
1600 {
1602 *started_tx = true;
1603 }
1604
1605 /* Fetch all non-ready tables. */
1607
1608 /* Allocate the tracking info in a permanent memory context. */
1610 foreach(lc, rstates)
1611 {
1612 rstate = palloc(sizeof(SubscriptionRelState));
1613 memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
1615 }
1616 MemoryContextSwitchTo(oldctx);
1617
1618 /*
1619 * Does the subscription have tables?
1620 *
1621 * If there were not-READY relations found then we know it does. But
1622 * if table_states_not_ready was empty we still need to check again to
1623 * see if there are 0 tables.
1624 */
1625 has_subrels = (table_states_not_ready != NIL) ||
1627
1628 /*
1629 * If the subscription relation cache has been invalidated since we
1630 * entered this routine, we still use and return the relations we just
1631 * finished constructing, to avoid infinite loops, but we leave the
1632 * table states marked as stale so that we'll rebuild it again on next
1633 * access. Otherwise, we mark the table states as valid.
1634 */
1637 }
1638
1639 return has_subrels;
1640}
1641
1642/*
1643 * Execute the initial sync with error handling. Disable the subscription,
1644 * if it's required.
1645 *
1646 * Allocate the slot name in long-lived context on return. Note that we don't
1647 * handle FATAL errors which are probably because of system resource error and
1648 * are not repeatable.
1649 */
1650static void
1651start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
1652{
1653 char *sync_slotname = NULL;
1654
1656
1657 PG_TRY();
1658 {
1659 /* Call initial sync. */
1660 sync_slotname = LogicalRepSyncTableStart(origin_startpos);
1661 }
1662 PG_CATCH();
1663 {
1666 else
1667 {
1668 /*
1669 * Report the worker failed during table synchronization. Abort
1670 * the current transaction so that the stats message is sent in an
1671 * idle state.
1672 */
1675
1676 PG_RE_THROW();
1677 }
1678 }
1679 PG_END_TRY();
1680
1681 /* allocate slot name in long-lived context */
1682 *slotname = MemoryContextStrdup(ApplyContext, sync_slotname);
1683 pfree(sync_slotname);
1684}
1685
1686/*
1687 * Runs the tablesync worker.
1688 *
1689 * It starts syncing tables. After a successful sync, sets streaming options
1690 * and starts streaming to catchup with apply worker.
1691 */
1692static void
1694{
1695 char originname[NAMEDATALEN];
1696 XLogRecPtr origin_startpos = InvalidXLogRecPtr;
1697 char *slotname = NULL;
1699
1700 start_table_sync(&origin_startpos, &slotname);
1701
1704 originname,
1705 sizeof(originname));
1706
1708
1709 set_stream_options(&options, slotname, &origin_startpos);
1710
1712
1713 /* Apply the changes till we catchup with the apply worker. */
1714 start_apply(origin_startpos);
1715}
1716
1717/* Logical Replication Tablesync worker entry point */
1718void
1720{
1721 int worker_slot = DatumGetInt32(main_arg);
1722
1723 SetupApplyOrSyncWorker(worker_slot);
1724
1726
1727 finish_sync_worker();
1728}
1729
1730/*
1731 * If the subscription has no tables then return false.
1732 *
1733 * Otherwise, are all tablesyncs READY?
1734 *
1735 * Note: This function is not suitable to be called from outside of apply or
1736 * tablesync workers because MySubscription needs to be already initialized.
1737 */
1738bool
1740{
1741 bool started_tx = false;
1742 bool has_subrels = false;
1743
1744 /* We need up-to-date sync state info for subscription tables here. */
1745 has_subrels = FetchTableStates(&started_tx);
1746
1747 if (started_tx)
1748 {
1750 pgstat_report_stat(true);
1751 }
1752
1753 /*
1754 * Return false when there are no tables in subscription or not all tables
1755 * are in ready state; true otherwise.
1756 */
1757 return has_subrels && (table_states_not_ready == NIL);
1758}
1759
1760/*
1761 * Update the two_phase state of the specified subscription in pg_subscription.
1762 */
1763void
1764UpdateTwoPhaseState(Oid suboid, char new_state)
1765{
1766 Relation rel;
1767 HeapTuple tup;
1768 bool nulls[Natts_pg_subscription];
1769 bool replaces[Natts_pg_subscription];
1770 Datum values[Natts_pg_subscription];
1771
1772 Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
1773 new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1774 new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1775
1776 rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1777 tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
1778 if (!HeapTupleIsValid(tup))
1779 elog(ERROR,
1780 "cache lookup failed for subscription oid %u",
1781 suboid);
1782
1783 /* Form a new tuple. */
1784 memset(values, 0, sizeof(values));
1785 memset(nulls, false, sizeof(nulls));
1786 memset(replaces, false, sizeof(replaces));
1787
1788 /* And update/set two_phase state */
1789 values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
1790 replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1791
1792 tup = heap_modify_tuple(tup, RelationGetDescr(rel),
1793 values, nulls, replaces);
1794 CatalogTupleUpdate(rel, &tup->t_self, tup);
1795
1796 heap_freetuple(tup);
1798}
AclResult
Definition: acl.h:182
@ ACLCHECK_OK
Definition: acl.h:183
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2622
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4007
#define ARR_DATA_PTR(a)
Definition: array.h:322
#define DatumGetArrayTypeP(X)
Definition: array.h:261
#define ARR_DIMS(a)
Definition: array.h:294
int16 AttrNumber
Definition: attnum.h:21
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
Definition: worker.c:4433
void start_apply(XLogRecPtr origin_startpos)
Definition: worker.c:4502
void DisableSubscriptionAndExit(void)
Definition: worker.c:4807
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:426
void set_apply_error_context_origin(char *originname)
Definition: worker.c:5148
MemoryContext ApplyContext
Definition: worker.c:292
void SetupApplyOrSyncWorker(int worker_slot)
Definition: worker.c:4733
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:297
Subscription * MySubscription
Definition: worker.c:299
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1780
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1644
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1608
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:510
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:815
static Datum values[MAXATTR]
Definition: bootstrap.c:151
#define TextDatumGetCString(d)
Definition: builtins.h:98
#define Max(x, y)
Definition: c.h:955
#define Assert(condition)
Definition: c.h:815
#define UINT64_FORMAT
Definition: c.h:507
int16_t int16
Definition: c.h:483
uint32_t uint32
Definition: c.h:488
#define lengthof(array)
Definition: c.h:745
#define OidIsValid(objectId)
Definition: c.h:732
size_t Size
Definition: c.h:562
CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
Definition: copyfrom.c:1383
uint64 CopyFrom(CopyFromState cstate)
Definition: copyfrom.c:640
int64 TimestampTz
Definition: timestamp.h:39
#define DSM_HANDLE_INVALID
Definition: dsm_impl.h:58
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:865
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define LOG
Definition: elog.h:31
#define PG_RE_THROW()
Definition: elog.h:412
#define PG_TRY(...)
Definition: elog.h:371
#define PG_END_TRY(...)
Definition: elog.h:396
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define PG_CATCH(...)
Definition: elog.h:381
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
void err(int eval, const char *fmt,...)
Definition: err.c:43
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1425
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1441
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:86
struct Latch * MyLatch
Definition: globals.c:62
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition: heaptuple.c:1210
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1435
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define MaxTupleAttributeNumber
Definition: htup_details.h:34
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:313
void proc_exit(int code)
Definition: ipc.c:104
int i
Definition: isn.c:72
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:565
void ResetLatch(Latch *latch)
Definition: latch.c:724
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:517
#define WL_SOCKET_READABLE
Definition: latch.h:128
#define WL_TIMEOUT
Definition: latch.h:130
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm)
Definition: launcher.c:297
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition: launcher.c:693
LogicalRepWorker * logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
Definition: launcher.c:234
void logicalrep_worker_wakeup(Oid subid, Oid relid)
Definition: launcher.c:673
static dshash_table * last_start_times
Definition: launcher.c:89
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:54
int max_sync_workers_per_subscription
Definition: launcher.c:51
int logicalrep_sync_worker_count(Oid subid)
Definition: launcher.c:845
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition: launcher.c:1072
List * lappend(List *list, void *datum)
Definition: list.c:339
void list_free_deep(List *list)
Definition: list.c:1560
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:226
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition: lmgr.c:107
#define NoLock
Definition: lockdefs.h:34
#define AccessShareLock
Definition: lockdefs.h:36
#define RowExclusiveLock
Definition: lockdefs.h:38
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1928
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3366
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1168
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1781
@ LW_SHARED
Definition: lwlock.h:115
DefElem * makeDefElem(char *name, Node *arg, int location)
Definition: makefuncs.c:590
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1683
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc0(Size size)
Definition: mcxt.c:1347
void * palloc(Size size)
Definition: mcxt.c:1317
MemoryContext CacheMemoryContext
Definition: mcxt.c:152
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
Oid GetUserId(void)
Definition: miscinit.c:517
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition: miscinit.c:1036
ObjectType get_relkind_objtype(char relkind)
TimestampTz replorigin_session_origin_timestamp
Definition: origin.c:161
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:225
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:256
void replorigin_session_reset(void)
Definition: origin.c:1194
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition: origin.c:415
RepOriginId replorigin_session_origin
Definition: origin.c:159
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition: origin.c:892
void replorigin_session_setup(RepOriginId node, int acquired_by)
Definition: origin.c:1101
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition: origin.c:1241
XLogRecPtr replorigin_session_origin_lsn
Definition: origin.c:160
#define InvalidRepOriginId
Definition: origin.h:33
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
ParseState * make_parsestate(ParseState *parentParseState)
Definition: parse_node.c:39
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, int lockmode, Alias *alias, bool inh, bool inFromCl)
#define ACL_INSERT
Definition: parsenodes.h:76
int16 attnum
Definition: pg_attribute.h:74
void * arg
NameData relname
Definition: pg_class.h:38
#define NAMEDATALEN
const void size_t len
static int server_version
Definition: pg_dumpall.c:110
#define lfirst(lc)
Definition: pg_list.h:172
#define NIL
Definition: pg_list.h:68
#define list_make1(x1)
Definition: pg_list.h:212
#define for_each_from(cell, lst, N)
Definition: pg_list.h:414
#define linitial(l)
Definition: pg_list.h:178
static char ** options
List * GetSubscriptionRelations(Oid subid, bool not_ready)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
bool HasSubscriptionRelations(Oid subid)
static char * buf
Definition: pg_test_fsync.c:72
long pgstat_report_stat(bool force)
Definition: pgstat.c:692
void pgstat_report_subscription_error(Oid subid, bool is_apply_error)
int pgsocket
Definition: port.h:29
#define snprintf
Definition: port.h:238
#define PGINVALID_SOCKET
Definition: port.h:31
static bool DatumGetBool(Datum X)
Definition: postgres.h:95
uintptr_t Datum
Definition: postgres.h:69
static Oid DatumGetObjectId(Datum X)
Definition: postgres.h:247
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:257
static char DatumGetChar(Datum X)
Definition: postgres.h:117
static int16 DatumGetInt16(Datum X)
Definition: postgres.h:167
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:207
static Datum CharGetDatum(char X)
Definition: postgres.h:127
#define InvalidOid
Definition: postgres_ext.h:37
unsigned int Oid
Definition: postgres_ext.h:32
static int fd(const char *x, int i)
Definition: preproc-init.c:105
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:103
tree ctl
Definition: radixtree.h:1838
#define RelationGetRelid(relation)
Definition: rel.h:505
#define RelationGetDescr(relation)
Definition: rel.h:531
#define RelationGetRelationName(relation)
Definition: rel.h:539
#define RelationGetNamespace(relation)
Definition: rel.h:546
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition: rls.c:52
@ RLS_ENABLED
Definition: rls.h:45
char * quote_qualified_identifier(const char *qualifier, const char *ident)
Definition: ruleutils.c:13024
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:12940
Snapshot GetTransactionSnapshot(void)
Definition: snapmgr.c:212
void PushActiveSnapshot(Snapshot snapshot)
Definition: snapmgr.c:610
void PopActiveSnapshot(void)
Definition: snapmgr.c:703
void InvalidateCatalogSnapshot(void)
Definition: snapmgr.c:384
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition: relation.c:163
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition: relation.c:503
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition: relation.c:348
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:30
void destroyStringInfo(StringInfo str)
Definition: stringinfo.c:409
StringInfo makeStringInfo(void)
Definition: stringinfo.c:72
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:126
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:230
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:242
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
Definition: dynahash.c:220
ItemPointerData t_self
Definition: htup.h:65
Definition: pg_list.h:54
LogicalRepRelation remoterel
LogicalRepRelId remoteid
Definition: logicalproto.h:107
Bitmapset * attkeys
Definition: logicalproto.h:115
XLogRecPtr relstate_lsn
LogicalRepWorkerType type
Definition: nodes.h:129
Form_pg_class rd_rel
Definition: rel.h:111
Tuplestorestate * tuplestore
Definition: walreceiver.h:223
TupleDesc tupledesc
Definition: walreceiver.h:224
WalRcvExecStatus status
Definition: walreceiver.h:220
Definition: regguts.h:323
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:91
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
static List * table_states_not_ready
Definition: tablesync.c:134
bool AllTablesyncsReady(void)
Definition: tablesync.c:1739
static bool wait_for_worker_state_change(char expected_state)
Definition: tablesync.c:232
void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
Definition: tablesync.c:281
SyncingTablesState
Definition: tablesync.c:127
@ SYNC_TABLE_STATE_REBUILD_STARTED
Definition: tablesync.c:129
@ SYNC_TABLE_STATE_VALID
Definition: tablesync.c:130
@ SYNC_TABLE_STATE_NEEDS_REBUILD
Definition: tablesync.c:128
static void pg_attribute_noreturn() finish_sync_worker(void)
Definition: tablesync.c:143
static void process_syncing_tables_for_apply(XLogRecPtr current_lsn)
Definition: tablesync.c:418
static List * make_copy_attnamelist(LogicalRepRelMapEntry *rel)
Definition: tablesync.c:698
void TablesyncWorkerMain(Datum main_arg)
Definition: tablesync.c:1719
static void process_syncing_tables_for_sync(XLogRecPtr current_lsn)
Definition: tablesync.c:295
static void fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, List **qual, bool *gencol_published)
Definition: tablesync.c:797
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
Definition: tablesync.c:1274
static void run_tablesync_worker()
Definition: tablesync.c:1693
static int copy_read_data(void *outbuf, int minread, int maxread)
Definition: tablesync.c:718
static SyncingTablesState table_states_validity
Definition: tablesync.c:133
void process_syncing_tables(XLogRecPtr current_lsn)
Definition: tablesync.c:667
static char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition: tablesync.c:1290
static void copy_table(Relation rel)
Definition: tablesync.c:1115
static bool wait_for_relation_state_change(Oid relid, char expected_state)
Definition: tablesync.c:184
static void start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
Definition: tablesync.c:1651
static StringInfo copybuf
Definition: tablesync.c:137
static bool FetchTableStates(bool *started_tx)
Definition: tablesync.c:1580
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition: tablesync.c:1764
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
Definition: tuplestore.c:1130
int64 tuplestore_tuple_count(Tuplestorestate *state)
Definition: tuplestore.c:580
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: tuptable.h:395
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:454
void SwitchToUntrustedUser(Oid userid, UserContext *context)
Definition: usercontext.c:33
void RestoreUserContext(UserContext *context)
Definition: usercontext.c:87
String * makeString(char *str)
Definition: value.c:63
#define strVal(v)
Definition: value.h:82
#define walrcv_startstreaming(conn, options)
Definition: walreceiver.h:451
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
Definition: walreceiver.h:435
@ WALRCV_OK_COMMAND
Definition: walreceiver.h:205
@ WALRCV_OK_TUPLES
Definition: walreceiver.h:207
@ WALRCV_OK_COPY_OUT
Definition: walreceiver.h:209
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
Definition: walreceiver.h:459
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:471
#define walrcv_server_version(conn)
Definition: walreceiver.h:447
#define walrcv_endstreaming(conn, next_tli)
Definition: walreceiver.h:453
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:465
#define walrcv_receive(conn, buffer, wait_fd)
Definition: walreceiver.h:455
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ WORKERTYPE_TABLESYNC
@ WORKERTYPE_UNKNOWN
@ WORKERTYPE_PARALLEL_APPLY
@ WORKERTYPE_APPLY
static bool am_tablesync_worker(void)
bool IsTransactionState(void)
Definition: xact.c:386
void CommandCounterIncrement(void)
Definition: xact.c:1099
void StartTransactionCommand(void)
Definition: xact.c:3051
void CommitTransactionCommand(void)
Definition: xact.c:3149
void AbortOutOfAnyTransaction(void)
Definition: xact.c:4854
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4568
XLogRecPtr GetXLogWriteRecPtr(void)
Definition: xlog.c:9451
int wal_retrieve_retry_interval
Definition: xlog.c:134
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2802
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint32 TimeLineID
Definition: xlogdefs.h:59