PostgreSQL Source Code git master
Loading...
Searching...
No Matches
tablesync.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 * tablesync.c
3 * PostgreSQL logical replication: initial table data synchronization
4 *
5 * Copyright (c) 2012-2026, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/tablesync.c
9 *
10 * NOTES
11 * This file contains code for initial table data synchronization for
12 * logical replication.
13 *
14 * The initial data synchronization is done separately for each table,
15 * in a separate apply worker that only fetches the initial snapshot data
16 * from the publisher and then synchronizes the position in the stream with
17 * the leader apply worker.
18 *
19 * There are several reasons for doing the synchronization this way:
20 * - It allows us to parallelize the initial data synchronization
21 * which lowers the time needed for it to happen.
22 * - The initial synchronization does not have to hold the xid and LSN
23 * for the time it takes to copy data of all tables, causing less
24 * bloat and lower disk consumption compared to doing the
25 * synchronization in a single process for the whole database.
26 * - It allows us to synchronize any tables added after the initial
27 * synchronization has finished.
28 *
29 * The stream position synchronization works in multiple steps:
30 * - Apply worker requests a tablesync worker to start, setting the new
31 * table state to INIT.
32 * - Tablesync worker starts; changes table state from INIT to DATASYNC while
33 * copying.
34 * - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
35 * worker specific) state to indicate when the copy phase has completed, so
36 * if the worker crashes with this (non-memory) state then the copy will not
37 * be re-attempted.
38 * - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
39 * - Apply worker periodically checks for tables in SYNCWAIT state. When
40 * any appear, it sets the table state to CATCHUP and starts loop-waiting
41 * until either the table state is set to SYNCDONE or the sync worker
42 * exits.
43 * - After the sync worker has seen the state change to CATCHUP, it will
44 * read the stream and apply changes (acting like an apply worker) until
45 * it catches up to the specified stream position. Then it sets the
46 * state to SYNCDONE. There might be zero changes applied between
47 * CATCHUP and SYNCDONE, because the sync worker might be ahead of the
48 * apply worker.
49 * - Once the state is set to SYNCDONE, the apply will continue tracking
50 * the table until it reaches the SYNCDONE stream position, at which
51 * point it sets state to READY and stops tracking. Again, there might
52 * be zero changes in between.
53 *
54 * So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
55 * -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
56 *
57 * The catalog pg_subscription_rel is used to keep information about
58 * subscribed tables and their state. The catalog holds all states
59 * except SYNCWAIT and CATCHUP which are only in shared memory.
60 *
61 * Example flows look like this:
62 * - Apply is in front:
63 * sync:8
64 * -> set in catalog FINISHEDCOPY
65 * -> set in memory SYNCWAIT
66 * apply:10
67 * -> set in memory CATCHUP
68 * -> enter wait-loop
69 * sync:10
70 * -> set in catalog SYNCDONE
71 * -> exit
72 * apply:10
73 * -> exit wait-loop
74 * -> continue rep
75 * apply:11
76 * -> set in catalog READY
77 *
78 * - Sync is in front:
79 * sync:10
80 * -> set in catalog FINISHEDCOPY
81 * -> set in memory SYNCWAIT
82 * apply:8
83 * -> set in memory CATCHUP
84 * -> continue per-table filtering
85 * sync:10
86 * -> set in catalog SYNCDONE
87 * -> exit
88 * apply:10
89 * -> set in catalog READY
90 * -> stop per-table filtering
91 * -> continue rep
92 *-------------------------------------------------------------------------
93 */
94
95#include "postgres.h"
96
97#include "access/table.h"
98#include "access/xact.h"
99#include "catalog/indexing.h"
101#include "catalog/pg_type.h"
102#include "commands/copy.h"
103#include "miscadmin.h"
104#include "nodes/makefuncs.h"
106#include "pgstat.h"
110#include "replication/origin.h"
111#include "replication/slot.h"
114#include "storage/ipc.h"
115#include "storage/latch.h"
116#include "storage/lmgr.h"
117#include "utils/acl.h"
118#include "utils/array.h"
119#include "utils/builtins.h"
120#include "utils/lsyscache.h"
121#include "utils/rls.h"
122#include "utils/snapmgr.h"
123#include "utils/syscache.h"
124#include "utils/usercontext.h"
125#include "utils/wait_event.h"
126
128
130
131/*
132 * Wait until the relation sync state is set in the catalog to the expected
133 * one; return true when it happens.
134 *
135 * Returns false if the table sync worker or the table itself have
136 * disappeared, or the table state has been reset.
137 *
138 * Currently, this is used in the apply worker when transitioning from
139 * CATCHUP state to SYNCDONE.
140 */
141static bool
143{
144 char state;
145
146 for (;;)
147 {
148 LogicalRepWorker *worker;
149 XLogRecPtr statelsn;
150
152
155 relid, &statelsn);
156
158 break;
159
160 if (state == expected_state)
161 return true;
162
163 /* Check if the sync worker is still running and bail if not. */
167 false);
169 if (!worker)
170 break;
171
175
177 }
178
179 return false;
180}
181
182/*
183 * Wait until the apply worker changes the state of our synchronization
184 * worker to the expected one.
185 *
186 * Used when transitioning from SYNCWAIT state to CATCHUP.
187 *
188 * Returns false if the apply worker has disappeared.
189 */
190static bool
192{
193 int rc;
194
195 for (;;)
196 {
197 LogicalRepWorker *worker;
198
200
201 /*
202 * Done if already in correct state. (We assume this fetch is atomic
203 * enough to not give a misleading answer if we do it with no lock.)
204 */
206 return true;
207
208 /*
209 * Bail out if the apply worker has died, else signal it we're
210 * waiting.
211 */
215 false);
216 if (worker && worker->proc)
219 if (!worker)
220 break;
221
222 /*
223 * Wait. We expect to get a latch signal back from the apply worker,
224 * but use a timeout in case it dies without sending one.
225 */
226 rc = WaitLatch(MyLatch,
229
230 if (rc & WL_LATCH_SET)
232 }
233
234 return false;
235}
236
237/*
238 * Handle table synchronization cooperation from the synchronization
239 * worker.
240 *
241 * If the sync worker is in CATCHUP state and reached (or passed) the
242 * predetermined synchronization point in the WAL stream, mark the table as
243 * SYNCDONE and finish.
244 */
245void
247{
249
252 {
253 TimeLineID tli;
254 char syncslotname[NAMEDATALEN] = {0};
255 char originname[NAMEDATALEN] = {0};
256
259
261
262 /*
263 * UpdateSubscriptionRelState must be called within a transaction.
264 */
265 if (!IsTransactionState())
267
272 false);
273
274 /*
275 * End streaming so that LogRepWorkerWalRcvConn can be used to drop
276 * the slot.
277 */
279
280 /*
281 * Cleanup the tablesync slot.
282 *
283 * This has to be done after updating the state because otherwise if
284 * there is an error while doing the database operations we won't be
285 * able to rollback dropped slot.
286 */
290 sizeof(syncslotname));
291
292 /*
293 * It is important to give an error if we are unable to drop the slot,
294 * otherwise, it won't be dropped till the corresponding subscription
295 * is dropped. So passing missing_ok = false.
296 */
298
300 pgstat_report_stat(false);
301
302 /*
303 * Start a new transaction to clean up the tablesync origin tracking.
304 * This transaction will be ended within the FinishSyncWorker(). Now,
305 * even, if we fail to remove this here, the apply worker will ensure
306 * to clean it up afterward.
307 *
308 * We need to do this after the table state is set to SYNCDONE.
309 * Otherwise, if an error occurs while performing the database
310 * operation, the worker will be restarted and the in-memory state of
311 * replication progress (remote_lsn) won't be rolled-back which would
312 * have been cleared before restart. So, the restarted worker will use
313 * invalid replication progress state resulting in replay of
314 * transactions that have already been applied.
315 */
317
321 sizeof(originname));
322
323 /*
324 * Resetting the origin session removes the ownership of the slot.
325 * This is needed to allow the origin to be dropped.
326 */
329
330 /*
331 * Drop the tablesync's origin tracking if exists.
332 *
333 * There is a chance that the user is concurrently performing refresh
334 * for the subscription where we remove the table state and its origin
335 * or the apply worker would have removed this origin. So passing
336 * missing_ok = true.
337 */
339
341 }
342 else
344}
345
346/*
347 * Handle table synchronization cooperation from the apply worker.
348 *
349 * Walk over all subscription tables that are individually tracked by the
350 * apply process (currently, all that have state other than
351 * SUBREL_STATE_READY) and manage synchronization for them.
352 *
353 * If there are tables that need synchronizing and are not being synchronized
354 * yet, start sync workers for them (if there are free slots for sync
355 * workers). To prevent starting the sync worker for the same relation at a
356 * high frequency after a failure, we store its last start time with each sync
357 * state info. We start the sync worker for the same relation after waiting
358 * at least wal_retrieve_retry_interval.
359 *
360 * For tables that are being synchronized already, check if sync workers
361 * either need action from the apply worker or have finished. This is the
362 * SYNCWAIT to CATCHUP transition.
363 *
364 * If the synchronization position is reached (SYNCDONE), then the table can
365 * be marked as READY and is no longer tracked.
366 */
367void
369{
371 {
372 Oid relid;
373 TimestampTz last_start_time;
374 };
375 static HTAB *last_start_times = NULL;
376 ListCell *lc;
377 bool started_tx;
378 bool should_exit = false;
379 Relation rel = NULL;
380
382
383 /* We need up-to-date sync state info for subscription tables here. */
385
386 /*
387 * Prepare a hash table for tracking last start times of workers, to avoid
388 * immediate restarts. We don't need it if there are no tables that need
389 * syncing.
390 */
392 {
393 HASHCTL ctl;
394
395 ctl.keysize = sizeof(Oid);
396 ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
397 last_start_times = hash_create("Logical replication table sync worker start times",
398 256, &ctl, HASH_ELEM | HASH_BLOBS);
399 }
400
401 /*
402 * Clean up the hash table when we're done with all tables (just to
403 * release the bit of memory).
404 */
406 {
409 }
410
411 /*
412 * Process all tables that are being synchronized.
413 */
414 foreach(lc, table_states_not_ready)
415 {
417
418 if (!started_tx)
419 {
421 started_tx = true;
422 }
423
425
426 if (rstate->state == SUBREL_STATE_SYNCDONE)
427 {
428 /*
429 * Apply has caught up to the position where the table sync has
430 * finished. Mark the table as ready so that the apply will just
431 * continue to replicate it normally.
432 */
433 if (current_lsn >= rstate->lsn)
434 {
436
437 rstate->state = SUBREL_STATE_READY;
438 rstate->lsn = current_lsn;
439
440 /*
441 * Remove the tablesync origin tracking if exists.
442 *
443 * There is a chance that the user is concurrently performing
444 * refresh for the subscription where we remove the table
445 * state and its origin or the tablesync worker would have
446 * already removed this origin. We can't rely on tablesync
447 * worker to remove the origin tracking as if there is any
448 * error while dropping we won't restart it to drop the
449 * origin. So passing missing_ok = true.
450 *
451 * Lock the subscription and origin in the same order as we
452 * are doing during DDL commands to avoid deadlocks. See
453 * AlterSubscription_refresh.
454 */
456 0, AccessShareLock);
457
458 if (!rel)
460
462 rstate->relid,
464 sizeof(originname));
466
467 /*
468 * Update the state to READY only after the origin cleanup.
469 */
471 rstate->relid, rstate->state,
472 rstate->lsn, true);
473 }
474 }
475 else
476 {
478
479 /*
480 * Look for a sync worker for this relation.
481 */
483
486 rstate->relid, false);
487
488 if (syncworker)
489 {
490 /* Found one, update our copy of its state */
491 SpinLockAcquire(&syncworker->relmutex);
492 rstate->state = syncworker->relstate;
493 rstate->lsn = syncworker->relstate_lsn;
494 if (rstate->state == SUBREL_STATE_SYNCWAIT)
495 {
496 /*
497 * Sync worker is waiting for apply. Tell sync worker it
498 * can catchup now.
499 */
501 syncworker->relstate_lsn =
502 Max(syncworker->relstate_lsn, current_lsn);
503 }
504 SpinLockRelease(&syncworker->relmutex);
505
506 /* If we told worker to catch up, wait for it. */
507 if (rstate->state == SUBREL_STATE_SYNCWAIT)
508 {
509 /* Signal the sync worker, as it may be waiting for us. */
510 if (syncworker->proc)
512
513 /* Now safe to release the LWLock */
515
516 if (started_tx)
517 {
518 /*
519 * We must commit the existing transaction to release
520 * the existing locks before entering a busy loop.
521 * This is required to avoid any undetected deadlocks
522 * due to any existing lock as deadlock detector won't
523 * be able to detect the waits on the latch.
524 *
525 * Also close any tables prior to the commit.
526 */
527 if (rel)
528 {
529 table_close(rel, NoLock);
530 rel = NULL;
531 }
533 pgstat_report_stat(false);
534 }
535
536 /*
537 * Enter busy loop and wait for synchronization worker to
538 * reach expected state (or die trying).
539 */
541 started_tx = true;
542
545 }
546 else
548 }
549 else
550 {
551 /*
552 * If there is no sync worker for this table yet, count
553 * running sync workers for this subscription, while we have
554 * the lock.
555 */
556 int nsyncworkers =
559 bool found;
560
561 /* Now safe to release the LWLock */
563
565 HASH_ENTER, &found);
566 if (!found)
567 hentry->last_start_time = 0;
568
570 rstate->relid, &hentry->last_start_time);
571 }
572 }
573 }
574
575 /* Close table if opened */
576 if (rel)
577 table_close(rel, NoLock);
578
579
580 if (started_tx)
581 {
582 /*
583 * Even when the two_phase mode is requested by the user, it remains
584 * as 'pending' until all tablesyncs have reached READY state.
585 *
586 * When this happens, we restart the apply worker and (if the
587 * conditions are still ok) then the two_phase tri-state will become
588 * 'enabled' at that time.
589 *
590 * Note: If the subscription has no tables then leave the state as
591 * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
592 * work.
593 */
595 {
596 CommandCounterIncrement(); /* make updates visible */
597 if (AllTablesyncsReady())
598 {
599 ereport(LOG,
600 (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
602 should_exit = true;
603 }
604 }
605
607 pgstat_report_stat(true);
608 }
609
610 if (should_exit)
611 {
612 /*
613 * Reset the last-start time for this worker so that the launcher will
614 * restart it without waiting for wal_retrieve_retry_interval.
615 */
617
618 proc_exit(0);
619 }
620}
621
622/*
623 * Create list of columns for COPY based on logical relation mapping.
624 */
625static List *
627{
629 int i;
630
631 for (i = 0; i < rel->remoterel.natts; i++)
632 {
635 }
636
637
638 return attnamelist;
639}
640
641/*
642 * Data source callback for the COPY FROM, which reads from the remote
643 * connection and passes the data back to our local COPY.
644 */
645static int
646copy_read_data(void *outbuf, int minread, int maxread)
647{
648 int bytesread = 0;
649 int avail;
650
651 /* If there are some leftover data from previous read, use it. */
652 avail = copybuf->len - copybuf->cursor;
653 if (avail)
654 {
655 if (avail > maxread)
656 avail = maxread;
657 memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
658 copybuf->cursor += avail;
659 maxread -= avail;
660 bytesread += avail;
661 }
662
663 while (maxread > 0 && bytesread < minread)
664 {
666 int len;
667 char *buf = NULL;
668
669 for (;;)
670 {
671 /* Try read the data. */
673
675
676 if (len == 0)
677 break;
678 else if (len < 0)
679 return bytesread;
680 else
681 {
682 /* Process the data */
683 copybuf->data = buf;
684 copybuf->len = len;
685 copybuf->cursor = 0;
686
687 avail = copybuf->len - copybuf->cursor;
688 if (avail > maxread)
689 avail = maxread;
690 memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
691 outbuf = (char *) outbuf + avail;
692 copybuf->cursor += avail;
693 maxread -= avail;
694 bytesread += avail;
695 }
696
698 return bytesread;
699 }
700
701 /*
702 * Wait for more data or latch.
703 */
708
710 }
711
712 return bytesread;
713}
714
715
716/*
717 * Get information about remote relation in similar fashion the RELATION
718 * message provides during replication.
719 *
720 * This function also returns (a) the relation qualifications to be used in
721 * the COPY command, and (b) whether the remote relation has published any
722 * generated column.
723 */
724static void
726 List **qual, bool *gencol_published)
727{
728 WalRcvExecResult *res;
729 StringInfoData cmd;
730 TupleTableSlot *slot;
733 Oid qualRow[] = {TEXTOID};
734 bool isnull;
735 int natt;
736 StringInfo pub_names = NULL;
739
740 lrel->nspname = nspname;
741 lrel->relname = relname;
742
743 /* First fetch Oid and replica identity. */
744 initStringInfo(&cmd);
745 appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
746 " FROM pg_catalog.pg_class c"
747 " INNER JOIN pg_catalog.pg_namespace n"
748 " ON (c.relnamespace = n.oid)"
749 " WHERE n.nspname = %s"
750 " AND c.relname = %s",
751 quote_literal_cstr(nspname),
755
756 if (res->status != WALRCV_OK_TUPLES)
759 errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
760 nspname, relname, res->err)));
761
763 if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
766 errmsg("table \"%s.%s\" not found on publisher",
767 nspname, relname)));
768
769 lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
770 Assert(!isnull);
771 lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
772 Assert(!isnull);
773 lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
774 Assert(!isnull);
775
778
779
780 /*
781 * Get column lists for each relation.
782 *
783 * We need to do this before fetching info about column names and types,
784 * so that we can skip columns that should not be replicated.
785 */
786 if (server_version >= 150000)
787 {
791
792 /* Build the pub_names comma-separated string. */
793 pub_names = makeStringInfo();
795
796 /*
797 * Fetch info about column lists for the relation (from all the
798 * publications).
799 */
800 resetStringInfo(&cmd);
801
802 if (server_version >= 190000)
803 {
804 /*
805 * We can pass both publication names and relid to
806 * pg_get_publication_tables() since version 19.
807 */
808 appendStringInfo(&cmd,
809 "SELECT DISTINCT"
810 " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
811 " THEN NULL ELSE gpt.attrs END)"
812 " FROM pg_get_publication_tables(ARRAY[%s], %u) gpt,"
813 " pg_class c"
814 " WHERE c.oid = gpt.relid",
815 pub_names->data,
816 lrel->remoteid);
817 }
818 else
819 appendStringInfo(&cmd,
820 "SELECT DISTINCT"
821 " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
822 " THEN NULL ELSE gpt.attrs END)"
823 " FROM pg_publication p,"
824 " LATERAL pg_get_publication_tables(p.pubname) gpt,"
825 " pg_class c"
826 " WHERE gpt.relid = %u AND c.oid = gpt.relid"
827 " AND p.pubname IN ( %s )",
828 lrel->remoteid,
829 pub_names->data);
830
833
834 if (pubres->status != WALRCV_OK_TUPLES)
837 errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
838 nspname, relname, pubres->err)));
839
840 /*
841 * We don't support the case where the column list is different for
842 * the same table when combining publications. See comments atop
843 * fetch_relation_list. So there should be only one row returned.
844 * Although we already checked this when creating the subscription, we
845 * still need to check here in case the column list was changed after
846 * creating the subscription and before the sync worker is started.
847 */
848 if (tuplestore_tuple_count(pubres->tuplestore) > 1)
851 errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
852 nspname, relname));
853
854 /*
855 * Get the column list and build a single bitmap with the attnums.
856 *
857 * If we find a NULL value, it means all the columns should be
858 * replicated.
859 */
861 if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
862 {
863 Datum cfval = slot_getattr(tslot, 1, &isnull);
864
865 if (!isnull)
866 {
867 ArrayType *arr;
868 int nelems;
869 int16 *elems;
870
872 nelems = ARR_DIMS(arr)[0];
873 elems = (int16 *) ARR_DATA_PTR(arr);
874
875 for (natt = 0; natt < nelems; natt++)
877 }
878
880 }
882
884 }
885
886 /*
887 * Now fetch column names and types.
888 */
889 resetStringInfo(&cmd);
891 "SELECT a.attnum,"
892 " a.attname,"
893 " a.atttypid,"
894 " a.attnum = ANY(i.indkey)");
895
896 /* Generated columns can be replicated since version 18. */
897 if (server_version >= 180000)
898 appendStringInfoString(&cmd, ", a.attgenerated != ''");
899
900 appendStringInfo(&cmd,
901 " FROM pg_catalog.pg_attribute a"
902 " LEFT JOIN pg_catalog.pg_index i"
903 " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
904 " WHERE a.attnum > 0::pg_catalog.int2"
905 " AND NOT a.attisdropped %s"
906 " AND a.attrelid = %u"
907 " ORDER BY a.attnum",
908 lrel->remoteid,
909 (server_version >= 120000 && server_version < 180000 ?
910 "AND a.attgenerated = ''" : ""),
911 lrel->remoteid);
914
915 if (res->status != WALRCV_OK_TUPLES)
918 errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
919 nspname, relname, res->err)));
920
921 /* We don't know the number of rows coming, so allocate enough space. */
922 lrel->attnames = palloc0_array(char *, MaxTupleAttributeNumber);
924 lrel->attkeys = NULL;
925
926 /*
927 * Store the columns as a list of names. Ignore those that are not
928 * present in the column list, if there is one.
929 */
930 natt = 0;
932 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
933 {
934 char *rel_colname;
936
937 attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
938 Assert(!isnull);
939
940 /* If the column is not in the column list, skip it. */
942 {
943 ExecClearTuple(slot);
944 continue;
945 }
946
947 rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
948 Assert(!isnull);
949
950 lrel->attnames[natt] = rel_colname;
951 lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
952 Assert(!isnull);
953
954 if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
955 lrel->attkeys = bms_add_member(lrel->attkeys, natt);
956
957 /* Remember if the remote table has published any generated column. */
958 if (server_version >= 180000 && !(*gencol_published))
959 {
960 *gencol_published = DatumGetBool(slot_getattr(slot, 5, &isnull));
961 Assert(!isnull);
962 }
963
964 /* Should never happen. */
966 elog(ERROR, "too many columns in remote table \"%s.%s\"",
967 nspname, relname);
968
969 ExecClearTuple(slot);
970 }
972
973 lrel->natts = natt;
974
976
977 /*
978 * Get relation's row filter expressions. DISTINCT avoids the same
979 * expression of a table in multiple publications from being included
980 * multiple times in the final expression.
981 *
982 * We need to copy the row even if it matches just one of the
983 * publications, so we later combine all the quals with OR.
984 *
985 * For initial synchronization, row filtering can be ignored in following
986 * cases:
987 *
988 * 1) one of the subscribed publications for the table hasn't specified
989 * any row filter
990 *
991 * 2) one of the subscribed publications has puballtables set to true
992 *
993 * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
994 * that includes this relation
995 */
996 if (server_version >= 150000)
997 {
998 /* Reuse the already-built pub_names. */
999 Assert(pub_names != NULL);
1000
1001 /* Check for row filters. */
1002 resetStringInfo(&cmd);
1003
1004 if (server_version >= 190000)
1005 {
1006 /*
1007 * We can pass both publication names and relid to
1008 * pg_get_publication_tables() since version 19.
1009 */
1010 appendStringInfo(&cmd,
1011 "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
1012 " FROM pg_get_publication_tables(ARRAY[%s], %u) gpt",
1013 pub_names->data,
1014 lrel->remoteid);
1015 }
1016 else
1017 appendStringInfo(&cmd,
1018 "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
1019 " FROM pg_publication p,"
1020 " LATERAL pg_get_publication_tables(p.pubname) gpt"
1021 " WHERE gpt.relid = %u"
1022 " AND p.pubname IN ( %s )",
1023 lrel->remoteid,
1024 pub_names->data);
1025
1027
1028 if (res->status != WALRCV_OK_TUPLES)
1029 ereport(ERROR,
1030 (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
1031 nspname, relname, res->err)));
1032
1033 /*
1034 * Multiple row filter expressions for the same table will be combined
1035 * by COPY using OR. If any of the filter expressions for this table
1036 * are null, it means the whole table will be copied. In this case it
1037 * is not necessary to construct a unified row filter expression at
1038 * all.
1039 */
1041 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1042 {
1043 Datum rf = slot_getattr(slot, 1, &isnull);
1044
1045 if (!isnull)
1046 *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
1047 else
1048 {
1049 /* Ignore filters and cleanup as necessary. */
1050 if (*qual)
1051 {
1052 list_free_deep(*qual);
1053 *qual = NIL;
1054 }
1055 break;
1056 }
1057
1058 ExecClearTuple(slot);
1059 }
1061
1063 destroyStringInfo(pub_names);
1064 }
1065
1066 pfree(cmd.data);
1067}
1068
1069/*
1070 * Copy existing data of a table from publisher.
1071 *
1072 * Caller is responsible for locking the local relation.
1073 */
1074static void
1076{
1077 LogicalRepRelMapEntry *relmapentry;
1079 List *qual = NIL;
1080 WalRcvExecResult *res;
1081 StringInfoData cmd;
1082 CopyFromState cstate;
1084 ParseState *pstate;
1085 List *options = NIL;
1086 bool gencol_published = false;
1087
1088 /* Get the publisher relation info. */
1090 RelationGetRelationName(rel), &lrel, &qual,
1092
1093 /* Put the relation into relmap. */
1095
1096 /* Map the publisher relation to local one. */
1097 relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
1098 Assert(rel == relmapentry->localrel);
1099
1100 /* Start copy on the publisher. */
1101 initStringInfo(&cmd);
1102
1103 /* Regular or partitioned table with no row filter or generated columns */
1104 if ((lrel.relkind == RELKIND_RELATION || lrel.relkind == RELKIND_PARTITIONED_TABLE)
1105 && qual == NIL && !gencol_published)
1106 {
1107 appendStringInfo(&cmd, "COPY %s",
1108 quote_qualified_identifier(lrel.nspname, lrel.relname));
1109
1110 /* If the table has columns, then specify the columns */
1111 if (lrel.natts)
1112 {
1113 appendStringInfoString(&cmd, " (");
1114
1115 /*
1116 * XXX Do we need to list the columns in all cases? Maybe we're
1117 * replicating all columns?
1118 */
1119 for (int i = 0; i < lrel.natts; i++)
1120 {
1121 if (i > 0)
1122 appendStringInfoString(&cmd, ", ");
1123
1125 }
1126
1127 appendStringInfoChar(&cmd, ')');
1128 }
1129
1130 appendStringInfoString(&cmd, " TO STDOUT");
1131 }
1132 else
1133 {
1134 /*
1135 * For non-tables and tables with row filters, we need to do COPY
1136 * (SELECT ...), but we can't just do SELECT * because we may need to
1137 * copy only subset of columns including generated columns. For tables
1138 * with any row filters, build a SELECT query with OR'ed row filters
1139 * for COPY.
1140 *
1141 * We also need to use this same COPY (SELECT ...) syntax when
1142 * generated columns are published, because copy of generated columns
1143 * is not supported by the normal COPY.
1144 */
1145 appendStringInfoString(&cmd, "COPY (SELECT ");
1146 for (int i = 0; i < lrel.natts; i++)
1147 {
1149 if (i < lrel.natts - 1)
1150 appendStringInfoString(&cmd, ", ");
1151 }
1152
1153 appendStringInfoString(&cmd, " FROM ");
1154
1155 /*
1156 * For regular tables, make sure we don't copy data from a child that
1157 * inherits the named table as those will be copied separately.
1158 */
1159 if (lrel.relkind == RELKIND_RELATION)
1160 appendStringInfoString(&cmd, "ONLY ");
1161
1163 /* list of OR'ed filters */
1164 if (qual != NIL)
1165 {
1166 ListCell *lc;
1167 char *q = strVal(linitial(qual));
1168
1169 appendStringInfo(&cmd, " WHERE %s", q);
1170 for_each_from(lc, qual, 1)
1171 {
1172 q = strVal(lfirst(lc));
1173 appendStringInfo(&cmd, " OR %s", q);
1174 }
1175 list_free_deep(qual);
1176 }
1177
1178 appendStringInfoString(&cmd, ") TO STDOUT");
1179 }
1180
1181 /*
1182 * Prior to v16, initial table synchronization will use text format even
1183 * if the binary option is enabled for a subscription.
1184 */
1187 {
1188 appendStringInfoString(&cmd, " WITH (FORMAT binary)");
1189 options = list_make1(makeDefElem("format",
1190 (Node *) makeString("binary"), -1));
1191 }
1192
1194 pfree(cmd.data);
1195 if (res->status != WALRCV_OK_COPY_OUT)
1196 ereport(ERROR,
1198 errmsg("could not start initial contents copy for table \"%s.%s\": %s",
1199 lrel.nspname, lrel.relname, res->err)));
1201
1203
1204 pstate = make_parsestate(NULL);
1206 NULL, false, false);
1207
1208 attnamelist = make_copy_attnamelist(relmapentry);
1209 cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
1210
1211 /* Do the copy */
1212 (void) CopyFrom(cstate);
1213
1214 logicalrep_rel_close(relmapentry, NoLock);
1215}
1216
1217/*
1218 * Determine the tablesync slot name.
1219 *
1220 * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
1221 * on slot name length. We append system_identifier to avoid slot_name
1222 * collision with subscriptions in other clusters. With the current scheme
1223 * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
1224 * length of slot_name will be 50.
1225 *
1226 * The returned slot name is stored in the supplied buffer (syncslotname) with
1227 * the given size.
1228 *
1229 * Note: We don't use the subscription slot name as part of tablesync slot name
1230 * because we are responsible for cleaning up these slots and it could become
1231 * impossible to recalculate what name to cleanup if the subscription slot name
1232 * had changed.
1233 */
1234void
1241
1242/*
1243 * Start syncing the table in the sync worker.
1244 *
1245 * If nothing needs to be done to sync the table, we exit the worker without
1246 * any further action.
1247 *
1248 * The returned slot name is palloc'ed in current memory context.
1249 */
1250static char *
1252{
1253 char *slotname;
1254 char *err;
1255 char relstate;
1256 XLogRecPtr relstate_lsn;
1257 Relation rel;
1259 WalRcvExecResult *res;
1260 char originname[NAMEDATALEN];
1263 bool must_use_password;
1264 bool run_as_owner;
1265
1266 /* Check the state of the table synchronization. */
1270 &relstate_lsn);
1272
1273 /* Is the use of a password mandatory? */
1276
1278 MyLogicalRepWorker->relstate = relstate;
1279 MyLogicalRepWorker->relstate_lsn = relstate_lsn;
1281
1282 /*
1283 * If synchronization is already done or no longer necessary, exit now
1284 * that we've updated shared memory state.
1285 */
1286 switch (relstate)
1287 {
1289 case SUBREL_STATE_READY:
1291 FinishSyncWorker(); /* doesn't return */
1292 }
1293
1294 /* Calculate the name of the tablesync slot. */
1295 slotname = (char *) palloc(NAMEDATALEN);
1298 slotname,
1299 NAMEDATALEN);
1300
1301 /*
1302 * Here we use the slot name instead of the subscription name as the
1303 * application_name, so that it is different from the leader apply worker,
1304 * so that synchronous replication can distinguish them.
1305 */
1309 slotname, &err);
1311 ereport(ERROR,
1313 errmsg("table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",
1314 MySubscription->name, err)));
1315
1319
1320 /* Assign the origin tracking record name. */
1323 originname,
1324 sizeof(originname));
1325
1327 {
1328 /*
1329 * We have previously errored out before finishing the copy so the
1330 * replication slot might exist. We want to remove the slot if it
1331 * already exists and proceed.
1332 *
1333 * XXX We could also instead try to drop the slot, last time we failed
1334 * but for that, we might need to clean up the copy state as it might
1335 * be in the middle of fetching the rows. Also, if there is a network
1336 * breakdown then it wouldn't have succeeded so trying it next time
1337 * seems like a better bet.
1338 */
1340 }
1342 {
1343 /*
1344 * The COPY phase was previously done, but tablesync then crashed
1345 * before it was able to finish normally.
1346 */
1348
1349 /*
1350 * The origin tracking name must already exist. It was created first
1351 * time this tablesync was launched.
1352 */
1357
1359
1360 goto copy_table_done;
1361 }
1362
1367
1368 /*
1369 * Update the state, create the replication origin, and make them visible
1370 * to others.
1371 */
1377 false);
1378
1379 /*
1380 * Create the replication origin in a separate transaction from the one
1381 * that sets up the origin in shared memory. This prevents the risk that
1382 * changes to the origin in shared memory cannot be rolled back if the
1383 * transaction aborts.
1384 */
1386 if (!OidIsValid(originid))
1388
1390 pgstat_report_stat(true);
1391
1393
1394 /*
1395 * Use a standard write lock here. It might be better to disallow access
1396 * to the table while it's being synchronized. But we don't want to block
1397 * the main apply process from working and it has to open the relation in
1398 * RowExclusiveLock when remapping remote relation id to local one.
1399 */
1401
1402 /*
1403 * Start a transaction in the remote node in REPEATABLE READ mode. This
1404 * ensures that both the replication slot we create (see below) and the
1405 * COPY are consistent with each other.
1406 */
1408 "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1409 0, NULL);
1410 if (res->status != WALRCV_OK_COMMAND)
1411 ereport(ERROR,
1413 errmsg("table copy could not start transaction on publisher: %s",
1414 res->err)));
1416
1417 /*
1418 * Create a new permanent logical decoding slot. This slot will be used
1419 * for the catchup phase after COPY is done, so tell it to use the
1420 * snapshot to make the final data consistent.
1421 */
1423 slotname, false /* permanent */ , false /* two_phase */ ,
1426
1427 /*
1428 * Advance the origin to the LSN got from walrcv_create_slot and then set
1429 * up the origin. The advancement is WAL logged for the purpose of
1430 * recovery. Locks are to prevent the replication origin from vanishing
1431 * while advancing.
1432 *
1433 * The purpose of doing these before the copy is to avoid doing the copy
1434 * again due to any error in advancing or setting up origin tracking.
1435 */
1438 true /* go backward */ , true /* WAL log */ );
1440
1443
1444 /*
1445 * If the user did not opt to run as the owner of the subscription
1446 * ('run_as_owner'), then copy the table as the owner of the table.
1447 */
1449 if (!run_as_owner)
1450 SwitchToUntrustedUser(rel->rd_rel->relowner, &ucxt);
1451
1452 /*
1453 * Check that our table sync worker has permission to insert into the
1454 * target table.
1455 */
1457 ACL_INSERT);
1458 if (aclresult != ACLCHECK_OK)
1460 get_relkind_objtype(rel->rd_rel->relkind),
1462
1463 /*
1464 * COPY FROM does not honor RLS policies. That is not a problem for
1465 * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
1466 * who has it implicitly), but other roles should not be able to
1467 * circumvent RLS. Disallow logical replication into RLS enabled
1468 * relations for such roles.
1469 */
1471 ereport(ERROR,
1473 errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1476
1477 /* Now do the initial data copy */
1479 copy_table(rel);
1481
1482 res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
1483 if (res->status != WALRCV_OK_COMMAND)
1484 ereport(ERROR,
1486 errmsg("table copy could not finish transaction on publisher: %s",
1487 res->err)));
1489
1490 if (!run_as_owner)
1492
1493 table_close(rel, NoLock);
1494
1495 /* Make the copy visible. */
1497
1498 /*
1499 * Update the persisted state to indicate the COPY phase is done; make it
1500 * visible to others.
1501 */
1506 false);
1507
1509
1511
1512 elog(DEBUG1,
1513 "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%08X",
1515
1516 /*
1517 * We are done with the initial data synchronization, update the state.
1518 */
1523
1524 /*
1525 * Finally, wait until the leader apply worker tells us to catch up and
1526 * then return to let LogicalRepApplyLoop do it.
1527 */
1529 return slotname;
1530}
1531
1532/*
1533 * Execute the initial sync with error handling. Disable the subscription,
1534 * if it's required.
1535 *
1536 * Allocate the slot name in long-lived context on return. Note that we don't
1537 * handle FATAL errors which are probably because of system resource error and
1538 * are not repeatable.
1539 */
1540static void
1542{
1543 char *sync_slotname = NULL;
1544
1546
1547 PG_TRY();
1548 {
1549 /* Call initial sync. */
1551 }
1552 PG_CATCH();
1553 {
1556 else
1557 {
1558 /*
1559 * Report the worker failed during table synchronization. Abort
1560 * the current transaction so that the stats message is sent in an
1561 * idle state.
1562 */
1565
1566 PG_RE_THROW();
1567 }
1568 }
1569 PG_END_TRY();
1570
1571 /* allocate slot name in long-lived context */
1574}
1575
1576/*
1577 * Runs the tablesync worker.
1578 *
1579 * It starts syncing tables. After a successful sync, sets streaming options
1580 * and starts streaming to catchup with apply worker.
1581 */
1582static void
1584{
1585 char originname[NAMEDATALEN];
1587 char *slotname = NULL;
1589
1590 start_table_sync(&origin_startpos, &slotname);
1591
1594 originname,
1595 sizeof(originname));
1596
1598
1600
1602
1603 /* Apply the changes till we catchup with the apply worker. */
1605}
1606
1607/* Logical Replication Tablesync worker entry point */
1608void
1619
1620/*
1621 * If the subscription has no tables then return false.
1622 *
1623 * Otherwise, are all tablesyncs READY?
1624 *
1625 * Note: This function is not suitable to be called from outside of apply or
1626 * tablesync workers because MySubscription needs to be already initialized.
1627 */
1628bool
1630{
1631 bool started_tx;
1632 bool has_tables;
1633
1634 /* We need up-to-date sync state info for subscription tables here. */
1636
1637 if (started_tx)
1638 {
1640 pgstat_report_stat(true);
1641 }
1642
1643 /*
1644 * Return false when there are no tables in subscription or not all tables
1645 * are in ready state; true otherwise.
1646 */
1647 return has_tables && (table_states_not_ready == NIL);
1648}
1649
1650/*
1651 * Return whether the subscription currently has any tables.
1652 *
1653 * Note: Unlike HasSubscriptionTables(), this function relies on cached
1654 * information for subscription tables. Additionally, it should not be
1655 * invoked outside of apply or tablesync workers, as MySubscription must be
1656 * initialized first.
1657 */
1658bool
1660{
1661 bool started_tx;
1662 bool has_tables;
1663
1664 /* We need up-to-date subscription tables info here */
1666
1667 if (started_tx)
1668 {
1670 pgstat_report_stat(true);
1671 }
1672
1673 return has_tables;
1674}
1675
1676/*
1677 * Update the two_phase state of the specified subscription in pg_subscription.
1678 */
1679void
1681{
1682 Relation rel;
1683 HeapTuple tup;
1684 bool nulls[Natts_pg_subscription];
1687
1691
1694 if (!HeapTupleIsValid(tup))
1695 elog(ERROR,
1696 "cache lookup failed for subscription oid %u",
1697 suboid);
1698
1699 /* Form a new tuple. */
1700 memset(values, 0, sizeof(values));
1701 memset(nulls, false, sizeof(nulls));
1702 memset(replaces, false, sizeof(replaces));
1703
1704 /* And update/set two_phase state */
1707
1709 values, nulls, replaces);
1710 CatalogTupleUpdate(rel, &tup->t_self, tup);
1711
1714}
AclResult
Definition acl.h:183
@ ACLCHECK_OK
Definition acl.h:184
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition aclchk.c:2672
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition aclchk.c:4082
#define ARR_DATA_PTR(a)
Definition array.h:322
#define DatumGetArrayTypeP(X)
Definition array.h:261
#define ARR_DIMS(a)
Definition array.h:294
int16 AttrNumber
Definition attnum.h:21
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
Definition worker.c:5557
void start_apply(XLogRecPtr origin_startpos)
Definition worker.c:5626
void DisableSubscriptionAndExit(void)
Definition worker.c:6007
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition worker.c:648
void set_apply_error_context_origin(char *originname)
Definition worker.c:6364
MemoryContext ApplyContext
Definition worker.c:477
void SetupApplyOrSyncWorker(int worker_slot)
Definition worker.c:5947
WalReceiverConn * LogRepWorkerWalRcvConn
Definition worker.c:482
Subscription * MySubscription
Definition worker.c:484
bool bms_is_member(int x, const Bitmapset *a)
Definition bitmapset.c:510
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition bitmapset.c:799
static Datum values[MAXATTR]
Definition bootstrap.c:190
#define TextDatumGetCString(d)
Definition builtins.h:99
#define Max(x, y)
Definition c.h:1085
#define Assert(condition)
Definition c.h:943
#define UINT64_FORMAT
Definition c.h:635
int16_t int16
Definition c.h:619
#define lengthof(array)
Definition c.h:873
#define OidIsValid(objectId)
Definition c.h:858
size_t Size
Definition c.h:689
memcpy(sums, checksumBaseOffsets, sizeof(checksumBaseOffsets))
CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
Definition copyfrom.c:1535
uint64 CopyFrom(CopyFromState cstate)
Definition copyfrom.c:781
int64 TimestampTz
Definition timestamp.h:39
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition dynahash.c:889
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition dynahash.c:360
void hash_destroy(HTAB *hashp)
Definition dynahash.c:802
int errcode(int sqlerrcode)
Definition elog.c:874
#define LOG
Definition elog.h:32
#define PG_RE_THROW()
Definition elog.h:407
#define PG_TRY(...)
Definition elog.h:374
#define PG_END_TRY(...)
Definition elog.h:399
#define DEBUG1
Definition elog.h:31
#define ERROR
Definition elog.h:40
#define PG_CATCH(...)
Definition elog.h:384
#define elog(elevel,...)
Definition elog.h:228
#define ereport(elevel,...)
Definition elog.h:152
void err(int eval, const char *fmt,...)
Definition err.c:43
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
const TupleTableSlotOps TTSOpsMinimalTuple
Definition execTuples.c:86
#define palloc0_array(type, count)
Definition fe_memutils.h:77
struct Latch * MyLatch
Definition globals.c:65
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition heaptuple.c:1118
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1372
@ HASH_ENTER
Definition hsearch.h:109
#define HASH_ELEM
Definition hsearch.h:90
#define HASH_BLOBS
Definition hsearch.h:92
#define HeapTupleIsValid(tuple)
Definition htup.h:78
#define MaxTupleAttributeNumber
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
Definition indexing.c:313
void proc_exit(int code)
Definition ipc.c:105
int i
Definition isn.c:77
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition latch.c:223
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
Definition launcher.c:756
LogicalRepWorker * logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, bool only_running)
Definition launcher.c:268
static dshash_table * last_start_times
Definition launcher.c:101
LogicalRepWorker * MyLogicalRepWorker
Definition launcher.c:58
int logicalrep_sync_worker_count(Oid subid)
Definition launcher.c:937
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition launcher.c:1155
List * lappend(List *list, void *datum)
Definition list.c:339
void list_free_deep(List *list)
Definition list.c:1560
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
Definition lmgr.c:229
void LockRelationOid(Oid relid, LOCKMODE lockmode)
Definition lmgr.c:107
#define NoLock
Definition lockdefs.h:34
#define AccessShareLock
Definition lockdefs.h:36
#define RowExclusiveLock
Definition lockdefs.h:38
char get_rel_relkind(Oid relid)
Definition lsyscache.c:2223
char * get_namespace_name(Oid nspid)
Definition lsyscache.c:3588
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
@ LW_SHARED
Definition lwlock.h:105
DefElem * makeDefElem(char *name, Node *arg, int location)
Definition makefuncs.c:637
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition mcxt.c:1768
void pfree(void *pointer)
Definition mcxt.c:1616
void * palloc(Size size)
Definition mcxt.c:1387
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
Oid GetUserId(void)
Definition miscinit.c:470
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition miscinit.c:990
static char * errmsg
ObjectType get_relkind_objtype(char relkind)
ReplOriginId replorigin_create(const char *roname)
Definition origin.c:274
ReplOriginXactState replorigin_xact_state
Definition origin.c:168
ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition origin.c:243
void replorigin_advance(ReplOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
Definition origin.c:928
void replorigin_session_reset(void)
Definition origin.c:1301
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition origin.c:459
XLogRecPtr replorigin_session_get_progress(bool flush)
Definition origin.c:1353
void replorigin_xact_clear(bool clear_origin)
Definition origin.c:1377
void replorigin_session_setup(ReplOriginId node, int acquired_by)
Definition origin.c:1156
ParseState * make_parsestate(ParseState *parentParseState)
Definition parse_node.c:39
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, LOCKMODE lockmode, Alias *alias, bool inh, bool inFromCl)
#define ACL_INSERT
Definition parsenodes.h:76
int16 attnum
NameData relname
Definition pg_class.h:40
#define NAMEDATALEN
const void size_t len
static int server_version
Definition pg_dumpall.c:122
#define lfirst(lc)
Definition pg_list.h:172
#define NIL
Definition pg_list.h:68
#define list_make1(x1)
Definition pg_list.h:244
#define for_each_from(cell, lst, N)
Definition pg_list.h:446
#define linitial(l)
Definition pg_list.h:178
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool already_locked)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
static char buf[DEFAULT_XLOG_SEG_SIZE]
long pgstat_report_stat(bool force)
Definition pgstat.c:722
void pgstat_report_subscription_error(Oid subid)
int pgsocket
Definition port.h:29
#define snprintf
Definition port.h:260
#define PGINVALID_SOCKET
Definition port.h:31
static bool DatumGetBool(Datum X)
Definition postgres.h:100
static Oid DatumGetObjectId(Datum X)
Definition postgres.h:242
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
uint64_t Datum
Definition postgres.h:70
static char DatumGetChar(Datum X)
Definition postgres.h:122
static int16 DatumGetInt16(Datum X)
Definition postgres.h:162
static int32 DatumGetInt32(Datum X)
Definition postgres.h:202
static Datum CharGetDatum(char X)
Definition postgres.h:132
#define InvalidOid
unsigned int Oid
static int fd(const char *x, int i)
static int fb(int x)
char * quote_literal_cstr(const char *rawstr)
Definition quote.c:101
tree ctl
Definition radixtree.h:1838
#define RelationGetRelid(relation)
Definition rel.h:516
#define RelationGetDescr(relation)
Definition rel.h:542
#define RelationGetRelationName(relation)
Definition rel.h:550
#define RelationGetNamespace(relation)
Definition rel.h:557
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
Definition rls.c:52
@ RLS_ENABLED
Definition rls.h:45
char * quote_qualified_identifier(const char *qualifier, const char *ident)
const char * quote_identifier(const char *ident)
Snapshot GetTransactionSnapshot(void)
Definition snapmgr.c:272
void PushActiveSnapshot(Snapshot snapshot)
Definition snapmgr.c:682
void PopActiveSnapshot(void)
Definition snapmgr.c:775
void InvalidateCatalogSnapshot(void)
Definition snapmgr.c:455
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
Definition relation.c:165
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
Definition relation.c:518
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
Definition relation.c:362
void destroyStringInfo(StringInfo str)
Definition stringinfo.c:409
StringInfo makeStringInfo(void)
Definition stringinfo.c:72
void resetStringInfo(StringInfo str)
Definition stringinfo.c:126
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition stringinfo.c:230
void appendStringInfoChar(StringInfo str, char ch)
Definition stringinfo.c:242
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
Size keysize
Definition hsearch.h:69
Definition pg_list.h:54
LogicalRepRelation remoterel
XLogRecPtr relstate_lsn
Definition nodes.h:135
Form_pg_class rd_rel
Definition rel.h:111
ReplOriginId origin
Definition origin.h:45
Tuplestorestate * tuplestore
TupleDesc tupledesc
WalRcvExecStatus status
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid, TimestampTz *last_start_time)
Definition syncutils.c:118
pg_noreturn void FinishSyncWorker(void)
Definition syncutils.c:50
void FetchRelationStates(bool *has_pending_subtables, bool *has_pending_subsequences, bool *started_tx)
Definition syncutils.c:203
#define SearchSysCacheCopy1(cacheId, key1)
Definition syscache.h:91
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
List * table_states_not_ready
Definition tablesync.c:127
bool AllTablesyncsReady(void)
Definition tablesync.c:1629
static bool wait_for_worker_state_change(char expected_state)
Definition tablesync.c:191
static bool wait_for_table_state_change(Oid relid, char expected_state)
Definition tablesync.c:142
static void run_tablesync_worker(void)
Definition tablesync.c:1583
void ProcessSyncingTablesForSync(XLogRecPtr current_lsn)
Definition tablesync.c:246
static List * make_copy_attnamelist(LogicalRepRelMapEntry *rel)
Definition tablesync.c:626
bool HasSubscriptionTablesCached(void)
Definition tablesync.c:1659
void TableSyncWorkerMain(Datum main_arg)
Definition tablesync.c:1609
static void fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, List **qual, bool *gencol_published)
Definition tablesync.c:725
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
Definition tablesync.c:1235
static int copy_read_data(void *outbuf, int minread, int maxread)
Definition tablesync.c:646
static char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
Definition tablesync.c:1251
static void copy_table(Relation rel)
Definition tablesync.c:1075
void ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
Definition tablesync.c:368
static void start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
Definition tablesync.c:1541
static StringInfo copybuf
Definition tablesync.c:129
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition tablesync.c:1680
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
int64 tuplestore_tuple_count(Tuplestorestate *state)
Definition tuplestore.c:581
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition tuptable.h:417
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition tuptable.h:476
void SwitchToUntrustedUser(Oid userid, UserContext *context)
Definition usercontext.c:33
void RestoreUserContext(UserContext *context)
Definition usercontext.c:87
String * makeString(char *str)
Definition value.c:63
#define strVal(v)
Definition value.h:82
#define WL_SOCKET_READABLE
#define WL_TIMEOUT
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET
#define walrcv_startstreaming(conn, options)
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
@ WALRCV_OK_COMMAND
@ WALRCV_OK_TUPLES
@ WALRCV_OK_COPY_OUT
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_server_version(conn)
#define walrcv_endstreaming(conn, next_tli)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
#define walrcv_receive(conn, buffer, wait_fd)
@ CRS_USE_SNAPSHOT
Definition walsender.h:24
@ WORKERTYPE_TABLESYNC
@ WORKERTYPE_APPLY
static bool am_tablesync_worker(void)
bool IsTransactionState(void)
Definition xact.c:389
void CommandCounterIncrement(void)
Definition xact.c:1130
void StartTransactionCommand(void)
Definition xact.c:3109
void CommitTransactionCommand(void)
Definition xact.c:3207
void AbortOutOfAnyTransaction(void)
Definition xact.c:4913
uint64 GetSystemIdentifier(void)
Definition xlog.c:4647
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint16 ReplOriginId
Definition xlogdefs.h:69
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28
uint32 TimeLineID
Definition xlogdefs.h:63