PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
execReplication.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * execReplication.c
4 * miscellaneous executor routines for logical replication
5 *
6 * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/executor/execReplication.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres.h"
16
17#include "access/genam.h"
18#include "access/gist.h"
19#include "access/relscan.h"
20#include "access/tableam.h"
21#include "access/transam.h"
22#include "access/xact.h"
23#include "catalog/pg_am_d.h"
24#include "commands/trigger.h"
25#include "executor/executor.h"
29#include "storage/lmgr.h"
30#include "utils/builtins.h"
31#include "utils/lsyscache.h"
32#include "utils/rel.h"
33#include "utils/snapmgr.h"
34#include "utils/syscache.h"
35#include "utils/typcache.h"
36
37
38static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
39 TypeCacheEntry **eq);
40
41/*
42 * Returns the fixed strategy number, if any, of the equality operator for the
43 * given operator class, otherwise, InvalidStrategy.
44 */
47{
48 Oid am = get_opclass_method(opclass);
49 int ret;
50
51 switch (am)
52 {
53 case BTREE_AM_OID:
55 break;
56 case HASH_AM_OID:
58 break;
59 case GIST_AM_OID:
61 break;
62 default:
63 ret = InvalidStrategy;
64 break;
65 }
66
67 return ret;
68}
69
70/*
71 * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
72 * is setup to match 'rel' (*NOT* idxrel!).
73 *
74 * Returns how many columns to use for the index scan.
75 *
76 * This is not generic routine, idxrel must be PK, RI, or an index that can be
77 * used for REPLICA IDENTITY FULL table. See FindUsableIndexForReplicaIdentityFull()
78 * for details.
79 *
80 * By definition, replication identity of a rel meets all limitations associated
81 * with that. Note that any other index could also meet these limitations.
82 */
83static int
85 TupleTableSlot *searchslot)
86{
87 int index_attoff;
88 int skey_attoff = 0;
89 Datum indclassDatum;
90 oidvector *opclass;
91 int2vector *indkey = &idxrel->rd_index->indkey;
92
93 indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, idxrel->rd_indextuple,
94 Anum_pg_index_indclass);
95 opclass = (oidvector *) DatumGetPointer(indclassDatum);
96
97 /* Build scankey for every non-expression attribute in the index. */
98 for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
99 index_attoff++)
100 {
101 Oid operator;
102 Oid optype;
103 Oid opfamily;
104 RegProcedure regop;
105 int table_attno = indkey->values[index_attoff];
106 StrategyNumber eq_strategy;
107
108 if (!AttributeNumberIsValid(table_attno))
109 {
110 /*
111 * XXX: Currently, we don't support expressions in the scan key,
112 * see code below.
113 */
114 continue;
115 }
116
117 /*
118 * Load the operator info. We need this to get the equality operator
119 * function for the scan key.
120 */
121 optype = get_opclass_input_type(opclass->values[index_attoff]);
122 opfamily = get_opclass_family(opclass->values[index_attoff]);
123 eq_strategy = get_equal_strategy_number(opclass->values[index_attoff]);
124 if (!eq_strategy)
125 elog(ERROR, "missing equal strategy for opclass %u", opclass->values[index_attoff]);
126
127 operator = get_opfamily_member(opfamily, optype,
128 optype,
129 eq_strategy);
130
131 if (!OidIsValid(operator))
132 elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
133 eq_strategy, optype, optype, opfamily);
134
135 regop = get_opcode(operator);
136
137 /* Initialize the scankey. */
138 ScanKeyInit(&skey[skey_attoff],
139 index_attoff + 1,
140 eq_strategy,
141 regop,
142 searchslot->tts_values[table_attno - 1]);
143
144 skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
145
146 /* Check for null value. */
147 if (searchslot->tts_isnull[table_attno - 1])
148 skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
149
150 skey_attoff++;
151 }
152
153 /* There must always be at least one attribute for the index scan. */
154 Assert(skey_attoff > 0);
155
156 return skey_attoff;
157}
158
159
160/*
161 * Helper function to check if it is necessary to re-fetch and lock the tuple
162 * due to concurrent modifications. This function should be called after
163 * invoking table_tuple_lock.
164 */
165static bool
167{
168 bool refetch = false;
169
170 switch (res)
171 {
172 case TM_Ok:
173 break;
174 case TM_Updated:
175 /* XXX: Improve handling here */
177 ereport(LOG,
179 errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
180 else
181 ereport(LOG,
183 errmsg("concurrent update, retrying")));
184 refetch = true;
185 break;
186 case TM_Deleted:
187 /* XXX: Improve handling here */
188 ereport(LOG,
190 errmsg("concurrent delete, retrying")));
191 refetch = true;
192 break;
193 case TM_Invisible:
194 elog(ERROR, "attempted to lock invisible tuple");
195 break;
196 default:
197 elog(ERROR, "unexpected table_tuple_lock status: %u", res);
198 break;
199 }
200
201 return refetch;
202}
203
204/*
205 * Search the relation 'rel' for tuple using the index.
206 *
207 * If a matching tuple is found, lock it with lockmode, fill the slot with its
208 * contents, and return true. Return false otherwise.
209 */
210bool
212 LockTupleMode lockmode,
213 TupleTableSlot *searchslot,
214 TupleTableSlot *outslot)
215{
217 int skey_attoff;
218 IndexScanDesc scan;
219 SnapshotData snap;
220 TransactionId xwait;
221 Relation idxrel;
222 bool found;
223 TypeCacheEntry **eq = NULL;
224 bool isIdxSafeToSkipDuplicates;
225
226 /* Open the index. */
227 idxrel = index_open(idxoid, RowExclusiveLock);
228
229 isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
230
231 InitDirtySnapshot(snap);
232
233 /* Build scan key. */
234 skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
235
236 /* Start an index scan. */
237 scan = index_beginscan(rel, idxrel, &snap, skey_attoff, 0);
238
239retry:
240 found = false;
241
242 index_rescan(scan, skey, skey_attoff, NULL, 0);
243
244 /* Try to find the tuple */
245 while (index_getnext_slot(scan, ForwardScanDirection, outslot))
246 {
247 /*
248 * Avoid expensive equality check if the index is primary key or
249 * replica identity index.
250 */
251 if (!isIdxSafeToSkipDuplicates)
252 {
253 if (eq == NULL)
254 eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
255
256 if (!tuples_equal(outslot, searchslot, eq))
257 continue;
258 }
259
260 ExecMaterializeSlot(outslot);
261
262 xwait = TransactionIdIsValid(snap.xmin) ?
263 snap.xmin : snap.xmax;
264
265 /*
266 * If the tuple is locked, wait for locking transaction to finish and
267 * retry.
268 */
269 if (TransactionIdIsValid(xwait))
270 {
271 XactLockTableWait(xwait, NULL, NULL, XLTW_None);
272 goto retry;
273 }
274
275 /* Found our tuple and it's not locked */
276 found = true;
277 break;
278 }
279
280 /* Found tuple, try to lock it in the lockmode. */
281 if (found)
282 {
283 TM_FailureData tmfd;
285
287
288 res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
289 outslot,
290 GetCurrentCommandId(false),
291 lockmode,
293 0 /* don't follow updates */ ,
294 &tmfd);
295
297
298 if (should_refetch_tuple(res, &tmfd))
299 goto retry;
300 }
301
302 index_endscan(scan);
303
304 /* Don't release lock until commit. */
305 index_close(idxrel, NoLock);
306
307 return found;
308}
309
310/*
311 * Compare the tuples in the slots by checking if they have equal values.
312 */
313static bool
315 TypeCacheEntry **eq)
316{
317 int attrnum;
318
320 slot2->tts_tupleDescriptor->natts);
321
322 slot_getallattrs(slot1);
323 slot_getallattrs(slot2);
324
325 /* Check equality of the attributes. */
326 for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
327 {
329 TypeCacheEntry *typentry;
330
331 att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
332
333 /*
334 * Ignore dropped and generated columns as the publisher doesn't send
335 * those
336 */
337 if (att->attisdropped || att->attgenerated)
338 continue;
339
340 /*
341 * If one value is NULL and other is not, then they are certainly not
342 * equal
343 */
344 if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
345 return false;
346
347 /*
348 * If both are NULL, they can be considered equal.
349 */
350 if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
351 continue;
352
353 typentry = eq[attrnum];
354 if (typentry == NULL)
355 {
356 typentry = lookup_type_cache(att->atttypid,
358 if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
360 (errcode(ERRCODE_UNDEFINED_FUNCTION),
361 errmsg("could not identify an equality operator for type %s",
362 format_type_be(att->atttypid))));
363 eq[attrnum] = typentry;
364 }
365
367 att->attcollation,
368 slot1->tts_values[attrnum],
369 slot2->tts_values[attrnum])))
370 return false;
371 }
372
373 return true;
374}
375
376/*
377 * Search the relation 'rel' for tuple using the sequential scan.
378 *
379 * If a matching tuple is found, lock it with lockmode, fill the slot with its
380 * contents, and return true. Return false otherwise.
381 *
382 * Note that this stops on the first matching tuple.
383 *
384 * This can obviously be quite slow on tables that have more than few rows.
385 */
386bool
388 TupleTableSlot *searchslot, TupleTableSlot *outslot)
389{
390 TupleTableSlot *scanslot;
391 TableScanDesc scan;
392 SnapshotData snap;
393 TypeCacheEntry **eq;
394 TransactionId xwait;
395 bool found;
397
399
400 eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
401
402 /* Start a heap scan. */
403 InitDirtySnapshot(snap);
404 scan = table_beginscan(rel, &snap, 0, NULL);
405 scanslot = table_slot_create(rel, NULL);
406
407retry:
408 found = false;
409
410 table_rescan(scan, NULL);
411
412 /* Try to find the tuple */
413 while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
414 {
415 if (!tuples_equal(scanslot, searchslot, eq))
416 continue;
417
418 found = true;
419 ExecCopySlot(outslot, scanslot);
420
421 xwait = TransactionIdIsValid(snap.xmin) ?
422 snap.xmin : snap.xmax;
423
424 /*
425 * If the tuple is locked, wait for locking transaction to finish and
426 * retry.
427 */
428 if (TransactionIdIsValid(xwait))
429 {
430 XactLockTableWait(xwait, NULL, NULL, XLTW_None);
431 goto retry;
432 }
433
434 /* Found our tuple and it's not locked */
435 break;
436 }
437
438 /* Found tuple, try to lock it in the lockmode. */
439 if (found)
440 {
441 TM_FailureData tmfd;
443
445
446 res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
447 outslot,
448 GetCurrentCommandId(false),
449 lockmode,
451 0 /* don't follow updates */ ,
452 &tmfd);
453
455
456 if (should_refetch_tuple(res, &tmfd))
457 goto retry;
458 }
459
460 table_endscan(scan);
462
463 return found;
464}
465
466/*
467 * Find the tuple that violates the passed unique index (conflictindex).
468 *
469 * If the conflicting tuple is found return true, otherwise false.
470 *
471 * We lock the tuple to avoid getting it deleted before the caller can fetch
472 * the required information. Note that if the tuple is deleted before a lock
473 * is acquired, we will retry to find the conflicting tuple again.
474 */
475static bool
476FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate,
477 Oid conflictindex, TupleTableSlot *slot,
478 TupleTableSlot **conflictslot)
479{
480 Relation rel = resultRelInfo->ri_RelationDesc;
481 ItemPointerData conflictTid;
482 TM_FailureData tmfd;
484
485 *conflictslot = NULL;
486
487retry:
488 if (ExecCheckIndexConstraints(resultRelInfo, slot, estate,
489 &conflictTid, &slot->tts_tid,
490 list_make1_oid(conflictindex)))
491 {
492 if (*conflictslot)
493 ExecDropSingleTupleTableSlot(*conflictslot);
494
495 *conflictslot = NULL;
496 return false;
497 }
498
499 *conflictslot = table_slot_create(rel, NULL);
500
502
503 res = table_tuple_lock(rel, &conflictTid, GetLatestSnapshot(),
504 *conflictslot,
505 GetCurrentCommandId(false),
508 0 /* don't follow updates */ ,
509 &tmfd);
510
512
513 if (should_refetch_tuple(res, &tmfd))
514 goto retry;
515
516 return true;
517}
518
519/*
520 * Check all the unique indexes in 'recheckIndexes' for conflict with the
521 * tuple in 'remoteslot' and report if found.
522 */
523static void
525 ConflictType type, List *recheckIndexes,
526 TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
527{
528 /* Check all the unique indexes for a conflict */
529 foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
530 {
531 TupleTableSlot *conflictslot;
532
533 if (list_member_oid(recheckIndexes, uniqueidx) &&
534 FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
535 &conflictslot))
536 {
537 RepOriginId origin;
538 TimestampTz committs;
539 TransactionId xmin;
540
541 GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs);
542 ReportApplyConflict(estate, resultRelInfo, ERROR, type,
543 searchslot, conflictslot, remoteslot,
544 uniqueidx, xmin, origin, committs);
545 }
546 }
547}
548
549/*
550 * Insert tuple represented in the slot to the relation, update the indexes,
551 * and execute any constraints and per-row triggers.
552 *
553 * Caller is responsible for opening the indexes.
554 */
555void
557 EState *estate, TupleTableSlot *slot)
558{
559 bool skip_tuple = false;
560 Relation rel = resultRelInfo->ri_RelationDesc;
561
562 /* For now we support only tables. */
563 Assert(rel->rd_rel->relkind == RELKIND_RELATION);
564
566
567 /* BEFORE ROW INSERT Triggers */
568 if (resultRelInfo->ri_TrigDesc &&
569 resultRelInfo->ri_TrigDesc->trig_insert_before_row)
570 {
571 if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
572 skip_tuple = true; /* "do nothing" */
573 }
574
575 if (!skip_tuple)
576 {
577 List *recheckIndexes = NIL;
578 List *conflictindexes;
579 bool conflict = false;
580
581 /* Compute stored generated columns */
582 if (rel->rd_att->constr &&
584 ExecComputeStoredGenerated(resultRelInfo, estate, slot,
585 CMD_INSERT);
586
587 /* Check the constraints of the tuple */
588 if (rel->rd_att->constr)
589 ExecConstraints(resultRelInfo, slot, estate);
590 if (rel->rd_rel->relispartition)
591 ExecPartitionCheck(resultRelInfo, slot, estate, true);
592
593 /* OK, store the tuple and create index entries for it */
594 simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
595
596 conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
597
598 if (resultRelInfo->ri_NumIndices > 0)
599 recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
600 slot, estate, false,
601 conflictindexes ? true : false,
602 &conflict,
603 conflictindexes, false);
604
605 /*
606 * Checks the conflict indexes to fetch the conflicting local tuple
607 * and reports the conflict. We perform this check here, instead of
608 * performing an additional index scan before the actual insertion and
609 * reporting the conflict if any conflicting tuples are found. This is
610 * to avoid the overhead of executing the extra scan for each INSERT
611 * operation, even when no conflict arises, which could introduce
612 * significant overhead to replication, particularly in cases where
613 * conflicts are rare.
614 *
615 * XXX OTOH, this could lead to clean-up effort for dead tuples added
616 * in heap and index in case of conflicts. But as conflicts shouldn't
617 * be a frequent thing so we preferred to save the performance
618 * overhead of extra scan before each insertion.
619 */
620 if (conflict)
621 CheckAndReportConflict(resultRelInfo, estate, CT_INSERT_EXISTS,
622 recheckIndexes, NULL, slot);
623
624 /* AFTER ROW INSERT Triggers */
625 ExecARInsertTriggers(estate, resultRelInfo, slot,
626 recheckIndexes, NULL);
627
628 /*
629 * XXX we should in theory pass a TransitionCaptureState object to the
630 * above to capture transition tuples, but after statement triggers
631 * don't actually get fired by replication yet anyway
632 */
633
634 list_free(recheckIndexes);
635 }
636}
637
638/*
639 * Find the searchslot tuple and update it with data in the slot,
640 * update the indexes, and execute any constraints and per-row triggers.
641 *
642 * Caller is responsible for opening the indexes.
643 */
644void
646 EState *estate, EPQState *epqstate,
647 TupleTableSlot *searchslot, TupleTableSlot *slot)
648{
649 bool skip_tuple = false;
650 Relation rel = resultRelInfo->ri_RelationDesc;
651 ItemPointer tid = &(searchslot->tts_tid);
652
653 /*
654 * We support only non-system tables, with
655 * check_publication_add_relation() accountable.
656 */
657 Assert(rel->rd_rel->relkind == RELKIND_RELATION);
659
661
662 /* BEFORE ROW UPDATE Triggers */
663 if (resultRelInfo->ri_TrigDesc &&
664 resultRelInfo->ri_TrigDesc->trig_update_before_row)
665 {
666 if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
667 tid, NULL, slot, NULL, NULL))
668 skip_tuple = true; /* "do nothing" */
669 }
670
671 if (!skip_tuple)
672 {
673 List *recheckIndexes = NIL;
674 TU_UpdateIndexes update_indexes;
675 List *conflictindexes;
676 bool conflict = false;
677
678 /* Compute stored generated columns */
679 if (rel->rd_att->constr &&
681 ExecComputeStoredGenerated(resultRelInfo, estate, slot,
682 CMD_UPDATE);
683
684 /* Check the constraints of the tuple */
685 if (rel->rd_att->constr)
686 ExecConstraints(resultRelInfo, slot, estate);
687 if (rel->rd_rel->relispartition)
688 ExecPartitionCheck(resultRelInfo, slot, estate, true);
689
690 simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
691 &update_indexes);
692
693 conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
694
695 if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
696 recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
697 slot, estate, true,
698 conflictindexes ? true : false,
699 &conflict, conflictindexes,
700 (update_indexes == TU_Summarizing));
701
702 /*
703 * Refer to the comments above the call to CheckAndReportConflict() in
704 * ExecSimpleRelationInsert to understand why this check is done at
705 * this point.
706 */
707 if (conflict)
708 CheckAndReportConflict(resultRelInfo, estate, CT_UPDATE_EXISTS,
709 recheckIndexes, searchslot, slot);
710
711 /* AFTER ROW UPDATE Triggers */
712 ExecARUpdateTriggers(estate, resultRelInfo,
713 NULL, NULL,
714 tid, NULL, slot,
715 recheckIndexes, NULL, false);
716
717 list_free(recheckIndexes);
718 }
719}
720
721/*
722 * Find the searchslot tuple and delete it, and execute any constraints
723 * and per-row triggers.
724 *
725 * Caller is responsible for opening the indexes.
726 */
727void
729 EState *estate, EPQState *epqstate,
730 TupleTableSlot *searchslot)
731{
732 bool skip_tuple = false;
733 Relation rel = resultRelInfo->ri_RelationDesc;
734 ItemPointer tid = &searchslot->tts_tid;
735
737
738 /* BEFORE ROW DELETE Triggers */
739 if (resultRelInfo->ri_TrigDesc &&
740 resultRelInfo->ri_TrigDesc->trig_delete_before_row)
741 {
742 skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
743 tid, NULL, NULL, NULL, NULL);
744 }
745
746 if (!skip_tuple)
747 {
748 /* OK, delete the tuple */
749 simple_table_tuple_delete(rel, tid, estate->es_snapshot);
750
751 /* AFTER ROW DELETE Triggers */
752 ExecARDeleteTriggers(estate, resultRelInfo,
753 tid, NULL, NULL, false);
754 }
755}
756
757/*
758 * Check if command can be executed with current replica identity.
759 */
760void
762{
763 PublicationDesc pubdesc;
764
765 /*
766 * Skip checking the replica identity for partitioned tables, because the
767 * operations are actually performed on the leaf partitions.
768 */
769 if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
770 return;
771
772 /* We only need to do checks for UPDATE and DELETE. */
773 if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
774 return;
775
776 /*
777 * It is only safe to execute UPDATE/DELETE if the relation does not
778 * publish UPDATEs or DELETEs, or all the following conditions are
779 * satisfied:
780 *
781 * 1. All columns, referenced in the row filters from publications which
782 * the relation is in, are valid - i.e. when all referenced columns are
783 * part of REPLICA IDENTITY.
784 *
785 * 2. All columns, referenced in the column lists are valid - i.e. when
786 * all columns referenced in the REPLICA IDENTITY are covered by the
787 * column list.
788 *
789 * 3. All generated columns in REPLICA IDENTITY of the relation, are valid
790 * - i.e. when all these generated columns are published.
791 *
792 * XXX We could optimize it by first checking whether any of the
793 * publications have a row filter or column list for this relation, or if
794 * the relation contains a generated column. If none of these exist and
795 * the relation has replica identity then we can avoid building the
796 * descriptor but as this happens only one time it doesn't seem worth the
797 * additional complexity.
798 */
799 RelationBuildPublicationDesc(rel, &pubdesc);
800 if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
802 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
803 errmsg("cannot update table \"%s\"",
805 errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
806 else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
808 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
809 errmsg("cannot update table \"%s\"",
811 errdetail("Column list used by the publication does not cover the replica identity.")));
812 else if (cmd == CMD_UPDATE && !pubdesc.gencols_valid_for_update)
814 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
815 errmsg("cannot update table \"%s\"",
817 errdetail("Replica identity must not contain unpublished generated columns.")));
818 else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
820 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
821 errmsg("cannot delete from table \"%s\"",
823 errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
824 else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
826 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
827 errmsg("cannot delete from table \"%s\"",
829 errdetail("Column list used by the publication does not cover the replica identity.")));
830 else if (cmd == CMD_DELETE && !pubdesc.gencols_valid_for_delete)
832 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
833 errmsg("cannot delete from table \"%s\"",
835 errdetail("Replica identity must not contain unpublished generated columns.")));
836
837 /* If relation has replica identity we are always good. */
839 return;
840
841 /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
842 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
843 return;
844
845 /*
846 * This is UPDATE/DELETE and there is no replica identity.
847 *
848 * Check if the table publishes UPDATES or DELETES.
849 */
850 if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
852 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
853 errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
855 errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
856 else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
858 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
859 errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
861 errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
862}
863
864
865/*
866 * Check if we support writing into specific relkind.
867 *
868 * The nspname and relname are only needed for error reporting.
869 */
870void
871CheckSubscriptionRelkind(char relkind, const char *nspname,
872 const char *relname)
873{
874 if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
876 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
877 errmsg("cannot use relation \"%s.%s\" as logical replication target",
878 nspname, relname),
880}
#define AttributeNumberIsValid(attributeNumber)
Definition: attnum.h:34
#define PG_USED_FOR_ASSERTS_ONLY
Definition: c.h:201
#define Assert(condition)
Definition: c.h:812
regproc RegProcedure
Definition: c.h:604
uint32 TransactionId
Definition: c.h:606
#define OidIsValid(objectId)
Definition: c.h:729
bool IsCatalogRelation(Relation relation)
Definition: catalog.c:103
void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *localslot, TupleTableSlot *remoteslot, Oid indexoid, TransactionId localxmin, RepOriginId localorigin, TimestampTz localts)
Definition: conflict.c:107
bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, RepOriginId *localorigin, TimestampTz *localts)
Definition: conflict.c:61
ConflictType
Definition: conflict.h:25
@ CT_INSERT_EXISTS
Definition: conflict.h:27
@ CT_UPDATE_EXISTS
Definition: conflict.h:33
int64 TimestampTz
Definition: timestamp.h:39
int errdetail(const char *fmt,...)
Definition: elog.c:1203
int errhint(const char *fmt,...)
Definition: elog.c:1317
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define LOG
Definition: elog.h:31
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
bool ExecCheckIndexConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, ItemPointer conflictTid, ItemPointer tupleid, List *arbiterIndexes)
Definition: execIndexing.c:536
List * ExecInsertIndexTuples(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool update, bool noDupErr, bool *specConflict, List *arbiterIndexes, bool onlySummarizing)
Definition: execIndexing.c:303
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1801
void ExecConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate)
Definition: execMain.c:1925
bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, TypeCacheEntry **eq)
static bool should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
void CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
static bool FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate, Oid conflictindex, TupleTableSlot *slot, TupleTableSlot **conflictslot)
static int build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *searchslot)
static void CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate, ConflictType type, List *recheckIndexes, TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
StrategyNumber get_equal_strategy_number(Oid opclass)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1441
Datum FunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
Definition: fmgr.c:1149
char * format_type_be(Oid type_oid)
Definition: format_type.c:343
StrategyNumber GistTranslateStratnum(Oid opclass, StrategyNumber strat)
Definition: gistutil.c:1081
bool index_getnext_slot(IndexScanDesc scan, ScanDirection direction, TupleTableSlot *slot)
Definition: indexam.c:675
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:177
IndexScanDesc index_beginscan(Relation heapRelation, Relation indexRelation, Snapshot snapshot, int nkeys, int norderbys)
Definition: indexam.c:256
void index_endscan(IndexScanDesc scan)
Definition: indexam.c:378
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:133
void index_rescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, int norderbys)
Definition: indexam.c:352
static bool ItemPointerIndicatesMovedPartitions(const ItemPointerData *pointer)
Definition: itemptr.h:197
void list_free(List *list)
Definition: list.c:1546
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:722
void XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid, XLTW_Oper oper)
Definition: lmgr.c:656
@ XLTW_None
Definition: lmgr.h:26
#define NoLock
Definition: lockdefs.h:34
#define RowExclusiveLock
Definition: lockdefs.h:38
@ LockWaitBlock
Definition: lockoptions.h:39
LockTupleMode
Definition: lockoptions.h:50
@ LockTupleShare
Definition: lockoptions.h:54
Oid get_opclass_method(Oid opclass)
Definition: lsyscache.c:1260
Oid get_opclass_input_type(Oid opclass)
Definition: lsyscache.c:1212
Oid get_opclass_family(Oid opclass)
Definition: lsyscache.c:1190
RegProcedure get_opcode(Oid opno)
Definition: lsyscache.c:1285
Oid get_opfamily_member(Oid opfamily, Oid lefttype, Oid righttype, int16 strategy)
Definition: lsyscache.c:166
void * palloc0(Size size)
Definition: mcxt.c:1347
void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot, CmdType cmdtype)
CmdType
Definition: nodes.h:263
@ CMD_INSERT
Definition: nodes.h:267
@ CMD_DELETE
Definition: nodes.h:268
@ CMD_UPDATE
Definition: nodes.h:266
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:200
int errdetail_relkind_not_supported(char relkind)
Definition: pg_class.c:24
NameData relname
Definition: pg_class.h:38
#define INDEX_MAX_KEYS
#define NIL
Definition: pg_list.h:68
#define list_make1_oid(x1)
Definition: pg_list.h:242
#define foreach_oid(var, lst)
Definition: pg_list.h:471
#define ERRCODE_T_R_SERIALIZATION_FAILURE
Definition: pgbench.c:76
static bool DatumGetBool(Datum X)
Definition: postgres.h:90
uintptr_t Datum
Definition: postgres.h:64
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:312
unsigned int Oid
Definition: postgres_ext.h:31
#define RelationGetDescr(relation)
Definition: rel.h:531
#define RelationGetRelationName(relation)
Definition: rel.h:539
#define IndexRelationGetNumberOfKeyAttributes(relation)
Definition: rel.h:524
void RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc)
Definition: relcache.c:5713
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4991
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
@ ForwardScanDirection
Definition: sdir.h:28
#define SK_SEARCHNULL
Definition: skey.h:121
#define SK_ISNULL
Definition: skey.h:115
Snapshot GetLatestSnapshot(void)
Definition: snapmgr.c:283
void PushActiveSnapshot(Snapshot snapshot)
Definition: snapmgr.c:610
void PopActiveSnapshot(void)
Definition: snapmgr.c:703
#define InitDirtySnapshot(snapshotdata)
Definition: snapmgr.h:42
Oid GetRelationIdentityOrPK(Relation rel)
Definition: relation.c:887
uint16 StrategyNumber
Definition: stratnum.h:22
#define RTEqualStrategyNumber
Definition: stratnum.h:68
#define InvalidStrategy
Definition: stratnum.h:24
#define HTEqualStrategyNumber
Definition: stratnum.h:41
#define BTEqualStrategyNumber
Definition: stratnum.h:31
Snapshot es_snapshot
Definition: execnodes.h:632
Oid fn_oid
Definition: fmgr.h:59
Definition: pg_list.h:54
PublicationActions pubactions
bool gencols_valid_for_update
bool cols_valid_for_update
bool gencols_valid_for_delete
struct HeapTupleData * rd_indextuple
Definition: rel.h:194
TupleDesc rd_att
Definition: rel.h:112
Form_pg_index rd_index
Definition: rel.h:192
Oid * rd_indcollation
Definition: rel.h:217
Form_pg_class rd_rel
Definition: rel.h:111
int ri_NumIndices
Definition: execnodes.h:462
List * ri_onConflictArbiterIndexes
Definition: execnodes.h:550
Relation ri_RelationDesc
Definition: execnodes.h:459
TriggerDesc * ri_TrigDesc
Definition: execnodes.h:492
int sk_flags
Definition: skey.h:66
Oid sk_collation
Definition: skey.h:70
TransactionId xmin
Definition: snapshot.h:153
TransactionId xmax
Definition: snapshot.h:154
ItemPointerData ctid
Definition: tableam.h:149
bool trig_delete_before_row
Definition: reltrigger.h:66
bool trig_update_before_row
Definition: reltrigger.h:61
bool trig_insert_before_row
Definition: reltrigger.h:56
bool has_generated_stored
Definition: tupdesc.h:45
TupleConstr * constr
Definition: tupdesc.h:133
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:123
bool * tts_isnull
Definition: tuptable.h:127
ItemPointerData tts_tid
Definition: tuptable.h:129
Datum * tts_values
Definition: tuptable.h:125
FmgrInfo eq_opr_finfo
Definition: typcache.h:75
Definition: c.h:669
int16 values[FLEXIBLE_ARRAY_MEMBER]
Definition: c.h:676
Definition: c.h:680
Oid values[FLEXIBLE_ARRAY_MEMBER]
Definition: c.h:687
Datum SysCacheGetAttrNotNull(int cacheId, HeapTuple tup, AttrNumber attributeNumber)
Definition: syscache.c:631
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:91
void simple_table_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot, Snapshot snapshot, TU_UpdateIndexes *update_indexes)
Definition: tableam.c:335
void simple_table_tuple_insert(Relation rel, TupleTableSlot *slot)
Definition: tableam.c:276
void simple_table_tuple_delete(Relation rel, ItemPointer tid, Snapshot snapshot)
Definition: tableam.c:290
static TableScanDesc table_beginscan(Relation rel, Snapshot snapshot, int nkeys, struct ScanKeyData *key)
Definition: tableam.h:912
TU_UpdateIndexes
Definition: tableam.h:117
@ TU_Summarizing
Definition: tableam.h:125
@ TU_None
Definition: tableam.h:119
static void table_endscan(TableScanDesc scan)
Definition: tableam.h:1024
TM_Result
Definition: tableam.h:79
@ TM_Ok
Definition: tableam.h:84
@ TM_Deleted
Definition: tableam.h:99
@ TM_Updated
Definition: tableam.h:96
@ TM_Invisible
Definition: tableam.h:87
static void table_rescan(TableScanDesc scan, struct ScanKeyData *key)
Definition: tableam.h:1033
static TM_Result table_tuple_lock(Relation rel, ItemPointer tid, Snapshot snapshot, TupleTableSlot *slot, CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, uint8 flags, TM_FailureData *tmfd)
Definition: tableam.h:1585
static bool table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
Definition: tableam.h:1060
#define TransactionIdIsValid(xid)
Definition: transam.h:41
bool ExecBRUpdateTriggers(EState *estate, EPQState *epqstate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot *newslot, TM_Result *tmresult, TM_FailureData *tmfd)
Definition: trigger.c:2935
void ExecARDeleteTriggers(EState *estate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TransitionCaptureState *transition_capture, bool is_crosspart_update)
Definition: trigger.c:2775
bool ExecBRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)
Definition: trigger.c:2459
void ExecARUpdateTriggers(EState *estate, ResultRelInfo *relinfo, ResultRelInfo *src_partinfo, ResultRelInfo *dst_partinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot *newslot, List *recheckIndexes, TransitionCaptureState *transition_capture, bool is_crosspart_update)
Definition: trigger.c:3098
bool ExecBRDeleteTriggers(EState *estate, EPQState *epqstate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot **epqslot, TM_Result *tmresult, TM_FailureData *tmfd)
Definition: trigger.c:2684
void ExecARInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot, List *recheckIndexes, TransitionCaptureState *transition_capture)
Definition: trigger.c:2535
bool equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2)
Definition: tupdesc.c:566
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:152
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:368
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:509
static void ExecMaterializeSlot(TupleTableSlot *slot)
Definition: tuptable.h:472
TypeCacheEntry * lookup_type_cache(Oid type_id, int flags)
Definition: typcache.c:386
#define TYPECACHE_EQ_OPR_FINFO
Definition: typcache.h:142
const char * type
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:828
uint16 RepOriginId
Definition: xlogdefs.h:65