PostgreSQL Source Code git master
Loading...
Searching...
No Matches
execReplication.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * execReplication.c
4 * miscellaneous executor routines for logical replication
5 *
6 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/executor/execReplication.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres.h"
16
17#include "access/amapi.h"
18#include "access/commit_ts.h"
19#include "access/genam.h"
20#include "access/gist.h"
21#include "access/relscan.h"
22#include "access/tableam.h"
23#include "access/transam.h"
24#include "access/xact.h"
25#include "access/heapam.h"
26#include "catalog/pg_am_d.h"
27#include "commands/trigger.h"
28#include "executor/executor.h"
32#include "storage/lmgr.h"
33#include "utils/builtins.h"
34#include "utils/lsyscache.h"
35#include "utils/rel.h"
36#include "utils/snapmgr.h"
37#include "utils/syscache.h"
38#include "utils/typcache.h"
39
40
42 TypeCacheEntry **eq, Bitmapset *columns);
43
44/*
45 * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
46 * is setup to match 'rel' (*NOT* idxrel!).
47 *
48 * Returns how many columns to use for the index scan.
49 *
50 * This is not a generic routine, idxrel must be PK, RI, or an index that can be
51 * used for a REPLICA IDENTITY FULL table. See FindUsableIndexForReplicaIdentityFull()
52 * for details.
53 *
54 * By definition, replication identity of a rel meets all limitations associated
55 * with that. Note that any other index could also meet these limitations.
56 */
57static int
60{
61 int index_attoff;
62 int skey_attoff = 0;
64 oidvector *opclass;
65 int2vector *indkey = &idxrel->rd_index->indkey;
66
70
71 /* Build scankey for every non-expression attribute in the index. */
74 {
75 Oid operator;
76 Oid optype;
77 Oid opfamily;
81
83 {
84 /*
85 * XXX: Currently, we don't support expressions in the scan key,
86 * see code below.
87 */
88 continue;
89 }
90
91 /*
92 * Load the operator info. We need this to get the equality operator
93 * function for the scan key.
94 */
96 opfamily = get_opclass_family(opclass->values[index_attoff]);
97 eq_strategy = IndexAmTranslateCompareType(COMPARE_EQ, idxrel->rd_rel->relam, opfamily, false);
98 operator = get_opfamily_member(opfamily, optype,
99 optype,
101
102 if (!OidIsValid(operator))
103 elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
104 eq_strategy, optype, optype, opfamily);
105
106 regop = get_opcode(operator);
107
108 /* Initialize the scankey. */
110 index_attoff + 1,
112 regop,
113 searchslot->tts_values[table_attno - 1]);
114
115 skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
116
117 /* Check for null value. */
118 if (searchslot->tts_isnull[table_attno - 1])
119 skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
120
121 skey_attoff++;
122 }
123
124 /* There must always be at least one attribute for the index scan. */
125 Assert(skey_attoff > 0);
126
127 return skey_attoff;
128}
129
130
131/*
132 * Helper function to check if it is necessary to re-fetch and lock the tuple
133 * due to concurrent modifications. This function should be called after
134 * invoking table_tuple_lock.
135 */
136static bool
138{
139 bool refetch = false;
140
141 switch (res)
142 {
143 case TM_Ok:
144 break;
145 case TM_Updated:
146 /* XXX: Improve handling here */
148 ereport(LOG,
150 errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
151 else
152 ereport(LOG,
154 errmsg("concurrent update, retrying")));
155 refetch = true;
156 break;
157 case TM_Deleted:
158 /* XXX: Improve handling here */
159 ereport(LOG,
161 errmsg("concurrent delete, retrying")));
162 refetch = true;
163 break;
164 case TM_Invisible:
165 elog(ERROR, "attempted to lock invisible tuple");
166 break;
167 default:
168 elog(ERROR, "unexpected table_tuple_lock status: %u", res);
169 break;
170 }
171
172 return refetch;
173}
174
175/*
176 * Search the relation 'rel' for tuple using the index.
177 *
178 * If a matching tuple is found, lock it with lockmode, fill the slot with its
179 * contents, and return true. Return false otherwise.
180 */
181bool
183 LockTupleMode lockmode,
186{
188 int skey_attoff;
189 IndexScanDesc scan;
193 bool found;
196
197 /* Open the index. */
199
201
203
204 /* Build scan key. */
206
207 /* Start an index scan. */
208 scan = index_beginscan(rel, idxrel,
210
211retry:
212 found = false;
213
214 index_rescan(scan, skey, skey_attoff, NULL, 0);
215
216 /* Try to find the tuple */
218 {
219 /*
220 * Avoid expensive equality check if the index is primary key or
221 * replica identity index.
222 */
224 {
225 if (eq == NULL)
226 eq = palloc0_array(TypeCacheEntry *, outslot->tts_tupleDescriptor->natts);
227
229 continue;
230 }
231
233
235 snap.xmin : snap.xmax;
236
237 /*
238 * If the tuple is locked, wait for locking transaction to finish and
239 * retry.
240 */
242 {
244 goto retry;
245 }
246
247 /* Found our tuple and it's not locked */
248 found = true;
249 break;
250 }
251
252 /* Found tuple, try to lock it in the lockmode. */
253 if (found)
254 {
255 TM_FailureData tmfd;
256 TM_Result res;
257
259
260 res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
261 outslot,
262 GetCurrentCommandId(false),
263 lockmode,
265 0 /* don't follow updates */ ,
266 &tmfd);
267
269
270 if (should_refetch_tuple(res, &tmfd))
271 goto retry;
272 }
273
274 index_endscan(scan);
275
276 /* Don't release lock until commit. */
278
279 return found;
280}
281
282/*
283 * Compare the tuples in the slots by checking if they have equal values.
284 *
285 * If 'columns' is not null, only the columns specified within it will be
286 * considered for the equality check, ignoring all other columns.
287 */
288static bool
290 TypeCacheEntry **eq, Bitmapset *columns)
291{
292 int attrnum;
293
294 Assert(slot1->tts_tupleDescriptor->natts ==
295 slot2->tts_tupleDescriptor->natts);
296
299
300 /* Check equality of the attributes. */
301 for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
302 {
304 TypeCacheEntry *typentry;
305
306 att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
307
308 /*
309 * Ignore dropped and generated columns as the publisher doesn't send
310 * those
311 */
312 if (att->attisdropped || att->attgenerated)
313 continue;
314
315 /*
316 * Ignore columns that are not listed for checking.
317 */
318 if (columns &&
320 columns))
321 continue;
322
323 /*
324 * If one value is NULL and other is not, then they are certainly not
325 * equal
326 */
327 if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
328 return false;
329
330 /*
331 * If both are NULL, they can be considered equal.
332 */
333 if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
334 continue;
335
336 typentry = eq[attrnum];
337 if (typentry == NULL)
338 {
339 typentry = lookup_type_cache(att->atttypid,
341 if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
344 errmsg("could not identify an equality operator for type %s",
345 format_type_be(att->atttypid))));
346 eq[attrnum] = typentry;
347 }
348
350 att->attcollation,
351 slot1->tts_values[attrnum],
352 slot2->tts_values[attrnum])))
353 return false;
354 }
355
356 return true;
357}
358
359/*
360 * Search the relation 'rel' for tuple using the sequential scan.
361 *
362 * If a matching tuple is found, lock it with lockmode, fill the slot with its
363 * contents, and return true. Return false otherwise.
364 *
365 * Note that this stops on the first matching tuple.
366 *
367 * This can obviously be quite slow on tables that have more than few rows.
368 */
369bool
372{
374 TableScanDesc scan;
378 bool found;
380
381 Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
382
383 eq = palloc0_array(TypeCacheEntry *, outslot->tts_tupleDescriptor->natts);
384
385 /* Start a heap scan. */
387 scan = table_beginscan(rel, &snap, 0, NULL,
388 SO_NONE);
390
391retry:
392 found = false;
393
394 table_rescan(scan, NULL);
395
396 /* Try to find the tuple */
398 {
400 continue;
401
402 found = true;
404
406 snap.xmin : snap.xmax;
407
408 /*
409 * If the tuple is locked, wait for locking transaction to finish and
410 * retry.
411 */
413 {
415 goto retry;
416 }
417
418 /* Found our tuple and it's not locked */
419 break;
420 }
421
422 /* Found tuple, try to lock it in the lockmode. */
423 if (found)
424 {
425 TM_FailureData tmfd;
426 TM_Result res;
427
429
430 res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
431 outslot,
432 GetCurrentCommandId(false),
433 lockmode,
435 0 /* don't follow updates */ ,
436 &tmfd);
437
439
440 if (should_refetch_tuple(res, &tmfd))
441 goto retry;
442 }
443
444 table_endscan(scan);
446
447 return found;
448}
449
450/*
451 * Build additional index information necessary for conflict detection.
452 */
453static void
455{
456 for (int i = 0; i < resultRelInfo->ri_NumIndices; i++)
457 {
458 Relation indexRelation = resultRelInfo->ri_IndexRelationDescs[i];
460
461 if (conflictindex != RelationGetRelid(indexRelation))
462 continue;
463
464 /*
465 * This Assert will fail if BuildSpeculativeIndexInfo() is called
466 * twice for the given index.
467 */
468 Assert(indexRelationInfo->ii_UniqueOps == NULL);
469
471 }
472}
473
474/*
475 * If the tuple is recently dead and was deleted by a transaction with a newer
476 * commit timestamp than previously recorded, update the associated transaction
477 * ID, commit time, and origin. This helps ensure that conflict detection uses
478 * the most recent and relevant deletion metadata.
479 */
480static void
486{
488 HeapTuple tuple;
489 Buffer buf;
490 bool recently_dead = false;
491 TransactionId xmax;
494
496
497 tuple = ExecFetchSlotHeapTuple(scanslot, false, NULL);
498 buf = hslot->buffer;
499
501
502 /*
503 * We do not consider HEAPTUPLE_DEAD status because it indicates either
504 * tuples whose inserting transaction was aborted (meaning there is no
505 * commit timestamp or origin), or tuples deleted by a transaction older
506 * than oldestxmin, making it safe to ignore them during conflict
507 * detection (See comments atop worker.c for details).
508 */
510 recently_dead = true;
511
513
514 if (!recently_dead)
515 return;
516
517 xmax = HeapTupleHeaderGetUpdateXid(tuple->t_data);
518 if (!TransactionIdIsValid(xmax))
519 return;
520
521 /* Select the dead tuple with the most recent commit timestamp */
524 {
525 *delete_xid = xmax;
528 }
529}
530
531/*
532 * Searches the relation 'rel' for the most recently deleted tuple that matches
533 * the values in 'searchslot' and is not yet removable by VACUUM. The function
534 * returns the transaction ID, origin, and commit timestamp of the transaction
535 * that deleted this tuple.
536 *
537 * 'oldestxmin' acts as a cutoff transaction ID. Tuples deleted by transactions
538 * with IDs >= 'oldestxmin' are considered recently dead and are eligible for
539 * conflict detection.
540 *
541 * Instead of stopping at the first match, we scan all matching dead tuples to
542 * identify most recent deletion. This is crucial because only the latest
543 * deletion is relevant for resolving conflicts.
544 *
545 * For example, consider a scenario on the subscriber where a row is deleted,
546 * re-inserted, and then deleted again only on the subscriber:
547 *
548 * - (pk, 1) - deleted at 9:00,
549 * - (pk, 1) - deleted at 9:02,
550 *
551 * Now, a remote update arrives: (pk, 1) -> (pk, 2), timestamped at 9:01.
552 *
553 * If we mistakenly return the older deletion (9:00), the system may wrongly
554 * apply the remote update using a last-update-wins strategy. Instead, we must
555 * recognize the more recent deletion at 9:02 and skip the update. See
556 * comments atop worker.c for details. Note, as of now, conflict resolution
557 * is not implemented. Consequently, the system may incorrectly report the
558 * older tuple as the conflicted one, leading to misleading results.
559 *
560 * The commit timestamp of the deleting transaction is used to determine which
561 * tuple was deleted most recently.
562 */
563bool
569{
571 TableScanDesc scan;
575
576 Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
577
580 *delete_time = 0;
581
582 /*
583 * If the relation has a replica identity key or a primary key that is
584 * unusable for locating deleted tuples (see
585 * IsIndexUsableForFindingDeletedTuple), a full table scan becomes
586 * necessary. In such cases, comparing the entire tuple is not required,
587 * since the remote tuple might not include all column values. Instead,
588 * the indexed columns alone are sufficient to identify the target tuple
589 * (see logicalrep_rel_mark_updatable).
590 */
593
594 /* fallback to PK if no replica identity */
595 if (!indexbitmap)
598
599 eq = palloc0_array(TypeCacheEntry *, searchslot->tts_tupleDescriptor->natts);
600
601 /*
602 * Start a heap scan using SnapshotAny to identify dead tuples that are
603 * not visible under a standard MVCC snapshot. Tuples from transactions
604 * not yet committed or those just committed prior to the scan are
605 * excluded in update_most_recent_deletion_info().
606 */
607 scan = table_beginscan(rel, SnapshotAny, 0, NULL,
608 SO_NONE);
610
611 table_rescan(scan, NULL);
612
613 /* Try to find the tuple */
615 {
617 continue;
618
621 }
622
623 table_endscan(scan);
625
626 return *delete_time != 0;
627}
628
629/*
630 * Similar to RelationFindDeletedTupleInfoSeq() but using index scan to locate
631 * the deleted tuple.
632 */
633bool
640{
643 int skey_attoff;
644 IndexScanDesc scan;
649
650 Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
652
654 *delete_time = 0;
656
658
660
662
663 /* Build scan key. */
665
666 /*
667 * Start an index scan using SnapshotAny to identify dead tuples that are
668 * not visible under a standard MVCC snapshot. Tuples from transactions
669 * not yet committed or those just committed prior to the scan are
670 * excluded in update_most_recent_deletion_info().
671 */
672 scan = index_beginscan(rel, idxrel,
674
675 index_rescan(scan, skey, skey_attoff, NULL, 0);
676
677 /* Try to find the tuple */
679 {
680 /*
681 * Avoid expensive equality check if the index is primary key or
682 * replica identity index.
683 */
685 {
686 if (eq == NULL)
687 eq = palloc0_array(TypeCacheEntry *, scanslot->tts_tupleDescriptor->natts);
688
690 continue;
691 }
692
695 }
696
697 index_endscan(scan);
698
700
702
703 return *delete_time != 0;
704}
705
706/*
707 * Find the tuple that violates the passed unique index (conflictindex).
708 *
709 * If the conflicting tuple is found return true, otherwise false.
710 *
711 * We lock the tuple to avoid getting it deleted before the caller can fetch
712 * the required information. Note that if the tuple is deleted before a lock
713 * is acquired, we will retry to find the conflicting tuple again.
714 */
715static bool
716FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate,
719{
720 Relation rel = resultRelInfo->ri_RelationDesc;
722 TM_FailureData tmfd;
723 TM_Result res;
724
726
727 /*
728 * Build additional information required to check constraints violations.
729 * See check_exclusion_or_unique_constraint().
730 */
732
733retry:
734 if (ExecCheckIndexConstraints(resultRelInfo, slot, estate,
735 &conflictTid, &slot->tts_tid,
737 {
738 if (*conflictslot)
740
742 return false;
743 }
744
746
748
751 GetCurrentCommandId(false),
754 0 /* don't follow updates */ ,
755 &tmfd);
756
758
759 if (should_refetch_tuple(res, &tmfd))
760 goto retry;
761
762 return true;
763}
764
765/*
766 * Check all the unique indexes in 'recheckIndexes' for conflict with the
767 * tuple in 'remoteslot' and report if found.
768 */
769static void
773{
776
777 /* Check all the unique indexes for conflicts */
779 {
781 FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
782 &conflictslot))
783 {
785
787 conflicttuple->indexoid = uniqueidx;
788
790 &conflicttuple->origin, &conflicttuple->ts);
791
793 }
794 }
795
796 /* Report the conflict, if found */
797 if (conflicttuples)
798 ReportApplyConflict(estate, resultRelInfo, ERROR,
801}
802
803/*
804 * Insert tuple represented in the slot to the relation, update the indexes,
805 * and execute any constraints and per-row triggers.
806 *
807 * Caller is responsible for opening the indexes.
808 */
809void
811 EState *estate, TupleTableSlot *slot)
812{
813 bool skip_tuple = false;
814 Relation rel = resultRelInfo->ri_RelationDesc;
815
816 /* For now we support only tables. */
817 Assert(rel->rd_rel->relkind == RELKIND_RELATION);
818
820
821 /* BEFORE ROW INSERT Triggers */
822 if (resultRelInfo->ri_TrigDesc &&
823 resultRelInfo->ri_TrigDesc->trig_insert_before_row)
824 {
825 if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
826 skip_tuple = true; /* "do nothing" */
827 }
828
829 if (!skip_tuple)
830 {
833 bool conflict = false;
834
835 /* Compute stored generated columns */
836 if (rel->rd_att->constr &&
838 ExecComputeStoredGenerated(resultRelInfo, estate, slot,
839 CMD_INSERT);
840
841 /* Check the constraints of the tuple */
842 if (rel->rd_att->constr)
843 ExecConstraints(resultRelInfo, slot, estate);
844 if (rel->rd_rel->relispartition)
845 ExecPartitionCheck(resultRelInfo, slot, estate, true);
846
847 /* OK, store the tuple and create index entries for it */
848 simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
849
851
852 if (resultRelInfo->ri_NumIndices > 0)
853 {
854 uint32 flags;
855
856 if (conflictindexes != NIL)
857 flags = EIIT_NO_DUPE_ERROR;
858 else
859 flags = 0;
861 estate, flags,
862 slot, conflictindexes,
863 &conflict);
864 }
865
866 /*
867 * Checks the conflict indexes to fetch the conflicting local row and
868 * reports the conflict. We perform this check here, instead of
869 * performing an additional index scan before the actual insertion and
870 * reporting the conflict if any conflicting rows are found. This is
871 * to avoid the overhead of executing the extra scan for each INSERT
872 * operation, even when no conflict arises, which could introduce
873 * significant overhead to replication, particularly in cases where
874 * conflicts are rare.
875 *
876 * XXX OTOH, this could lead to clean-up effort for dead tuples added
877 * in heap and index in case of conflicts. But as conflicts shouldn't
878 * be a frequent thing so we preferred to save the performance
879 * overhead of extra scan before each insertion.
880 */
881 if (conflict)
882 CheckAndReportConflict(resultRelInfo, estate, CT_INSERT_EXISTS,
883 recheckIndexes, NULL, slot);
884
885 /* AFTER ROW INSERT Triggers */
886 ExecARInsertTriggers(estate, resultRelInfo, slot,
888
889 /*
890 * XXX we should in theory pass a TransitionCaptureState object to the
891 * above to capture transition tuples, but after statement triggers
892 * don't actually get fired by replication yet anyway
893 */
894
896 }
897}
898
899/*
900 * Find the searchslot tuple and update it with data in the slot,
901 * update the indexes, and execute any constraints and per-row triggers.
902 *
903 * Caller is responsible for opening the indexes.
904 */
905void
907 EState *estate, EPQState *epqstate,
909{
910 bool skip_tuple = false;
911 Relation rel = resultRelInfo->ri_RelationDesc;
912 ItemPointer tid = &(searchslot->tts_tid);
913
914 /*
915 * We support only non-system tables, with
916 * check_publication_add_relation() accountable.
917 */
918 Assert(rel->rd_rel->relkind == RELKIND_RELATION);
920
922
923 /* BEFORE ROW UPDATE Triggers */
924 if (resultRelInfo->ri_TrigDesc &&
925 resultRelInfo->ri_TrigDesc->trig_update_before_row)
926 {
927 if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
928 tid, NULL, slot, NULL, NULL, false))
929 skip_tuple = true; /* "do nothing" */
930 }
931
932 if (!skip_tuple)
933 {
937 bool conflict = false;
938
939 /* Compute stored generated columns */
940 if (rel->rd_att->constr &&
942 ExecComputeStoredGenerated(resultRelInfo, estate, slot,
943 CMD_UPDATE);
944
945 /* Check the constraints of the tuple */
946 if (rel->rd_att->constr)
947 ExecConstraints(resultRelInfo, slot, estate);
948 if (rel->rd_rel->relispartition)
949 ExecPartitionCheck(resultRelInfo, slot, estate, true);
950
951 simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
953
955
956 if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
957 {
958 uint32 flags = EIIT_IS_UPDATE;
959
960 if (conflictindexes != NIL)
961 flags |= EIIT_NO_DUPE_ERROR;
963 flags |= EIIT_ONLY_SUMMARIZING;
965 estate, flags,
966 slot, conflictindexes,
967 &conflict);
968 }
969
970 /*
971 * Refer to the comments above the call to CheckAndReportConflict() in
972 * ExecSimpleRelationInsert to understand why this check is done at
973 * this point.
974 */
975 if (conflict)
976 CheckAndReportConflict(resultRelInfo, estate, CT_UPDATE_EXISTS,
978
979 /* AFTER ROW UPDATE Triggers */
980 ExecARUpdateTriggers(estate, resultRelInfo,
981 NULL, NULL,
982 tid, NULL, slot,
983 recheckIndexes, NULL, false);
984
986 }
987}
988
989/*
990 * Find the searchslot tuple and delete it, and execute any constraints
991 * and per-row triggers.
992 *
993 * Caller is responsible for opening the indexes.
994 */
995void
997 EState *estate, EPQState *epqstate,
999{
1000 bool skip_tuple = false;
1001 Relation rel = resultRelInfo->ri_RelationDesc;
1002 ItemPointer tid = &searchslot->tts_tid;
1003
1005
1006 /* BEFORE ROW DELETE Triggers */
1007 if (resultRelInfo->ri_TrigDesc &&
1008 resultRelInfo->ri_TrigDesc->trig_delete_before_row)
1009 {
1010 skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
1011 tid, NULL, NULL, NULL, NULL, false);
1012 }
1013
1014 if (!skip_tuple)
1015 {
1016 /* OK, delete the tuple */
1017 simple_table_tuple_delete(rel, tid, estate->es_snapshot);
1018
1019 /* AFTER ROW DELETE Triggers */
1020 ExecARDeleteTriggers(estate, resultRelInfo,
1021 tid, NULL, NULL, false);
1022 }
1023}
1024
1025/*
1026 * Check if command can be executed with current replica identity.
1027 */
1028void
1030{
1032
1033 /*
1034 * Skip checking the replica identity for partitioned tables, because the
1035 * operations are actually performed on the leaf partitions.
1036 */
1037 if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1038 return;
1039
1040 /* We only need to do checks for UPDATE and DELETE. */
1041 if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
1042 return;
1043
1044 /*
1045 * It is only safe to execute UPDATE/DELETE if the relation does not
1046 * publish UPDATEs or DELETEs, or all the following conditions are
1047 * satisfied:
1048 *
1049 * 1. All columns, referenced in the row filters from publications which
1050 * the relation is in, are valid - i.e. when all referenced columns are
1051 * part of REPLICA IDENTITY.
1052 *
1053 * 2. All columns, referenced in the column lists are valid - i.e. when
1054 * all columns referenced in the REPLICA IDENTITY are covered by the
1055 * column list.
1056 *
1057 * 3. All generated columns in REPLICA IDENTITY of the relation, are valid
1058 * - i.e. when all these generated columns are published.
1059 *
1060 * XXX We could optimize it by first checking whether any of the
1061 * publications have a row filter or column list for this relation, or if
1062 * the relation contains a generated column. If none of these exist and
1063 * the relation has replica identity then we can avoid building the
1064 * descriptor but as this happens only one time it doesn't seem worth the
1065 * additional complexity.
1066 */
1068 if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
1069 ereport(ERROR,
1071 errmsg("cannot update table \"%s\"",
1073 errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
1074 else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
1075 ereport(ERROR,
1077 errmsg("cannot update table \"%s\"",
1079 errdetail("Column list used by the publication does not cover the replica identity.")));
1080 else if (cmd == CMD_UPDATE && !pubdesc.gencols_valid_for_update)
1081 ereport(ERROR,
1083 errmsg("cannot update table \"%s\"",
1085 errdetail("Replica identity must not contain unpublished generated columns.")));
1086 else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
1087 ereport(ERROR,
1089 errmsg("cannot delete from table \"%s\"",
1091 errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
1092 else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
1093 ereport(ERROR,
1095 errmsg("cannot delete from table \"%s\"",
1097 errdetail("Column list used by the publication does not cover the replica identity.")));
1098 else if (cmd == CMD_DELETE && !pubdesc.gencols_valid_for_delete)
1099 ereport(ERROR,
1101 errmsg("cannot delete from table \"%s\"",
1103 errdetail("Replica identity must not contain unpublished generated columns.")));
1104
1105 /* If relation has replica identity we are always good. */
1107 return;
1108
1109 /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
1110 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
1111 return;
1112
1113 /*
1114 * This is UPDATE/DELETE and there is no replica identity.
1115 *
1116 * Check if the table publishes UPDATES or DELETES.
1117 */
1118 if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
1119 ereport(ERROR,
1121 errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
1123 errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
1124 else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
1125 ereport(ERROR,
1127 errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
1129 errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
1130}
1131
1132
1133/*
1134 * Check if we support writing into specific relkind of local relation and check
1135 * if it aligns with the relkind of the relation on the publisher.
1136 *
1137 * The nspname and relname are only needed for error reporting.
1138 */
1139void
1141 const char *nspname, const char *relname)
1142{
1146 ereport(ERROR,
1148 errmsg("cannot use relation \"%s.%s\" as logical replication target",
1149 nspname, relname),
1151
1152 /*
1153 * Allow RELKIND_RELATION and RELKIND_PARTITIONED_TABLE to be treated
1154 * interchangeably, but ensure that sequences (RELKIND_SEQUENCE) match
1155 * exactly on both publisher and subscriber.
1156 */
1159 ereport(ERROR,
1161 /* translator: 3rd and 4th %s are "sequence" or "table" */
1162 errmsg("relation \"%s.%s\" type mismatch: source \"%s\", target \"%s\"",
1163 nspname, relname,
1164 remoterelkind == RELKIND_SEQUENCE ? "sequence" : "table",
1165 localrelkind == RELKIND_SEQUENCE ? "sequence" : "table"));
1166}
StrategyNumber IndexAmTranslateCompareType(CompareType cmptype, Oid amoid, Oid opfamily, bool missing_ok)
Definition amapi.c:161
#define AttributeNumberIsValid(attributeNumber)
Definition attnum.h:34
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition timestamp.c:1775
bool bms_is_member(int x, const Bitmapset *a)
Definition bitmapset.c:510
int Buffer
Definition buf.h:23
@ BUFFER_LOCK_SHARE
Definition bufmgr.h:212
@ BUFFER_LOCK_UNLOCK
Definition bufmgr.h:207
static void LockBuffer(Buffer buffer, BufferLockMode mode)
Definition bufmgr.h:334
#define PG_USED_FOR_ASSERTS_ONLY
Definition c.h:249
#define Assert(condition)
Definition c.h:943
regproc RegProcedure
Definition c.h:734
uint32_t uint32
Definition c.h:624
uint32 TransactionId
Definition c.h:736
#define OidIsValid(objectId)
Definition c.h:858
bool IsCatalogRelation(Relation relation)
Definition catalog.c:104
@ COMPARE_EQ
Definition cmptype.h:36
bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, ReplOriginId *nodeid)
Definition commit_ts.c:283
bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, ReplOriginId *localorigin, TimestampTz *localts)
Definition conflict.c:64
void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *remoteslot, List *conflicttuples)
Definition conflict.c:105
ConflictType
Definition conflict.h:32
@ CT_MULTIPLE_UNIQUE_CONFLICTS
Definition conflict.h:55
@ CT_INSERT_EXISTS
Definition conflict.h:34
@ CT_UPDATE_EXISTS
Definition conflict.h:40
int64 TimestampTz
Definition timestamp.h:39
int errcode(int sqlerrcode)
Definition elog.c:874
#define LOG
Definition elog.h:32
int errhint(const char *fmt,...) pg_attribute_printf(1
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define ereport(elevel,...)
Definition elog.h:152
List * ExecInsertIndexTuples(ResultRelInfo *resultRelInfo, EState *estate, uint32 flags, TupleTableSlot *slot, List *arbiterIndexes, bool *specConflict)
bool ExecCheckIndexConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, ItemPointer conflictTid, const ItemPointerData *tupleid, List *arbiterIndexes)
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition execMain.c:1885
void ExecConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate)
Definition execMain.c:2009
static void BuildConflictIndexInfo(ResultRelInfo *resultRelInfo, Oid conflictindex)
void CheckSubscriptionRelkind(char localrelkind, char remoterelkind, const char *nspname, const char *relname)
static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, TypeCacheEntry **eq, Bitmapset *columns)
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
static bool should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
void CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
bool RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid, TupleTableSlot *searchslot, TransactionId oldestxmin, TransactionId *delete_xid, ReplOriginId *delete_origin, TimestampTz *delete_time)
bool RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot, TransactionId oldestxmin, TransactionId *delete_xid, ReplOriginId *delete_origin, TimestampTz *delete_time)
static bool FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate, Oid conflictindex, TupleTableSlot *slot, TupleTableSlot **conflictslot)
static int build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *searchslot)
static void update_most_recent_deletion_info(TupleTableSlot *scanslot, TransactionId oldestxmin, TransactionId *delete_xid, TimestampTz *delete_time, ReplOriginId *delete_origin)
static void CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate, ConflictType type, List *recheckIndexes, TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
HeapTuple ExecFetchSlotHeapTuple(TupleTableSlot *slot, bool materialize, bool *shouldFree)
#define EIIT_IS_UPDATE
Definition executor.h:757
#define EIIT_ONLY_SUMMARIZING
Definition executor.h:759
#define EIIT_NO_DUPE_ERROR
Definition executor.h:758
#define palloc0_array(type, count)
Definition fe_memutils.h:77
#define palloc0_object(type)
Definition fe_memutils.h:75
Datum FunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
Definition fmgr.c:1151
char * format_type_be(Oid type_oid)
@ HEAPTUPLE_RECENTLY_DEAD
Definition heapam.h:140
HTSV_Result HeapTupleSatisfiesVacuum(HeapTuple htup, TransactionId OldestXmin, Buffer buffer)
static TransactionId HeapTupleHeaderGetUpdateXid(const HeapTupleHeaderData *tup)
void BuildSpeculativeIndexInfo(Relation index, IndexInfo *ii)
Definition index.c:2687
bool index_getnext_slot(IndexScanDesc scan, ScanDirection direction, TupleTableSlot *slot)
Definition indexam.c:698
IndexScanDesc index_beginscan(Relation heapRelation, Relation indexRelation, Snapshot snapshot, IndexScanInstrumentation *instrument, int nkeys, int norderbys, uint32 flags)
Definition indexam.c:257
void index_close(Relation relation, LOCKMODE lockmode)
Definition indexam.c:178
void index_endscan(IndexScanDesc scan)
Definition indexam.c:394
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition indexam.c:134
void index_rescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, int norderbys)
Definition indexam.c:368
int i
Definition isn.c:77
static bool ItemPointerIndicatesMovedPartitions(const ItemPointerData *pointer)
Definition itemptr.h:197
List * lappend(List *list, void *datum)
Definition list.c:339
void list_free(List *list)
Definition list.c:1546
bool list_member_oid(const List *list, Oid datum)
Definition list.c:722
void XactLockTableWait(TransactionId xid, Relation rel, const ItemPointerData *ctid, XLTW_Oper oper)
Definition lmgr.c:663
@ XLTW_None
Definition lmgr.h:26
#define NoLock
Definition lockdefs.h:34
#define RowExclusiveLock
Definition lockdefs.h:38
@ LockWaitBlock
Definition lockoptions.h:40
LockTupleMode
Definition lockoptions.h:51
@ LockTupleShare
Definition lockoptions.h:55
Oid get_opclass_input_type(Oid opclass)
Definition lsyscache.c:1384
Oid get_opclass_family(Oid opclass)
Definition lsyscache.c:1362
RegProcedure get_opcode(Oid opno)
Definition lsyscache.c:1505
Oid get_opfamily_member(Oid opfamily, Oid lefttype, Oid righttype, int16 strategy)
Definition lsyscache.c:170
void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot, CmdType cmdtype)
CmdType
Definition nodes.h:273
@ CMD_INSERT
Definition nodes.h:277
@ CMD_DELETE
Definition nodes.h:278
@ CMD_UPDATE
Definition nodes.h:276
static char * errmsg
#define InvalidReplOriginId
Definition origin.h:33
FormData_pg_attribute * Form_pg_attribute
int errdetail_relkind_not_supported(char relkind)
Definition pg_class.c:24
NameData relname
Definition pg_class.h:40
#define INDEX_MAX_KEYS
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
#define list_make1_oid(x1)
Definition pg_list.h:274
#define foreach_oid(var, lst)
Definition pg_list.h:503
static char buf[DEFAULT_XLOG_SEG_SIZE]
#define ERRCODE_T_R_SERIALIZATION_FAILURE
Definition pgbench.c:77
static bool DatumGetBool(Datum X)
Definition postgres.h:100
uint64_t Datum
Definition postgres.h:70
static Pointer DatumGetPointer(Datum X)
Definition postgres.h:332
unsigned int Oid
static int fb(int x)
#define RelationGetRelid(relation)
Definition rel.h:516
#define RelationGetDescr(relation)
Definition rel.h:542
#define RelationGetRelationName(relation)
Definition rel.h:550
#define IndexRelationGetNumberOfKeyAttributes(relation)
Definition rel.h:535
void RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc)
Definition relcache.c:5785
Bitmapset * RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
Definition relcache.c:5294
Oid RelationGetReplicaIndex(Relation relation)
Definition relcache.c:5063
@ INDEX_ATTR_BITMAP_PRIMARY_KEY
Definition relcache.h:70
@ INDEX_ATTR_BITMAP_IDENTITY_KEY
Definition relcache.h:71
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition scankey.c:76
@ ForwardScanDirection
Definition sdir.h:28
#define SK_SEARCHNULL
Definition skey.h:121
#define SK_ISNULL
Definition skey.h:115
Snapshot GetLatestSnapshot(void)
Definition snapmgr.c:354
void PushActiveSnapshot(Snapshot snapshot)
Definition snapmgr.c:682
void PopActiveSnapshot(void)
Definition snapmgr.c:775
Snapshot GetActiveSnapshot(void)
Definition snapmgr.c:800
#define SnapshotAny
Definition snapmgr.h:33
#define InitDirtySnapshot(snapshotdata)
Definition snapmgr.h:42
Oid GetRelationIdentityOrPK(Relation rel)
Definition relation.c:905
uint16 StrategyNumber
Definition stratnum.h:22
Snapshot es_snapshot
Definition execnodes.h:696
Oid fn_oid
Definition fmgr.h:59
HeapTupleHeader t_data
Definition htup.h:68
Definition pg_list.h:54
TupleDesc rd_att
Definition rel.h:112
Form_pg_class rd_rel
Definition rel.h:111
List * ri_onConflictArbiterIndexes
Definition execnodes.h:613
Relation ri_RelationDesc
Definition execnodes.h:513
RelationPtr ri_IndexRelationDescs
Definition execnodes.h:519
TriggerDesc * ri_TrigDesc
Definition execnodes.h:548
IndexInfo ** ri_IndexRelationInfo
Definition execnodes.h:522
ItemPointerData ctid
Definition tableam.h:171
bool trig_delete_before_row
Definition reltrigger.h:66
bool trig_update_before_row
Definition reltrigger.h:61
bool trig_insert_before_row
Definition reltrigger.h:56
bool has_generated_stored
Definition tupdesc.h:46
TupleConstr * constr
Definition tupdesc.h:159
ItemPointerData tts_tid
Definition tuptable.h:142
FmgrInfo eq_opr_finfo
Definition typcache.h:76
Definition c.h:815
Oid values[FLEXIBLE_ARRAY_MEMBER]
Definition c.h:822
#define FirstLowInvalidHeapAttributeNumber
Definition sysattr.h:27
Datum SysCacheGetAttrNotNull(SysCacheIdentifier cacheId, HeapTuple tup, AttrNumber attributeNumber)
Definition syscache.c:626
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition tableam.c:92
void simple_table_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot, Snapshot snapshot, TU_UpdateIndexes *update_indexes)
Definition tableam.c:361
void simple_table_tuple_insert(Relation rel, TupleTableSlot *slot)
Definition tableam.c:302
void simple_table_tuple_delete(Relation rel, ItemPointer tid, Snapshot snapshot)
Definition tableam.c:316
@ SO_NONE
Definition tableam.h:49
TU_UpdateIndexes
Definition tableam.h:133
@ TU_Summarizing
Definition tableam.h:141
@ TU_None
Definition tableam.h:135
static void table_endscan(TableScanDesc scan)
Definition tableam.h:1061
TM_Result
Definition tableam.h:95
@ TM_Ok
Definition tableam.h:100
@ TM_Deleted
Definition tableam.h:115
@ TM_Updated
Definition tableam.h:112
@ TM_Invisible
Definition tableam.h:103
static TM_Result table_tuple_lock(Relation rel, ItemPointer tid, Snapshot snapshot, TupleTableSlot *slot, CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, uint8 flags, TM_FailureData *tmfd)
Definition tableam.h:1648
static void table_rescan(TableScanDesc scan, ScanKeyData *key)
Definition tableam.h:1070
static TableScanDesc table_beginscan(Relation rel, Snapshot snapshot, int nkeys, ScanKeyData *key, uint32 flags)
Definition tableam.h:943
static bool table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
Definition tableam.h:1096
#define InvalidTransactionId
Definition transam.h:31
#define TransactionIdIsValid(xid)
Definition transam.h:41
bool ExecBRUpdateTriggers(EState *estate, EPQState *epqstate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot *newslot, TM_Result *tmresult, TM_FailureData *tmfd, bool is_merge_update)
Definition trigger.c:2973
void ExecARDeleteTriggers(EState *estate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TransitionCaptureState *transition_capture, bool is_crosspart_update)
Definition trigger.c:2803
bool ExecBRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)
Definition trigger.c:2467
void ExecARUpdateTriggers(EState *estate, ResultRelInfo *relinfo, ResultRelInfo *src_partinfo, ResultRelInfo *dst_partinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot *newslot, List *recheckIndexes, TransitionCaptureState *transition_capture, bool is_crosspart_update)
Definition trigger.c:3146
bool ExecBRDeleteTriggers(EState *estate, EPQState *epqstate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot **epqslot, TM_Result *tmresult, TM_FailureData *tmfd, bool is_merge_delete)
Definition trigger.c:2703
void ExecARInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot, List *recheckIndexes, TransitionCaptureState *transition_capture)
Definition trigger.c:2545
bool equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2)
Definition tupdesc.c:648
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition tupdesc.h:178
static void slot_getallattrs(TupleTableSlot *slot)
Definition tuptable.h:390
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition tuptable.h:543
static void ExecMaterializeSlot(TupleTableSlot *slot)
Definition tuptable.h:494
TypeCacheEntry * lookup_type_cache(Oid type_id, int flags)
Definition typcache.c:389
#define TYPECACHE_EQ_OPR_FINFO
Definition typcache.h:143
const char * type
CommandId GetCurrentCommandId(bool used)
Definition xact.c:831
uint16 ReplOriginId
Definition xlogdefs.h:69