PostgreSQL Source Code  git master
execReplication.c File Reference
#include "postgres.h"
#include "access/relscan.h"
#include "access/transam.h"
#include "access/xact.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "nodes/nodeFuncs.h"
#include "parser/parse_relation.h"
#include "parser/parsetree.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/datum.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/typcache.h"
#include "utils/tqual.h"
Include dependency graph for execReplication.c:

Go to the source code of this file.

Functions

static bool build_replindex_scan_key (ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *searchslot)
 
bool RelationFindReplTupleByIndex (Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
 
static bool tuple_equals_slot (TupleDesc desc, HeapTuple tup, TupleTableSlot *slot)
 
bool RelationFindReplTupleSeq (Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
 
void ExecSimpleRelationInsert (EState *estate, TupleTableSlot *slot)
 
void ExecSimpleRelationUpdate (EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
 
void ExecSimpleRelationDelete (EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
 
void CheckCmdReplicaIdentity (Relation rel, CmdType cmd)
 
void CheckSubscriptionRelkind (char relkind, const char *nspname, const char *relname)
 

Function Documentation

◆ build_replindex_scan_key()

static bool build_replindex_scan_key ( ScanKey  skey,
Relation  rel,
Relation  idxrel,
TupleTableSlot searchslot 
)
static

Definition at line 48 of file execReplication.c.

References Assert, BTEqualStrategyNumber, DatumGetPointer, elog, ERROR, get_opclass_family(), get_opclass_input_type(), get_opcode(), get_opfamily_member(), IndexRelationGetNumberOfKeyAttributes, INDEXRELID, OidIsValid, RelationData::rd_index, RelationData::rd_indextuple, RelationGetRelid, RelationGetReplicaIndex(), ScanKeyInit(), ScanKeyData::sk_flags, SK_ISNULL, SysCacheGetAttr(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, int2vector::values, and oidvector::values.

Referenced by RelationFindReplTupleByIndex().

50 {
51  int attoff;
52  bool isnull;
53  Datum indclassDatum;
54  oidvector *opclass;
55  int2vector *indkey = &idxrel->rd_index->indkey;
56  bool hasnulls = false;
57 
59 
60  indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple,
61  Anum_pg_index_indclass, &isnull);
62  Assert(!isnull);
63  opclass = (oidvector *) DatumGetPointer(indclassDatum);
64 
65  /* Build scankey for every attribute in the index. */
66  for (attoff = 0; attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++)
67  {
68  Oid operator;
69  Oid opfamily;
70  RegProcedure regop;
71  int pkattno = attoff + 1;
72  int mainattno = indkey->values[attoff];
73  Oid optype = get_opclass_input_type(opclass->values[attoff]);
74 
75  /*
76  * Load the operator info. We need this to get the equality operator
77  * function for the scan key.
78  */
79  opfamily = get_opclass_family(opclass->values[attoff]);
80 
81  operator = get_opfamily_member(opfamily, optype,
82  optype,
84  if (!OidIsValid(operator))
85  elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
86  BTEqualStrategyNumber, optype, optype, opfamily);
87 
88  regop = get_opcode(operator);
89 
90  /* Initialize the scankey. */
91  ScanKeyInit(&skey[attoff],
92  pkattno,
94  regop,
95  searchslot->tts_values[mainattno - 1]);
96 
97  /* Check for null value. */
98  if (searchslot->tts_isnull[mainattno - 1])
99  {
100  hasnulls = true;
101  skey[attoff].sk_flags |= SK_ISNULL;
102  }
103  }
104 
105  return hasnulls;
106 }
Definition: c.h:555
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4591
regproc RegProcedure
Definition: c.h:472
Datum * tts_values
Definition: tuptable.h:130
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:605
struct HeapTupleData * rd_indextuple
Definition: rel.h:133
Form_pg_index rd_index
Definition: rel.h:131
#define ERROR
Definition: elog.h:43
bool * tts_isnull
Definition: tuptable.h:132
Oid values[FLEXIBLE_ARRAY_MEMBER]
Definition: c.h:563
Oid get_opfamily_member(Oid opfamily, Oid lefttype, Oid righttype, int16 strategy)
Definition: lsyscache.c:163
#define IndexRelationGetNumberOfKeyAttributes(relation)
Definition: rel.h:426
#define SK_ISNULL
Definition: skey.h:115
Definition: c.h:544
uintptr_t Datum
Definition: postgres.h:367
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1368
RegProcedure get_opcode(Oid opno)
Definition: lsyscache.c:1079
int sk_flags
Definition: skey.h:66
#define Assert(condition)
Definition: c.h:699
int16 values[FLEXIBLE_ARRAY_MEMBER]
Definition: c.h:552
Oid get_opclass_family(Oid opclass)
Definition: lsyscache.c:1032
#define DatumGetPointer(X)
Definition: postgres.h:534
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
#define elog
Definition: elog.h:219
#define RelationGetRelid(relation)
Definition: rel.h:407
#define BTEqualStrategyNumber
Definition: stratnum.h:31
Oid get_opclass_input_type(Oid opclass)
Definition: lsyscache.c:1054

◆ CheckCmdReplicaIdentity()

void CheckCmdReplicaIdentity ( Relation  rel,
CmdType  cmd 
)

Definition at line 556 of file execReplication.c.

References CMD_DELETE, CMD_UPDATE, ereport, errcode(), errhint(), errmsg(), ERROR, GetRelationPublicationActions(), OidIsValid, PublicationActions::pubdelete, PublicationActions::pubupdate, RelationData::rd_rel, RelationGetRelationName, and RelationGetReplicaIndex().

Referenced by CheckValidResultRel(), ExecSimpleRelationDelete(), ExecSimpleRelationInsert(), and ExecSimpleRelationUpdate().

557 {
558  PublicationActions *pubactions;
559 
560  /* We only need to do checks for UPDATE and DELETE. */
561  if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
562  return;
563 
564  /* If relation has replica identity we are always good. */
565  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
567  return;
568 
569  /*
570  * This is either UPDATE OR DELETE and there is no replica identity.
571  *
572  * Check if the table publishes UPDATES or DELETES.
573  */
574  pubactions = GetRelationPublicationActions(rel);
575  if (cmd == CMD_UPDATE && pubactions->pubupdate)
576  ereport(ERROR,
577  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
578  errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
580  errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
581  else if (cmd == CMD_DELETE && pubactions->pubdelete)
582  ereport(ERROR,
583  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
584  errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
586  errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
587 }
int errhint(const char *fmt,...)
Definition: elog.c:987
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4591
int errcode(int sqlerrcode)
Definition: elog.c:575
Form_pg_class rd_rel
Definition: rel.h:84
#define OidIsValid(objectId)
Definition: c.h:605
#define ERROR
Definition: elog.h:43
#define RelationGetRelationName(relation)
Definition: rel.h:441
#define ereport(elevel, rest)
Definition: elog.h:122
struct PublicationActions * GetRelationPublicationActions(Relation relation)
Definition: relcache.c:5191
int errmsg(const char *fmt,...)
Definition: elog.c:797

◆ CheckSubscriptionRelkind()

void CheckSubscriptionRelkind ( char  relkind,
const char *  nspname,
const char *  relname 
)

Definition at line 596 of file execReplication.c.

References ereport, errcode(), errmsg(), and ERROR.

Referenced by AlterSubscription_refresh(), CreateSubscription(), and logicalrep_rel_open().

598 {
599  /*
600  * We currently only support writing to regular tables.
601  */
602  if (relkind != RELKIND_RELATION)
603  ereport(ERROR,
604  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
605  errmsg("logical replication target relation \"%s.%s\" is not a table",
606  nspname, relname)));
607 }
int errcode(int sqlerrcode)
Definition: elog.c:575
char relkind
Definition: pg_class.h:51
#define ERROR
Definition: elog.h:43
#define ereport(elevel, rest)
Definition: elog.h:122
int errmsg(const char *fmt,...)
Definition: elog.c:797

◆ ExecSimpleRelationDelete()

void ExecSimpleRelationDelete ( EState estate,
EPQState epqstate,
TupleTableSlot searchslot 
)

Definition at line 516 of file execReplication.c.

References Assert, CheckCmdReplicaIdentity(), CMD_DELETE, EState::es_result_relation_info, ExecARDeleteTriggers(), ExecBRDeleteTriggers(), list_free(), NIL, RelationData::rd_rel, ResultRelInfo::ri_RelationDesc, ResultRelInfo::ri_TrigDesc, simple_heap_delete(), HeapTupleData::t_self, TriggerDesc::trig_delete_before_row, and TupleTableSlot::tts_tuple.

Referenced by apply_handle_delete().

518 {
519  bool skip_tuple = false;
520  ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
521  Relation rel = resultRelInfo->ri_RelationDesc;
522 
523  /* For now we support only tables. */
524  Assert(rel->rd_rel->relkind == RELKIND_RELATION);
525 
527 
528  /* BEFORE ROW DELETE Triggers */
529  if (resultRelInfo->ri_TrigDesc &&
530  resultRelInfo->ri_TrigDesc->trig_delete_before_row)
531  {
532  skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
533  &searchslot->tts_tuple->t_self,
534  NULL);
535  }
536 
537  if (!skip_tuple)
538  {
539  List *recheckIndexes = NIL;
540 
541  /* OK, delete the tuple */
542  simple_heap_delete(rel, &searchslot->tts_tuple->t_self);
543 
544  /* AFTER ROW DELETE Triggers */
545  ExecARDeleteTriggers(estate, resultRelInfo,
546  &searchslot->tts_tuple->t_self, NULL, NULL);
547 
548  list_free(recheckIndexes);
549  }
550 }
#define NIL
Definition: pg_list.h:69
Relation ri_RelationDesc
Definition: execnodes.h:397
Form_pg_class rd_rel
Definition: rel.h:84
void ExecARDeleteTriggers(EState *estate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TransitionCaptureState *transition_capture)
Definition: trigger.c:2799
ItemPointerData t_self
Definition: htup.h:65
void CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
TriggerDesc * ri_TrigDesc
Definition: execnodes.h:409
bool ExecBRDeleteTriggers(EState *estate, EPQState *epqstate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple)
Definition: trigger.c:2730
#define Assert(condition)
Definition: c.h:699
void simple_heap_delete(Relation relation, ItemPointer tid)
Definition: heapam.c:3455
void list_free(List *list)
Definition: list.c:1133
HeapTuple tts_tuple
Definition: tuptable.h:122
Definition: pg_list.h:45
bool trig_delete_before_row
Definition: reltrigger.h:65
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:492

◆ ExecSimpleRelationInsert()

void ExecSimpleRelationInsert ( EState estate,
TupleTableSlot slot 
)

Definition at line 388 of file execReplication.c.

References Assert, CheckCmdReplicaIdentity(), CMD_INSERT, tupleDesc::constr, EState::es_result_relation_info, ExecARInsertTriggers(), ExecBRInsertTriggers(), ExecConstraints(), ExecInsertIndexTuples(), ExecMaterializeSlot(), ExecPartitionCheck(), list_free(), NIL, RelationData::rd_att, RelationData::rd_rel, ResultRelInfo::ri_NumIndices, ResultRelInfo::ri_PartitionCheck, ResultRelInfo::ri_RelationDesc, ResultRelInfo::ri_TrigDesc, simple_heap_insert(), HeapTupleData::t_self, and TriggerDesc::trig_insert_before_row.

Referenced by apply_handle_insert().

389 {
390  bool skip_tuple = false;
391  HeapTuple tuple;
392  ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
393  Relation rel = resultRelInfo->ri_RelationDesc;
394 
395  /* For now we support only tables. */
396  Assert(rel->rd_rel->relkind == RELKIND_RELATION);
397 
399 
400  /* BEFORE ROW INSERT Triggers */
401  if (resultRelInfo->ri_TrigDesc &&
402  resultRelInfo->ri_TrigDesc->trig_insert_before_row)
403  {
404  slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
405 
406  if (slot == NULL) /* "do nothing" */
407  skip_tuple = true;
408  }
409 
410  if (!skip_tuple)
411  {
412  List *recheckIndexes = NIL;
413 
414  /* Check the constraints of the tuple */
415  if (rel->rd_att->constr)
416  ExecConstraints(resultRelInfo, slot, estate);
417  if (resultRelInfo->ri_PartitionCheck)
418  ExecPartitionCheck(resultRelInfo, slot, estate, true);
419 
420  /* Store the slot into tuple that we can inspect. */
421  tuple = ExecMaterializeSlot(slot);
422 
423  /* OK, store the tuple and create index entries for it */
424  simple_heap_insert(rel, tuple);
425 
426  if (resultRelInfo->ri_NumIndices > 0)
427  recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
428  estate, false, NULL,
429  NIL);
430 
431  /* AFTER ROW INSERT Triggers */
432  ExecARInsertTriggers(estate, resultRelInfo, tuple,
433  recheckIndexes, NULL);
434 
435  /*
436  * XXX we should in theory pass a TransitionCaptureState object to the
437  * above to capture transition tuples, but after statement triggers
438  * don't actually get fired by replication yet anyway
439  */
440 
441  list_free(recheckIndexes);
442  }
443 }
int ri_NumIndices
Definition: execnodes.h:400
#define NIL
Definition: pg_list.h:69
Relation ri_RelationDesc
Definition: execnodes.h:397
List * ExecInsertIndexTuples(TupleTableSlot *slot, ItemPointer tupleid, EState *estate, bool noDupErr, bool *specConflict, List *arbiterIndexes)
Definition: execIndexing.c:271
void ExecConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate)
Definition: execMain.c:1971
void ExecARInsertTriggers(EState *estate, ResultRelInfo *relinfo, HeapTuple trigtuple, List *recheckIndexes, TransitionCaptureState *transition_capture)
Definition: trigger.c:2581
Form_pg_class rd_rel
Definition: rel.h:84
ItemPointerData t_self
Definition: htup.h:65
void CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
TriggerDesc * ri_TrigDesc
Definition: execnodes.h:409
bool trig_insert_before_row
Definition: reltrigger.h:55
Oid simple_heap_insert(Relation relation, HeapTuple tup)
Definition: heapam.c:2986
List * ri_PartitionCheck
Definition: execnodes.h:454
TupleDesc rd_att
Definition: rel.h:85
#define Assert(condition)
Definition: c.h:699
TupleConstr * constr
Definition: tupdesc.h:87
HeapTuple ExecMaterializeSlot(TupleTableSlot *slot)
Definition: execTuples.c:781
void list_free(List *list)
Definition: list.c:1133
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1864
TupleTableSlot * ExecBRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)
Definition: trigger.c:2515
Definition: pg_list.h:45
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:492

◆ ExecSimpleRelationUpdate()

void ExecSimpleRelationUpdate ( EState estate,
EPQState epqstate,
TupleTableSlot searchslot,
TupleTableSlot slot 
)

Definition at line 452 of file execReplication.c.

References Assert, CheckCmdReplicaIdentity(), CMD_UPDATE, tupleDesc::constr, EState::es_result_relation_info, ExecARUpdateTriggers(), ExecBRUpdateTriggers(), ExecConstraints(), ExecInsertIndexTuples(), ExecMaterializeSlot(), ExecPartitionCheck(), HeapTupleIsHeapOnly, list_free(), NIL, RelationData::rd_att, RelationData::rd_rel, ResultRelInfo::ri_NumIndices, ResultRelInfo::ri_PartitionCheck, ResultRelInfo::ri_RelationDesc, ResultRelInfo::ri_TrigDesc, simple_heap_update(), HeapTupleData::t_self, TriggerDesc::trig_update_before_row, and TupleTableSlot::tts_tuple.

Referenced by apply_handle_update().

454 {
455  bool skip_tuple = false;
456  HeapTuple tuple;
457  ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
458  Relation rel = resultRelInfo->ri_RelationDesc;
459 
460  /* For now we support only tables. */
461  Assert(rel->rd_rel->relkind == RELKIND_RELATION);
462 
464 
465  /* BEFORE ROW UPDATE Triggers */
466  if (resultRelInfo->ri_TrigDesc &&
467  resultRelInfo->ri_TrigDesc->trig_update_before_row)
468  {
469  slot = ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
470  &searchslot->tts_tuple->t_self,
471  NULL, slot);
472 
473  if (slot == NULL) /* "do nothing" */
474  skip_tuple = true;
475  }
476 
477  if (!skip_tuple)
478  {
479  List *recheckIndexes = NIL;
480 
481  /* Check the constraints of the tuple */
482  if (rel->rd_att->constr)
483  ExecConstraints(resultRelInfo, slot, estate);
484  if (resultRelInfo->ri_PartitionCheck)
485  ExecPartitionCheck(resultRelInfo, slot, estate, true);
486 
487  /* Store the slot into tuple that we can write. */
488  tuple = ExecMaterializeSlot(slot);
489 
490  /* OK, update the tuple and index entries for it */
491  simple_heap_update(rel, &searchslot->tts_tuple->t_self,
492  slot->tts_tuple);
493 
494  if (resultRelInfo->ri_NumIndices > 0 &&
496  recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
497  estate, false, NULL,
498  NIL);
499 
500  /* AFTER ROW UPDATE Triggers */
501  ExecARUpdateTriggers(estate, resultRelInfo,
502  &searchslot->tts_tuple->t_self,
503  NULL, tuple, recheckIndexes, NULL);
504 
505  list_free(recheckIndexes);
506  }
507 }
int ri_NumIndices
Definition: execnodes.h:400
#define NIL
Definition: pg_list.h:69
Relation ri_RelationDesc
Definition: execnodes.h:397
List * ExecInsertIndexTuples(TupleTableSlot *slot, ItemPointer tupleid, EState *estate, bool noDupErr, bool *specConflict, List *arbiterIndexes)
Definition: execIndexing.c:271
TupleTableSlot * ExecBRUpdateTriggers(EState *estate, EPQState *epqstate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot *slot)
Definition: trigger.c:2951
void ExecConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate)
Definition: execMain.c:1971
Form_pg_class rd_rel
Definition: rel.h:84
ItemPointerData t_self
Definition: htup.h:65
bool trig_update_before_row
Definition: reltrigger.h:60
void CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
TriggerDesc * ri_TrigDesc
Definition: execnodes.h:409
void ExecARUpdateTriggers(EState *estate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, HeapTuple newtuple, List *recheckIndexes, TransitionCaptureState *transition_capture)
Definition: trigger.c:3068
List * ri_PartitionCheck
Definition: execnodes.h:454
TupleDesc rd_att
Definition: rel.h:85
#define HeapTupleIsHeapOnly(tuple)
Definition: htup_details.h:698
#define Assert(condition)
Definition: c.h:699
TupleConstr * constr
Definition: tupdesc.h:87
void simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
Definition: heapam.c:4599
HeapTuple ExecMaterializeSlot(TupleTableSlot *slot)
Definition: execTuples.c:781
void list_free(List *list)
Definition: list.c:1133
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1864
HeapTuple tts_tuple
Definition: tuptable.h:122
Definition: pg_list.h:45
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:492

◆ RelationFindReplTupleByIndex()

bool RelationFindReplTupleByIndex ( Relation  rel,
Oid  idxoid,
LockTupleMode  lockmode,
TupleTableSlot searchslot,
TupleTableSlot outslot 
)

Definition at line 115 of file execReplication.c.

References buf, build_replindex_scan_key(), HeapUpdateFailureData::ctid, elog, ereport, errcode(), errmsg(), ERROR, ExecMaterializeSlot(), ExecStoreTuple(), ForwardScanDirection, GetCurrentCommandId(), GetLatestSnapshot(), heap_lock_tuple(), HeapTupleInvisible, HeapTupleMayBeUpdated, HeapTupleUpdated, index_beginscan(), index_close(), index_endscan(), index_getnext(), INDEX_MAX_KEYS, index_open(), index_rescan(), IndexRelationGetNumberOfKeyAttributes, InitDirtySnapshot, InvalidBuffer, ItemPointerCopy, ItemPointerIndicatesMovedPartitions, LockWaitBlock, LOG, NoLock, PopActiveSnapshot(), PushActiveSnapshot(), ReleaseBuffer(), RowExclusiveLock, HeapTupleData::t_self, TransactionIdIsValid, TupleTableSlot::tts_tuple, XactLockTableWait(), XLTW_None, SnapshotData::xmax, and SnapshotData::xmin.

Referenced by apply_handle_delete(), and apply_handle_update().

119 {
120  HeapTuple scantuple;
122  IndexScanDesc scan;
123  SnapshotData snap;
124  TransactionId xwait;
125  Relation idxrel;
126  bool found;
127 
128  /* Open the index. */
129  idxrel = index_open(idxoid, RowExclusiveLock);
130 
131  /* Start an index scan. */
132  InitDirtySnapshot(snap);
133  scan = index_beginscan(rel, idxrel, &snap,
135  0);
136 
137  /* Build scan key. */
138  build_replindex_scan_key(skey, rel, idxrel, searchslot);
139 
140 retry:
141  found = false;
142 
143  index_rescan(scan, skey, IndexRelationGetNumberOfKeyAttributes(idxrel), NULL, 0);
144 
145  /* Try to find the tuple */
146  if ((scantuple = index_getnext(scan, ForwardScanDirection)) != NULL)
147  {
148  found = true;
149  ExecStoreTuple(scantuple, outslot, InvalidBuffer, false);
150  ExecMaterializeSlot(outslot);
151 
152  xwait = TransactionIdIsValid(snap.xmin) ?
153  snap.xmin : snap.xmax;
154 
155  /*
156  * If the tuple is locked, wait for locking transaction to finish and
157  * retry.
158  */
159  if (TransactionIdIsValid(xwait))
160  {
161  XactLockTableWait(xwait, NULL, NULL, XLTW_None);
162  goto retry;
163  }
164  }
165 
166  /* Found tuple, try to lock it in the lockmode. */
167  if (found)
168  {
169  Buffer buf;
171  HTSU_Result res;
172  HeapTupleData locktup;
173 
174  ItemPointerCopy(&outslot->tts_tuple->t_self, &locktup.t_self);
175 
177 
178  res = heap_lock_tuple(rel, &locktup, GetCurrentCommandId(false),
179  lockmode,
181  false /* don't follow updates */ ,
182  &buf, &hufd);
183  /* the tuple slot already has the buffer pinned */
184  ReleaseBuffer(buf);
185 
187 
188  switch (res)
189  {
191  break;
192  case HeapTupleUpdated:
193  /* XXX: Improve handling here */
195  ereport(LOG,
196  (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
197  errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
198  else
199  ereport(LOG,
200  (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
201  errmsg("concurrent update, retrying")));
202  goto retry;
203  case HeapTupleInvisible:
204  elog(ERROR, "attempted to lock invisible tuple");
205  break;
206  default:
207  elog(ERROR, "unexpected heap_lock_tuple status: %u", res);
208  break;
209  }
210  }
211 
212  index_endscan(scan);
213 
214  /* Don't release lock until commit. */
215  index_close(idxrel, NoLock);
216 
217  return found;
218 }
TupleTableSlot * ExecStoreTuple(HeapTuple tuple, TupleTableSlot *slot, Buffer buffer, bool shouldFree)
Definition: execTuples.c:356
uint32 TransactionId
Definition: c.h:474
HTSU_Result heap_lock_tuple(Relation relation, HeapTuple tuple, CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, bool follow_updates, Buffer *buffer, HeapUpdateFailureData *hufd)
Definition: heapam.c:4688
#define InvalidBuffer
Definition: buf.h:25
void index_rescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, int norderbys)
Definition: indexam.c:310
int errcode(int sqlerrcode)
Definition: elog.c:575
void ReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:3309
void PopActiveSnapshot(void)
Definition: snapmgr.c:812
#define LOG
Definition: elog.h:26
#define ERROR
Definition: elog.h:43
Definition: lmgr.h:26
#define InitDirtySnapshot(snapshotdata)
Definition: tqual.h:103
ItemPointerData t_self
Definition: htup.h:65
#define NoLock
Definition: lockdefs.h:34
static char * buf
Definition: pg_test_fsync.c:67
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:733
#define RowExclusiveLock
Definition: lockdefs.h:38
HTSU_Result
Definition: snapshot.h:121
TransactionId xmax
Definition: snapshot.h:69
TransactionId xmin
Definition: snapshot.h:68
#define IndexRelationGetNumberOfKeyAttributes(relation)
Definition: rel.h:426
void index_endscan(IndexScanDesc scan)
Definition: indexam.c:340
#define ereport(elevel, rest)
Definition: elog.h:122
#define ItemPointerIndicatesMovedPartitions(pointer)
Definition: itemptr.h:184
void XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid, XLTW_Oper oper)
Definition: lmgr.c:554
#define INDEX_MAX_KEYS
HeapTuple ExecMaterializeSlot(TupleTableSlot *slot)
Definition: execTuples.c:781
Snapshot GetLatestSnapshot(void)
Definition: snapmgr.c:379
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:176
int errmsg(const char *fmt,...)
Definition: elog.c:797
ItemPointerData ctid
Definition: heapam.h:70
HeapTuple tts_tuple
Definition: tuptable.h:122
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:679
#define elog
Definition: elog.h:219
#define TransactionIdIsValid(xid)
Definition: transam.h:41
int Buffer
Definition: buf.h:23
static bool build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *searchslot)
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:150
HeapTuple index_getnext(IndexScanDesc scan, ScanDirection direction)
Definition: indexam.c:659
IndexScanDesc index_beginscan(Relation heapRelation, Relation indexRelation, Snapshot snapshot, int nkeys, int norderbys)
Definition: indexam.c:221
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:161

◆ RelationFindReplTupleSeq()

bool RelationFindReplTupleSeq ( Relation  rel,
LockTupleMode  lockmode,
TupleTableSlot searchslot,
TupleTableSlot outslot 
)

Definition at line 285 of file execReplication.c.

References Assert, buf, HeapUpdateFailureData::ctid, elog, equalTupleDescs(), ereport, errcode(), errmsg(), ERROR, ExecMaterializeSlot(), ExecStoreTuple(), ForwardScanDirection, GetCurrentCommandId(), GetLatestSnapshot(), heap_beginscan(), heap_endscan(), heap_getnext(), heap_lock_tuple(), heap_rescan(), HeapTupleInvisible, HeapTupleMayBeUpdated, HeapTupleUpdated, InitDirtySnapshot, InvalidBuffer, ItemPointerCopy, ItemPointerIndicatesMovedPartitions, LockWaitBlock, LOG, PopActiveSnapshot(), PushActiveSnapshot(), RelationGetDescr, ReleaseBuffer(), HeapTupleData::t_self, TransactionIdIsValid, TupleTableSlot::tts_tuple, TupleTableSlot::tts_tupleDescriptor, tuple_equals_slot(), XactLockTableWait(), XLTW_None, SnapshotData::xmax, and SnapshotData::xmin.

Referenced by apply_handle_delete(), and apply_handle_update().

287 {
288  HeapTuple scantuple;
289  HeapScanDesc scan;
290  SnapshotData snap;
291  TransactionId xwait;
292  bool found;
293  TupleDesc desc = RelationGetDescr(rel);
294 
295  Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
296 
297  /* Start a heap scan. */
298  InitDirtySnapshot(snap);
299  scan = heap_beginscan(rel, &snap, 0, NULL);
300 
301 retry:
302  found = false;
303 
304  heap_rescan(scan, NULL);
305 
306  /* Try to find the tuple */
307  while ((scantuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
308  {
309  if (!tuple_equals_slot(desc, scantuple, searchslot))
310  continue;
311 
312  found = true;
313  ExecStoreTuple(scantuple, outslot, InvalidBuffer, false);
314  ExecMaterializeSlot(outslot);
315 
316  xwait = TransactionIdIsValid(snap.xmin) ?
317  snap.xmin : snap.xmax;
318 
319  /*
320  * If the tuple is locked, wait for locking transaction to finish and
321  * retry.
322  */
323  if (TransactionIdIsValid(xwait))
324  {
325  XactLockTableWait(xwait, NULL, NULL, XLTW_None);
326  goto retry;
327  }
328  }
329 
330  /* Found tuple, try to lock it in the lockmode. */
331  if (found)
332  {
333  Buffer buf;
335  HTSU_Result res;
336  HeapTupleData locktup;
337 
338  ItemPointerCopy(&outslot->tts_tuple->t_self, &locktup.t_self);
339 
341 
342  res = heap_lock_tuple(rel, &locktup, GetCurrentCommandId(false),
343  lockmode,
345  false /* don't follow updates */ ,
346  &buf, &hufd);
347  /* the tuple slot already has the buffer pinned */
348  ReleaseBuffer(buf);
349 
351 
352  switch (res)
353  {
355  break;
356  case HeapTupleUpdated:
357  /* XXX: Improve handling here */
359  ereport(LOG,
360  (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
361  errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
362  else
363  ereport(LOG,
364  (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
365  errmsg("concurrent update, retrying")));
366  goto retry;
367  case HeapTupleInvisible:
368  elog(ERROR, "attempted to lock invisible tuple");
369  break;
370  default:
371  elog(ERROR, "unexpected heap_lock_tuple status: %u", res);
372  break;
373  }
374  }
375 
376  heap_endscan(scan);
377 
378  return found;
379 }
TupleTableSlot * ExecStoreTuple(HeapTuple tuple, TupleTableSlot *slot, Buffer buffer, bool shouldFree)
Definition: execTuples.c:356
void heap_endscan(HeapScanDesc scan)
Definition: heapam.c:1572
uint32 TransactionId
Definition: c.h:474
#define RelationGetDescr(relation)
Definition: rel.h:433
HTSU_Result heap_lock_tuple(Relation relation, HeapTuple tuple, CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, bool follow_updates, Buffer *buffer, HeapUpdateFailureData *hufd)
Definition: heapam.c:4688
#define InvalidBuffer
Definition: buf.h:25
int errcode(int sqlerrcode)
Definition: elog.c:575
void ReleaseBuffer(Buffer buffer)
Definition: bufmgr.c:3309
void PopActiveSnapshot(void)
Definition: snapmgr.c:812
#define LOG
Definition: elog.h:26
static bool tuple_equals_slot(TupleDesc desc, HeapTuple tup, TupleTableSlot *slot)
#define ERROR
Definition: elog.h:43
Definition: lmgr.h:26
#define InitDirtySnapshot(snapshotdata)
Definition: tqual.h:103
ItemPointerData t_self
Definition: htup.h:65
static char * buf
Definition: pg_test_fsync.c:67
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:733
HTSU_Result
Definition: snapshot.h:121
void heap_rescan(HeapScanDesc scan, ScanKey key)
Definition: heapam.c:1528
TransactionId xmax
Definition: snapshot.h:69
TransactionId xmin
Definition: snapshot.h:68
#define ereport(elevel, rest)
Definition: elog.h:122
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction)
Definition: heapam.c:1835
#define ItemPointerIndicatesMovedPartitions(pointer)
Definition: itemptr.h:184
void XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid, XLTW_Oper oper)
Definition: lmgr.c:554
#define Assert(condition)
Definition: c.h:699
HeapTuple ExecMaterializeSlot(TupleTableSlot *slot)
Definition: execTuples.c:781
Snapshot GetLatestSnapshot(void)
Definition: snapmgr.c:379
int errmsg(const char *fmt,...)
Definition: elog.c:797
bool equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2)
Definition: tupdesc.c:409
ItemPointerData ctid
Definition: heapam.h:70
HeapTuple tts_tuple
Definition: tuptable.h:122
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:679
#define elog
Definition: elog.h:219
#define TransactionIdIsValid(xid)
Definition: transam.h:41
HeapScanDesc heap_beginscan(Relation relation, Snapshot snapshot, int nkeys, ScanKey key)
Definition: heapam.c:1404
int Buffer
Definition: buf.h:23
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:161

◆ tuple_equals_slot()

static bool tuple_equals_slot ( TupleDesc  desc,
HeapTuple  tup,
TupleTableSlot slot 
)
static

Definition at line 229 of file execReplication.c.

References DatumGetBool, TypeCacheEntry::eq_opr_finfo, ereport, errcode(), errmsg(), ERROR, FmgrInfo::fn_oid, format_type_be(), FunctionCall2, heap_deform_tuple(), lookup_type_cache(), MaxTupleAttributeNumber, tupleDesc::natts, OidIsValid, TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, TupleDescAttr, TYPECACHE_EQ_OPR_FINFO, and values.

Referenced by RelationFindReplTupleSeq().

230 {
232  bool isnull[MaxTupleAttributeNumber];
233  int attrnum;
234 
235  heap_deform_tuple(tup, desc, values, isnull);
236 
237  /* Check equality of the attributes. */
238  for (attrnum = 0; attrnum < desc->natts; attrnum++)
239  {
240  Form_pg_attribute att;
241  TypeCacheEntry *typentry;
242 
243  /*
244  * If one value is NULL and other is not, then they are certainly not
245  * equal
246  */
247  if (isnull[attrnum] != slot->tts_isnull[attrnum])
248  return false;
249 
250  /*
251  * If both are NULL, they can be considered equal.
252  */
253  if (isnull[attrnum])
254  continue;
255 
256  att = TupleDescAttr(desc, attrnum);
257 
258  typentry = lookup_type_cache(att->atttypid, TYPECACHE_EQ_OPR_FINFO);
259  if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
260  ereport(ERROR,
261  (errcode(ERRCODE_UNDEFINED_FUNCTION),
262  errmsg("could not identify an equality operator for type %s",
263  format_type_be(att->atttypid))));
264 
265  if (!DatumGetBool(FunctionCall2(&typentry->eq_opr_finfo,
266  values[attrnum],
267  slot->tts_values[attrnum])))
268  return false;
269  }
270 
271  return true;
272 }
#define MaxTupleAttributeNumber
Definition: htup_details.h:33
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:93
#define TYPECACHE_EQ_OPR_FINFO
Definition: typcache.h:132
#define FunctionCall2(flinfo, arg1, arg2)
Definition: fmgr.h:610
int errcode(int sqlerrcode)
Definition: elog.c:575
char * format_type_be(Oid type_oid)
Definition: format_type.c:328
Datum * tts_values
Definition: tuptable.h:130
#define OidIsValid(objectId)
Definition: c.h:605
int natts
Definition: tupdesc.h:82
#define ERROR
Definition: elog.h:43
bool * tts_isnull
Definition: tuptable.h:132
#define DatumGetBool(X)
Definition: postgres.h:378
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:197
#define ereport(elevel, rest)
Definition: elog.h:122
uintptr_t Datum
Definition: postgres.h:367
FmgrInfo eq_opr_finfo
Definition: typcache.h:71
TypeCacheEntry * lookup_type_cache(Oid type_id, int flags)
Definition: typcache.c:321
Oid fn_oid
Definition: fmgr.h:59
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:1315
static Datum values[MAXATTR]
Definition: bootstrap.c:164
int errmsg(const char *fmt,...)
Definition: elog.c:797