PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
execReplication.c File Reference
#include "postgres.h"
#include "access/genam.h"
#include "access/gist.h"
#include "access/relscan.h"
#include "access/tableam.h"
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/pg_am_d.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/nodeModifyTable.h"
#include "replication/conflict.h"
#include "replication/logicalrelation.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/typcache.h"
Include dependency graph for execReplication.c:

Go to the source code of this file.

Functions

static bool tuples_equal (TupleTableSlot *slot1, TupleTableSlot *slot2, TypeCacheEntry **eq)
 
static int build_replindex_scan_key (ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *searchslot)
 
static bool should_refetch_tuple (TM_Result res, TM_FailureData *tmfd)
 
bool RelationFindReplTupleByIndex (Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
 
bool RelationFindReplTupleSeq (Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
 
static bool FindConflictTuple (ResultRelInfo *resultRelInfo, EState *estate, Oid conflictindex, TupleTableSlot *slot, TupleTableSlot **conflictslot)
 
static void CheckAndReportConflict (ResultRelInfo *resultRelInfo, EState *estate, ConflictType type, List *recheckIndexes, TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
 
void ExecSimpleRelationInsert (ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
 
void ExecSimpleRelationUpdate (ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
 
void ExecSimpleRelationDelete (ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
 
void CheckCmdReplicaIdentity (Relation rel, CmdType cmd)
 
void CheckSubscriptionRelkind (char relkind, const char *nspname, const char *relname)
 

Function Documentation

◆ build_replindex_scan_key()

static int build_replindex_scan_key ( ScanKey  skey,
Relation  rel,
Relation  idxrel,
TupleTableSlot searchslot 
)
static

Definition at line 55 of file execReplication.c.

57{
58 int index_attoff;
59 int skey_attoff = 0;
60 Datum indclassDatum;
61 oidvector *opclass;
62 int2vector *indkey = &idxrel->rd_index->indkey;
63
64 indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, idxrel->rd_indextuple,
65 Anum_pg_index_indclass);
66 opclass = (oidvector *) DatumGetPointer(indclassDatum);
67
68 /* Build scankey for every non-expression attribute in the index. */
69 for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
70 index_attoff++)
71 {
72 Oid operator;
73 Oid optype;
74 Oid opfamily;
75 RegProcedure regop;
76 int table_attno = indkey->values[index_attoff];
77 StrategyNumber eq_strategy;
78
79 if (!AttributeNumberIsValid(table_attno))
80 {
81 /*
82 * XXX: Currently, we don't support expressions in the scan key,
83 * see code below.
84 */
85 continue;
86 }
87
88 /*
89 * Load the operator info. We need this to get the equality operator
90 * function for the scan key.
91 */
92 optype = get_opclass_input_type(opclass->values[index_attoff]);
93 opfamily = get_opclass_family(opclass->values[index_attoff]);
94 eq_strategy = IndexAmTranslateCompareType(COMPARE_EQ, idxrel->rd_rel->relam, opfamily, false);
95 operator = get_opfamily_member(opfamily, optype,
96 optype,
97 eq_strategy);
98
99 if (!OidIsValid(operator))
100 elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
101 eq_strategy, optype, optype, opfamily);
102
103 regop = get_opcode(operator);
104
105 /* Initialize the scankey. */
106 ScanKeyInit(&skey[skey_attoff],
107 index_attoff + 1,
108 eq_strategy,
109 regop,
110 searchslot->tts_values[table_attno - 1]);
111
112 skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
113
114 /* Check for null value. */
115 if (searchslot->tts_isnull[table_attno - 1])
116 skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
117
118 skey_attoff++;
119 }
120
121 /* There must always be at least one attribute for the index scan. */
122 Assert(skey_attoff > 0);
123
124 return skey_attoff;
125}
StrategyNumber IndexAmTranslateCompareType(CompareType cmptype, Oid amoid, Oid opfamily, bool missing_ok)
Definition: amapi.c:143
#define AttributeNumberIsValid(attributeNumber)
Definition: attnum.h:34
regproc RegProcedure
Definition: c.h:621
#define OidIsValid(objectId)
Definition: c.h:746
@ COMPARE_EQ
Definition: cmptype.h:36
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
Assert(PointerIsAligned(start, uint64))
Oid get_opclass_input_type(Oid opclass)
Definition: lsyscache.c:1225
Oid get_opclass_family(Oid opclass)
Definition: lsyscache.c:1203
RegProcedure get_opcode(Oid opno)
Definition: lsyscache.c:1324
Oid get_opfamily_member(Oid opfamily, Oid lefttype, Oid righttype, int16 strategy)
Definition: lsyscache.c:167
uintptr_t Datum
Definition: postgres.h:69
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:317
unsigned int Oid
Definition: postgres_ext.h:32
#define IndexRelationGetNumberOfKeyAttributes(relation)
Definition: rel.h:531
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
#define SK_SEARCHNULL
Definition: skey.h:121
#define SK_ISNULL
Definition: skey.h:115
uint16 StrategyNumber
Definition: stratnum.h:22
struct HeapTupleData * rd_indextuple
Definition: rel.h:194
Form_pg_index rd_index
Definition: rel.h:192
Oid * rd_indcollation
Definition: rel.h:217
Form_pg_class rd_rel
Definition: rel.h:111
int sk_flags
Definition: skey.h:66
Oid sk_collation
Definition: skey.h:70
bool * tts_isnull
Definition: tuptable.h:127
Datum * tts_values
Definition: tuptable.h:125
Definition: c.h:686
int16 values[FLEXIBLE_ARRAY_MEMBER]
Definition: c.h:693
Definition: c.h:697
Oid values[FLEXIBLE_ARRAY_MEMBER]
Definition: c.h:704
Datum SysCacheGetAttrNotNull(int cacheId, HeapTuple tup, AttrNumber attributeNumber)
Definition: syscache.c:631

References Assert(), AttributeNumberIsValid, COMPARE_EQ, DatumGetPointer(), elog, ERROR, get_opclass_family(), get_opclass_input_type(), get_opcode(), get_opfamily_member(), IndexAmTranslateCompareType(), IndexRelationGetNumberOfKeyAttributes, OidIsValid, RelationData::rd_indcollation, RelationData::rd_index, RelationData::rd_indextuple, RelationData::rd_rel, ScanKeyInit(), ScanKeyData::sk_collation, ScanKeyData::sk_flags, SK_ISNULL, SK_SEARCHNULL, SysCacheGetAttrNotNull(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, int2vector::values, and oidvector::values.

Referenced by RelationFindReplTupleByIndex().

◆ CheckAndReportConflict()

static void CheckAndReportConflict ( ResultRelInfo resultRelInfo,
EState estate,
ConflictType  type,
List recheckIndexes,
TupleTableSlot searchslot,
TupleTableSlot remoteslot 
)
static

Definition at line 492 of file execReplication.c.

495{
496 /* Check all the unique indexes for a conflict */
497 foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
498 {
499 TupleTableSlot *conflictslot;
500
501 if (list_member_oid(recheckIndexes, uniqueidx) &&
502 FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
503 &conflictslot))
504 {
505 RepOriginId origin;
506 TimestampTz committs;
507 TransactionId xmin;
508
509 GetTupleTransactionInfo(conflictslot, &xmin, &origin, &committs);
510 ReportApplyConflict(estate, resultRelInfo, ERROR, type,
511 searchslot, conflictslot, remoteslot,
512 uniqueidx, xmin, origin, committs);
513 }
514 }
515}
uint32 TransactionId
Definition: c.h:623
void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *localslot, TupleTableSlot *remoteslot, Oid indexoid, TransactionId localxmin, RepOriginId localorigin, TimestampTz localts)
Definition: conflict.c:107
bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, RepOriginId *localorigin, TimestampTz *localts)
Definition: conflict.c:61
int64 TimestampTz
Definition: timestamp.h:39
static bool FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate, Oid conflictindex, TupleTableSlot *slot, TupleTableSlot **conflictslot)
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:722
#define foreach_oid(var, lst)
Definition: pg_list.h:471
List * ri_onConflictArbiterIndexes
Definition: execnodes.h:569
const char * type
uint16 RepOriginId
Definition: xlogdefs.h:65

References ERROR, FindConflictTuple(), foreach_oid, GetTupleTransactionInfo(), list_member_oid(), ReportApplyConflict(), ResultRelInfo::ri_onConflictArbiterIndexes, and type.

Referenced by ExecSimpleRelationInsert(), and ExecSimpleRelationUpdate().

◆ CheckCmdReplicaIdentity()

void CheckCmdReplicaIdentity ( Relation  rel,
CmdType  cmd 
)

Definition at line 729 of file execReplication.c.

730{
731 PublicationDesc pubdesc;
732
733 /*
734 * Skip checking the replica identity for partitioned tables, because the
735 * operations are actually performed on the leaf partitions.
736 */
737 if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
738 return;
739
740 /* We only need to do checks for UPDATE and DELETE. */
741 if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
742 return;
743
744 /*
745 * It is only safe to execute UPDATE/DELETE if the relation does not
746 * publish UPDATEs or DELETEs, or all the following conditions are
747 * satisfied:
748 *
749 * 1. All columns, referenced in the row filters from publications which
750 * the relation is in, are valid - i.e. when all referenced columns are
751 * part of REPLICA IDENTITY.
752 *
753 * 2. All columns, referenced in the column lists are valid - i.e. when
754 * all columns referenced in the REPLICA IDENTITY are covered by the
755 * column list.
756 *
757 * 3. All generated columns in REPLICA IDENTITY of the relation, are valid
758 * - i.e. when all these generated columns are published.
759 *
760 * XXX We could optimize it by first checking whether any of the
761 * publications have a row filter or column list for this relation, or if
762 * the relation contains a generated column. If none of these exist and
763 * the relation has replica identity then we can avoid building the
764 * descriptor but as this happens only one time it doesn't seem worth the
765 * additional complexity.
766 */
767 RelationBuildPublicationDesc(rel, &pubdesc);
768 if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
770 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
771 errmsg("cannot update table \"%s\"",
773 errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
774 else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
776 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
777 errmsg("cannot update table \"%s\"",
779 errdetail("Column list used by the publication does not cover the replica identity.")));
780 else if (cmd == CMD_UPDATE && !pubdesc.gencols_valid_for_update)
782 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
783 errmsg("cannot update table \"%s\"",
785 errdetail("Replica identity must not contain unpublished generated columns.")));
786 else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
788 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
789 errmsg("cannot delete from table \"%s\"",
791 errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
792 else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
794 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
795 errmsg("cannot delete from table \"%s\"",
797 errdetail("Column list used by the publication does not cover the replica identity.")));
798 else if (cmd == CMD_DELETE && !pubdesc.gencols_valid_for_delete)
800 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
801 errmsg("cannot delete from table \"%s\"",
803 errdetail("Replica identity must not contain unpublished generated columns.")));
804
805 /* If relation has replica identity we are always good. */
807 return;
808
809 /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
810 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
811 return;
812
813 /*
814 * This is UPDATE/DELETE and there is no replica identity.
815 *
816 * Check if the table publishes UPDATES or DELETES.
817 */
818 if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
820 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
821 errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
823 errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
824 else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
826 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
827 errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
829 errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
830}
int errdetail(const char *fmt,...)
Definition: elog.c:1203
int errhint(const char *fmt,...)
Definition: elog.c:1317
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ereport(elevel,...)
Definition: elog.h:149
@ CMD_DELETE
Definition: nodes.h:268
@ CMD_UPDATE
Definition: nodes.h:266
#define RelationGetRelationName(relation)
Definition: rel.h:546
void RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc)
Definition: relcache.c:5722
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:5000
PublicationActions pubactions
bool gencols_valid_for_update
bool gencols_valid_for_delete

References CMD_DELETE, CMD_UPDATE, PublicationDesc::cols_valid_for_delete, PublicationDesc::cols_valid_for_update, ereport, errcode(), errdetail(), errhint(), errmsg(), ERROR, PublicationDesc::gencols_valid_for_delete, PublicationDesc::gencols_valid_for_update, OidIsValid, PublicationDesc::pubactions, PublicationActions::pubdelete, PublicationActions::pubupdate, RelationData::rd_rel, RelationBuildPublicationDesc(), RelationGetRelationName, RelationGetReplicaIndex(), PublicationDesc::rf_valid_for_delete, and PublicationDesc::rf_valid_for_update.

Referenced by CheckValidResultRel(), ExecSimpleRelationDelete(), ExecSimpleRelationInsert(), and ExecSimpleRelationUpdate().

◆ CheckSubscriptionRelkind()

void CheckSubscriptionRelkind ( char  relkind,
const char *  nspname,
const char *  relname 
)

Definition at line 839 of file execReplication.c.

841{
842 if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
844 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
845 errmsg("cannot use relation \"%s.%s\" as logical replication target",
846 nspname, relname),
848}
int errdetail_relkind_not_supported(char relkind)
Definition: pg_class.c:24
NameData relname
Definition: pg_class.h:38

References ereport, errcode(), errdetail_relkind_not_supported(), errmsg(), ERROR, and relname.

Referenced by AlterSubscription_refresh(), apply_handle_tuple_routing(), CreateSubscription(), and logicalrep_rel_open().

◆ ExecSimpleRelationDelete()

void ExecSimpleRelationDelete ( ResultRelInfo resultRelInfo,
EState estate,
EPQState epqstate,
TupleTableSlot searchslot 
)

Definition at line 696 of file execReplication.c.

699{
700 bool skip_tuple = false;
701 Relation rel = resultRelInfo->ri_RelationDesc;
702 ItemPointer tid = &searchslot->tts_tid;
703
705
706 /* BEFORE ROW DELETE Triggers */
707 if (resultRelInfo->ri_TrigDesc &&
708 resultRelInfo->ri_TrigDesc->trig_delete_before_row)
709 {
710 skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
711 tid, NULL, NULL, NULL, NULL);
712 }
713
714 if (!skip_tuple)
715 {
716 /* OK, delete the tuple */
717 simple_table_tuple_delete(rel, tid, estate->es_snapshot);
718
719 /* AFTER ROW DELETE Triggers */
720 ExecARDeleteTriggers(estate, resultRelInfo,
721 tid, NULL, NULL, false);
722 }
723}
void CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
Snapshot es_snapshot
Definition: execnodes.h:651
Relation ri_RelationDesc
Definition: execnodes.h:475
TriggerDesc * ri_TrigDesc
Definition: execnodes.h:510
bool trig_delete_before_row
Definition: reltrigger.h:66
ItemPointerData tts_tid
Definition: tuptable.h:129
void simple_table_tuple_delete(Relation rel, ItemPointer tid, Snapshot snapshot)
Definition: tableam.c:291
void ExecARDeleteTriggers(EState *estate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TransitionCaptureState *transition_capture, bool is_crosspart_update)
Definition: trigger.c:2781
bool ExecBRDeleteTriggers(EState *estate, EPQState *epqstate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot **epqslot, TM_Result *tmresult, TM_FailureData *tmfd)
Definition: trigger.c:2690

References CheckCmdReplicaIdentity(), CMD_DELETE, EState::es_snapshot, ExecARDeleteTriggers(), ExecBRDeleteTriggers(), ResultRelInfo::ri_RelationDesc, ResultRelInfo::ri_TrigDesc, simple_table_tuple_delete(), TriggerDesc::trig_delete_before_row, and TupleTableSlot::tts_tid.

Referenced by apply_handle_delete_internal(), and apply_handle_tuple_routing().

◆ ExecSimpleRelationInsert()

void ExecSimpleRelationInsert ( ResultRelInfo resultRelInfo,
EState estate,
TupleTableSlot slot 
)

Definition at line 524 of file execReplication.c.

526{
527 bool skip_tuple = false;
528 Relation rel = resultRelInfo->ri_RelationDesc;
529
530 /* For now we support only tables. */
531 Assert(rel->rd_rel->relkind == RELKIND_RELATION);
532
534
535 /* BEFORE ROW INSERT Triggers */
536 if (resultRelInfo->ri_TrigDesc &&
537 resultRelInfo->ri_TrigDesc->trig_insert_before_row)
538 {
539 if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
540 skip_tuple = true; /* "do nothing" */
541 }
542
543 if (!skip_tuple)
544 {
545 List *recheckIndexes = NIL;
546 List *conflictindexes;
547 bool conflict = false;
548
549 /* Compute stored generated columns */
550 if (rel->rd_att->constr &&
552 ExecComputeStoredGenerated(resultRelInfo, estate, slot,
553 CMD_INSERT);
554
555 /* Check the constraints of the tuple */
556 if (rel->rd_att->constr)
557 ExecConstraints(resultRelInfo, slot, estate);
558 if (rel->rd_rel->relispartition)
559 ExecPartitionCheck(resultRelInfo, slot, estate, true);
560
561 /* OK, store the tuple and create index entries for it */
562 simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
563
564 conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
565
566 if (resultRelInfo->ri_NumIndices > 0)
567 recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
568 slot, estate, false,
569 conflictindexes ? true : false,
570 &conflict,
571 conflictindexes, false);
572
573 /*
574 * Checks the conflict indexes to fetch the conflicting local tuple
575 * and reports the conflict. We perform this check here, instead of
576 * performing an additional index scan before the actual insertion and
577 * reporting the conflict if any conflicting tuples are found. This is
578 * to avoid the overhead of executing the extra scan for each INSERT
579 * operation, even when no conflict arises, which could introduce
580 * significant overhead to replication, particularly in cases where
581 * conflicts are rare.
582 *
583 * XXX OTOH, this could lead to clean-up effort for dead tuples added
584 * in heap and index in case of conflicts. But as conflicts shouldn't
585 * be a frequent thing so we preferred to save the performance
586 * overhead of extra scan before each insertion.
587 */
588 if (conflict)
589 CheckAndReportConflict(resultRelInfo, estate, CT_INSERT_EXISTS,
590 recheckIndexes, NULL, slot);
591
592 /* AFTER ROW INSERT Triggers */
593 ExecARInsertTriggers(estate, resultRelInfo, slot,
594 recheckIndexes, NULL);
595
596 /*
597 * XXX we should in theory pass a TransitionCaptureState object to the
598 * above to capture transition tuples, but after statement triggers
599 * don't actually get fired by replication yet anyway
600 */
601
602 list_free(recheckIndexes);
603 }
604}
@ CT_INSERT_EXISTS
Definition: conflict.h:27
List * ExecInsertIndexTuples(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool update, bool noDupErr, bool *specConflict, List *arbiterIndexes, bool onlySummarizing)
Definition: execIndexing.c:310
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1930
void ExecConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate)
Definition: execMain.c:2054
static void CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate, ConflictType type, List *recheckIndexes, TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
void list_free(List *list)
Definition: list.c:1546
void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot, CmdType cmdtype)
@ CMD_INSERT
Definition: nodes.h:267
#define NIL
Definition: pg_list.h:68
Definition: pg_list.h:54
TupleDesc rd_att
Definition: rel.h:112
int ri_NumIndices
Definition: execnodes.h:478
bool trig_insert_before_row
Definition: reltrigger.h:56
bool has_generated_stored
Definition: tupdesc.h:46
TupleConstr * constr
Definition: tupdesc.h:135
void simple_table_tuple_insert(Relation rel, TupleTableSlot *slot)
Definition: tableam.c:277
bool ExecBRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)
Definition: trigger.c:2463
void ExecARInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot, List *recheckIndexes, TransitionCaptureState *transition_capture)
Definition: trigger.c:2541

References Assert(), CheckAndReportConflict(), CheckCmdReplicaIdentity(), CMD_INSERT, TupleDescData::constr, CT_INSERT_EXISTS, ExecARInsertTriggers(), ExecBRInsertTriggers(), ExecComputeStoredGenerated(), ExecConstraints(), ExecInsertIndexTuples(), ExecPartitionCheck(), TupleConstr::has_generated_stored, list_free(), NIL, RelationData::rd_att, RelationData::rd_rel, ResultRelInfo::ri_NumIndices, ResultRelInfo::ri_onConflictArbiterIndexes, ResultRelInfo::ri_RelationDesc, ResultRelInfo::ri_TrigDesc, simple_table_tuple_insert(), and TriggerDesc::trig_insert_before_row.

Referenced by apply_handle_insert_internal().

◆ ExecSimpleRelationUpdate()

void ExecSimpleRelationUpdate ( ResultRelInfo resultRelInfo,
EState estate,
EPQState epqstate,
TupleTableSlot searchslot,
TupleTableSlot slot 
)

Definition at line 613 of file execReplication.c.

616{
617 bool skip_tuple = false;
618 Relation rel = resultRelInfo->ri_RelationDesc;
619 ItemPointer tid = &(searchslot->tts_tid);
620
621 /*
622 * We support only non-system tables, with
623 * check_publication_add_relation() accountable.
624 */
625 Assert(rel->rd_rel->relkind == RELKIND_RELATION);
627
629
630 /* BEFORE ROW UPDATE Triggers */
631 if (resultRelInfo->ri_TrigDesc &&
632 resultRelInfo->ri_TrigDesc->trig_update_before_row)
633 {
634 if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
635 tid, NULL, slot, NULL, NULL))
636 skip_tuple = true; /* "do nothing" */
637 }
638
639 if (!skip_tuple)
640 {
641 List *recheckIndexes = NIL;
642 TU_UpdateIndexes update_indexes;
643 List *conflictindexes;
644 bool conflict = false;
645
646 /* Compute stored generated columns */
647 if (rel->rd_att->constr &&
649 ExecComputeStoredGenerated(resultRelInfo, estate, slot,
650 CMD_UPDATE);
651
652 /* Check the constraints of the tuple */
653 if (rel->rd_att->constr)
654 ExecConstraints(resultRelInfo, slot, estate);
655 if (rel->rd_rel->relispartition)
656 ExecPartitionCheck(resultRelInfo, slot, estate, true);
657
658 simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
659 &update_indexes);
660
661 conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
662
663 if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
664 recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
665 slot, estate, true,
666 conflictindexes ? true : false,
667 &conflict, conflictindexes,
668 (update_indexes == TU_Summarizing));
669
670 /*
671 * Refer to the comments above the call to CheckAndReportConflict() in
672 * ExecSimpleRelationInsert to understand why this check is done at
673 * this point.
674 */
675 if (conflict)
676 CheckAndReportConflict(resultRelInfo, estate, CT_UPDATE_EXISTS,
677 recheckIndexes, searchslot, slot);
678
679 /* AFTER ROW UPDATE Triggers */
680 ExecARUpdateTriggers(estate, resultRelInfo,
681 NULL, NULL,
682 tid, NULL, slot,
683 recheckIndexes, NULL, false);
684
685 list_free(recheckIndexes);
686 }
687}
bool IsCatalogRelation(Relation relation)
Definition: catalog.c:103
@ CT_UPDATE_EXISTS
Definition: conflict.h:33
bool trig_update_before_row
Definition: reltrigger.h:61
void simple_table_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot, Snapshot snapshot, TU_UpdateIndexes *update_indexes)
Definition: tableam.c:336
TU_UpdateIndexes
Definition: tableam.h:117
@ TU_Summarizing
Definition: tableam.h:125
@ TU_None
Definition: tableam.h:119
bool ExecBRUpdateTriggers(EState *estate, EPQState *epqstate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot *newslot, TM_Result *tmresult, TM_FailureData *tmfd)
Definition: trigger.c:2941
void ExecARUpdateTriggers(EState *estate, ResultRelInfo *relinfo, ResultRelInfo *src_partinfo, ResultRelInfo *dst_partinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot *newslot, List *recheckIndexes, TransitionCaptureState *transition_capture, bool is_crosspart_update)
Definition: trigger.c:3106

References Assert(), CheckAndReportConflict(), CheckCmdReplicaIdentity(), CMD_UPDATE, TupleDescData::constr, CT_UPDATE_EXISTS, EState::es_snapshot, ExecARUpdateTriggers(), ExecBRUpdateTriggers(), ExecComputeStoredGenerated(), ExecConstraints(), ExecInsertIndexTuples(), ExecPartitionCheck(), TupleConstr::has_generated_stored, IsCatalogRelation(), list_free(), NIL, RelationData::rd_att, RelationData::rd_rel, ResultRelInfo::ri_NumIndices, ResultRelInfo::ri_onConflictArbiterIndexes, ResultRelInfo::ri_RelationDesc, ResultRelInfo::ri_TrigDesc, simple_table_tuple_update(), TriggerDesc::trig_update_before_row, TupleTableSlot::tts_tid, TU_None, and TU_Summarizing.

Referenced by apply_handle_tuple_routing(), and apply_handle_update_internal().

◆ FindConflictTuple()

static bool FindConflictTuple ( ResultRelInfo resultRelInfo,
EState estate,
Oid  conflictindex,
TupleTableSlot slot,
TupleTableSlot **  conflictslot 
)
static

Definition at line 444 of file execReplication.c.

447{
448 Relation rel = resultRelInfo->ri_RelationDesc;
449 ItemPointerData conflictTid;
450 TM_FailureData tmfd;
451 TM_Result res;
452
453 *conflictslot = NULL;
454
455retry:
456 if (ExecCheckIndexConstraints(resultRelInfo, slot, estate,
457 &conflictTid, &slot->tts_tid,
458 list_make1_oid(conflictindex)))
459 {
460 if (*conflictslot)
461 ExecDropSingleTupleTableSlot(*conflictslot);
462
463 *conflictslot = NULL;
464 return false;
465 }
466
467 *conflictslot = table_slot_create(rel, NULL);
468
470
471 res = table_tuple_lock(rel, &conflictTid, GetActiveSnapshot(),
472 *conflictslot,
473 GetCurrentCommandId(false),
476 0 /* don't follow updates */ ,
477 &tmfd);
478
480
481 if (should_refetch_tuple(res, &tmfd))
482 goto retry;
483
484 return true;
485}
bool ExecCheckIndexConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, ItemPointer conflictTid, ItemPointer tupleid, List *arbiterIndexes)
Definition: execIndexing.c:543
static bool should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1441
@ LockWaitBlock
Definition: lockoptions.h:39
@ LockTupleShare
Definition: lockoptions.h:54
#define list_make1_oid(x1)
Definition: pg_list.h:242
Snapshot GetLatestSnapshot(void)
Definition: snapmgr.c:342
void PushActiveSnapshot(Snapshot snapshot)
Definition: snapmgr.c:669
void PopActiveSnapshot(void)
Definition: snapmgr.c:762
Snapshot GetActiveSnapshot(void)
Definition: snapmgr.c:787
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:92
TM_Result
Definition: tableam.h:79
static TM_Result table_tuple_lock(Relation rel, ItemPointer tid, Snapshot snapshot, TupleTableSlot *slot, CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, uint8 flags, TM_FailureData *tmfd)
Definition: tableam.h:1586
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:828

References ExecCheckIndexConstraints(), ExecDropSingleTupleTableSlot(), GetActiveSnapshot(), GetCurrentCommandId(), GetLatestSnapshot(), list_make1_oid, LockTupleShare, LockWaitBlock, PopActiveSnapshot(), PushActiveSnapshot(), ResultRelInfo::ri_RelationDesc, should_refetch_tuple(), table_slot_create(), table_tuple_lock(), and TupleTableSlot::tts_tid.

Referenced by CheckAndReportConflict().

◆ RelationFindReplTupleByIndex()

bool RelationFindReplTupleByIndex ( Relation  rel,
Oid  idxoid,
LockTupleMode  lockmode,
TupleTableSlot searchslot,
TupleTableSlot outslot 
)

Definition at line 179 of file execReplication.c.

183{
185 int skey_attoff;
186 IndexScanDesc scan;
187 SnapshotData snap;
188 TransactionId xwait;
189 Relation idxrel;
190 bool found;
191 TypeCacheEntry **eq = NULL;
192 bool isIdxSafeToSkipDuplicates;
193
194 /* Open the index. */
195 idxrel = index_open(idxoid, RowExclusiveLock);
196
197 isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
198
199 InitDirtySnapshot(snap);
200
201 /* Build scan key. */
202 skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
203
204 /* Start an index scan. */
205 scan = index_beginscan(rel, idxrel, &snap, NULL, skey_attoff, 0);
206
207retry:
208 found = false;
209
210 index_rescan(scan, skey, skey_attoff, NULL, 0);
211
212 /* Try to find the tuple */
213 while (index_getnext_slot(scan, ForwardScanDirection, outslot))
214 {
215 /*
216 * Avoid expensive equality check if the index is primary key or
217 * replica identity index.
218 */
219 if (!isIdxSafeToSkipDuplicates)
220 {
221 if (eq == NULL)
222 eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
223
224 if (!tuples_equal(outslot, searchslot, eq))
225 continue;
226 }
227
228 ExecMaterializeSlot(outslot);
229
230 xwait = TransactionIdIsValid(snap.xmin) ?
231 snap.xmin : snap.xmax;
232
233 /*
234 * If the tuple is locked, wait for locking transaction to finish and
235 * retry.
236 */
237 if (TransactionIdIsValid(xwait))
238 {
239 XactLockTableWait(xwait, NULL, NULL, XLTW_None);
240 goto retry;
241 }
242
243 /* Found our tuple and it's not locked */
244 found = true;
245 break;
246 }
247
248 /* Found tuple, try to lock it in the lockmode. */
249 if (found)
250 {
251 TM_FailureData tmfd;
252 TM_Result res;
253
255
256 res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
257 outslot,
258 GetCurrentCommandId(false),
259 lockmode,
261 0 /* don't follow updates */ ,
262 &tmfd);
263
265
266 if (should_refetch_tuple(res, &tmfd))
267 goto retry;
268 }
269
270 index_endscan(scan);
271
272 /* Don't release lock until commit. */
273 index_close(idxrel, NoLock);
274
275 return found;
276}
static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, TypeCacheEntry **eq)
static int build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *searchslot)
bool index_getnext_slot(IndexScanDesc scan, ScanDirection direction, TupleTableSlot *slot)
Definition: indexam.c:719
IndexScanDesc index_beginscan(Relation heapRelation, Relation indexRelation, Snapshot snapshot, IndexScanInstrumentation *instrument, int nkeys, int norderbys)
Definition: indexam.c:256
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:177
void index_endscan(IndexScanDesc scan)
Definition: indexam.c:382
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:133
void index_rescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, int norderbys)
Definition: indexam.c:356
void XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid, XLTW_Oper oper)
Definition: lmgr.c:656
@ XLTW_None
Definition: lmgr.h:26
#define NoLock
Definition: lockdefs.h:34
#define RowExclusiveLock
Definition: lockdefs.h:38
void * palloc0(Size size)
Definition: mcxt.c:1347
#define INDEX_MAX_KEYS
@ ForwardScanDirection
Definition: sdir.h:28
#define InitDirtySnapshot(snapshotdata)
Definition: snapmgr.h:42
Oid GetRelationIdentityOrPK(Relation rel)
Definition: relation.c:891
TransactionId xmin
Definition: snapshot.h:153
TransactionId xmax
Definition: snapshot.h:154
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:123
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static void ExecMaterializeSlot(TupleTableSlot *slot)
Definition: tuptable.h:472

References build_replindex_scan_key(), ExecMaterializeSlot(), ForwardScanDirection, GetActiveSnapshot(), GetCurrentCommandId(), GetLatestSnapshot(), GetRelationIdentityOrPK(), index_beginscan(), index_close(), index_endscan(), index_getnext_slot(), INDEX_MAX_KEYS, index_open(), index_rescan(), InitDirtySnapshot, LockWaitBlock, TupleDescData::natts, NoLock, palloc0(), PopActiveSnapshot(), PushActiveSnapshot(), RowExclusiveLock, should_refetch_tuple(), table_tuple_lock(), TransactionIdIsValid, TupleTableSlot::tts_tid, TupleTableSlot::tts_tupleDescriptor, tuples_equal(), XactLockTableWait(), XLTW_None, SnapshotData::xmax, and SnapshotData::xmin.

Referenced by FindReplTupleInLocalRel().

◆ RelationFindReplTupleSeq()

bool RelationFindReplTupleSeq ( Relation  rel,
LockTupleMode  lockmode,
TupleTableSlot searchslot,
TupleTableSlot outslot 
)

Definition at line 355 of file execReplication.c.

357{
358 TupleTableSlot *scanslot;
359 TableScanDesc scan;
360 SnapshotData snap;
361 TypeCacheEntry **eq;
362 TransactionId xwait;
363 bool found;
365
367
368 eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
369
370 /* Start a heap scan. */
371 InitDirtySnapshot(snap);
372 scan = table_beginscan(rel, &snap, 0, NULL);
373 scanslot = table_slot_create(rel, NULL);
374
375retry:
376 found = false;
377
378 table_rescan(scan, NULL);
379
380 /* Try to find the tuple */
381 while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
382 {
383 if (!tuples_equal(scanslot, searchslot, eq))
384 continue;
385
386 found = true;
387 ExecCopySlot(outslot, scanslot);
388
389 xwait = TransactionIdIsValid(snap.xmin) ?
390 snap.xmin : snap.xmax;
391
392 /*
393 * If the tuple is locked, wait for locking transaction to finish and
394 * retry.
395 */
396 if (TransactionIdIsValid(xwait))
397 {
398 XactLockTableWait(xwait, NULL, NULL, XLTW_None);
399 goto retry;
400 }
401
402 /* Found our tuple and it's not locked */
403 break;
404 }
405
406 /* Found tuple, try to lock it in the lockmode. */
407 if (found)
408 {
409 TM_FailureData tmfd;
410 TM_Result res;
411
413
414 res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
415 outslot,
416 GetCurrentCommandId(false),
417 lockmode,
419 0 /* don't follow updates */ ,
420 &tmfd);
421
423
424 if (should_refetch_tuple(res, &tmfd))
425 goto retry;
426 }
427
428 table_endscan(scan);
430
431 return found;
432}
#define PG_USED_FOR_ASSERTS_ONLY
Definition: c.h:224
#define RelationGetDescr(relation)
Definition: rel.h:538
static TableScanDesc table_beginscan(Relation rel, Snapshot snapshot, int nkeys, struct ScanKeyData *key)
Definition: tableam.h:913
static void table_endscan(TableScanDesc scan)
Definition: tableam.h:1025
static void table_rescan(TableScanDesc scan, struct ScanKeyData *key)
Definition: tableam.h:1034
static bool table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
Definition: tableam.h:1061
bool equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2)
Definition: tupdesc.c:567
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:509

References Assert(), equalTupleDescs(), ExecCopySlot(), ExecDropSingleTupleTableSlot(), ForwardScanDirection, GetActiveSnapshot(), GetCurrentCommandId(), GetLatestSnapshot(), InitDirtySnapshot, LockWaitBlock, TupleDescData::natts, palloc0(), PG_USED_FOR_ASSERTS_ONLY, PopActiveSnapshot(), PushActiveSnapshot(), RelationGetDescr, should_refetch_tuple(), table_beginscan(), table_endscan(), table_rescan(), table_scan_getnextslot(), table_slot_create(), table_tuple_lock(), TransactionIdIsValid, TupleTableSlot::tts_tid, TupleTableSlot::tts_tupleDescriptor, tuples_equal(), XactLockTableWait(), XLTW_None, SnapshotData::xmax, and SnapshotData::xmin.

Referenced by FindReplTupleInLocalRel().

◆ should_refetch_tuple()

static bool should_refetch_tuple ( TM_Result  res,
TM_FailureData tmfd 
)
static

Definition at line 134 of file execReplication.c.

135{
136 bool refetch = false;
137
138 switch (res)
139 {
140 case TM_Ok:
141 break;
142 case TM_Updated:
143 /* XXX: Improve handling here */
145 ereport(LOG,
147 errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
148 else
149 ereport(LOG,
151 errmsg("concurrent update, retrying")));
152 refetch = true;
153 break;
154 case TM_Deleted:
155 /* XXX: Improve handling here */
156 ereport(LOG,
158 errmsg("concurrent delete, retrying")));
159 refetch = true;
160 break;
161 case TM_Invisible:
162 elog(ERROR, "attempted to lock invisible tuple");
163 break;
164 default:
165 elog(ERROR, "unexpected table_tuple_lock status: %u", res);
166 break;
167 }
168
169 return refetch;
170}
#define LOG
Definition: elog.h:31
static bool ItemPointerIndicatesMovedPartitions(const ItemPointerData *pointer)
Definition: itemptr.h:197
#define ERRCODE_T_R_SERIALIZATION_FAILURE
Definition: pgbench.c:77
ItemPointerData ctid
Definition: tableam.h:150
@ TM_Ok
Definition: tableam.h:84
@ TM_Deleted
Definition: tableam.h:99
@ TM_Updated
Definition: tableam.h:96
@ TM_Invisible
Definition: tableam.h:87

References TM_FailureData::ctid, elog, ereport, errcode(), ERRCODE_T_R_SERIALIZATION_FAILURE, errmsg(), ERROR, ItemPointerIndicatesMovedPartitions(), LOG, TM_Deleted, TM_Invisible, TM_Ok, and TM_Updated.

Referenced by FindConflictTuple(), RelationFindReplTupleByIndex(), and RelationFindReplTupleSeq().

◆ tuples_equal()

static bool tuples_equal ( TupleTableSlot slot1,
TupleTableSlot slot2,
TypeCacheEntry **  eq 
)
static

Definition at line 282 of file execReplication.c.

284{
285 int attrnum;
286
288 slot2->tts_tupleDescriptor->natts);
289
290 slot_getallattrs(slot1);
291 slot_getallattrs(slot2);
292
293 /* Check equality of the attributes. */
294 for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
295 {
297 TypeCacheEntry *typentry;
298
299 att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
300
301 /*
302 * Ignore dropped and generated columns as the publisher doesn't send
303 * those
304 */
305 if (att->attisdropped || att->attgenerated)
306 continue;
307
308 /*
309 * If one value is NULL and other is not, then they are certainly not
310 * equal
311 */
312 if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
313 return false;
314
315 /*
316 * If both are NULL, they can be considered equal.
317 */
318 if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
319 continue;
320
321 typentry = eq[attrnum];
322 if (typentry == NULL)
323 {
324 typentry = lookup_type_cache(att->atttypid,
326 if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
328 (errcode(ERRCODE_UNDEFINED_FUNCTION),
329 errmsg("could not identify an equality operator for type %s",
330 format_type_be(att->atttypid))));
331 eq[attrnum] = typentry;
332 }
333
335 att->attcollation,
336 slot1->tts_values[attrnum],
337 slot2->tts_values[attrnum])))
338 return false;
339 }
340
341 return true;
342}
Datum FunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
Definition: fmgr.c:1149
char * format_type_be(Oid type_oid)
Definition: format_type.c:343
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:200
static bool DatumGetBool(Datum X)
Definition: postgres.h:95
Oid fn_oid
Definition: fmgr.h:59
FmgrInfo eq_opr_finfo
Definition: typcache.h:75
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:154
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:368
TypeCacheEntry * lookup_type_cache(Oid type_id, int flags)
Definition: typcache.c:386
#define TYPECACHE_EQ_OPR_FINFO
Definition: typcache.h:142

References Assert(), DatumGetBool(), TypeCacheEntry::eq_opr_finfo, ereport, errcode(), errmsg(), ERROR, FmgrInfo::fn_oid, format_type_be(), FunctionCall2Coll(), lookup_type_cache(), TupleDescData::natts, OidIsValid, slot_getallattrs(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, TupleDescAttr(), and TYPECACHE_EQ_OPR_FINFO.

Referenced by RelationFindReplTupleByIndex(), and RelationFindReplTupleSeq().