PostgreSQL Source Code  git master
execReplication.c File Reference
#include "postgres.h"
#include "access/genam.h"
#include "access/relscan.h"
#include "access/tableam.h"
#include "access/transam.h"
#include "access/xact.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/nodeModifyTable.h"
#include "nodes/nodeFuncs.h"
#include "parser/parse_relation.h"
#include "parser/parsetree.h"
#include "replication/logicalrelation.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/datum.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/typcache.h"
Include dependency graph for execReplication.c:

Go to the source code of this file.

Functions

static bool tuples_equal (TupleTableSlot *slot1, TupleTableSlot *slot2, TypeCacheEntry **eq)
 
static int build_replindex_scan_key (ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *searchslot)
 
bool RelationFindReplTupleByIndex (Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
 
bool RelationFindReplTupleSeq (Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
 
void ExecSimpleRelationInsert (ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
 
void ExecSimpleRelationUpdate (ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
 
void ExecSimpleRelationDelete (ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
 
void CheckCmdReplicaIdentity (Relation rel, CmdType cmd)
 
void CheckSubscriptionRelkind (char relkind, const char *nspname, const char *relname)
 

Function Documentation

◆ build_replindex_scan_key()

static int build_replindex_scan_key ( ScanKey  skey,
Relation  rel,
Relation  idxrel,
TupleTableSlot searchslot 
)
static

Definition at line 58 of file execReplication.c.

60 {
61  int index_attoff;
62  int skey_attoff = 0;
63  Datum indclassDatum;
64  oidvector *opclass;
65  int2vector *indkey = &idxrel->rd_index->indkey;
66 
67  indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, idxrel->rd_indextuple,
68  Anum_pg_index_indclass);
69  opclass = (oidvector *) DatumGetPointer(indclassDatum);
70 
71  /* Build scankey for every non-expression attribute in the index. */
72  for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
73  index_attoff++)
74  {
75  Oid operator;
76  Oid optype;
77  Oid opfamily;
78  RegProcedure regop;
79  int table_attno = indkey->values[index_attoff];
80 
81  if (!AttributeNumberIsValid(table_attno))
82  {
83  /*
84  * XXX: Currently, we don't support expressions in the scan key,
85  * see code below.
86  */
87  continue;
88  }
89 
90  /*
91  * Load the operator info. We need this to get the equality operator
92  * function for the scan key.
93  */
94  optype = get_opclass_input_type(opclass->values[index_attoff]);
95  opfamily = get_opclass_family(opclass->values[index_attoff]);
96 
97  operator = get_opfamily_member(opfamily, optype,
98  optype,
100  if (!OidIsValid(operator))
101  elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
102  BTEqualStrategyNumber, optype, optype, opfamily);
103 
104  regop = get_opcode(operator);
105 
106  /* Initialize the scankey. */
107  ScanKeyInit(&skey[skey_attoff],
108  index_attoff + 1,
110  regop,
111  searchslot->tts_values[table_attno - 1]);
112 
113  skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
114 
115  /* Check for null value. */
116  if (searchslot->tts_isnull[table_attno - 1])
117  skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
118 
119  skey_attoff++;
120  }
121 
122  /* There must always be at least one attribute for the index scan. */
123  Assert(skey_attoff > 0);
124 
125  return skey_attoff;
126 }
#define AttributeNumberIsValid(attributeNumber)
Definition: attnum.h:34
regproc RegProcedure
Definition: c.h:634
#define OidIsValid(objectId)
Definition: c.h:759
#define ERROR
Definition: elog.h:39
Assert(fmt[strlen(fmt) - 1] !='\n')
Oid get_opclass_input_type(Oid opclass)
Definition: lsyscache.c:1216
Oid get_opclass_family(Oid opclass)
Definition: lsyscache.c:1194
RegProcedure get_opcode(Oid opno)
Definition: lsyscache.c:1267
Oid get_opfamily_member(Oid opfamily, Oid lefttype, Oid righttype, int16 strategy)
Definition: lsyscache.c:165
uintptr_t Datum
Definition: postgres.h:64
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:312
unsigned int Oid
Definition: postgres_ext.h:31
#define IndexRelationGetNumberOfKeyAttributes(relation)
Definition: rel.h:522
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
#define SK_SEARCHNULL
Definition: skey.h:121
#define SK_ISNULL
Definition: skey.h:115
#define BTEqualStrategyNumber
Definition: stratnum.h:31
struct HeapTupleData * rd_indextuple
Definition: rel.h:192
Form_pg_index rd_index
Definition: rel.h:190
Oid * rd_indcollation
Definition: rel.h:215
int sk_flags
Definition: skey.h:66
Oid sk_collation
Definition: skey.h:70
bool * tts_isnull
Definition: tuptable.h:128
Datum * tts_values
Definition: tuptable.h:126
Definition: c.h:699
int16 values[FLEXIBLE_ARRAY_MEMBER]
Definition: c.h:706
Definition: c.h:710
Oid values[FLEXIBLE_ARRAY_MEMBER]
Definition: c.h:717
Datum SysCacheGetAttrNotNull(int cacheId, HeapTuple tup, AttrNumber attributeNumber)
Definition: syscache.c:1110
@ INDEXRELID
Definition: syscache.h:66

References Assert(), AttributeNumberIsValid, BTEqualStrategyNumber, DatumGetPointer(), elog(), ERROR, get_opclass_family(), get_opclass_input_type(), get_opcode(), get_opfamily_member(), IndexRelationGetNumberOfKeyAttributes, INDEXRELID, OidIsValid, RelationData::rd_indcollation, RelationData::rd_index, RelationData::rd_indextuple, ScanKeyInit(), ScanKeyData::sk_collation, ScanKeyData::sk_flags, SK_ISNULL, SK_SEARCHNULL, SysCacheGetAttrNotNull(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, int2vector::values, and oidvector::values.

Referenced by RelationFindReplTupleByIndex().

◆ CheckCmdReplicaIdentity()

void CheckCmdReplicaIdentity ( Relation  rel,
CmdType  cmd 
)

Definition at line 620 of file execReplication.c.

621 {
622  PublicationDesc pubdesc;
623 
624  /*
625  * Skip checking the replica identity for partitioned tables, because the
626  * operations are actually performed on the leaf partitions.
627  */
628  if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
629  return;
630 
631  /* We only need to do checks for UPDATE and DELETE. */
632  if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
633  return;
634 
635  /*
636  * It is only safe to execute UPDATE/DELETE when all columns, referenced
637  * in the row filters from publications which the relation is in, are
638  * valid - i.e. when all referenced columns are part of REPLICA IDENTITY
639  * or the table does not publish UPDATEs or DELETEs.
640  *
641  * XXX We could optimize it by first checking whether any of the
642  * publications have a row filter for this relation. If not and relation
643  * has replica identity then we can avoid building the descriptor but as
644  * this happens only one time it doesn't seem worth the additional
645  * complexity.
646  */
647  RelationBuildPublicationDesc(rel, &pubdesc);
648  if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
649  ereport(ERROR,
650  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
651  errmsg("cannot update table \"%s\"",
653  errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
654  else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
655  ereport(ERROR,
656  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
657  errmsg("cannot update table \"%s\"",
659  errdetail("Column list used by the publication does not cover the replica identity.")));
660  else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
661  ereport(ERROR,
662  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
663  errmsg("cannot delete from table \"%s\"",
665  errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
666  else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
667  ereport(ERROR,
668  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
669  errmsg("cannot delete from table \"%s\"",
671  errdetail("Column list used by the publication does not cover the replica identity.")));
672 
673  /* If relation has replica identity we are always good. */
675  return;
676 
677  /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
678  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
679  return;
680 
681  /*
682  * This is UPDATE/DELETE and there is no replica identity.
683  *
684  * Check if the table publishes UPDATES or DELETES.
685  */
686  if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
687  ereport(ERROR,
688  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
689  errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
691  errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
692  else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
693  ereport(ERROR,
694  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
695  errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
697  errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
698 }
int errdetail(const char *fmt,...)
Definition: elog.c:1202
int errhint(const char *fmt,...)
Definition: elog.c:1316
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define ereport(elevel,...)
Definition: elog.h:149
@ CMD_DELETE
Definition: nodes.h:279
@ CMD_UPDATE
Definition: nodes.h:277
#define RelationGetRelationName(relation)
Definition: rel.h:537
void RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc)
Definition: relcache.c:5648
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4948
PublicationActions pubactions
bool cols_valid_for_delete
bool cols_valid_for_update
Form_pg_class rd_rel
Definition: rel.h:110

References CMD_DELETE, CMD_UPDATE, PublicationDesc::cols_valid_for_delete, PublicationDesc::cols_valid_for_update, ereport, errcode(), errdetail(), errhint(), errmsg(), ERROR, OidIsValid, PublicationDesc::pubactions, PublicationActions::pubdelete, PublicationActions::pubupdate, RelationData::rd_rel, RelationBuildPublicationDesc(), RelationGetRelationName, RelationGetReplicaIndex(), PublicationDesc::rf_valid_for_delete, and PublicationDesc::rf_valid_for_update.

Referenced by CheckValidResultRel(), ExecSimpleRelationDelete(), ExecSimpleRelationInsert(), and ExecSimpleRelationUpdate().

◆ CheckSubscriptionRelkind()

void CheckSubscriptionRelkind ( char  relkind,
const char *  nspname,
const char *  relname 
)

Definition at line 707 of file execReplication.c.

709 {
710  if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
711  ereport(ERROR,
712  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
713  errmsg("cannot use relation \"%s.%s\" as logical replication target",
714  nspname, relname),
716 }
int errdetail_relkind_not_supported(char relkind)
Definition: pg_class.c:24
NameData relname
Definition: pg_class.h:38

References ereport, errcode(), errdetail_relkind_not_supported(), errmsg(), ERROR, and relname.

Referenced by AlterSubscription_refresh(), apply_handle_tuple_routing(), CreateSubscription(), and logicalrep_rel_open().

◆ ExecSimpleRelationDelete()

void ExecSimpleRelationDelete ( ResultRelInfo resultRelInfo,
EState estate,
EPQState epqstate,
TupleTableSlot searchslot 
)

Definition at line 587 of file execReplication.c.

590 {
591  bool skip_tuple = false;
592  Relation rel = resultRelInfo->ri_RelationDesc;
593  ItemPointer tid = &searchslot->tts_tid;
594 
596 
597  /* BEFORE ROW DELETE Triggers */
598  if (resultRelInfo->ri_TrigDesc &&
599  resultRelInfo->ri_TrigDesc->trig_delete_before_row)
600  {
601  skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
602  tid, NULL, NULL, NULL, NULL);
603  }
604 
605  if (!skip_tuple)
606  {
607  /* OK, delete the tuple */
608  simple_table_tuple_delete(rel, tid, estate->es_snapshot);
609 
610  /* AFTER ROW DELETE Triggers */
611  ExecARDeleteTriggers(estate, resultRelInfo,
612  tid, NULL, NULL, false);
613  }
614 }
void CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
Snapshot es_snapshot
Definition: execnodes.h:616
Relation ri_RelationDesc
Definition: execnodes.h:450
TriggerDesc * ri_TrigDesc
Definition: execnodes.h:480
bool trig_delete_before_row
Definition: reltrigger.h:66
ItemPointerData tts_tid
Definition: tuptable.h:130
void simple_table_tuple_delete(Relation rel, ItemPointer tid, Snapshot snapshot)
Definition: tableam.c:300
void ExecARDeleteTriggers(EState *estate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TransitionCaptureState *transition_capture, bool is_crosspart_update)
Definition: trigger.c:2785
bool ExecBRDeleteTriggers(EState *estate, EPQState *epqstate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot **epqslot, TM_Result *tmresult, TM_FailureData *tmfd)
Definition: trigger.c:2694

References CheckCmdReplicaIdentity(), CMD_DELETE, EState::es_snapshot, ExecARDeleteTriggers(), ExecBRDeleteTriggers(), ResultRelInfo::ri_RelationDesc, ResultRelInfo::ri_TrigDesc, simple_table_tuple_delete(), TriggerDesc::trig_delete_before_row, and TupleTableSlot::tts_tid.

Referenced by apply_handle_delete_internal().

◆ ExecSimpleRelationInsert()

void ExecSimpleRelationInsert ( ResultRelInfo resultRelInfo,
EState estate,
TupleTableSlot slot 
)

Definition at line 458 of file execReplication.c.

460 {
461  bool skip_tuple = false;
462  Relation rel = resultRelInfo->ri_RelationDesc;
463 
464  /* For now we support only tables. */
465  Assert(rel->rd_rel->relkind == RELKIND_RELATION);
466 
468 
469  /* BEFORE ROW INSERT Triggers */
470  if (resultRelInfo->ri_TrigDesc &&
471  resultRelInfo->ri_TrigDesc->trig_insert_before_row)
472  {
473  if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
474  skip_tuple = true; /* "do nothing" */
475  }
476 
477  if (!skip_tuple)
478  {
479  List *recheckIndexes = NIL;
480 
481  /* Compute stored generated columns */
482  if (rel->rd_att->constr &&
484  ExecComputeStoredGenerated(resultRelInfo, estate, slot,
485  CMD_INSERT);
486 
487  /* Check the constraints of the tuple */
488  if (rel->rd_att->constr)
489  ExecConstraints(resultRelInfo, slot, estate);
490  if (rel->rd_rel->relispartition)
491  ExecPartitionCheck(resultRelInfo, slot, estate, true);
492 
493  /* OK, store the tuple and create index entries for it */
494  simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
495 
496  if (resultRelInfo->ri_NumIndices > 0)
497  recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
498  slot, estate, false, false,
499  NULL, NIL, false);
500 
501  /* AFTER ROW INSERT Triggers */
502  ExecARInsertTriggers(estate, resultRelInfo, slot,
503  recheckIndexes, NULL);
504 
505  /*
506  * XXX we should in theory pass a TransitionCaptureState object to the
507  * above to capture transition tuples, but after statement triggers
508  * don't actually get fired by replication yet anyway
509  */
510 
511  list_free(recheckIndexes);
512  }
513 }
List * ExecInsertIndexTuples(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool update, bool noDupErr, bool *specConflict, List *arbiterIndexes, bool onlySummarizing)
Definition: execIndexing.c:293
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1779
void ExecConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate)
Definition: execMain.c:1903
void list_free(List *list)
Definition: list.c:1545
void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot, CmdType cmdtype)
@ CMD_INSERT
Definition: nodes.h:278
#define NIL
Definition: pg_list.h:68
Definition: pg_list.h:54
TupleDesc rd_att
Definition: rel.h:111
int ri_NumIndices
Definition: execnodes.h:453
bool trig_insert_before_row
Definition: reltrigger.h:56
bool has_generated_stored
Definition: tupdesc.h:45
TupleConstr * constr
Definition: tupdesc.h:85
void simple_table_tuple_insert(Relation rel, TupleTableSlot *slot)
Definition: tableam.c:286
bool ExecBRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)
Definition: trigger.c:2469
void ExecARInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot, List *recheckIndexes, TransitionCaptureState *transition_capture)
Definition: trigger.c:2545

References Assert(), CheckCmdReplicaIdentity(), CMD_INSERT, TupleDescData::constr, ExecARInsertTriggers(), ExecBRInsertTriggers(), ExecComputeStoredGenerated(), ExecConstraints(), ExecInsertIndexTuples(), ExecPartitionCheck(), TupleConstr::has_generated_stored, list_free(), NIL, RelationData::rd_att, RelationData::rd_rel, ResultRelInfo::ri_NumIndices, ResultRelInfo::ri_RelationDesc, ResultRelInfo::ri_TrigDesc, simple_table_tuple_insert(), and TriggerDesc::trig_insert_before_row.

Referenced by apply_handle_insert_internal().

◆ ExecSimpleRelationUpdate()

void ExecSimpleRelationUpdate ( ResultRelInfo resultRelInfo,
EState estate,
EPQState epqstate,
TupleTableSlot searchslot,
TupleTableSlot slot 
)

Definition at line 522 of file execReplication.c.

525 {
526  bool skip_tuple = false;
527  Relation rel = resultRelInfo->ri_RelationDesc;
528  ItemPointer tid = &(searchslot->tts_tid);
529 
530  /* For now we support only tables. */
531  Assert(rel->rd_rel->relkind == RELKIND_RELATION);
532 
534 
535  /* BEFORE ROW UPDATE Triggers */
536  if (resultRelInfo->ri_TrigDesc &&
537  resultRelInfo->ri_TrigDesc->trig_update_before_row)
538  {
539  if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
540  tid, NULL, slot, NULL, NULL))
541  skip_tuple = true; /* "do nothing" */
542  }
543 
544  if (!skip_tuple)
545  {
546  List *recheckIndexes = NIL;
547  TU_UpdateIndexes update_indexes;
548 
549  /* Compute stored generated columns */
550  if (rel->rd_att->constr &&
552  ExecComputeStoredGenerated(resultRelInfo, estate, slot,
553  CMD_UPDATE);
554 
555  /* Check the constraints of the tuple */
556  if (rel->rd_att->constr)
557  ExecConstraints(resultRelInfo, slot, estate);
558  if (rel->rd_rel->relispartition)
559  ExecPartitionCheck(resultRelInfo, slot, estate, true);
560 
561  simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
562  &update_indexes);
563 
564  if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
565  recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
566  slot, estate, true, false,
567  NULL, NIL,
568  (update_indexes == TU_Summarizing));
569 
570  /* AFTER ROW UPDATE Triggers */
571  ExecARUpdateTriggers(estate, resultRelInfo,
572  NULL, NULL,
573  tid, NULL, slot,
574  recheckIndexes, NULL, false);
575 
576  list_free(recheckIndexes);
577  }
578 }
bool trig_update_before_row
Definition: reltrigger.h:61
void simple_table_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot, Snapshot snapshot, TU_UpdateIndexes *update_indexes)
Definition: tableam.c:346
TU_UpdateIndexes
Definition: tableam.h:110
@ TU_Summarizing
Definition: tableam.h:118
@ TU_None
Definition: tableam.h:112
bool ExecBRUpdateTriggers(EState *estate, EPQState *epqstate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot *newslot, TM_Result *tmresult, TM_FailureData *tmfd)
Definition: trigger.c:2945
void ExecARUpdateTriggers(EState *estate, ResultRelInfo *relinfo, ResultRelInfo *src_partinfo, ResultRelInfo *dst_partinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot *newslot, List *recheckIndexes, TransitionCaptureState *transition_capture, bool is_crosspart_update)
Definition: trigger.c:3090

References Assert(), CheckCmdReplicaIdentity(), CMD_UPDATE, TupleDescData::constr, EState::es_snapshot, ExecARUpdateTriggers(), ExecBRUpdateTriggers(), ExecComputeStoredGenerated(), ExecConstraints(), ExecInsertIndexTuples(), ExecPartitionCheck(), TupleConstr::has_generated_stored, list_free(), NIL, RelationData::rd_att, RelationData::rd_rel, ResultRelInfo::ri_NumIndices, ResultRelInfo::ri_RelationDesc, ResultRelInfo::ri_TrigDesc, simple_table_tuple_update(), TriggerDesc::trig_update_before_row, TupleTableSlot::tts_tid, TU_None, and TU_Summarizing.

Referenced by apply_handle_tuple_routing(), and apply_handle_update_internal().

◆ RelationFindReplTupleByIndex()

bool RelationFindReplTupleByIndex ( Relation  rel,
Oid  idxoid,
LockTupleMode  lockmode,
TupleTableSlot searchslot,
TupleTableSlot outslot 
)

Definition at line 135 of file execReplication.c.

139 {
141  int skey_attoff;
142  IndexScanDesc scan;
143  SnapshotData snap;
144  TransactionId xwait;
145  Relation idxrel;
146  bool found;
147  TypeCacheEntry **eq = NULL;
148  bool isIdxSafeToSkipDuplicates;
149 
150  /* Open the index. */
151  idxrel = index_open(idxoid, RowExclusiveLock);
152 
153  isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
154 
155  InitDirtySnapshot(snap);
156 
157  /* Build scan key. */
158  skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
159 
160  /* Start an index scan. */
161  scan = index_beginscan(rel, idxrel, &snap, skey_attoff, 0);
162 
163 retry:
164  found = false;
165 
166  index_rescan(scan, skey, skey_attoff, NULL, 0);
167 
168  /* Try to find the tuple */
169  while (index_getnext_slot(scan, ForwardScanDirection, outslot))
170  {
171  /*
172  * Avoid expensive equality check if the index is primary key or
173  * replica identity index.
174  */
175  if (!isIdxSafeToSkipDuplicates)
176  {
177  if (eq == NULL)
178  {
179 #ifdef USE_ASSERT_CHECKING
180  /* apply assertions only once for the input idxoid */
181  IndexInfo *indexInfo = BuildIndexInfo(idxrel);
182 
184 #endif
185 
186  eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
187  }
188 
189  if (!tuples_equal(outslot, searchslot, eq))
190  continue;
191  }
192 
193  ExecMaterializeSlot(outslot);
194 
195  xwait = TransactionIdIsValid(snap.xmin) ?
196  snap.xmin : snap.xmax;
197 
198  /*
199  * If the tuple is locked, wait for locking transaction to finish and
200  * retry.
201  */
202  if (TransactionIdIsValid(xwait))
203  {
204  XactLockTableWait(xwait, NULL, NULL, XLTW_None);
205  goto retry;
206  }
207 
208  /* Found our tuple and it's not locked */
209  found = true;
210  break;
211  }
212 
213  /* Found tuple, try to lock it in the lockmode. */
214  if (found)
215  {
216  TM_FailureData tmfd;
217  TM_Result res;
218 
220 
221  res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
222  outslot,
223  GetCurrentCommandId(false),
224  lockmode,
226  0 /* don't follow updates */ ,
227  &tmfd);
228 
230 
231  switch (res)
232  {
233  case TM_Ok:
234  break;
235  case TM_Updated:
236  /* XXX: Improve handling here */
238  ereport(LOG,
240  errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
241  else
242  ereport(LOG,
244  errmsg("concurrent update, retrying")));
245  goto retry;
246  case TM_Deleted:
247  /* XXX: Improve handling here */
248  ereport(LOG,
250  errmsg("concurrent delete, retrying")));
251  goto retry;
252  case TM_Invisible:
253  elog(ERROR, "attempted to lock invisible tuple");
254  break;
255  default:
256  elog(ERROR, "unexpected table_tuple_lock status: %u", res);
257  break;
258  }
259  }
260 
261  index_endscan(scan);
262 
263  /* Don't release lock until commit. */
264  index_close(idxrel, NoLock);
265 
266  return found;
267 }
uint32 TransactionId
Definition: c.h:636
#define LOG
Definition: elog.h:31
static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, TypeCacheEntry **eq)
static int build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *searchslot)
IndexInfo * BuildIndexInfo(Relation index)
Definition: index.c:2430
bool index_getnext_slot(IndexScanDesc scan, ScanDirection direction, TupleTableSlot *slot)
Definition: indexam.c:624
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:158
IndexScanDesc index_beginscan(Relation heapRelation, Relation indexRelation, Snapshot snapshot, int nkeys, int norderbys)
Definition: indexam.c:205
void index_endscan(IndexScanDesc scan)
Definition: indexam.c:327
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:132
void index_rescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, int norderbys)
Definition: indexam.c:301
static bool ItemPointerIndicatesMovedPartitions(const ItemPointerData *pointer)
Definition: itemptr.h:197
void XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid, XLTW_Oper oper)
Definition: lmgr.c:668
@ XLTW_None
Definition: lmgr.h:26
#define NoLock
Definition: lockdefs.h:34
#define RowExclusiveLock
Definition: lockdefs.h:38
@ LockWaitBlock
Definition: lockoptions.h:39
void * palloc0(Size size)
Definition: mcxt.c:1241
#define INDEX_MAX_KEYS
#define ERRCODE_T_R_SERIALIZATION_FAILURE
Definition: pgbench.c:76
@ ForwardScanDirection
Definition: sdir.h:28
Snapshot GetLatestSnapshot(void)
Definition: snapmgr.c:326
void PushActiveSnapshot(Snapshot snapshot)
Definition: snapmgr.c:683
void PopActiveSnapshot(void)
Definition: snapmgr.c:778
#define InitDirtySnapshot(snapshotdata)
Definition: snapmgr.h:74
Oid GetRelationIdentityOrPK(Relation rel)
Definition: relation.c:857
bool IsIndexUsableForReplicaIdentityFull(IndexInfo *indexInfo)
Definition: relation.c:842
TransactionId xmin
Definition: snapshot.h:157
TransactionId xmax
Definition: snapshot.h:158
ItemPointerData ctid
Definition: tableam.h:142
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
TM_Result
Definition: tableam.h:72
@ TM_Ok
Definition: tableam.h:77
@ TM_Deleted
Definition: tableam.h:92
@ TM_Updated
Definition: tableam.h:89
@ TM_Invisible
Definition: tableam.h:80
static TM_Result table_tuple_lock(Relation rel, ItemPointer tid, Snapshot snapshot, TupleTableSlot *slot, CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, uint8 flags, TM_FailureData *tmfd)
Definition: tableam.h:1585
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static void ExecMaterializeSlot(TupleTableSlot *slot)
Definition: tuptable.h:489
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:818

References Assert(), build_replindex_scan_key(), BuildIndexInfo(), TM_FailureData::ctid, elog(), ereport, errcode(), ERRCODE_T_R_SERIALIZATION_FAILURE, errmsg(), ERROR, ExecMaterializeSlot(), ForwardScanDirection, GetCurrentCommandId(), GetLatestSnapshot(), GetRelationIdentityOrPK(), index_beginscan(), index_close(), index_endscan(), index_getnext_slot(), INDEX_MAX_KEYS, index_open(), index_rescan(), InitDirtySnapshot, IsIndexUsableForReplicaIdentityFull(), ItemPointerIndicatesMovedPartitions(), LockWaitBlock, LOG, TupleDescData::natts, NoLock, palloc0(), PopActiveSnapshot(), PushActiveSnapshot(), res, RowExclusiveLock, table_tuple_lock(), TM_Deleted, TM_Invisible, TM_Ok, TM_Updated, TransactionIdIsValid, TupleTableSlot::tts_tid, TupleTableSlot::tts_tupleDescriptor, tuples_equal(), XactLockTableWait(), XLTW_None, SnapshotData::xmax, and SnapshotData::xmin.

Referenced by FindReplTupleInLocalRel().

◆ RelationFindReplTupleSeq()

bool RelationFindReplTupleSeq ( Relation  rel,
LockTupleMode  lockmode,
TupleTableSlot searchslot,
TupleTableSlot outslot 
)

Definition at line 346 of file execReplication.c.

348 {
349  TupleTableSlot *scanslot;
350  TableScanDesc scan;
351  SnapshotData snap;
352  TypeCacheEntry **eq;
353  TransactionId xwait;
354  bool found;
356 
357  Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
358 
359  eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
360 
361  /* Start a heap scan. */
362  InitDirtySnapshot(snap);
363  scan = table_beginscan(rel, &snap, 0, NULL);
364  scanslot = table_slot_create(rel, NULL);
365 
366 retry:
367  found = false;
368 
369  table_rescan(scan, NULL);
370 
371  /* Try to find the tuple */
372  while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
373  {
374  if (!tuples_equal(scanslot, searchslot, eq))
375  continue;
376 
377  found = true;
378  ExecCopySlot(outslot, scanslot);
379 
380  xwait = TransactionIdIsValid(snap.xmin) ?
381  snap.xmin : snap.xmax;
382 
383  /*
384  * If the tuple is locked, wait for locking transaction to finish and
385  * retry.
386  */
387  if (TransactionIdIsValid(xwait))
388  {
389  XactLockTableWait(xwait, NULL, NULL, XLTW_None);
390  goto retry;
391  }
392 
393  /* Found our tuple and it's not locked */
394  break;
395  }
396 
397  /* Found tuple, try to lock it in the lockmode. */
398  if (found)
399  {
400  TM_FailureData tmfd;
401  TM_Result res;
402 
404 
405  res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
406  outslot,
407  GetCurrentCommandId(false),
408  lockmode,
410  0 /* don't follow updates */ ,
411  &tmfd);
412 
414 
415  switch (res)
416  {
417  case TM_Ok:
418  break;
419  case TM_Updated:
420  /* XXX: Improve handling here */
422  ereport(LOG,
424  errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
425  else
426  ereport(LOG,
428  errmsg("concurrent update, retrying")));
429  goto retry;
430  case TM_Deleted:
431  /* XXX: Improve handling here */
432  ereport(LOG,
434  errmsg("concurrent delete, retrying")));
435  goto retry;
436  case TM_Invisible:
437  elog(ERROR, "attempted to lock invisible tuple");
438  break;
439  default:
440  elog(ERROR, "unexpected table_tuple_lock status: %u", res);
441  break;
442  }
443  }
444 
445  table_endscan(scan);
447 
448  return found;
449 }
#define PG_USED_FOR_ASSERTS_ONLY
Definition: c.h:166
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1255
#define RelationGetDescr(relation)
Definition: rel.h:529
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:91
static TableScanDesc table_beginscan(Relation rel, Snapshot snapshot, int nkeys, struct ScanKeyData *key)
Definition: tableam.h:903
static void table_endscan(TableScanDesc scan)
Definition: tableam.h:1011
static void table_rescan(TableScanDesc scan, struct ScanKeyData *key)
Definition: tableam.h:1020
static bool table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
Definition: tableam.h:1052
bool equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2)
Definition: tupdesc.c:402
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:521

References Assert(), TM_FailureData::ctid, elog(), equalTupleDescs(), ereport, errcode(), ERRCODE_T_R_SERIALIZATION_FAILURE, errmsg(), ERROR, ExecCopySlot(), ExecDropSingleTupleTableSlot(), ForwardScanDirection, GetCurrentCommandId(), GetLatestSnapshot(), InitDirtySnapshot, ItemPointerIndicatesMovedPartitions(), LockWaitBlock, LOG, TupleDescData::natts, palloc0(), PG_USED_FOR_ASSERTS_ONLY, PopActiveSnapshot(), PushActiveSnapshot(), RelationGetDescr, res, table_beginscan(), table_endscan(), table_rescan(), table_scan_getnextslot(), table_slot_create(), table_tuple_lock(), TM_Deleted, TM_Invisible, TM_Ok, TM_Updated, TransactionIdIsValid, TupleTableSlot::tts_tid, TupleTableSlot::tts_tupleDescriptor, tuples_equal(), XactLockTableWait(), XLTW_None, SnapshotData::xmax, and SnapshotData::xmin.

Referenced by FindReplTupleInLocalRel().

◆ tuples_equal()

static bool tuples_equal ( TupleTableSlot slot1,
TupleTableSlot slot2,
TypeCacheEntry **  eq 
)
static

Definition at line 273 of file execReplication.c.

275 {
276  int attrnum;
277 
278  Assert(slot1->tts_tupleDescriptor->natts ==
279  slot2->tts_tupleDescriptor->natts);
280 
281  slot_getallattrs(slot1);
282  slot_getallattrs(slot2);
283 
284  /* Check equality of the attributes. */
285  for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
286  {
287  Form_pg_attribute att;
288  TypeCacheEntry *typentry;
289 
290  att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
291 
292  /*
293  * Ignore dropped and generated columns as the publisher doesn't send
294  * those
295  */
296  if (att->attisdropped || att->attgenerated)
297  continue;
298 
299  /*
300  * If one value is NULL and other is not, then they are certainly not
301  * equal
302  */
303  if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
304  return false;
305 
306  /*
307  * If both are NULL, they can be considered equal.
308  */
309  if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
310  continue;
311 
312  typentry = eq[attrnum];
313  if (typentry == NULL)
314  {
315  typentry = lookup_type_cache(att->atttypid,
317  if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
318  ereport(ERROR,
319  (errcode(ERRCODE_UNDEFINED_FUNCTION),
320  errmsg("could not identify an equality operator for type %s",
321  format_type_be(att->atttypid))));
322  eq[attrnum] = typentry;
323  }
324 
326  att->attcollation,
327  slot1->tts_values[attrnum],
328  slot2->tts_values[attrnum])))
329  return false;
330  }
331 
332  return true;
333 }
Datum FunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
Definition: fmgr.c:1121
char * format_type_be(Oid type_oid)
Definition: format_type.c:339
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:209
static bool DatumGetBool(Datum X)
Definition: postgres.h:90
Oid fn_oid
Definition: fmgr.h:59
FmgrInfo eq_opr_finfo
Definition: typcache.h:75
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:400
TypeCacheEntry * lookup_type_cache(Oid type_id, int flags)
Definition: typcache.c:339
#define TYPECACHE_EQ_OPR_FINFO
Definition: typcache.h:141

References Assert(), DatumGetBool(), TypeCacheEntry::eq_opr_finfo, ereport, errcode(), errmsg(), ERROR, FmgrInfo::fn_oid, format_type_be(), FunctionCall2Coll(), lookup_type_cache(), TupleDescData::natts, OidIsValid, slot_getallattrs(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, TupleDescAttr, and TYPECACHE_EQ_OPR_FINFO.

Referenced by RelationFindReplTupleByIndex(), and RelationFindReplTupleSeq().