PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
execReplication.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * execReplication.c
4 * miscellaneous executor routines for logical replication
5 *
6 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/executor/execReplication.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres.h"
16
17#include "access/genam.h"
18#include "access/gist.h"
19#include "access/relscan.h"
20#include "access/tableam.h"
21#include "access/transam.h"
22#include "access/xact.h"
23#include "catalog/pg_am_d.h"
24#include "commands/trigger.h"
25#include "executor/executor.h"
29#include "storage/lmgr.h"
30#include "utils/builtins.h"
31#include "utils/lsyscache.h"
32#include "utils/rel.h"
33#include "utils/snapmgr.h"
34#include "utils/syscache.h"
35#include "utils/typcache.h"
36
37
38static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
39 TypeCacheEntry **eq);
40
41/*
42 * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
43 * is setup to match 'rel' (*NOT* idxrel!).
44 *
45 * Returns how many columns to use for the index scan.
46 *
47 * This is not generic routine, idxrel must be PK, RI, or an index that can be
48 * used for REPLICA IDENTITY FULL table. See FindUsableIndexForReplicaIdentityFull()
49 * for details.
50 *
51 * By definition, replication identity of a rel meets all limitations associated
52 * with that. Note that any other index could also meet these limitations.
53 */
54static int
56 TupleTableSlot *searchslot)
57{
58 int index_attoff;
59 int skey_attoff = 0;
60 Datum indclassDatum;
61 oidvector *opclass;
62 int2vector *indkey = &idxrel->rd_index->indkey;
63
64 indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, idxrel->rd_indextuple,
65 Anum_pg_index_indclass);
66 opclass = (oidvector *) DatumGetPointer(indclassDatum);
67
68 /* Build scankey for every non-expression attribute in the index. */
69 for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
70 index_attoff++)
71 {
72 Oid operator;
73 Oid optype;
74 Oid opfamily;
75 RegProcedure regop;
76 int table_attno = indkey->values[index_attoff];
77 StrategyNumber eq_strategy;
78
79 if (!AttributeNumberIsValid(table_attno))
80 {
81 /*
82 * XXX: Currently, we don't support expressions in the scan key,
83 * see code below.
84 */
85 continue;
86 }
87
88 /*
89 * Load the operator info. We need this to get the equality operator
90 * function for the scan key.
91 */
92 optype = get_opclass_input_type(opclass->values[index_attoff]);
93 opfamily = get_opclass_family(opclass->values[index_attoff]);
94 eq_strategy = IndexAmTranslateCompareType(COMPARE_EQ, idxrel->rd_rel->relam, opfamily, false);
95 operator = get_opfamily_member(opfamily, optype,
96 optype,
97 eq_strategy);
98
99 if (!OidIsValid(operator))
100 elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
101 eq_strategy, optype, optype, opfamily);
102
103 regop = get_opcode(operator);
104
105 /* Initialize the scankey. */
106 ScanKeyInit(&skey[skey_attoff],
107 index_attoff + 1,
108 eq_strategy,
109 regop,
110 searchslot->tts_values[table_attno - 1]);
111
112 skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
113
114 /* Check for null value. */
115 if (searchslot->tts_isnull[table_attno - 1])
116 skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
117
118 skey_attoff++;
119 }
120
121 /* There must always be at least one attribute for the index scan. */
122 Assert(skey_attoff > 0);
123
124 return skey_attoff;
125}
126
127
128/*
129 * Helper function to check if it is necessary to re-fetch and lock the tuple
130 * due to concurrent modifications. This function should be called after
131 * invoking table_tuple_lock.
132 */
133static bool
135{
136 bool refetch = false;
137
138 switch (res)
139 {
140 case TM_Ok:
141 break;
142 case TM_Updated:
143 /* XXX: Improve handling here */
145 ereport(LOG,
147 errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
148 else
149 ereport(LOG,
151 errmsg("concurrent update, retrying")));
152 refetch = true;
153 break;
154 case TM_Deleted:
155 /* XXX: Improve handling here */
156 ereport(LOG,
158 errmsg("concurrent delete, retrying")));
159 refetch = true;
160 break;
161 case TM_Invisible:
162 elog(ERROR, "attempted to lock invisible tuple");
163 break;
164 default:
165 elog(ERROR, "unexpected table_tuple_lock status: %u", res);
166 break;
167 }
168
169 return refetch;
170}
171
172/*
173 * Search the relation 'rel' for tuple using the index.
174 *
175 * If a matching tuple is found, lock it with lockmode, fill the slot with its
176 * contents, and return true. Return false otherwise.
177 */
178bool
180 LockTupleMode lockmode,
181 TupleTableSlot *searchslot,
182 TupleTableSlot *outslot)
183{
185 int skey_attoff;
186 IndexScanDesc scan;
187 SnapshotData snap;
188 TransactionId xwait;
189 Relation idxrel;
190 bool found;
191 TypeCacheEntry **eq = NULL;
192 bool isIdxSafeToSkipDuplicates;
193
194 /* Open the index. */
195 idxrel = index_open(idxoid, RowExclusiveLock);
196
197 isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
198
199 InitDirtySnapshot(snap);
200
201 /* Build scan key. */
202 skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
203
204 /* Start an index scan. */
205 scan = index_beginscan(rel, idxrel, &snap, NULL, skey_attoff, 0);
206
207retry:
208 found = false;
209
210 index_rescan(scan, skey, skey_attoff, NULL, 0);
211
212 /* Try to find the tuple */
213 while (index_getnext_slot(scan, ForwardScanDirection, outslot))
214 {
215 /*
216 * Avoid expensive equality check if the index is primary key or
217 * replica identity index.
218 */
219 if (!isIdxSafeToSkipDuplicates)
220 {
221 if (eq == NULL)
222 eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
223
224 if (!tuples_equal(outslot, searchslot, eq))
225 continue;
226 }
227
228 ExecMaterializeSlot(outslot);
229
230 xwait = TransactionIdIsValid(snap.xmin) ?
231 snap.xmin : snap.xmax;
232
233 /*
234 * If the tuple is locked, wait for locking transaction to finish and
235 * retry.
236 */
237 if (TransactionIdIsValid(xwait))
238 {
239 XactLockTableWait(xwait, NULL, NULL, XLTW_None);
240 goto retry;
241 }
242
243 /* Found our tuple and it's not locked */
244 found = true;
245 break;
246 }
247
248 /* Found tuple, try to lock it in the lockmode. */
249 if (found)
250 {
251 TM_FailureData tmfd;
252 TM_Result res;
253
255
256 res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
257 outslot,
258 GetCurrentCommandId(false),
259 lockmode,
261 0 /* don't follow updates */ ,
262 &tmfd);
263
265
266 if (should_refetch_tuple(res, &tmfd))
267 goto retry;
268 }
269
270 index_endscan(scan);
271
272 /* Don't release lock until commit. */
273 index_close(idxrel, NoLock);
274
275 return found;
276}
277
278/*
279 * Compare the tuples in the slots by checking if they have equal values.
280 */
281static bool
283 TypeCacheEntry **eq)
284{
285 int attrnum;
286
288 slot2->tts_tupleDescriptor->natts);
289
290 slot_getallattrs(slot1);
291 slot_getallattrs(slot2);
292
293 /* Check equality of the attributes. */
294 for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
295 {
297 TypeCacheEntry *typentry;
298
299 att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
300
301 /*
302 * Ignore dropped and generated columns as the publisher doesn't send
303 * those
304 */
305 if (att->attisdropped || att->attgenerated)
306 continue;
307
308 /*
309 * If one value is NULL and other is not, then they are certainly not
310 * equal
311 */
312 if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
313 return false;
314
315 /*
316 * If both are NULL, they can be considered equal.
317 */
318 if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
319 continue;
320
321 typentry = eq[attrnum];
322 if (typentry == NULL)
323 {
324 typentry = lookup_type_cache(att->atttypid,
326 if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
328 (errcode(ERRCODE_UNDEFINED_FUNCTION),
329 errmsg("could not identify an equality operator for type %s",
330 format_type_be(att->atttypid))));
331 eq[attrnum] = typentry;
332 }
333
335 att->attcollation,
336 slot1->tts_values[attrnum],
337 slot2->tts_values[attrnum])))
338 return false;
339 }
340
341 return true;
342}
343
344/*
345 * Search the relation 'rel' for tuple using the sequential scan.
346 *
347 * If a matching tuple is found, lock it with lockmode, fill the slot with its
348 * contents, and return true. Return false otherwise.
349 *
350 * Note that this stops on the first matching tuple.
351 *
352 * This can obviously be quite slow on tables that have more than few rows.
353 */
354bool
356 TupleTableSlot *searchslot, TupleTableSlot *outslot)
357{
358 TupleTableSlot *scanslot;
359 TableScanDesc scan;
360 SnapshotData snap;
361 TypeCacheEntry **eq;
362 TransactionId xwait;
363 bool found;
365
367
368 eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
369
370 /* Start a heap scan. */
371 InitDirtySnapshot(snap);
372 scan = table_beginscan(rel, &snap, 0, NULL);
373 scanslot = table_slot_create(rel, NULL);
374
375retry:
376 found = false;
377
378 table_rescan(scan, NULL);
379
380 /* Try to find the tuple */
381 while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
382 {
383 if (!tuples_equal(scanslot, searchslot, eq))
384 continue;
385
386 found = true;
387 ExecCopySlot(outslot, scanslot);
388
389 xwait = TransactionIdIsValid(snap.xmin) ?
390 snap.xmin : snap.xmax;
391
392 /*
393 * If the tuple is locked, wait for locking transaction to finish and
394 * retry.
395 */
396 if (TransactionIdIsValid(xwait))
397 {
398 XactLockTableWait(xwait, NULL, NULL, XLTW_None);
399 goto retry;
400 }
401
402 /* Found our tuple and it's not locked */
403 break;
404 }
405
406 /* Found tuple, try to lock it in the lockmode. */
407 if (found)
408 {
409 TM_FailureData tmfd;
410 TM_Result res;
411
413
414 res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
415 outslot,
416 GetCurrentCommandId(false),
417 lockmode,
419 0 /* don't follow updates */ ,
420 &tmfd);
421
423
424 if (should_refetch_tuple(res, &tmfd))
425 goto retry;
426 }
427
428 table_endscan(scan);
430
431 return found;
432}
433
434/*
435 * Build additional index information necessary for conflict detection.
436 */
437static void
438BuildConflictIndexInfo(ResultRelInfo *resultRelInfo, Oid conflictindex)
439{
440 for (int i = 0; i < resultRelInfo->ri_NumIndices; i++)
441 {
442 Relation indexRelation = resultRelInfo->ri_IndexRelationDescs[i];
443 IndexInfo *indexRelationInfo = resultRelInfo->ri_IndexRelationInfo[i];
444
445 if (conflictindex != RelationGetRelid(indexRelation))
446 continue;
447
448 /*
449 * This Assert will fail if BuildSpeculativeIndexInfo() is called
450 * twice for the given index.
451 */
452 Assert(indexRelationInfo->ii_UniqueOps == NULL);
453
454 BuildSpeculativeIndexInfo(indexRelation, indexRelationInfo);
455 }
456}
457
458/*
459 * Find the tuple that violates the passed unique index (conflictindex).
460 *
461 * If the conflicting tuple is found return true, otherwise false.
462 *
463 * We lock the tuple to avoid getting it deleted before the caller can fetch
464 * the required information. Note that if the tuple is deleted before a lock
465 * is acquired, we will retry to find the conflicting tuple again.
466 */
467static bool
468FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate,
469 Oid conflictindex, TupleTableSlot *slot,
470 TupleTableSlot **conflictslot)
471{
472 Relation rel = resultRelInfo->ri_RelationDesc;
473 ItemPointerData conflictTid;
474 TM_FailureData tmfd;
475 TM_Result res;
476
477 *conflictslot = NULL;
478
479 /*
480 * Build additional information required to check constraints violations.
481 * See check_exclusion_or_unique_constraint().
482 */
483 BuildConflictIndexInfo(resultRelInfo, conflictindex);
484
485retry:
486 if (ExecCheckIndexConstraints(resultRelInfo, slot, estate,
487 &conflictTid, &slot->tts_tid,
488 list_make1_oid(conflictindex)))
489 {
490 if (*conflictslot)
491 ExecDropSingleTupleTableSlot(*conflictslot);
492
493 *conflictslot = NULL;
494 return false;
495 }
496
497 *conflictslot = table_slot_create(rel, NULL);
498
500
501 res = table_tuple_lock(rel, &conflictTid, GetActiveSnapshot(),
502 *conflictslot,
503 GetCurrentCommandId(false),
506 0 /* don't follow updates */ ,
507 &tmfd);
508
510
511 if (should_refetch_tuple(res, &tmfd))
512 goto retry;
513
514 return true;
515}
516
517/*
518 * Check all the unique indexes in 'recheckIndexes' for conflict with the
519 * tuple in 'remoteslot' and report if found.
520 */
521static void
523 ConflictType type, List *recheckIndexes,
524 TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
525{
526 List *conflicttuples = NIL;
527 TupleTableSlot *conflictslot;
528
529 /* Check all the unique indexes for conflicts */
530 foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
531 {
532 if (list_member_oid(recheckIndexes, uniqueidx) &&
533 FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
534 &conflictslot))
535 {
537
538 conflicttuple->slot = conflictslot;
539 conflicttuple->indexoid = uniqueidx;
540
541 GetTupleTransactionInfo(conflictslot, &conflicttuple->xmin,
542 &conflicttuple->origin, &conflicttuple->ts);
543
544 conflicttuples = lappend(conflicttuples, conflicttuple);
545 }
546 }
547
548 /* Report the conflict, if found */
549 if (conflicttuples)
550 ReportApplyConflict(estate, resultRelInfo, ERROR,
551 list_length(conflicttuples) > 1 ? CT_MULTIPLE_UNIQUE_CONFLICTS : type,
552 searchslot, remoteslot, conflicttuples);
553}
554
555/*
556 * Insert tuple represented in the slot to the relation, update the indexes,
557 * and execute any constraints and per-row triggers.
558 *
559 * Caller is responsible for opening the indexes.
560 */
561void
563 EState *estate, TupleTableSlot *slot)
564{
565 bool skip_tuple = false;
566 Relation rel = resultRelInfo->ri_RelationDesc;
567
568 /* For now we support only tables. */
569 Assert(rel->rd_rel->relkind == RELKIND_RELATION);
570
572
573 /* BEFORE ROW INSERT Triggers */
574 if (resultRelInfo->ri_TrigDesc &&
575 resultRelInfo->ri_TrigDesc->trig_insert_before_row)
576 {
577 if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
578 skip_tuple = true; /* "do nothing" */
579 }
580
581 if (!skip_tuple)
582 {
583 List *recheckIndexes = NIL;
584 List *conflictindexes;
585 bool conflict = false;
586
587 /* Compute stored generated columns */
588 if (rel->rd_att->constr &&
590 ExecComputeStoredGenerated(resultRelInfo, estate, slot,
591 CMD_INSERT);
592
593 /* Check the constraints of the tuple */
594 if (rel->rd_att->constr)
595 ExecConstraints(resultRelInfo, slot, estate);
596 if (rel->rd_rel->relispartition)
597 ExecPartitionCheck(resultRelInfo, slot, estate, true);
598
599 /* OK, store the tuple and create index entries for it */
600 simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
601
602 conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
603
604 if (resultRelInfo->ri_NumIndices > 0)
605 recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
606 slot, estate, false,
607 conflictindexes ? true : false,
608 &conflict,
609 conflictindexes, false);
610
611 /*
612 * Checks the conflict indexes to fetch the conflicting local tuple
613 * and reports the conflict. We perform this check here, instead of
614 * performing an additional index scan before the actual insertion and
615 * reporting the conflict if any conflicting tuples are found. This is
616 * to avoid the overhead of executing the extra scan for each INSERT
617 * operation, even when no conflict arises, which could introduce
618 * significant overhead to replication, particularly in cases where
619 * conflicts are rare.
620 *
621 * XXX OTOH, this could lead to clean-up effort for dead tuples added
622 * in heap and index in case of conflicts. But as conflicts shouldn't
623 * be a frequent thing so we preferred to save the performance
624 * overhead of extra scan before each insertion.
625 */
626 if (conflict)
627 CheckAndReportConflict(resultRelInfo, estate, CT_INSERT_EXISTS,
628 recheckIndexes, NULL, slot);
629
630 /* AFTER ROW INSERT Triggers */
631 ExecARInsertTriggers(estate, resultRelInfo, slot,
632 recheckIndexes, NULL);
633
634 /*
635 * XXX we should in theory pass a TransitionCaptureState object to the
636 * above to capture transition tuples, but after statement triggers
637 * don't actually get fired by replication yet anyway
638 */
639
640 list_free(recheckIndexes);
641 }
642}
643
644/*
645 * Find the searchslot tuple and update it with data in the slot,
646 * update the indexes, and execute any constraints and per-row triggers.
647 *
648 * Caller is responsible for opening the indexes.
649 */
650void
652 EState *estate, EPQState *epqstate,
653 TupleTableSlot *searchslot, TupleTableSlot *slot)
654{
655 bool skip_tuple = false;
656 Relation rel = resultRelInfo->ri_RelationDesc;
657 ItemPointer tid = &(searchslot->tts_tid);
658
659 /*
660 * We support only non-system tables, with
661 * check_publication_add_relation() accountable.
662 */
663 Assert(rel->rd_rel->relkind == RELKIND_RELATION);
665
667
668 /* BEFORE ROW UPDATE Triggers */
669 if (resultRelInfo->ri_TrigDesc &&
670 resultRelInfo->ri_TrigDesc->trig_update_before_row)
671 {
672 if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
673 tid, NULL, slot, NULL, NULL))
674 skip_tuple = true; /* "do nothing" */
675 }
676
677 if (!skip_tuple)
678 {
679 List *recheckIndexes = NIL;
680 TU_UpdateIndexes update_indexes;
681 List *conflictindexes;
682 bool conflict = false;
683
684 /* Compute stored generated columns */
685 if (rel->rd_att->constr &&
687 ExecComputeStoredGenerated(resultRelInfo, estate, slot,
688 CMD_UPDATE);
689
690 /* Check the constraints of the tuple */
691 if (rel->rd_att->constr)
692 ExecConstraints(resultRelInfo, slot, estate);
693 if (rel->rd_rel->relispartition)
694 ExecPartitionCheck(resultRelInfo, slot, estate, true);
695
696 simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
697 &update_indexes);
698
699 conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
700
701 if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
702 recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
703 slot, estate, true,
704 conflictindexes ? true : false,
705 &conflict, conflictindexes,
706 (update_indexes == TU_Summarizing));
707
708 /*
709 * Refer to the comments above the call to CheckAndReportConflict() in
710 * ExecSimpleRelationInsert to understand why this check is done at
711 * this point.
712 */
713 if (conflict)
714 CheckAndReportConflict(resultRelInfo, estate, CT_UPDATE_EXISTS,
715 recheckIndexes, searchslot, slot);
716
717 /* AFTER ROW UPDATE Triggers */
718 ExecARUpdateTriggers(estate, resultRelInfo,
719 NULL, NULL,
720 tid, NULL, slot,
721 recheckIndexes, NULL, false);
722
723 list_free(recheckIndexes);
724 }
725}
726
727/*
728 * Find the searchslot tuple and delete it, and execute any constraints
729 * and per-row triggers.
730 *
731 * Caller is responsible for opening the indexes.
732 */
733void
735 EState *estate, EPQState *epqstate,
736 TupleTableSlot *searchslot)
737{
738 bool skip_tuple = false;
739 Relation rel = resultRelInfo->ri_RelationDesc;
740 ItemPointer tid = &searchslot->tts_tid;
741
743
744 /* BEFORE ROW DELETE Triggers */
745 if (resultRelInfo->ri_TrigDesc &&
746 resultRelInfo->ri_TrigDesc->trig_delete_before_row)
747 {
748 skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
749 tid, NULL, NULL, NULL, NULL);
750 }
751
752 if (!skip_tuple)
753 {
754 /* OK, delete the tuple */
755 simple_table_tuple_delete(rel, tid, estate->es_snapshot);
756
757 /* AFTER ROW DELETE Triggers */
758 ExecARDeleteTriggers(estate, resultRelInfo,
759 tid, NULL, NULL, false);
760 }
761}
762
763/*
764 * Check if command can be executed with current replica identity.
765 */
766void
768{
769 PublicationDesc pubdesc;
770
771 /*
772 * Skip checking the replica identity for partitioned tables, because the
773 * operations are actually performed on the leaf partitions.
774 */
775 if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
776 return;
777
778 /* We only need to do checks for UPDATE and DELETE. */
779 if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
780 return;
781
782 /*
783 * It is only safe to execute UPDATE/DELETE if the relation does not
784 * publish UPDATEs or DELETEs, or all the following conditions are
785 * satisfied:
786 *
787 * 1. All columns, referenced in the row filters from publications which
788 * the relation is in, are valid - i.e. when all referenced columns are
789 * part of REPLICA IDENTITY.
790 *
791 * 2. All columns, referenced in the column lists are valid - i.e. when
792 * all columns referenced in the REPLICA IDENTITY are covered by the
793 * column list.
794 *
795 * 3. All generated columns in REPLICA IDENTITY of the relation, are valid
796 * - i.e. when all these generated columns are published.
797 *
798 * XXX We could optimize it by first checking whether any of the
799 * publications have a row filter or column list for this relation, or if
800 * the relation contains a generated column. If none of these exist and
801 * the relation has replica identity then we can avoid building the
802 * descriptor but as this happens only one time it doesn't seem worth the
803 * additional complexity.
804 */
805 RelationBuildPublicationDesc(rel, &pubdesc);
806 if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
808 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
809 errmsg("cannot update table \"%s\"",
811 errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
812 else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
814 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
815 errmsg("cannot update table \"%s\"",
817 errdetail("Column list used by the publication does not cover the replica identity.")));
818 else if (cmd == CMD_UPDATE && !pubdesc.gencols_valid_for_update)
820 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
821 errmsg("cannot update table \"%s\"",
823 errdetail("Replica identity must not contain unpublished generated columns.")));
824 else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
826 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
827 errmsg("cannot delete from table \"%s\"",
829 errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
830 else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
832 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
833 errmsg("cannot delete from table \"%s\"",
835 errdetail("Column list used by the publication does not cover the replica identity.")));
836 else if (cmd == CMD_DELETE && !pubdesc.gencols_valid_for_delete)
838 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
839 errmsg("cannot delete from table \"%s\"",
841 errdetail("Replica identity must not contain unpublished generated columns.")));
842
843 /* If relation has replica identity we are always good. */
845 return;
846
847 /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
848 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
849 return;
850
851 /*
852 * This is UPDATE/DELETE and there is no replica identity.
853 *
854 * Check if the table publishes UPDATES or DELETES.
855 */
856 if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
858 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
859 errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
861 errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
862 else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
864 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
865 errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
867 errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
868}
869
870
871/*
872 * Check if we support writing into specific relkind.
873 *
874 * The nspname and relname are only needed for error reporting.
875 */
876void
877CheckSubscriptionRelkind(char relkind, const char *nspname,
878 const char *relname)
879{
880 if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
882 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
883 errmsg("cannot use relation \"%s.%s\" as logical replication target",
884 nspname, relname),
886}
StrategyNumber IndexAmTranslateCompareType(CompareType cmptype, Oid amoid, Oid opfamily, bool missing_ok)
Definition: amapi.c:148
#define AttributeNumberIsValid(attributeNumber)
Definition: attnum.h:34
#define PG_USED_FOR_ASSERTS_ONLY
Definition: c.h:224
regproc RegProcedure
Definition: c.h:621
uint32 TransactionId
Definition: c.h:623
#define OidIsValid(objectId)
Definition: c.h:746
bool IsCatalogRelation(Relation relation)
Definition: catalog.c:104
@ COMPARE_EQ
Definition: cmptype.h:36
void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *remoteslot, List *conflicttuples)
Definition: conflict.c:103
bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, RepOriginId *localorigin, TimestampTz *localts)
Definition: conflict.c:62
ConflictType
Definition: conflict.h:25
@ CT_MULTIPLE_UNIQUE_CONFLICTS
Definition: conflict.h:45
@ CT_INSERT_EXISTS
Definition: conflict.h:27
@ CT_UPDATE_EXISTS
Definition: conflict.h:33
int errdetail(const char *fmt,...)
Definition: elog.c:1204
int errhint(const char *fmt,...)
Definition: elog.c:1318
int errcode(int sqlerrcode)
Definition: elog.c:854
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define LOG
Definition: elog.h:31
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
bool ExecCheckIndexConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, ItemPointer conflictTid, ItemPointer tupleid, List *arbiterIndexes)
Definition: execIndexing.c:542
List * ExecInsertIndexTuples(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool update, bool noDupErr, bool *specConflict, List *arbiterIndexes, bool onlySummarizing)
Definition: execIndexing.c:309
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1932
void ExecConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate)
Definition: execMain.c:2056
static void BuildConflictIndexInfo(ResultRelInfo *resultRelInfo, Oid conflictindex)
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, TypeCacheEntry **eq)
static bool should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
void CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
static bool FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate, Oid conflictindex, TupleTableSlot *slot, TupleTableSlot **conflictslot)
static int build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *searchslot)
static void CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate, ConflictType type, List *recheckIndexes, TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1443
#define palloc0_object(type)
Definition: fe_memutils.h:75
Datum FunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
Definition: fmgr.c:1149
char * format_type_be(Oid type_oid)
Definition: format_type.c:343
Assert(PointerIsAligned(start, uint64))
void BuildSpeculativeIndexInfo(Relation index, IndexInfo *ii)
Definition: index.c:2669
bool index_getnext_slot(IndexScanDesc scan, ScanDirection direction, TupleTableSlot *slot)
Definition: indexam.c:720
IndexScanDesc index_beginscan(Relation heapRelation, Relation indexRelation, Snapshot snapshot, IndexScanInstrumentation *instrument, int nkeys, int norderbys)
Definition: indexam.c:256
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:177
void index_endscan(IndexScanDesc scan)
Definition: indexam.c:382
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:133
void index_rescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, int norderbys)
Definition: indexam.c:356
int i
Definition: isn.c:77
static bool ItemPointerIndicatesMovedPartitions(const ItemPointerData *pointer)
Definition: itemptr.h:197
List * lappend(List *list, void *datum)
Definition: list.c:339
void list_free(List *list)
Definition: list.c:1546
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:722
void XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid, XLTW_Oper oper)
Definition: lmgr.c:663
@ XLTW_None
Definition: lmgr.h:26
#define NoLock
Definition: lockdefs.h:34
#define RowExclusiveLock
Definition: lockdefs.h:38
@ LockWaitBlock
Definition: lockoptions.h:39
LockTupleMode
Definition: lockoptions.h:50
@ LockTupleShare
Definition: lockoptions.h:54
Oid get_opclass_input_type(Oid opclass)
Definition: lsyscache.c:1304
Oid get_opclass_family(Oid opclass)
Definition: lsyscache.c:1282
RegProcedure get_opcode(Oid opno)
Definition: lsyscache.c:1425
Oid get_opfamily_member(Oid opfamily, Oid lefttype, Oid righttype, int16 strategy)
Definition: lsyscache.c:167
void * palloc0(Size size)
Definition: mcxt.c:1973
void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot, CmdType cmdtype)
CmdType
Definition: nodes.h:269
@ CMD_INSERT
Definition: nodes.h:273
@ CMD_DELETE
Definition: nodes.h:274
@ CMD_UPDATE
Definition: nodes.h:272
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:202
int errdetail_relkind_not_supported(char relkind)
Definition: pg_class.c:24
NameData relname
Definition: pg_class.h:38
#define INDEX_MAX_KEYS
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
#define list_make1_oid(x1)
Definition: pg_list.h:242
#define foreach_oid(var, lst)
Definition: pg_list.h:471
#define ERRCODE_T_R_SERIALIZATION_FAILURE
Definition: pgbench.c:77
static bool DatumGetBool(Datum X)
Definition: postgres.h:95
uintptr_t Datum
Definition: postgres.h:69
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:317
unsigned int Oid
Definition: postgres_ext.h:30
#define RelationGetRelid(relation)
Definition: rel.h:516
#define RelationGetDescr(relation)
Definition: rel.h:542
#define RelationGetRelationName(relation)
Definition: rel.h:550
#define IndexRelationGetNumberOfKeyAttributes(relation)
Definition: rel.h:535
void RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc)
Definition: relcache.c:5791
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:5069
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
@ ForwardScanDirection
Definition: sdir.h:28
#define SK_SEARCHNULL
Definition: skey.h:121
#define SK_ISNULL
Definition: skey.h:115
Snapshot GetLatestSnapshot(void)
Definition: snapmgr.c:342
void PushActiveSnapshot(Snapshot snapshot)
Definition: snapmgr.c:669
void PopActiveSnapshot(void)
Definition: snapmgr.c:762
Snapshot GetActiveSnapshot(void)
Definition: snapmgr.c:787
#define InitDirtySnapshot(snapshotdata)
Definition: snapmgr.h:42
Oid GetRelationIdentityOrPK(Relation rel)
Definition: relation.c:891
uint16 StrategyNumber
Definition: stratnum.h:22
TimestampTz ts
Definition: conflict.h:68
RepOriginId origin
Definition: conflict.h:67
TransactionId xmin
Definition: conflict.h:65
TupleTableSlot * slot
Definition: conflict.h:61
Snapshot es_snapshot
Definition: execnodes.h:657
Oid fn_oid
Definition: fmgr.h:59
Oid * ii_UniqueOps
Definition: execnodes.h:206
Definition: pg_list.h:54
PublicationActions pubactions
bool gencols_valid_for_update
bool gencols_valid_for_delete
struct HeapTupleData * rd_indextuple
Definition: rel.h:194
TupleDesc rd_att
Definition: rel.h:112
Form_pg_index rd_index
Definition: rel.h:192
Oid * rd_indcollation
Definition: rel.h:217
Form_pg_class rd_rel
Definition: rel.h:111
int ri_NumIndices
Definition: execnodes.h:478
List * ri_onConflictArbiterIndexes
Definition: execnodes.h:575
Relation ri_RelationDesc
Definition: execnodes.h:475
RelationPtr ri_IndexRelationDescs
Definition: execnodes.h:481
TriggerDesc * ri_TrigDesc
Definition: execnodes.h:510
IndexInfo ** ri_IndexRelationInfo
Definition: execnodes.h:484
int sk_flags
Definition: skey.h:66
Oid sk_collation
Definition: skey.h:70
TransactionId xmin
Definition: snapshot.h:153
TransactionId xmax
Definition: snapshot.h:154
ItemPointerData ctid
Definition: tableam.h:143
bool trig_delete_before_row
Definition: reltrigger.h:66
bool trig_update_before_row
Definition: reltrigger.h:61
bool trig_insert_before_row
Definition: reltrigger.h:56
bool has_generated_stored
Definition: tupdesc.h:46
TupleConstr * constr
Definition: tupdesc.h:141
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:123
bool * tts_isnull
Definition: tuptable.h:127
ItemPointerData tts_tid
Definition: tuptable.h:129
Datum * tts_values
Definition: tuptable.h:125
FmgrInfo eq_opr_finfo
Definition: typcache.h:76
Definition: c.h:686
int16 values[FLEXIBLE_ARRAY_MEMBER]
Definition: c.h:693
Definition: c.h:697
Oid values[FLEXIBLE_ARRAY_MEMBER]
Definition: c.h:704
Datum SysCacheGetAttrNotNull(int cacheId, HeapTuple tup, AttrNumber attributeNumber)
Definition: syscache.c:631
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:92
void simple_table_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot, Snapshot snapshot, TU_UpdateIndexes *update_indexes)
Definition: tableam.c:336
void simple_table_tuple_insert(Relation rel, TupleTableSlot *slot)
Definition: tableam.c:277
void simple_table_tuple_delete(Relation rel, ItemPointer tid, Snapshot snapshot)
Definition: tableam.c:291
static TableScanDesc table_beginscan(Relation rel, Snapshot snapshot, int nkeys, struct ScanKeyData *key)
Definition: tableam.h:870
TU_UpdateIndexes
Definition: tableam.h:110
@ TU_Summarizing
Definition: tableam.h:118
@ TU_None
Definition: tableam.h:112
static void table_endscan(TableScanDesc scan)
Definition: tableam.h:979
TM_Result
Definition: tableam.h:72
@ TM_Ok
Definition: tableam.h:77
@ TM_Deleted
Definition: tableam.h:92
@ TM_Updated
Definition: tableam.h:89
@ TM_Invisible
Definition: tableam.h:80
static void table_rescan(TableScanDesc scan, struct ScanKeyData *key)
Definition: tableam.h:988
static TM_Result table_tuple_lock(Relation rel, ItemPointer tid, Snapshot snapshot, TupleTableSlot *slot, CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, uint8 flags, TM_FailureData *tmfd)
Definition: tableam.h:1540
static bool table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
Definition: tableam.h:1015
#define TransactionIdIsValid(xid)
Definition: transam.h:41
bool ExecBRUpdateTriggers(EState *estate, EPQState *epqstate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot *newslot, TM_Result *tmresult, TM_FailureData *tmfd)
Definition: trigger.c:2941
void ExecARDeleteTriggers(EState *estate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TransitionCaptureState *transition_capture, bool is_crosspart_update)
Definition: trigger.c:2781
bool ExecBRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)
Definition: trigger.c:2463
void ExecARUpdateTriggers(EState *estate, ResultRelInfo *relinfo, ResultRelInfo *src_partinfo, ResultRelInfo *dst_partinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot *newslot, List *recheckIndexes, TransitionCaptureState *transition_capture, bool is_crosspart_update)
Definition: trigger.c:3106
bool ExecBRDeleteTriggers(EState *estate, EPQState *epqstate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot **epqslot, TM_Result *tmresult, TM_FailureData *tmfd)
Definition: trigger.c:2690
void ExecARInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot, List *recheckIndexes, TransitionCaptureState *transition_capture)
Definition: trigger.c:2541
bool equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2)
Definition: tupdesc.c:583
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:160
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:372
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:525
static void ExecMaterializeSlot(TupleTableSlot *slot)
Definition: tuptable.h:476
TypeCacheEntry * lookup_type_cache(Oid type_id, int flags)
Definition: typcache.c:386
#define TYPECACHE_EQ_OPR_FINFO
Definition: typcache.h:143
const char * type
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:829