PostgreSQL Source Code  git master
execReplication.c File Reference
#include "postgres.h"
#include "access/genam.h"
#include "access/relscan.h"
#include "access/tableam.h"
#include "access/transam.h"
#include "access/xact.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/nodeModifyTable.h"
#include "nodes/nodeFuncs.h"
#include "parser/parse_relation.h"
#include "parser/parsetree.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/datum.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/typcache.h"
Include dependency graph for execReplication.c:

Go to the source code of this file.

Functions

static bool build_replindex_scan_key (ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *searchslot)
 
bool RelationFindReplTupleByIndex (Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
 
static bool tuples_equal (TupleTableSlot *slot1, TupleTableSlot *slot2, TypeCacheEntry **eq)
 
bool RelationFindReplTupleSeq (Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
 
void ExecSimpleRelationInsert (ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot)
 
void ExecSimpleRelationUpdate (ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
 
void ExecSimpleRelationDelete (ResultRelInfo *resultRelInfo, EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
 
void CheckCmdReplicaIdentity (Relation rel, CmdType cmd)
 
void CheckSubscriptionRelkind (char relkind, const char *nspname, const char *relname)
 

Function Documentation

◆ build_replindex_scan_key()

static bool build_replindex_scan_key ( ScanKey  skey,
Relation  rel,
Relation  idxrel,
TupleTableSlot searchslot 
)
static

Definition at line 50 of file execReplication.c.

52 {
53  int attoff;
54  bool isnull;
55  Datum indclassDatum;
56  oidvector *opclass;
57  int2vector *indkey = &idxrel->rd_index->indkey;
58  bool hasnulls = false;
59 
62 
63  indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple,
64  Anum_pg_index_indclass, &isnull);
65  Assert(!isnull);
66  opclass = (oidvector *) DatumGetPointer(indclassDatum);
67 
68  /* Build scankey for every attribute in the index. */
69  for (attoff = 0; attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++)
70  {
71  Oid operator;
72  Oid opfamily;
73  RegProcedure regop;
74  int pkattno = attoff + 1;
75  int mainattno = indkey->values[attoff];
76  Oid optype = get_opclass_input_type(opclass->values[attoff]);
77 
78  /*
79  * Load the operator info. We need this to get the equality operator
80  * function for the scan key.
81  */
82  opfamily = get_opclass_family(opclass->values[attoff]);
83 
84  operator = get_opfamily_member(opfamily, optype,
85  optype,
87  if (!OidIsValid(operator))
88  elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
89  BTEqualStrategyNumber, optype, optype, opfamily);
90 
91  regop = get_opcode(operator);
92 
93  /* Initialize the scankey. */
94  ScanKeyInit(&skey[attoff],
95  pkattno,
97  regop,
98  searchslot->tts_values[mainattno - 1]);
99 
100  skey[attoff].sk_collation = idxrel->rd_indcollation[attoff];
101 
102  /* Check for null value. */
103  if (searchslot->tts_isnull[mainattno - 1])
104  {
105  hasnulls = true;
106  skey[attoff].sk_flags |= SK_ISNULL;
107  }
108  }
109 
110  return hasnulls;
111 }
regproc RegProcedure
Definition: c.h:596
#define OidIsValid(objectId)
Definition: c.h:721
#define ERROR
Definition: elog.h:33
Assert(fmt[strlen(fmt) - 1] !='\n')
Oid get_opclass_input_type(Oid opclass)
Definition: lsyscache.c:1215
Oid get_opclass_family(Oid opclass)
Definition: lsyscache.c:1193
RegProcedure get_opcode(Oid opno)
Definition: lsyscache.c:1266
Oid get_opfamily_member(Oid opfamily, Oid lefttype, Oid righttype, int16 strategy)
Definition: lsyscache.c:164
uintptr_t Datum
Definition: postgres.h:411
#define DatumGetPointer(X)
Definition: postgres.h:593
unsigned int Oid
Definition: postgres_ext.h:31
#define RelationGetRelid(relation)
Definition: rel.h:488
#define IndexRelationGetNumberOfKeyAttributes(relation)
Definition: rel.h:507
Oid RelationGetPrimaryKeyIndex(Relation relation)
Definition: relcache.c:4863
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4884
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
#define SK_ISNULL
Definition: skey.h:115
#define BTEqualStrategyNumber
Definition: stratnum.h:31
struct HeapTupleData * rd_indextuple
Definition: rel.h:189
Form_pg_index rd_index
Definition: rel.h:187
Oid * rd_indcollation
Definition: rel.h:212
int sk_flags
Definition: skey.h:66
Oid sk_collation
Definition: skey.h:70
bool * tts_isnull
Definition: tuptable.h:128
Datum * tts_values
Definition: tuptable.h:126
Definition: c.h:661
int16 values[FLEXIBLE_ARRAY_MEMBER]
Definition: c.h:668
Definition: c.h:672
Oid values[FLEXIBLE_ARRAY_MEMBER]
Definition: c.h:679
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1434
@ INDEXRELID
Definition: syscache.h:66

References Assert(), BTEqualStrategyNumber, DatumGetPointer, elog(), ERROR, get_opclass_family(), get_opclass_input_type(), get_opcode(), get_opfamily_member(), IndexRelationGetNumberOfKeyAttributes, INDEXRELID, OidIsValid, RelationData::rd_indcollation, RelationData::rd_index, RelationData::rd_indextuple, RelationGetPrimaryKeyIndex(), RelationGetRelid, RelationGetReplicaIndex(), ScanKeyInit(), ScanKeyData::sk_collation, ScanKeyData::sk_flags, SK_ISNULL, SysCacheGetAttr(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, int2vector::values, and oidvector::values.

Referenced by RelationFindReplTupleByIndex().

◆ CheckCmdReplicaIdentity()

void CheckCmdReplicaIdentity ( Relation  rel,
CmdType  cmd 
)

Definition at line 568 of file execReplication.c.

569 {
570  PublicationDesc pubdesc;
571 
572  /* We only need to do checks for UPDATE and DELETE. */
573  if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
574  return;
575 
576  /*
577  * It is only safe to execute UPDATE/DELETE when all columns, referenced
578  * in the row filters from publications which the relation is in, are
579  * valid - i.e. when all referenced columns are part of REPLICA IDENTITY
580  * or the table does not publish UPDATEs or DELETEs.
581  *
582  * XXX We could optimize it by first checking whether any of the
583  * publications have a row filter for this relation. If not and relation
584  * has replica identity then we can avoid building the descriptor but as
585  * this happens only one time it doesn't seem worth the additional
586  * complexity.
587  */
588  RelationBuildPublicationDesc(rel, &pubdesc);
589  if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
590  ereport(ERROR,
591  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
592  errmsg("cannot update table \"%s\"",
594  errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
595  else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
596  ereport(ERROR,
597  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
598  errmsg("cannot update table \"%s\"",
600  errdetail("Column list used by the publication does not cover the replica identity.")));
601  else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
602  ereport(ERROR,
603  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
604  errmsg("cannot delete from table \"%s\"",
606  errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
607  else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
608  ereport(ERROR,
609  (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
610  errmsg("cannot delete from table \"%s\"",
612  errdetail("Column list used by the publication does not cover the replica identity.")));
613 
614  /* If relation has replica identity we are always good. */
616  return;
617 
618  /* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
619  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
620  return;
621 
622  /*
623  * This is UPDATE/DELETE and there is no replica identity.
624  *
625  * Check if the table publishes UPDATES or DELETES.
626  */
627  if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
628  ereport(ERROR,
629  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
630  errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
632  errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
633  else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
634  ereport(ERROR,
635  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
636  errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
638  errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
639 }
int errdetail(const char *fmt,...)
Definition: elog.c:1037
int errhint(const char *fmt,...)
Definition: elog.c:1151
int errcode(int sqlerrcode)
Definition: elog.c:693
int errmsg(const char *fmt,...)
Definition: elog.c:904
#define ereport(elevel,...)
Definition: elog.h:143
@ CMD_DELETE
Definition: nodes.h:725
@ CMD_UPDATE
Definition: nodes.h:723
#define RelationGetRelationName(relation)
Definition: rel.h:522
void RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc)
Definition: relcache.c:5555
PublicationActions pubactions
bool cols_valid_for_delete
bool cols_valid_for_update
Form_pg_class rd_rel
Definition: rel.h:109

References CMD_DELETE, CMD_UPDATE, PublicationDesc::cols_valid_for_delete, PublicationDesc::cols_valid_for_update, ereport, errcode(), errdetail(), errhint(), errmsg(), ERROR, OidIsValid, PublicationDesc::pubactions, PublicationActions::pubdelete, PublicationActions::pubupdate, RelationData::rd_rel, RelationBuildPublicationDesc(), RelationGetRelationName, RelationGetReplicaIndex(), PublicationDesc::rf_valid_for_delete, and PublicationDesc::rf_valid_for_update.

Referenced by CheckValidResultRel(), ExecSimpleRelationDelete(), ExecSimpleRelationInsert(), and ExecSimpleRelationUpdate().

◆ CheckSubscriptionRelkind()

void CheckSubscriptionRelkind ( char  relkind,
const char *  nspname,
const char *  relname 
)

Definition at line 648 of file execReplication.c.

650 {
651  if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
652  ereport(ERROR,
653  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
654  errmsg("cannot use relation \"%s.%s\" as logical replication target",
655  nspname, relname),
657 }
int errdetail_relkind_not_supported(char relkind)
Definition: pg_class.c:24
NameData relname
Definition: pg_class.h:38

References ereport, errcode(), errdetail_relkind_not_supported(), errmsg(), ERROR, and relname.

Referenced by AlterSubscription_refresh(), CreateSubscription(), and logicalrep_rel_open().

◆ ExecSimpleRelationDelete()

void ExecSimpleRelationDelete ( ResultRelInfo resultRelInfo,
EState estate,
EPQState epqstate,
TupleTableSlot searchslot 
)

Definition at line 535 of file execReplication.c.

538 {
539  bool skip_tuple = false;
540  Relation rel = resultRelInfo->ri_RelationDesc;
541  ItemPointer tid = &searchslot->tts_tid;
542 
544 
545  /* BEFORE ROW DELETE Triggers */
546  if (resultRelInfo->ri_TrigDesc &&
547  resultRelInfo->ri_TrigDesc->trig_delete_before_row)
548  {
549  skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
550  tid, NULL, NULL);
551  }
552 
553  if (!skip_tuple)
554  {
555  /* OK, delete the tuple */
556  simple_table_tuple_delete(rel, tid, estate->es_snapshot);
557 
558  /* AFTER ROW DELETE Triggers */
559  ExecARDeleteTriggers(estate, resultRelInfo,
560  tid, NULL, NULL, false);
561  }
562 }
void CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
Snapshot es_snapshot
Definition: execnodes.h:590
Relation ri_RelationDesc
Definition: execnodes.h:433
TriggerDesc * ri_TrigDesc
Definition: execnodes.h:460
bool trig_delete_before_row
Definition: reltrigger.h:66
ItemPointerData tts_tid
Definition: tuptable.h:130
void simple_table_tuple_delete(Relation rel, ItemPointer tid, Snapshot snapshot)
Definition: tableam.c:301
void ExecARDeleteTriggers(EState *estate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TransitionCaptureState *transition_capture, bool is_crosspart_update)
Definition: trigger.c:2786
bool ExecBRDeleteTriggers(EState *estate, EPQState *epqstate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot **epqslot)
Definition: trigger.c:2697

References CheckCmdReplicaIdentity(), CMD_DELETE, EState::es_snapshot, ExecARDeleteTriggers(), ExecBRDeleteTriggers(), ResultRelInfo::ri_RelationDesc, ResultRelInfo::ri_TrigDesc, simple_table_tuple_delete(), TriggerDesc::trig_delete_before_row, and TupleTableSlot::tts_tid.

Referenced by apply_handle_delete_internal().

◆ ExecSimpleRelationInsert()

void ExecSimpleRelationInsert ( ResultRelInfo resultRelInfo,
EState estate,
TupleTableSlot slot 
)

Definition at line 407 of file execReplication.c.

409 {
410  bool skip_tuple = false;
411  Relation rel = resultRelInfo->ri_RelationDesc;
412 
413  /* For now we support only tables. */
414  Assert(rel->rd_rel->relkind == RELKIND_RELATION);
415 
417 
418  /* BEFORE ROW INSERT Triggers */
419  if (resultRelInfo->ri_TrigDesc &&
420  resultRelInfo->ri_TrigDesc->trig_insert_before_row)
421  {
422  if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
423  skip_tuple = true; /* "do nothing" */
424  }
425 
426  if (!skip_tuple)
427  {
428  List *recheckIndexes = NIL;
429 
430  /* Compute stored generated columns */
431  if (rel->rd_att->constr &&
433  ExecComputeStoredGenerated(resultRelInfo, estate, slot,
434  CMD_INSERT);
435 
436  /* Check the constraints of the tuple */
437  if (rel->rd_att->constr)
438  ExecConstraints(resultRelInfo, slot, estate);
439  if (rel->rd_rel->relispartition)
440  ExecPartitionCheck(resultRelInfo, slot, estate, true);
441 
442  /* OK, store the tuple and create index entries for it */
443  simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
444 
445  if (resultRelInfo->ri_NumIndices > 0)
446  recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
447  slot, estate, false, false,
448  NULL, NIL);
449 
450  /* AFTER ROW INSERT Triggers */
451  ExecARInsertTriggers(estate, resultRelInfo, slot,
452  recheckIndexes, NULL);
453 
454  /*
455  * XXX we should in theory pass a TransitionCaptureState object to the
456  * above to capture transition tuples, but after statement triggers
457  * don't actually get fired by replication yet anyway
458  */
459 
460  list_free(recheckIndexes);
461  }
462 }
List * ExecInsertIndexTuples(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool update, bool noDupErr, bool *specConflict, List *arbiterIndexes)
Definition: execIndexing.c:284
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1782
void ExecConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate)
Definition: execMain.c:1906
void list_free(List *list)
Definition: list.c:1545
void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo, EState *estate, TupleTableSlot *slot, CmdType cmdtype)
@ CMD_INSERT
Definition: nodes.h:724
#define NIL
Definition: pg_list.h:66
Definition: pg_list.h:52
TupleDesc rd_att
Definition: rel.h:110
int ri_NumIndices
Definition: execnodes.h:436
bool trig_insert_before_row
Definition: reltrigger.h:56
bool has_generated_stored
Definition: tupdesc.h:45
TupleConstr * constr
Definition: tupdesc.h:85
void simple_table_tuple_insert(Relation rel, TupleTableSlot *slot)
Definition: tableam.c:287
bool ExecBRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)
Definition: trigger.c:2472
void ExecARInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot, List *recheckIndexes, TransitionCaptureState *transition_capture)
Definition: trigger.c:2548

References Assert(), CheckCmdReplicaIdentity(), CMD_INSERT, TupleDescData::constr, ExecARInsertTriggers(), ExecBRInsertTriggers(), ExecComputeStoredGenerated(), ExecConstraints(), ExecInsertIndexTuples(), ExecPartitionCheck(), TupleConstr::has_generated_stored, list_free(), NIL, RelationData::rd_att, RelationData::rd_rel, ResultRelInfo::ri_NumIndices, ResultRelInfo::ri_RelationDesc, ResultRelInfo::ri_TrigDesc, simple_table_tuple_insert(), and TriggerDesc::trig_insert_before_row.

Referenced by apply_handle_insert_internal().

◆ ExecSimpleRelationUpdate()

void ExecSimpleRelationUpdate ( ResultRelInfo resultRelInfo,
EState estate,
EPQState epqstate,
TupleTableSlot searchslot,
TupleTableSlot slot 
)

Definition at line 471 of file execReplication.c.

474 {
475  bool skip_tuple = false;
476  Relation rel = resultRelInfo->ri_RelationDesc;
477  ItemPointer tid = &(searchslot->tts_tid);
478 
479  /* For now we support only tables. */
480  Assert(rel->rd_rel->relkind == RELKIND_RELATION);
481 
483 
484  /* BEFORE ROW UPDATE Triggers */
485  if (resultRelInfo->ri_TrigDesc &&
486  resultRelInfo->ri_TrigDesc->trig_update_before_row)
487  {
488  if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
489  tid, NULL, slot, NULL))
490  skip_tuple = true; /* "do nothing" */
491  }
492 
493  if (!skip_tuple)
494  {
495  List *recheckIndexes = NIL;
496  bool update_indexes;
497 
498  /* Compute stored generated columns */
499  if (rel->rd_att->constr &&
501  ExecComputeStoredGenerated(resultRelInfo, estate, slot,
502  CMD_UPDATE);
503 
504  /* Check the constraints of the tuple */
505  if (rel->rd_att->constr)
506  ExecConstraints(resultRelInfo, slot, estate);
507  if (rel->rd_rel->relispartition)
508  ExecPartitionCheck(resultRelInfo, slot, estate, true);
509 
510  simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
511  &update_indexes);
512 
513  if (resultRelInfo->ri_NumIndices > 0 && update_indexes)
514  recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
515  slot, estate, true, false,
516  NULL, NIL);
517 
518  /* AFTER ROW UPDATE Triggers */
519  ExecARUpdateTriggers(estate, resultRelInfo,
520  NULL, NULL,
521  tid, NULL, slot,
522  recheckIndexes, NULL, false);
523 
524  list_free(recheckIndexes);
525  }
526 }
bool trig_update_before_row
Definition: reltrigger.h:61
void simple_table_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot, Snapshot snapshot, bool *update_indexes)
Definition: tableam.c:346
void ExecARUpdateTriggers(EState *estate, ResultRelInfo *relinfo, ResultRelInfo *src_partinfo, ResultRelInfo *dst_partinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot *newslot, List *recheckIndexes, TransitionCaptureState *transition_capture, bool is_crosspart_update)
Definition: trigger.c:3089
bool ExecBRUpdateTriggers(EState *estate, EPQState *epqstate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot *newslot, TM_FailureData *tmfd)
Definition: trigger.c:2945

References Assert(), CheckCmdReplicaIdentity(), CMD_UPDATE, TupleDescData::constr, EState::es_snapshot, ExecARUpdateTriggers(), ExecBRUpdateTriggers(), ExecComputeStoredGenerated(), ExecConstraints(), ExecInsertIndexTuples(), ExecPartitionCheck(), TupleConstr::has_generated_stored, list_free(), NIL, RelationData::rd_att, RelationData::rd_rel, ResultRelInfo::ri_NumIndices, ResultRelInfo::ri_RelationDesc, ResultRelInfo::ri_TrigDesc, simple_table_tuple_update(), TriggerDesc::trig_update_before_row, and TupleTableSlot::tts_tid.

Referenced by apply_handle_tuple_routing(), and apply_handle_update_internal().

◆ RelationFindReplTupleByIndex()

bool RelationFindReplTupleByIndex ( Relation  rel,
Oid  idxoid,
LockTupleMode  lockmode,
TupleTableSlot searchslot,
TupleTableSlot outslot 
)

Definition at line 120 of file execReplication.c.

124 {
126  IndexScanDesc scan;
127  SnapshotData snap;
128  TransactionId xwait;
129  Relation idxrel;
130  bool found;
131 
132  /* Open the index. */
133  idxrel = index_open(idxoid, RowExclusiveLock);
134 
135  /* Start an index scan. */
136  InitDirtySnapshot(snap);
137  scan = index_beginscan(rel, idxrel, &snap,
139  0);
140 
141  /* Build scan key. */
142  build_replindex_scan_key(skey, rel, idxrel, searchslot);
143 
144 retry:
145  found = false;
146 
147  index_rescan(scan, skey, IndexRelationGetNumberOfKeyAttributes(idxrel), NULL, 0);
148 
149  /* Try to find the tuple */
150  if (index_getnext_slot(scan, ForwardScanDirection, outslot))
151  {
152  found = true;
153  ExecMaterializeSlot(outslot);
154 
155  xwait = TransactionIdIsValid(snap.xmin) ?
156  snap.xmin : snap.xmax;
157 
158  /*
159  * If the tuple is locked, wait for locking transaction to finish and
160  * retry.
161  */
162  if (TransactionIdIsValid(xwait))
163  {
164  XactLockTableWait(xwait, NULL, NULL, XLTW_None);
165  goto retry;
166  }
167  }
168 
169  /* Found tuple, try to lock it in the lockmode. */
170  if (found)
171  {
172  TM_FailureData tmfd;
173  TM_Result res;
174 
176 
177  res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
178  outslot,
179  GetCurrentCommandId(false),
180  lockmode,
182  0 /* don't follow updates */ ,
183  &tmfd);
184 
186 
187  switch (res)
188  {
189  case TM_Ok:
190  break;
191  case TM_Updated:
192  /* XXX: Improve handling here */
194  ereport(LOG,
196  errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
197  else
198  ereport(LOG,
200  errmsg("concurrent update, retrying")));
201  goto retry;
202  case TM_Deleted:
203  /* XXX: Improve handling here */
204  ereport(LOG,
206  errmsg("concurrent delete, retrying")));
207  goto retry;
208  case TM_Invisible:
209  elog(ERROR, "attempted to lock invisible tuple");
210  break;
211  default:
212  elog(ERROR, "unexpected table_tuple_lock status: %u", res);
213  break;
214  }
215  }
216 
217  index_endscan(scan);
218 
219  /* Don't release lock until commit. */
220  index_close(idxrel, NoLock);
221 
222  return found;
223 }
uint32 TransactionId
Definition: c.h:598
#define LOG
Definition: elog.h:25
static bool build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *searchslot)
bool index_getnext_slot(IndexScanDesc scan, ScanDirection direction, TupleTableSlot *slot)
Definition: indexam.c:616
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:158
IndexScanDesc index_beginscan(Relation heapRelation, Relation indexRelation, Snapshot snapshot, int nkeys, int norderbys)
Definition: indexam.c:205
void index_endscan(IndexScanDesc scan)
Definition: indexam.c:323
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:132
void index_rescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, int norderbys)
Definition: indexam.c:297
#define ItemPointerIndicatesMovedPartitions(pointer)
Definition: itemptr.h:184
void XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid, XLTW_Oper oper)
Definition: lmgr.c:668
@ XLTW_None
Definition: lmgr.h:26
#define NoLock
Definition: lockdefs.h:34
#define RowExclusiveLock
Definition: lockdefs.h:38
@ LockWaitBlock
Definition: lockoptions.h:39
#define INDEX_MAX_KEYS
#define ERRCODE_T_R_SERIALIZATION_FAILURE
Definition: pgbench.c:79
@ ForwardScanDirection
Definition: sdir.h:26
Snapshot GetLatestSnapshot(void)
Definition: snapmgr.c:325
void PopActiveSnapshot(void)
Definition: snapmgr.c:776
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:682
#define InitDirtySnapshot(snapshotdata)
Definition: snapmgr.h:74
TransactionId xmin
Definition: snapshot.h:157
TransactionId xmax
Definition: snapshot.h:158
ItemPointerData ctid
Definition: tableam.h:126
TM_Result
Definition: tableam.h:72
@ TM_Ok
Definition: tableam.h:77
@ TM_Deleted
Definition: tableam.h:92
@ TM_Updated
Definition: tableam.h:89
@ TM_Invisible
Definition: tableam.h:80
static TM_Result table_tuple_lock(Relation rel, ItemPointer tid, Snapshot snapshot, TupleTableSlot *slot, CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, uint8 flags, TM_FailureData *tmfd)
Definition: tableam.h:1551
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static void ExecMaterializeSlot(TupleTableSlot *slot)
Definition: tuptable.h:443
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:814

References build_replindex_scan_key(), TM_FailureData::ctid, elog(), ereport, errcode(), ERRCODE_T_R_SERIALIZATION_FAILURE, errmsg(), ERROR, ExecMaterializeSlot(), ForwardScanDirection, GetCurrentCommandId(), GetLatestSnapshot(), index_beginscan(), index_close(), index_endscan(), index_getnext_slot(), INDEX_MAX_KEYS, index_open(), index_rescan(), IndexRelationGetNumberOfKeyAttributes, InitDirtySnapshot, ItemPointerIndicatesMovedPartitions, LockWaitBlock, LOG, NoLock, PopActiveSnapshot(), PushActiveSnapshot(), res, RowExclusiveLock, table_tuple_lock(), TM_Deleted, TM_Invisible, TM_Ok, TM_Updated, TransactionIdIsValid, TupleTableSlot::tts_tid, XactLockTableWait(), XLTW_None, SnapshotData::xmax, and SnapshotData::xmin.

Referenced by FindReplTupleInLocalRel().

◆ RelationFindReplTupleSeq()

bool RelationFindReplTupleSeq ( Relation  rel,
LockTupleMode  lockmode,
TupleTableSlot searchslot,
TupleTableSlot outslot 
)

Definition at line 295 of file execReplication.c.

297 {
298  TupleTableSlot *scanslot;
299  TableScanDesc scan;
300  SnapshotData snap;
301  TypeCacheEntry **eq;
302  TransactionId xwait;
303  bool found;
305 
306  Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
307 
308  eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
309 
310  /* Start a heap scan. */
311  InitDirtySnapshot(snap);
312  scan = table_beginscan(rel, &snap, 0, NULL);
313  scanslot = table_slot_create(rel, NULL);
314 
315 retry:
316  found = false;
317 
318  table_rescan(scan, NULL);
319 
320  /* Try to find the tuple */
321  while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
322  {
323  if (!tuples_equal(scanslot, searchslot, eq))
324  continue;
325 
326  found = true;
327  ExecCopySlot(outslot, scanslot);
328 
329  xwait = TransactionIdIsValid(snap.xmin) ?
330  snap.xmin : snap.xmax;
331 
332  /*
333  * If the tuple is locked, wait for locking transaction to finish and
334  * retry.
335  */
336  if (TransactionIdIsValid(xwait))
337  {
338  XactLockTableWait(xwait, NULL, NULL, XLTW_None);
339  goto retry;
340  }
341 
342  /* Found our tuple and it's not locked */
343  break;
344  }
345 
346  /* Found tuple, try to lock it in the lockmode. */
347  if (found)
348  {
349  TM_FailureData tmfd;
350  TM_Result res;
351 
353 
354  res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
355  outslot,
356  GetCurrentCommandId(false),
357  lockmode,
359  0 /* don't follow updates */ ,
360  &tmfd);
361 
363 
364  switch (res)
365  {
366  case TM_Ok:
367  break;
368  case TM_Updated:
369  /* XXX: Improve handling here */
371  ereport(LOG,
373  errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
374  else
375  ereport(LOG,
377  errmsg("concurrent update, retrying")));
378  goto retry;
379  case TM_Deleted:
380  /* XXX: Improve handling here */
381  ereport(LOG,
383  errmsg("concurrent delete, retrying")));
384  goto retry;
385  case TM_Invisible:
386  elog(ERROR, "attempted to lock invisible tuple");
387  break;
388  default:
389  elog(ERROR, "unexpected table_tuple_lock status: %u", res);
390  break;
391  }
392  }
393 
394  table_endscan(scan);
396 
397  return found;
398 }
#define PG_USED_FOR_ASSERTS_ONLY
Definition: c.h:166
static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, TypeCacheEntry **eq)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1254
void * palloc0(Size size)
Definition: mcxt.c:1099
#define RelationGetDescr(relation)
Definition: rel.h:514
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:91
static TableScanDesc table_beginscan(Relation rel, Snapshot snapshot, int nkeys, struct ScanKeyData *key)
Definition: tableam.h:885
static void table_endscan(TableScanDesc scan)
Definition: tableam.h:993
static void table_rescan(TableScanDesc scan, struct ScanKeyData *key)
Definition: tableam.h:1002
static bool table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
Definition: tableam.h:1034
bool equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2)
Definition: tupdesc.c:402
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:475

References Assert(), TM_FailureData::ctid, elog(), equalTupleDescs(), ereport, errcode(), ERRCODE_T_R_SERIALIZATION_FAILURE, errmsg(), ERROR, ExecCopySlot(), ExecDropSingleTupleTableSlot(), ForwardScanDirection, GetCurrentCommandId(), GetLatestSnapshot(), InitDirtySnapshot, ItemPointerIndicatesMovedPartitions, LockWaitBlock, LOG, TupleDescData::natts, palloc0(), PG_USED_FOR_ASSERTS_ONLY, PopActiveSnapshot(), PushActiveSnapshot(), RelationGetDescr, res, table_beginscan(), table_endscan(), table_rescan(), table_scan_getnextslot(), table_slot_create(), table_tuple_lock(), TM_Deleted, TM_Invisible, TM_Ok, TM_Updated, TransactionIdIsValid, TupleTableSlot::tts_tid, TupleTableSlot::tts_tupleDescriptor, tuples_equal(), XactLockTableWait(), XLTW_None, SnapshotData::xmax, and SnapshotData::xmin.

Referenced by FindReplTupleInLocalRel().

◆ tuples_equal()

static bool tuples_equal ( TupleTableSlot slot1,
TupleTableSlot slot2,
TypeCacheEntry **  eq 
)
static

Definition at line 229 of file execReplication.c.

231 {
232  int attrnum;
233 
234  Assert(slot1->tts_tupleDescriptor->natts ==
235  slot2->tts_tupleDescriptor->natts);
236 
237  slot_getallattrs(slot1);
238  slot_getallattrs(slot2);
239 
240  /* Check equality of the attributes. */
241  for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
242  {
243  Form_pg_attribute att;
244  TypeCacheEntry *typentry;
245 
246  /*
247  * If one value is NULL and other is not, then they are certainly not
248  * equal
249  */
250  if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
251  return false;
252 
253  /*
254  * If both are NULL, they can be considered equal.
255  */
256  if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
257  continue;
258 
259  att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
260 
261  typentry = eq[attrnum];
262  if (typentry == NULL)
263  {
264  typentry = lookup_type_cache(att->atttypid,
266  if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
267  ereport(ERROR,
268  (errcode(ERRCODE_UNDEFINED_FUNCTION),
269  errmsg("could not identify an equality operator for type %s",
270  format_type_be(att->atttypid))));
271  eq[attrnum] = typentry;
272  }
273 
275  att->attcollation,
276  slot1->tts_values[attrnum],
277  slot2->tts_values[attrnum])))
278  return false;
279  }
280 
281  return true;
282 }
Datum FunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
Definition: fmgr.c:1134
char * format_type_be(Oid type_oid)
Definition: format_type.c:343
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:207
#define DatumGetBool(X)
Definition: postgres.h:437
Oid fn_oid
Definition: fmgr.h:59
FmgrInfo eq_opr_finfo
Definition: typcache.h:75
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:354
TypeCacheEntry * lookup_type_cache(Oid type_id, int flags)
Definition: typcache.c:339
#define TYPECACHE_EQ_OPR_FINFO
Definition: typcache.h:141

References Assert(), DatumGetBool, TypeCacheEntry::eq_opr_finfo, ereport, errcode(), errmsg(), ERROR, FmgrInfo::fn_oid, format_type_be(), FunctionCall2Coll(), lookup_type_cache(), TupleDescData::natts, OidIsValid, slot_getallattrs(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, TupleDescAttr, and TYPECACHE_EQ_OPR_FINFO.

Referenced by RelationFindReplTupleSeq().