PostgreSQL Source Code git master
Loading...
Searching...
No Matches
execReplication.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * execReplication.c
4 * miscellaneous executor routines for logical replication
5 *
6 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/executor/execReplication.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres.h"
16
17#include "access/amapi.h"
18#include "access/commit_ts.h"
19#include "access/genam.h"
20#include "access/gist.h"
21#include "access/relscan.h"
22#include "access/tableam.h"
23#include "access/transam.h"
24#include "access/xact.h"
25#include "access/heapam.h"
26#include "catalog/pg_am_d.h"
27#include "commands/trigger.h"
28#include "executor/executor.h"
32#include "storage/lmgr.h"
33#include "utils/builtins.h"
34#include "utils/lsyscache.h"
35#include "utils/rel.h"
36#include "utils/snapmgr.h"
37#include "utils/syscache.h"
38#include "utils/typcache.h"
39
40
42 TypeCacheEntry **eq, Bitmapset *columns);
43
44/*
45 * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
46 * is setup to match 'rel' (*NOT* idxrel!).
47 *
48 * Returns how many columns to use for the index scan.
49 *
50 * This is not a generic routine, idxrel must be PK, RI, or an index that can be
51 * used for a REPLICA IDENTITY FULL table. See FindUsableIndexForReplicaIdentityFull()
52 * for details.
53 *
54 * By definition, replication identity of a rel meets all limitations associated
55 * with that. Note that any other index could also meet these limitations.
56 */
57static int
60{
61 int index_attoff;
62 int skey_attoff = 0;
64 oidvector *opclass;
65 int2vector *indkey = &idxrel->rd_index->indkey;
66
70
71 /* Build scankey for every non-expression attribute in the index. */
74 {
75 Oid operator;
76 Oid optype;
77 Oid opfamily;
81
83 {
84 /*
85 * XXX: Currently, we don't support expressions in the scan key,
86 * see code below.
87 */
88 continue;
89 }
90
91 /*
92 * Load the operator info. We need this to get the equality operator
93 * function for the scan key.
94 */
96 opfamily = get_opclass_family(opclass->values[index_attoff]);
97 eq_strategy = IndexAmTranslateCompareType(COMPARE_EQ, idxrel->rd_rel->relam, opfamily, false);
98 operator = get_opfamily_member(opfamily, optype,
99 optype,
101
102 if (!OidIsValid(operator))
103 elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
104 eq_strategy, optype, optype, opfamily);
105
106 regop = get_opcode(operator);
107
108 /* Initialize the scankey. */
110 index_attoff + 1,
112 regop,
113 searchslot->tts_values[table_attno - 1]);
114
115 skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
116
117 /* Check for null value. */
118 if (searchslot->tts_isnull[table_attno - 1])
119 skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
120
121 skey_attoff++;
122 }
123
124 /* There must always be at least one attribute for the index scan. */
125 Assert(skey_attoff > 0);
126
127 return skey_attoff;
128}
129
130
131/*
132 * Helper function to check if it is necessary to re-fetch and lock the tuple
133 * due to concurrent modifications. This function should be called after
134 * invoking table_tuple_lock.
135 */
136static bool
138{
139 bool refetch = false;
140
141 switch (res)
142 {
143 case TM_Ok:
144 break;
145 case TM_Updated:
146 /* XXX: Improve handling here */
148 ereport(LOG,
150 errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
151 else
152 ereport(LOG,
154 errmsg("concurrent update, retrying")));
155 refetch = true;
156 break;
157 case TM_Deleted:
158 /* XXX: Improve handling here */
159 ereport(LOG,
161 errmsg("concurrent delete, retrying")));
162 refetch = true;
163 break;
164 case TM_Invisible:
165 elog(ERROR, "attempted to lock invisible tuple");
166 break;
167 default:
168 elog(ERROR, "unexpected table_tuple_lock status: %u", res);
169 break;
170 }
171
172 return refetch;
173}
174
175/*
176 * Search the relation 'rel' for tuple using the index.
177 *
178 * If a matching tuple is found, lock it with lockmode, fill the slot with its
179 * contents, and return true. Return false otherwise.
180 */
181bool
183 LockTupleMode lockmode,
186{
188 int skey_attoff;
189 IndexScanDesc scan;
193 bool found;
196
197 /* Open the index. */
199
201
203
204 /* Build scan key. */
206
207 /* Start an index scan. */
208 scan = index_beginscan(rel, idxrel, &snap, NULL, skey_attoff, 0);
209
210retry:
211 found = false;
212
213 index_rescan(scan, skey, skey_attoff, NULL, 0);
214
215 /* Try to find the tuple */
217 {
218 /*
219 * Avoid expensive equality check if the index is primary key or
220 * replica identity index.
221 */
223 {
224 if (eq == NULL)
225 eq = palloc0_array(TypeCacheEntry *, outslot->tts_tupleDescriptor->natts);
226
228 continue;
229 }
230
232
234 snap.xmin : snap.xmax;
235
236 /*
237 * If the tuple is locked, wait for locking transaction to finish and
238 * retry.
239 */
241 {
243 goto retry;
244 }
245
246 /* Found our tuple and it's not locked */
247 found = true;
248 break;
249 }
250
251 /* Found tuple, try to lock it in the lockmode. */
252 if (found)
253 {
254 TM_FailureData tmfd;
255 TM_Result res;
256
258
259 res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
260 outslot,
261 GetCurrentCommandId(false),
262 lockmode,
264 0 /* don't follow updates */ ,
265 &tmfd);
266
268
269 if (should_refetch_tuple(res, &tmfd))
270 goto retry;
271 }
272
273 index_endscan(scan);
274
275 /* Don't release lock until commit. */
277
278 return found;
279}
280
281/*
282 * Compare the tuples in the slots by checking if they have equal values.
283 *
284 * If 'columns' is not null, only the columns specified within it will be
285 * considered for the equality check, ignoring all other columns.
286 */
287static bool
289 TypeCacheEntry **eq, Bitmapset *columns)
290{
291 int attrnum;
292
293 Assert(slot1->tts_tupleDescriptor->natts ==
294 slot2->tts_tupleDescriptor->natts);
295
298
299 /* Check equality of the attributes. */
300 for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
301 {
303 TypeCacheEntry *typentry;
304
305 att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
306
307 /*
308 * Ignore dropped and generated columns as the publisher doesn't send
309 * those
310 */
311 if (att->attisdropped || att->attgenerated)
312 continue;
313
314 /*
315 * Ignore columns that are not listed for checking.
316 */
317 if (columns &&
319 columns))
320 continue;
321
322 /*
323 * If one value is NULL and other is not, then they are certainly not
324 * equal
325 */
326 if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
327 return false;
328
329 /*
330 * If both are NULL, they can be considered equal.
331 */
332 if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
333 continue;
334
335 typentry = eq[attrnum];
336 if (typentry == NULL)
337 {
338 typentry = lookup_type_cache(att->atttypid,
340 if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
343 errmsg("could not identify an equality operator for type %s",
344 format_type_be(att->atttypid))));
345 eq[attrnum] = typentry;
346 }
347
349 att->attcollation,
350 slot1->tts_values[attrnum],
351 slot2->tts_values[attrnum])))
352 return false;
353 }
354
355 return true;
356}
357
358/*
359 * Search the relation 'rel' for tuple using the sequential scan.
360 *
361 * If a matching tuple is found, lock it with lockmode, fill the slot with its
362 * contents, and return true. Return false otherwise.
363 *
364 * Note that this stops on the first matching tuple.
365 *
366 * This can obviously be quite slow on tables that have more than few rows.
367 */
368bool
371{
373 TableScanDesc scan;
377 bool found;
379
380 Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
381
382 eq = palloc0_array(TypeCacheEntry *, outslot->tts_tupleDescriptor->natts);
383
384 /* Start a heap scan. */
386 scan = table_beginscan(rel, &snap, 0, NULL);
388
389retry:
390 found = false;
391
392 table_rescan(scan, NULL);
393
394 /* Try to find the tuple */
396 {
398 continue;
399
400 found = true;
402
404 snap.xmin : snap.xmax;
405
406 /*
407 * If the tuple is locked, wait for locking transaction to finish and
408 * retry.
409 */
411 {
413 goto retry;
414 }
415
416 /* Found our tuple and it's not locked */
417 break;
418 }
419
420 /* Found tuple, try to lock it in the lockmode. */
421 if (found)
422 {
423 TM_FailureData tmfd;
424 TM_Result res;
425
427
428 res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
429 outslot,
430 GetCurrentCommandId(false),
431 lockmode,
433 0 /* don't follow updates */ ,
434 &tmfd);
435
437
438 if (should_refetch_tuple(res, &tmfd))
439 goto retry;
440 }
441
442 table_endscan(scan);
444
445 return found;
446}
447
448/*
449 * Build additional index information necessary for conflict detection.
450 */
451static void
453{
454 for (int i = 0; i < resultRelInfo->ri_NumIndices; i++)
455 {
456 Relation indexRelation = resultRelInfo->ri_IndexRelationDescs[i];
458
459 if (conflictindex != RelationGetRelid(indexRelation))
460 continue;
461
462 /*
463 * This Assert will fail if BuildSpeculativeIndexInfo() is called
464 * twice for the given index.
465 */
466 Assert(indexRelationInfo->ii_UniqueOps == NULL);
467
469 }
470}
471
472/*
473 * If the tuple is recently dead and was deleted by a transaction with a newer
474 * commit timestamp than previously recorded, update the associated transaction
475 * ID, commit time, and origin. This helps ensure that conflict detection uses
476 * the most recent and relevant deletion metadata.
477 */
478static void
484{
486 HeapTuple tuple;
487 Buffer buf;
488 bool recently_dead = false;
489 TransactionId xmax;
492
494
495 tuple = ExecFetchSlotHeapTuple(scanslot, false, NULL);
496 buf = hslot->buffer;
497
499
500 /*
501 * We do not consider HEAPTUPLE_DEAD status because it indicates either
502 * tuples whose inserting transaction was aborted (meaning there is no
503 * commit timestamp or origin), or tuples deleted by a transaction older
504 * than oldestxmin, making it safe to ignore them during conflict
505 * detection (See comments atop worker.c for details).
506 */
508 recently_dead = true;
509
511
512 if (!recently_dead)
513 return;
514
515 xmax = HeapTupleHeaderGetUpdateXid(tuple->t_data);
516 if (!TransactionIdIsValid(xmax))
517 return;
518
519 /* Select the dead tuple with the most recent commit timestamp */
522 {
523 *delete_xid = xmax;
526 }
527}
528
529/*
530 * Searches the relation 'rel' for the most recently deleted tuple that matches
531 * the values in 'searchslot' and is not yet removable by VACUUM. The function
532 * returns the transaction ID, origin, and commit timestamp of the transaction
533 * that deleted this tuple.
534 *
535 * 'oldestxmin' acts as a cutoff transaction ID. Tuples deleted by transactions
536 * with IDs >= 'oldestxmin' are considered recently dead and are eligible for
537 * conflict detection.
538 *
539 * Instead of stopping at the first match, we scan all matching dead tuples to
540 * identify most recent deletion. This is crucial because only the latest
541 * deletion is relevant for resolving conflicts.
542 *
543 * For example, consider a scenario on the subscriber where a row is deleted,
544 * re-inserted, and then deleted again only on the subscriber:
545 *
546 * - (pk, 1) - deleted at 9:00,
547 * - (pk, 1) - deleted at 9:02,
548 *
549 * Now, a remote update arrives: (pk, 1) -> (pk, 2), timestamped at 9:01.
550 *
551 * If we mistakenly return the older deletion (9:00), the system may wrongly
552 * apply the remote update using a last-update-wins strategy. Instead, we must
553 * recognize the more recent deletion at 9:02 and skip the update. See
554 * comments atop worker.c for details. Note, as of now, conflict resolution
555 * is not implemented. Consequently, the system may incorrectly report the
556 * older tuple as the conflicted one, leading to misleading results.
557 *
558 * The commit timestamp of the deleting transaction is used to determine which
559 * tuple was deleted most recently.
560 */
561bool
567{
569 TableScanDesc scan;
573
574 Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
575
578 *delete_time = 0;
579
580 /*
581 * If the relation has a replica identity key or a primary key that is
582 * unusable for locating deleted tuples (see
583 * IsIndexUsableForFindingDeletedTuple), a full table scan becomes
584 * necessary. In such cases, comparing the entire tuple is not required,
585 * since the remote tuple might not include all column values. Instead,
586 * the indexed columns alone are sufficient to identify the target tuple
587 * (see logicalrep_rel_mark_updatable).
588 */
591
592 /* fallback to PK if no replica identity */
593 if (!indexbitmap)
596
597 eq = palloc0_array(TypeCacheEntry *, searchslot->tts_tupleDescriptor->natts);
598
599 /*
600 * Start a heap scan using SnapshotAny to identify dead tuples that are
601 * not visible under a standard MVCC snapshot. Tuples from transactions
602 * not yet committed or those just committed prior to the scan are
603 * excluded in update_most_recent_deletion_info().
604 */
605 scan = table_beginscan(rel, SnapshotAny, 0, NULL);
607
608 table_rescan(scan, NULL);
609
610 /* Try to find the tuple */
612 {
614 continue;
615
618 }
619
620 table_endscan(scan);
622
623 return *delete_time != 0;
624}
625
626/*
627 * Similar to RelationFindDeletedTupleInfoSeq() but using index scan to locate
628 * the deleted tuple.
629 */
630bool
637{
640 int skey_attoff;
641 IndexScanDesc scan;
646
647 Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
649
651 *delete_time = 0;
653
655
657
659
660 /* Build scan key. */
662
663 /*
664 * Start an index scan using SnapshotAny to identify dead tuples that are
665 * not visible under a standard MVCC snapshot. Tuples from transactions
666 * not yet committed or those just committed prior to the scan are
667 * excluded in update_most_recent_deletion_info().
668 */
670
671 index_rescan(scan, skey, skey_attoff, NULL, 0);
672
673 /* Try to find the tuple */
675 {
676 /*
677 * Avoid expensive equality check if the index is primary key or
678 * replica identity index.
679 */
681 {
682 if (eq == NULL)
683 eq = palloc0_array(TypeCacheEntry *, scanslot->tts_tupleDescriptor->natts);
684
686 continue;
687 }
688
691 }
692
693 index_endscan(scan);
694
696
698
699 return *delete_time != 0;
700}
701
702/*
703 * Find the tuple that violates the passed unique index (conflictindex).
704 *
705 * If the conflicting tuple is found return true, otherwise false.
706 *
707 * We lock the tuple to avoid getting it deleted before the caller can fetch
708 * the required information. Note that if the tuple is deleted before a lock
709 * is acquired, we will retry to find the conflicting tuple again.
710 */
711static bool
712FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate,
715{
716 Relation rel = resultRelInfo->ri_RelationDesc;
718 TM_FailureData tmfd;
719 TM_Result res;
720
722
723 /*
724 * Build additional information required to check constraints violations.
725 * See check_exclusion_or_unique_constraint().
726 */
728
729retry:
730 if (ExecCheckIndexConstraints(resultRelInfo, slot, estate,
731 &conflictTid, &slot->tts_tid,
733 {
734 if (*conflictslot)
736
738 return false;
739 }
740
742
744
747 GetCurrentCommandId(false),
750 0 /* don't follow updates */ ,
751 &tmfd);
752
754
755 if (should_refetch_tuple(res, &tmfd))
756 goto retry;
757
758 return true;
759}
760
761/*
762 * Check all the unique indexes in 'recheckIndexes' for conflict with the
763 * tuple in 'remoteslot' and report if found.
764 */
765static void
769{
772
773 /* Check all the unique indexes for conflicts */
775 {
777 FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
778 &conflictslot))
779 {
781
783 conflicttuple->indexoid = uniqueidx;
784
786 &conflicttuple->origin, &conflicttuple->ts);
787
789 }
790 }
791
792 /* Report the conflict, if found */
793 if (conflicttuples)
794 ReportApplyConflict(estate, resultRelInfo, ERROR,
797}
798
799/*
800 * Insert tuple represented in the slot to the relation, update the indexes,
801 * and execute any constraints and per-row triggers.
802 *
803 * Caller is responsible for opening the indexes.
804 */
805void
807 EState *estate, TupleTableSlot *slot)
808{
809 bool skip_tuple = false;
810 Relation rel = resultRelInfo->ri_RelationDesc;
811
812 /* For now we support only tables. */
813 Assert(rel->rd_rel->relkind == RELKIND_RELATION);
814
816
817 /* BEFORE ROW INSERT Triggers */
818 if (resultRelInfo->ri_TrigDesc &&
819 resultRelInfo->ri_TrigDesc->trig_insert_before_row)
820 {
821 if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
822 skip_tuple = true; /* "do nothing" */
823 }
824
825 if (!skip_tuple)
826 {
829 bool conflict = false;
830
831 /* Compute stored generated columns */
832 if (rel->rd_att->constr &&
834 ExecComputeStoredGenerated(resultRelInfo, estate, slot,
835 CMD_INSERT);
836
837 /* Check the constraints of the tuple */
838 if (rel->rd_att->constr)
839 ExecConstraints(resultRelInfo, slot, estate);
840 if (rel->rd_rel->relispartition)
841 ExecPartitionCheck(resultRelInfo, slot, estate, true);
842
843 /* OK, store the tuple and create index entries for it */
844 simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
845
847
848 if (resultRelInfo->ri_NumIndices > 0)
850 slot, estate, false,
851 conflictindexes ? true : false,
852 &conflict,
853 conflictindexes, false);
854
855 /*
856 * Checks the conflict indexes to fetch the conflicting local row and
857 * reports the conflict. We perform this check here, instead of
858 * performing an additional index scan before the actual insertion and
859 * reporting the conflict if any conflicting rows are found. This is
860 * to avoid the overhead of executing the extra scan for each INSERT
861 * operation, even when no conflict arises, which could introduce
862 * significant overhead to replication, particularly in cases where
863 * conflicts are rare.
864 *
865 * XXX OTOH, this could lead to clean-up effort for dead tuples added
866 * in heap and index in case of conflicts. But as conflicts shouldn't
867 * be a frequent thing so we preferred to save the performance
868 * overhead of extra scan before each insertion.
869 */
870 if (conflict)
871 CheckAndReportConflict(resultRelInfo, estate, CT_INSERT_EXISTS,
872 recheckIndexes, NULL, slot);
873
874 /* AFTER ROW INSERT Triggers */
875 ExecARInsertTriggers(estate, resultRelInfo, slot,
877
878 /*
879 * XXX we should in theory pass a TransitionCaptureState object to the
880 * above to capture transition tuples, but after statement triggers
881 * don't actually get fired by replication yet anyway
882 */
883
885 }
886}
887
888/*
889 * Find the searchslot tuple and update it with data in the slot,
890 * update the indexes, and execute any constraints and per-row triggers.
891 *
892 * Caller is responsible for opening the indexes.
893 */
894void
896 EState *estate, EPQState *epqstate,
898{
899 bool skip_tuple = false;
900 Relation rel = resultRelInfo->ri_RelationDesc;
901 ItemPointer tid = &(searchslot->tts_tid);
902
903 /*
904 * We support only non-system tables, with
905 * check_publication_add_relation() accountable.
906 */
907 Assert(rel->rd_rel->relkind == RELKIND_RELATION);
909
911
912 /* BEFORE ROW UPDATE Triggers */
913 if (resultRelInfo->ri_TrigDesc &&
914 resultRelInfo->ri_TrigDesc->trig_update_before_row)
915 {
916 if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
917 tid, NULL, slot, NULL, NULL, false))
918 skip_tuple = true; /* "do nothing" */
919 }
920
921 if (!skip_tuple)
922 {
926 bool conflict = false;
927
928 /* Compute stored generated columns */
929 if (rel->rd_att->constr &&
931 ExecComputeStoredGenerated(resultRelInfo, estate, slot,
932 CMD_UPDATE);
933
934 /* Check the constraints of the tuple */
935 if (rel->rd_att->constr)
936 ExecConstraints(resultRelInfo, slot, estate);
937 if (rel->rd_rel->relispartition)
938 ExecPartitionCheck(resultRelInfo, slot, estate, true);
939
940 simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
942
944
945 if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
947 slot, estate, true,
948 conflictindexes ? true : false,
951
952 /*
953 * Refer to the comments above the call to CheckAndReportConflict() in
954 * ExecSimpleRelationInsert to understand why this check is done at
955 * this point.
956 */
957 if (conflict)
958 CheckAndReportConflict(resultRelInfo, estate, CT_UPDATE_EXISTS,
960
961 /* AFTER ROW UPDATE Triggers */
962 ExecARUpdateTriggers(estate, resultRelInfo,
963 NULL, NULL,
964 tid, NULL, slot,
965 recheckIndexes, NULL, false);
966
968 }
969}
970
971/*
972 * Find the searchslot tuple and delete it, and execute any constraints
973 * and per-row triggers.
974 *
975 * Caller is responsible for opening the indexes.
976 */
977void
979 EState *estate, EPQState *epqstate,
981{
982 bool skip_tuple = false;
983 Relation rel = resultRelInfo->ri_RelationDesc;
984 ItemPointer tid = &searchslot->tts_tid;
985
987
988 /* BEFORE ROW DELETE Triggers */
989 if (resultRelInfo->ri_TrigDesc &&
990 resultRelInfo->ri_TrigDesc->trig_delete_before_row)
991 {
992 skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
993 tid, NULL, NULL, NULL, NULL, false);
994 }
995
996 if (!skip_tuple)
997 {
998 /* OK, delete the tuple */
999 simple_table_tuple_delete(rel, tid, estate->es_snapshot);
1000
1001 /* AFTER ROW DELETE Triggers */
1002 ExecARDeleteTriggers(estate, resultRelInfo,
1003 tid, NULL, NULL, false);
1004 }
1005}
1006
1007/*
1008 * Check if command can be executed with current replica identity.
1009 */
1010void
1012{
1014
1015 /*
1016 * Skip checking the replica identity for partitioned tables, because the
1017 * operations are actually performed on the leaf partitions.
1018 */
1019 if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1020 return;
1021
1022 /* We only need to do checks for UPDATE and DELETE. */
1023 if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
1024 return;
1025
1026 /*
1027 * It is only safe to execute UPDATE/DELETE if the relation does not
1028 * publish UPDATEs or DELETEs, or all the following conditions are
1029 * satisfied:
1030 *
1031 * 1. All columns, referenced in the row filters from publications which
1032 * the relation is in, are valid - i.e. when all referenced columns are
1033 * part of REPLICA IDENTITY.
1034 *
1035 * 2. All columns, referenced in the column lists are valid - i.e. when
1036 * all columns referenced in the REPLICA IDENTITY are covered by the
1037 * column list.
1038 *
1039 * 3. All generated columns in REPLICA IDENTITY of the relation, are valid
1040 * - i.e. when all these generated columns are published.
1041 *
1042 * XXX We could optimize it by first checking whether any of the
1043 * publications have a row filter or column list for this relation, or if
1044 * the relation contains a generated column. If none of these exist and
1045 * the relation has replica identity then we can avoid building the
1046 * descriptor but as this happens only one time it doesn't seem worth the
1047 * additional complexity.
1048 */
1050 if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
1051 ereport(ERROR,
1053 errmsg("cannot update table \"%s\"",
1055 errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
1056 else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
1057 ereport(ERROR,
1059 errmsg("cannot update table \"%s\"",
1061 errdetail("Column list used by the publication does not cover the replica identity.")));
1062 else if (cmd == CMD_UPDATE && !pubdesc.gencols_valid_for_update)
1063 ereport(ERROR,
1065 errmsg("cannot update table \"%s\"",
1067 errdetail("Replica identity must not contain unpublished generated columns.")));
1068 else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
1069 ereport(ERROR,
1071 errmsg("cannot delete from table \"%s\"",
1073 errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
1074 else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
1075 ereport(ERROR,
1077 errmsg("cannot delete from table \"%s\"",
1079 errdetail("Column list used by the publication does not cover the replica identity.")));
1080 else if (cmd == CMD_DELETE && !pubdesc.gencols_valid_for_delete)
1081 ereport(ERROR,
1083 errmsg("cannot delete from table \"%s\"",
1085 errdetail("Replica identity must not contain unpublished generated columns.")));
1086
1087 /* If relation has replica identity we are always good. */
1089 return;
1090
1091 /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
1092 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
1093 return;
1094
1095 /*
1096 * This is UPDATE/DELETE and there is no replica identity.
1097 *
1098 * Check if the table publishes UPDATES or DELETES.
1099 */
1100 if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
1101 ereport(ERROR,
1103 errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
1105 errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
1106 else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
1107 ereport(ERROR,
1109 errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
1111 errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
1112}
1113
1114
1115/*
1116 * Check if we support writing into specific relkind of local relation and check
1117 * if it aligns with the relkind of the relation on the publisher.
1118 *
1119 * The nspname and relname are only needed for error reporting.
1120 */
1121void
1123 const char *nspname, const char *relname)
1124{
1128 ereport(ERROR,
1130 errmsg("cannot use relation \"%s.%s\" as logical replication target",
1131 nspname, relname),
1133
1134 /*
1135 * Allow RELKIND_RELATION and RELKIND_PARTITIONED_TABLE to be treated
1136 * interchangeably, but ensure that sequences (RELKIND_SEQUENCE) match
1137 * exactly on both publisher and subscriber.
1138 */
1141 ereport(ERROR,
1143 /* translator: 3rd and 4th %s are "sequence" or "table" */
1144 errmsg("relation \"%s.%s\" type mismatch: source \"%s\", target \"%s\"",
1145 nspname, relname,
1146 remoterelkind == RELKIND_SEQUENCE ? "sequence" : "table",
1147 localrelkind == RELKIND_SEQUENCE ? "sequence" : "table"));
1148}
StrategyNumber IndexAmTranslateCompareType(CompareType cmptype, Oid amoid, Oid opfamily, bool missing_ok)
Definition amapi.c:161
#define AttributeNumberIsValid(attributeNumber)
Definition attnum.h:34
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition timestamp.c:1781
bool bms_is_member(int x, const Bitmapset *a)
Definition bitmapset.c:510
int Buffer
Definition buf.h:23
@ BUFFER_LOCK_SHARE
Definition bufmgr.h:210
@ BUFFER_LOCK_UNLOCK
Definition bufmgr.h:205
static void LockBuffer(Buffer buffer, BufferLockMode mode)
Definition bufmgr.h:328
#define PG_USED_FOR_ASSERTS_ONLY
Definition c.h:223
#define Assert(condition)
Definition c.h:873
regproc RegProcedure
Definition c.h:664
uint32 TransactionId
Definition c.h:666
#define OidIsValid(objectId)
Definition c.h:788
bool IsCatalogRelation(Relation relation)
Definition catalog.c:104
@ COMPARE_EQ
Definition cmptype.h:36
bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, ReplOriginId *nodeid)
Definition commit_ts.c:272
bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, ReplOriginId *localorigin, TimestampTz *localts)
Definition conflict.c:63
void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *remoteslot, List *conflicttuples)
Definition conflict.c:104
ConflictType
Definition conflict.h:32
@ CT_MULTIPLE_UNIQUE_CONFLICTS
Definition conflict.h:55
@ CT_INSERT_EXISTS
Definition conflict.h:34
@ CT_UPDATE_EXISTS
Definition conflict.h:40
int64 TimestampTz
Definition timestamp.h:39
int errdetail(const char *fmt,...)
Definition elog.c:1216
int errhint(const char *fmt,...)
Definition elog.c:1330
int errcode(int sqlerrcode)
Definition elog.c:863
int errmsg(const char *fmt,...)
Definition elog.c:1080
#define LOG
Definition elog.h:31
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
List * ExecInsertIndexTuples(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool update, bool noDupErr, bool *specConflict, List *arbiterIndexes, bool onlySummarizing)
bool ExecCheckIndexConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, ItemPointer conflictTid, const ItemPointerData *tupleid, List *arbiterIndexes)
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition execMain.c:1860
void ExecConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate)
Definition execMain.c:1984
static void BuildConflictIndexInfo(ResultRelInfo *resultRelInfo, Oid conflictindex)
void CheckSubscriptionRelkind(char localrelkind, char remoterelkind, const char *nspname, const char *relname)
static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, TypeCacheEntry **eq, Bitmapset *columns)
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
static bool should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
void CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
bool RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid, TupleTableSlot *searchslot, TransactionId oldestxmin, TransactionId *delete_xid, ReplOriginId *delete_origin, TimestampTz *delete_time)
bool RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot, TransactionId oldestxmin, TransactionId *delete_xid, ReplOriginId *delete_origin, TimestampTz *delete_time)
static bool FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate, Oid conflictindex, TupleTableSlot *slot, TupleTableSlot **conflictslot)
static int build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *searchslot)
static void update_most_recent_deletion_info(TupleTableSlot *scanslot, TransactionId oldestxmin, TransactionId *delete_xid, TimestampTz *delete_time, ReplOriginId *delete_origin)
static void CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate, ConflictType type, List *recheckIndexes, TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
HeapTuple ExecFetchSlotHeapTuple(TupleTableSlot *slot, bool materialize, bool *shouldFree)
#define palloc0_array(type, count)
Definition fe_memutils.h:77
#define palloc0_object(type)
Definition fe_memutils.h:75
Datum FunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
Definition fmgr.c:1150
char * format_type_be(Oid type_oid)
@ HEAPTUPLE_RECENTLY_DEAD
Definition heapam.h:128
HTSV_Result HeapTupleSatisfiesVacuum(HeapTuple htup, TransactionId OldestXmin, Buffer buffer)
static TransactionId HeapTupleHeaderGetUpdateXid(const HeapTupleHeaderData *tup)
void BuildSpeculativeIndexInfo(Relation index, IndexInfo *ii)
Definition index.c:2667
bool index_getnext_slot(IndexScanDesc scan, ScanDirection direction, TupleTableSlot *slot)
Definition indexam.c:730
IndexScanDesc index_beginscan(Relation heapRelation, Relation indexRelation, Snapshot snapshot, IndexScanInstrumentation *instrument, int nkeys, int norderbys)
Definition indexam.c:256
void index_close(Relation relation, LOCKMODE lockmode)
Definition indexam.c:177
void index_endscan(IndexScanDesc scan)
Definition indexam.c:392
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition indexam.c:133
void index_rescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, int norderbys)
Definition indexam.c:366
int i
Definition isn.c:77
static bool ItemPointerIndicatesMovedPartitions(const ItemPointerData *pointer)
Definition itemptr.h:197
List * lappend(List *list, void *datum)
Definition list.c:339
void list_free(List *list)
Definition list.c:1546
bool list_member_oid(const List *list, Oid datum)
Definition list.c:722
void XactLockTableWait(TransactionId xid, Relation rel, const ItemPointerData *ctid, XLTW_Oper oper)
Definition lmgr.c:663
@ XLTW_None
Definition lmgr.h:26
#define NoLock
Definition lockdefs.h:34
#define RowExclusiveLock
Definition lockdefs.h:38
@ LockWaitBlock
Definition lockoptions.h:39
LockTupleMode
Definition lockoptions.h:50
@ LockTupleShare
Definition lockoptions.h:54
Oid get_opclass_input_type(Oid opclass)
Definition lsyscache.c:1314
Oid get_opclass_family(Oid opclass)
Definition lsyscache.c:1292
RegProcedure get_opcode(Oid opno)
Definition lsyscache.c:1435
Oid get_opfamily_member(Oid opfamily, Oid lefttype, Oid righttype, int16 strategy)
Definition lsyscache.c:168
void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot, CmdType cmdtype)
CmdType
Definition nodes.h:273
@ CMD_INSERT
Definition nodes.h:277
@ CMD_DELETE
Definition nodes.h:278
@ CMD_UPDATE
Definition nodes.h:276
#define InvalidReplOriginId
Definition origin.h:33
FormData_pg_attribute * Form_pg_attribute
int errdetail_relkind_not_supported(char relkind)
Definition pg_class.c:24
NameData relname
Definition pg_class.h:38
#define INDEX_MAX_KEYS
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
#define list_make1_oid(x1)
Definition pg_list.h:242
#define foreach_oid(var, lst)
Definition pg_list.h:471
static char buf[DEFAULT_XLOG_SEG_SIZE]
#define ERRCODE_T_R_SERIALIZATION_FAILURE
Definition pgbench.c:77
static bool DatumGetBool(Datum X)
Definition postgres.h:100
uint64_t Datum
Definition postgres.h:70
static Pointer DatumGetPointer(Datum X)
Definition postgres.h:342
unsigned int Oid
static int fb(int x)
#define RelationGetRelid(relation)
Definition rel.h:514
#define RelationGetDescr(relation)
Definition rel.h:540
#define RelationGetRelationName(relation)
Definition rel.h:548
#define IndexRelationGetNumberOfKeyAttributes(relation)
Definition rel.h:533
void RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc)
Definition relcache.c:5789
Bitmapset * RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
Definition relcache.c:5298
Oid RelationGetReplicaIndex(Relation relation)
Definition relcache.c:5067
@ INDEX_ATTR_BITMAP_PRIMARY_KEY
Definition relcache.h:70
@ INDEX_ATTR_BITMAP_IDENTITY_KEY
Definition relcache.h:71
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition scankey.c:76
@ ForwardScanDirection
Definition sdir.h:28
#define SK_SEARCHNULL
Definition skey.h:121
#define SK_ISNULL
Definition skey.h:115
Snapshot GetLatestSnapshot(void)
Definition snapmgr.c:354
void PushActiveSnapshot(Snapshot snapshot)
Definition snapmgr.c:682
void PopActiveSnapshot(void)
Definition snapmgr.c:775
Snapshot GetActiveSnapshot(void)
Definition snapmgr.c:800
#define SnapshotAny
Definition snapmgr.h:33
#define InitDirtySnapshot(snapshotdata)
Definition snapmgr.h:42
Oid GetRelationIdentityOrPK(Relation rel)
Definition relation.c:905
uint16 StrategyNumber
Definition stratnum.h:22
Snapshot es_snapshot
Definition execnodes.h:662
Oid fn_oid
Definition fmgr.h:59
HeapTupleHeader t_data
Definition htup.h:68
Definition pg_list.h:54
TupleDesc rd_att
Definition rel.h:112
Form_pg_class rd_rel
Definition rel.h:111
List * ri_onConflictArbiterIndexes
Definition execnodes.h:582
Relation ri_RelationDesc
Definition execnodes.h:482
RelationPtr ri_IndexRelationDescs
Definition execnodes.h:488
TriggerDesc * ri_TrigDesc
Definition execnodes.h:517
IndexInfo ** ri_IndexRelationInfo
Definition execnodes.h:491
ItemPointerData ctid
Definition tableam.h:149
bool trig_delete_before_row
Definition reltrigger.h:66
bool trig_update_before_row
Definition reltrigger.h:61
bool trig_insert_before_row
Definition reltrigger.h:56
bool has_generated_stored
Definition tupdesc.h:46
TupleConstr * constr
Definition tupdesc.h:141
ItemPointerData tts_tid
Definition tuptable.h:128
FmgrInfo eq_opr_finfo
Definition typcache.h:76
Definition c.h:745
Oid values[FLEXIBLE_ARRAY_MEMBER]
Definition c.h:752
#define FirstLowInvalidHeapAttributeNumber
Definition sysattr.h:27
Datum SysCacheGetAttrNotNull(int cacheId, HeapTuple tup, AttrNumber attributeNumber)
Definition syscache.c:625
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition tableam.c:92
void simple_table_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot, Snapshot snapshot, TU_UpdateIndexes *update_indexes)
Definition tableam.c:359
void simple_table_tuple_insert(Relation rel, TupleTableSlot *slot)
Definition tableam.c:300
void simple_table_tuple_delete(Relation rel, ItemPointer tid, Snapshot snapshot)
Definition tableam.c:314
TU_UpdateIndexes
Definition tableam.h:111
@ TU_Summarizing
Definition tableam.h:119
@ TU_None
Definition tableam.h:113
static void table_endscan(TableScanDesc scan)
Definition tableam.h:1005
TM_Result
Definition tableam.h:73
@ TM_Ok
Definition tableam.h:78
@ TM_Deleted
Definition tableam.h:93
@ TM_Updated
Definition tableam.h:90
@ TM_Invisible
Definition tableam.h:81
static TM_Result table_tuple_lock(Relation rel, ItemPointer tid, Snapshot snapshot, TupleTableSlot *slot, CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, uint8 flags, TM_FailureData *tmfd)
Definition tableam.h:1571
static void table_rescan(TableScanDesc scan, ScanKeyData *key)
Definition tableam.h:1014
static bool table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
Definition tableam.h:1040
static TableScanDesc table_beginscan(Relation rel, Snapshot snapshot, int nkeys, ScanKeyData *key)
Definition tableam.h:897
#define InvalidTransactionId
Definition transam.h:31
#define TransactionIdIsValid(xid)
Definition transam.h:41
bool ExecBRUpdateTriggers(EState *estate, EPQState *epqstate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot *newslot, TM_Result *tmresult, TM_FailureData *tmfd, bool is_merge_update)
Definition trigger.c:2971
void ExecARDeleteTriggers(EState *estate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TransitionCaptureState *transition_capture, bool is_crosspart_update)
Definition trigger.c:2801
bool ExecBRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)
Definition trigger.c:2465
void ExecARUpdateTriggers(EState *estate, ResultRelInfo *relinfo, ResultRelInfo *src_partinfo, ResultRelInfo *dst_partinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot *newslot, List *recheckIndexes, TransitionCaptureState *transition_capture, bool is_crosspart_update)
Definition trigger.c:3144
bool ExecBRDeleteTriggers(EState *estate, EPQState *epqstate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot **epqslot, TM_Result *tmresult, TM_FailureData *tmfd, bool is_merge_delete)
Definition trigger.c:2701
void ExecARInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot, List *recheckIndexes, TransitionCaptureState *transition_capture)
Definition trigger.c:2543
bool equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2)
Definition tupdesc.c:590
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition tupdesc.h:160
static void slot_getallattrs(TupleTableSlot *slot)
Definition tuptable.h:371
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition tuptable.h:524
static void ExecMaterializeSlot(TupleTableSlot *slot)
Definition tuptable.h:475
TypeCacheEntry * lookup_type_cache(Oid type_id, int flags)
Definition typcache.c:386
#define TYPECACHE_EQ_OPR_FINFO
Definition typcache.h:143
const char * type
CommandId GetCurrentCommandId(bool used)
Definition xact.c:830
uint16 ReplOriginId
Definition xlogdefs.h:69