PostgreSQL Source Code  git master
execReplication.c File Reference
#include "postgres.h"
#include "access/genam.h"
#include "access/relscan.h"
#include "access/tableam.h"
#include "access/transam.h"
#include "access/xact.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/nodeModifyTable.h"
#include "nodes/nodeFuncs.h"
#include "parser/parse_relation.h"
#include "parser/parsetree.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/datum.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/typcache.h"
Include dependency graph for execReplication.c:

Go to the source code of this file.

Functions

static bool build_replindex_scan_key (ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *searchslot)
 
bool RelationFindReplTupleByIndex (Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
 
static bool tuples_equal (TupleTableSlot *slot1, TupleTableSlot *slot2, TypeCacheEntry **eq)
 
bool RelationFindReplTupleSeq (Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
 
void ExecSimpleRelationInsert (EState *estate, TupleTableSlot *slot)
 
void ExecSimpleRelationUpdate (EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
 
void ExecSimpleRelationDelete (EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
 
void CheckCmdReplicaIdentity (Relation rel, CmdType cmd)
 
void CheckSubscriptionRelkind (char relkind, const char *nspname, const char *relname)
 

Function Documentation

◆ build_replindex_scan_key()

static bool build_replindex_scan_key ( ScanKey  skey,
Relation  rel,
Relation  idxrel,
TupleTableSlot searchslot 
)
static

Definition at line 50 of file execReplication.c.

References Assert, BTEqualStrategyNumber, DatumGetPointer, elog, ERROR, get_opclass_family(), get_opclass_input_type(), get_opcode(), get_opfamily_member(), IndexRelationGetNumberOfKeyAttributes, INDEXRELID, OidIsValid, RelationData::rd_indcollation, RelationData::rd_index, RelationData::rd_indextuple, RelationGetPrimaryKeyIndex(), RelationGetRelid, RelationGetReplicaIndex(), ScanKeyInit(), ScanKeyData::sk_collation, ScanKeyData::sk_flags, SK_ISNULL, SysCacheGetAttr(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, int2vector::values, and oidvector::values.

Referenced by RelationFindReplTupleByIndex().

52 {
53  int attoff;
54  bool isnull;
55  Datum indclassDatum;
56  oidvector *opclass;
57  int2vector *indkey = &idxrel->rd_index->indkey;
58  bool hasnulls = false;
59 
62 
63  indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple,
64  Anum_pg_index_indclass, &isnull);
65  Assert(!isnull);
66  opclass = (oidvector *) DatumGetPointer(indclassDatum);
67 
68  /* Build scankey for every attribute in the index. */
69  for (attoff = 0; attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++)
70  {
71  Oid operator;
72  Oid opfamily;
73  RegProcedure regop;
74  int pkattno = attoff + 1;
75  int mainattno = indkey->values[attoff];
76  Oid optype = get_opclass_input_type(opclass->values[attoff]);
77 
78  /*
79  * Load the operator info. We need this to get the equality operator
80  * function for the scan key.
81  */
82  opfamily = get_opclass_family(opclass->values[attoff]);
83 
84  operator = get_opfamily_member(opfamily, optype,
85  optype,
87  if (!OidIsValid(operator))
88  elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
89  BTEqualStrategyNumber, optype, optype, opfamily);
90 
91  regop = get_opcode(operator);
92 
93  /* Initialize the scankey. */
94  ScanKeyInit(&skey[attoff],
95  pkattno,
97  regop,
98  searchslot->tts_values[mainattno - 1]);
99 
100  skey[attoff].sk_collation = idxrel->rd_indcollation[attoff];
101 
102  /* Check for null value. */
103  if (searchslot->tts_isnull[mainattno - 1])
104  {
105  hasnulls = true;
106  skey[attoff].sk_flags |= SK_ISNULL;
107  }
108  }
109 
110  return hasnulls;
111 }
Definition: c.h:594
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4715
regproc RegProcedure
Definition: c.h:511
Datum * tts_values
Definition: tuptable.h:126
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:644
struct HeapTupleData * rd_indextuple
Definition: rel.h:176
Form_pg_index rd_index
Definition: rel.h:174
Oid * rd_indcollation
Definition: rel.h:199
#define ERROR
Definition: elog.h:43
bool * tts_isnull
Definition: tuptable.h:128
Oid values[FLEXIBLE_ARRAY_MEMBER]
Definition: c.h:602
Oid get_opfamily_member(Oid opfamily, Oid lefttype, Oid righttype, int16 strategy)
Definition: lsyscache.c:164
#define IndexRelationGetNumberOfKeyAttributes(relation)
Definition: rel.h:475
#define SK_ISNULL
Definition: skey.h:115
Definition: c.h:583
uintptr_t Datum
Definition: postgres.h:367
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1377
RegProcedure get_opcode(Oid opno)
Definition: lsyscache.c:1153
int sk_flags
Definition: skey.h:66
#define Assert(condition)
Definition: c.h:738
int16 values[FLEXIBLE_ARRAY_MEMBER]
Definition: c.h:591
Oid get_opclass_family(Oid opclass)
Definition: lsyscache.c:1080
#define DatumGetPointer(X)
Definition: postgres.h:549
Oid sk_collation
Definition: skey.h:70
#define elog(elevel,...)
Definition: elog.h:214
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
#define RelationGetRelid(relation)
Definition: rel.h:456
Oid RelationGetPrimaryKeyIndex(Relation relation)
Definition: relcache.c:4694
#define BTEqualStrategyNumber
Definition: stratnum.h:31
Oid get_opclass_input_type(Oid opclass)
Definition: lsyscache.c:1102

◆ CheckCmdReplicaIdentity()

void CheckCmdReplicaIdentity ( Relation  rel,
CmdType  cmd 
)

Definition at line 564 of file execReplication.c.

References CMD_DELETE, CMD_UPDATE, ereport, errcode(), errhint(), errmsg(), ERROR, GetRelationPublicationActions(), OidIsValid, PublicationActions::pubdelete, PublicationActions::pubupdate, RelationData::rd_rel, RelationGetRelationName, and RelationGetReplicaIndex().

Referenced by CheckValidResultRel(), exec_rt_fetch(), ExecSimpleRelationDelete(), ExecSimpleRelationInsert(), and ExecSimpleRelationUpdate().

565 {
566  PublicationActions *pubactions;
567 
568  /* We only need to do checks for UPDATE and DELETE. */
569  if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
570  return;
571 
572  /* If relation has replica identity we are always good. */
573  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
575  return;
576 
577  /*
578  * This is either UPDATE OR DELETE and there is no replica identity.
579  *
580  * Check if the table publishes UPDATES or DELETES.
581  */
582  pubactions = GetRelationPublicationActions(rel);
583  if (cmd == CMD_UPDATE && pubactions->pubupdate)
584  ereport(ERROR,
585  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
586  errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
588  errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
589  else if (cmd == CMD_DELETE && pubactions->pubdelete)
590  ereport(ERROR,
591  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
592  errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
594  errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
595 }
int errhint(const char *fmt,...)
Definition: elog.c:1071
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4715
int errcode(int sqlerrcode)
Definition: elog.c:610
Form_pg_class rd_rel
Definition: rel.h:109
#define OidIsValid(objectId)
Definition: c.h:644
#define ERROR
Definition: elog.h:43
#define RelationGetRelationName(relation)
Definition: rel.h:490
#define ereport(elevel,...)
Definition: elog.h:144
struct PublicationActions * GetRelationPublicationActions(Relation relation)
Definition: relcache.c:5298
int errmsg(const char *fmt,...)
Definition: elog.c:824

◆ CheckSubscriptionRelkind()

void CheckSubscriptionRelkind ( char  relkind,
const char *  nspname,
const char *  relname 
)

Definition at line 604 of file execReplication.c.

References ereport, errcode(), errdetail(), errmsg(), and ERROR.

Referenced by AlterSubscription_refresh(), CreateSubscription(), exec_rt_fetch(), and logicalrep_rel_open().

606 {
607  /*
608  * Give a more specific error for foreign tables.
609  */
610  if (relkind == RELKIND_FOREIGN_TABLE)
611  ereport(ERROR,
612  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
613  errmsg("cannot use relation \"%s.%s\" as logical replication target",
614  nspname, relname),
615  errdetail("\"%s.%s\" is a foreign table.",
616  nspname, relname)));
617 
618  if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
619  ereport(ERROR,
620  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
621  errmsg("cannot use relation \"%s.%s\" as logical replication target",
622  nspname, relname),
623  errdetail("\"%s.%s\" is not a table.",
624  nspname, relname)));
625 }
int errcode(int sqlerrcode)
Definition: elog.c:610
NameData relname
Definition: pg_class.h:38
#define ERROR
Definition: elog.h:43
int errdetail(const char *fmt,...)
Definition: elog.c:957
#define ereport(elevel,...)
Definition: elog.h:144
int errmsg(const char *fmt,...)
Definition: elog.c:824

◆ ExecSimpleRelationDelete()

void ExecSimpleRelationDelete ( EState estate,
EPQState epqstate,
TupleTableSlot searchslot 
)

Definition at line 530 of file execReplication.c.

References CheckCmdReplicaIdentity(), CMD_DELETE, EState::es_result_relation_info, EState::es_snapshot, ExecARDeleteTriggers(), ExecBRDeleteTriggers(), ResultRelInfo::ri_RelationDesc, ResultRelInfo::ri_TrigDesc, simple_table_tuple_delete(), TriggerDesc::trig_delete_before_row, and TupleTableSlot::tts_tid.

Referenced by apply_handle_delete_internal(), and exec_rt_fetch().

532 {
533  bool skip_tuple = false;
534  ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
535  Relation rel = resultRelInfo->ri_RelationDesc;
536  ItemPointer tid = &searchslot->tts_tid;
537 
539 
540  /* BEFORE ROW DELETE Triggers */
541  if (resultRelInfo->ri_TrigDesc &&
542  resultRelInfo->ri_TrigDesc->trig_delete_before_row)
543  {
544  skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
545  tid, NULL, NULL);
546 
547  }
548 
549  if (!skip_tuple)
550  {
551  /* OK, delete the tuple */
552  simple_table_tuple_delete(rel, tid, estate->es_snapshot);
553 
554  /* AFTER ROW DELETE Triggers */
555  ExecARDeleteTriggers(estate, resultRelInfo,
556  tid, NULL, NULL);
557  }
558 }
Relation ri_RelationDesc
Definition: execnodes.h:413
Snapshot es_snapshot
Definition: execnodes.h:508
bool ExecBRDeleteTriggers(EState *estate, EPQState *epqstate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot **epqslot)
Definition: trigger.c:2413
void ExecARDeleteTriggers(EState *estate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TransitionCaptureState *transition_capture)
Definition: trigger.c:2498
void CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
TriggerDesc * ri_TrigDesc
Definition: execnodes.h:425
void simple_table_tuple_delete(Relation rel, ItemPointer tid, Snapshot snapshot)
Definition: tableam.c:275
ItemPointerData tts_tid
Definition: tuptable.h:130
bool trig_delete_before_row
Definition: reltrigger.h:66
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:527

◆ ExecSimpleRelationInsert()

void ExecSimpleRelationInsert ( EState estate,
TupleTableSlot slot 
)

Definition at line 407 of file execReplication.c.

References Assert, CheckCmdReplicaIdentity(), CMD_INSERT, TupleDescData::constr, EState::es_result_relation_info, ExecARInsertTriggers(), ExecBRInsertTriggers(), ExecComputeStoredGenerated(), ExecConstraints(), ExecInsertIndexTuples(), ExecPartitionCheck(), TupleConstr::has_generated_stored, list_free(), NIL, RelationData::rd_att, RelationData::rd_rel, ResultRelInfo::ri_NumIndices, ResultRelInfo::ri_PartitionCheck, ResultRelInfo::ri_RelationDesc, ResultRelInfo::ri_TrigDesc, simple_table_tuple_insert(), and TriggerDesc::trig_insert_before_row.

Referenced by apply_handle_insert_internal(), and exec_rt_fetch().

408 {
409  bool skip_tuple = false;
410  ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
411  Relation rel = resultRelInfo->ri_RelationDesc;
412 
413  /* For now we support only tables. */
414  Assert(rel->rd_rel->relkind == RELKIND_RELATION);
415 
417 
418  /* BEFORE ROW INSERT Triggers */
419  if (resultRelInfo->ri_TrigDesc &&
420  resultRelInfo->ri_TrigDesc->trig_insert_before_row)
421  {
422  if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
423  skip_tuple = true; /* "do nothing" */
424  }
425 
426  if (!skip_tuple)
427  {
428  List *recheckIndexes = NIL;
429 
430  /* Compute stored generated columns */
431  if (rel->rd_att->constr &&
433  ExecComputeStoredGenerated(estate, slot, CMD_INSERT);
434 
435  /* Check the constraints of the tuple */
436  if (rel->rd_att->constr)
437  ExecConstraints(resultRelInfo, slot, estate);
438  if (resultRelInfo->ri_PartitionCheck)
439  ExecPartitionCheck(resultRelInfo, slot, estate, true);
440 
441  /* OK, store the tuple and create index entries for it */
442  simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
443 
444  if (resultRelInfo->ri_NumIndices > 0)
445  recheckIndexes = ExecInsertIndexTuples(slot, estate, false, NULL,
446  NIL);
447 
448  /* AFTER ROW INSERT Triggers */
449  ExecARInsertTriggers(estate, resultRelInfo, slot,
450  recheckIndexes, NULL);
451 
452  /*
453  * XXX we should in theory pass a TransitionCaptureState object to the
454  * above to capture transition tuples, but after statement triggers
455  * don't actually get fired by replication yet anyway
456  */
457 
458  list_free(recheckIndexes);
459  }
460 }
int ri_NumIndices
Definition: execnodes.h:416
#define NIL
Definition: pg_list.h:65
Relation ri_RelationDesc
Definition: execnodes.h:413
void ExecARInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot, List *recheckIndexes, TransitionCaptureState *transition_capture)
Definition: trigger.c:2268
void ExecComputeStoredGenerated(EState *estate, TupleTableSlot *slot, CmdType cmdtype)
void ExecConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate)
Definition: execMain.c:1897
Form_pg_class rd_rel
Definition: rel.h:109
bool has_generated_stored
Definition: tupdesc.h:45
bool ExecBRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)
Definition: trigger.c:2192
List * ExecInsertIndexTuples(TupleTableSlot *slot, EState *estate, bool noDupErr, bool *specConflict, List *arbiterIndexes)
Definition: execIndexing.c:273
TupleConstr * constr
Definition: tupdesc.h:85
void CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
TriggerDesc * ri_TrigDesc
Definition: execnodes.h:425
bool trig_insert_before_row
Definition: reltrigger.h:56
List * ri_PartitionCheck
Definition: execnodes.h:481
TupleDesc rd_att
Definition: rel.h:110
#define Assert(condition)
Definition: c.h:738
void simple_table_tuple_insert(Relation rel, TupleTableSlot *slot)
Definition: tableam.c:261
void list_free(List *list)
Definition: list.c:1376
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1783
Definition: pg_list.h:50
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:527

◆ ExecSimpleRelationUpdate()

void ExecSimpleRelationUpdate ( EState estate,
EPQState epqstate,
TupleTableSlot searchslot,
TupleTableSlot slot 
)

Definition at line 469 of file execReplication.c.

References Assert, CheckCmdReplicaIdentity(), CMD_UPDATE, TupleDescData::constr, EState::es_result_relation_info, EState::es_snapshot, ExecARUpdateTriggers(), ExecBRUpdateTriggers(), ExecComputeStoredGenerated(), ExecConstraints(), ExecInsertIndexTuples(), ExecPartitionCheck(), TupleConstr::has_generated_stored, list_free(), NIL, RelationData::rd_att, RelationData::rd_rel, ResultRelInfo::ri_NumIndices, ResultRelInfo::ri_PartitionCheck, ResultRelInfo::ri_RelationDesc, ResultRelInfo::ri_TrigDesc, simple_table_tuple_update(), TriggerDesc::trig_update_before_row, and TupleTableSlot::tts_tid.

Referenced by apply_handle_tuple_routing(), apply_handle_update_internal(), and exec_rt_fetch().

471 {
472  bool skip_tuple = false;
473  ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
474  Relation rel = resultRelInfo->ri_RelationDesc;
475  ItemPointer tid = &(searchslot->tts_tid);
476 
477  /* For now we support only tables. */
478  Assert(rel->rd_rel->relkind == RELKIND_RELATION);
479 
481 
482  /* BEFORE ROW UPDATE Triggers */
483  if (resultRelInfo->ri_TrigDesc &&
484  resultRelInfo->ri_TrigDesc->trig_update_before_row)
485  {
486  if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
487  tid, NULL, slot))
488  skip_tuple = true; /* "do nothing" */
489  }
490 
491  if (!skip_tuple)
492  {
493  List *recheckIndexes = NIL;
494  bool update_indexes;
495 
496  /* Compute stored generated columns */
497  if (rel->rd_att->constr &&
499  ExecComputeStoredGenerated(estate, slot, CMD_UPDATE);
500 
501  /* Check the constraints of the tuple */
502  if (rel->rd_att->constr)
503  ExecConstraints(resultRelInfo, slot, estate);
504  if (resultRelInfo->ri_PartitionCheck)
505  ExecPartitionCheck(resultRelInfo, slot, estate, true);
506 
507  simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
508  &update_indexes);
509 
510  if (resultRelInfo->ri_NumIndices > 0 && update_indexes)
511  recheckIndexes = ExecInsertIndexTuples(slot, estate, false, NULL,
512  NIL);
513 
514  /* AFTER ROW UPDATE Triggers */
515  ExecARUpdateTriggers(estate, resultRelInfo,
516  tid, NULL, slot,
517  recheckIndexes, NULL);
518 
519  list_free(recheckIndexes);
520  }
521 }
int ri_NumIndices
Definition: execnodes.h:416
#define NIL
Definition: pg_list.h:65
Relation ri_RelationDesc
Definition: execnodes.h:413
void ExecComputeStoredGenerated(EState *estate, TupleTableSlot *slot, CmdType cmdtype)
void ExecConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate)
Definition: execMain.c:1897
void simple_table_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot, Snapshot snapshot, bool *update_indexes)
Definition: tableam.c:320
Snapshot es_snapshot
Definition: execnodes.h:508
Form_pg_class rd_rel
Definition: rel.h:109
bool has_generated_stored
Definition: tupdesc.h:45
List * ExecInsertIndexTuples(TupleTableSlot *slot, EState *estate, bool noDupErr, bool *specConflict, List *arbiterIndexes)
Definition: execIndexing.c:273
TupleConstr * constr
Definition: tupdesc.h:85
bool trig_update_before_row
Definition: reltrigger.h:61
void CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
TriggerDesc * ri_TrigDesc
Definition: execnodes.h:425
void ExecARUpdateTriggers(EState *estate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot *newslot, List *recheckIndexes, TransitionCaptureState *transition_capture)
Definition: trigger.c:2783
List * ri_PartitionCheck
Definition: execnodes.h:481
TupleDesc rd_att
Definition: rel.h:110
#define Assert(condition)
Definition: c.h:738
void list_free(List *list)
Definition: list.c:1376
bool ExecBRUpdateTriggers(EState *estate, EPQState *epqstate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot *newslot)
Definition: trigger.c:2643
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1783
Definition: pg_list.h:50
ItemPointerData tts_tid
Definition: tuptable.h:130
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:527

◆ RelationFindReplTupleByIndex()

bool RelationFindReplTupleByIndex ( Relation  rel,
Oid  idxoid,
LockTupleMode  lockmode,
TupleTableSlot searchslot,
TupleTableSlot outslot 
)

Definition at line 120 of file execReplication.c.

References build_replindex_scan_key(), TM_FailureData::ctid, elog, ereport, errcode(), errmsg(), ERROR, ExecMaterializeSlot(), ForwardScanDirection, GetCurrentCommandId(), GetLatestSnapshot(), index_beginscan(), index_close(), index_endscan(), index_getnext_slot(), INDEX_MAX_KEYS, index_open(), index_rescan(), IndexRelationGetNumberOfKeyAttributes, InitDirtySnapshot, ItemPointerIndicatesMovedPartitions, LockWaitBlock, LOG, NoLock, PopActiveSnapshot(), PushActiveSnapshot(), RowExclusiveLock, table_tuple_lock(), TM_Deleted, TM_Invisible, TM_Ok, TM_Updated, TransactionIdIsValid, TupleTableSlot::tts_tid, XactLockTableWait(), XLTW_None, SnapshotData::xmax, and SnapshotData::xmin.

Referenced by exec_rt_fetch(), and FindReplTupleInLocalRel().

124 {
126  IndexScanDesc scan;
127  SnapshotData snap;
128  TransactionId xwait;
129  Relation idxrel;
130  bool found;
131 
132  /* Open the index. */
133  idxrel = index_open(idxoid, RowExclusiveLock);
134 
135  /* Start an index scan. */
136  InitDirtySnapshot(snap);
137  scan = index_beginscan(rel, idxrel, &snap,
139  0);
140 
141  /* Build scan key. */
142  build_replindex_scan_key(skey, rel, idxrel, searchslot);
143 
144 retry:
145  found = false;
146 
147  index_rescan(scan, skey, IndexRelationGetNumberOfKeyAttributes(idxrel), NULL, 0);
148 
149  /* Try to find the tuple */
150  if (index_getnext_slot(scan, ForwardScanDirection, outslot))
151  {
152  found = true;
153  ExecMaterializeSlot(outslot);
154 
155  xwait = TransactionIdIsValid(snap.xmin) ?
156  snap.xmin : snap.xmax;
157 
158  /*
159  * If the tuple is locked, wait for locking transaction to finish and
160  * retry.
161  */
162  if (TransactionIdIsValid(xwait))
163  {
164  XactLockTableWait(xwait, NULL, NULL, XLTW_None);
165  goto retry;
166  }
167  }
168 
169  /* Found tuple, try to lock it in the lockmode. */
170  if (found)
171  {
172  TM_FailureData tmfd;
173  TM_Result res;
174 
176 
177  res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
178  outslot,
179  GetCurrentCommandId(false),
180  lockmode,
182  0 /* don't follow updates */ ,
183  &tmfd);
184 
186 
187  switch (res)
188  {
189  case TM_Ok:
190  break;
191  case TM_Updated:
192  /* XXX: Improve handling here */
194  ereport(LOG,
195  (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
196  errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
197  else
198  ereport(LOG,
199  (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
200  errmsg("concurrent update, retrying")));
201  goto retry;
202  case TM_Deleted:
203  /* XXX: Improve handling here */
204  ereport(LOG,
205  (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
206  errmsg("concurrent delete, retrying")));
207  goto retry;
208  case TM_Invisible:
209  elog(ERROR, "attempted to lock invisible tuple");
210  break;
211  default:
212  elog(ERROR, "unexpected table_tuple_lock status: %u", res);
213  break;
214  }
215  }
216 
217  index_endscan(scan);
218 
219  /* Don't release lock until commit. */
220  index_close(idxrel, NoLock);
221 
222  return found;
223 }
ItemPointerData ctid
Definition: tableam.h:124
#define InitDirtySnapshot(snapshotdata)
Definition: snapmgr.h:76
uint32 TransactionId
Definition: c.h:513
void index_rescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, int norderbys)
Definition: indexam.c:295
int errcode(int sqlerrcode)
Definition: elog.c:610
void PopActiveSnapshot(void)
Definition: snapmgr.c:814
#define LOG
Definition: elog.h:26
#define ERROR
Definition: elog.h:43
Definition: lmgr.h:26
static TM_Result table_tuple_lock(Relation rel, ItemPointer tid, Snapshot snapshot, TupleTableSlot *slot, CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, uint8 flags, TM_FailureData *tmfd)
Definition: tableam.h:1328
#define NoLock
Definition: lockdefs.h:34
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:735
#define RowExclusiveLock
Definition: lockdefs.h:38
TransactionId xmax
Definition: snapshot.h:158
TransactionId xmin
Definition: snapshot.h:157
#define IndexRelationGetNumberOfKeyAttributes(relation)
Definition: rel.h:475
void index_endscan(IndexScanDesc scan)
Definition: indexam.c:321
TM_Result
Definition: tableam.h:69
#define ItemPointerIndicatesMovedPartitions(pointer)
Definition: itemptr.h:184
static void ExecMaterializeSlot(TupleTableSlot *slot)
Definition: tuptable.h:443
#define ereport(elevel,...)
Definition: elog.h:144
void XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid, XLTW_Oper oper)
Definition: lmgr.c:624
Definition: tableam.h:75
#define INDEX_MAX_KEYS
bool index_getnext_slot(IndexScanDesc scan, ScanDirection direction, TupleTableSlot *slot)
Definition: indexam.c:613
Snapshot GetLatestSnapshot(void)
Definition: snapmgr.c:381
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:158
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define elog(elevel,...)
Definition: elog.h:214
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:746
#define TransactionIdIsValid(xid)
Definition: transam.h:41
ItemPointerData tts_tid
Definition: tuptable.h:130
static bool build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *searchslot)
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:132
IndexScanDesc index_beginscan(Relation heapRelation, Relation indexRelation, Snapshot snapshot, int nkeys, int norderbys)
Definition: indexam.c:203

◆ RelationFindReplTupleSeq()

bool RelationFindReplTupleSeq ( Relation  rel,
LockTupleMode  lockmode,
TupleTableSlot searchslot,
TupleTableSlot outslot 
)

Definition at line 295 of file execReplication.c.

References Assert, TM_FailureData::ctid, elog, equalTupleDescs(), ereport, errcode(), errmsg(), ERROR, ExecCopySlot(), ExecDropSingleTupleTableSlot(), ForwardScanDirection, GetCurrentCommandId(), GetLatestSnapshot(), InitDirtySnapshot, ItemPointerIndicatesMovedPartitions, LockWaitBlock, LOG, TupleDescData::natts, palloc0(), PG_USED_FOR_ASSERTS_ONLY, PopActiveSnapshot(), PushActiveSnapshot(), RelationGetDescr, table_beginscan(), table_endscan(), table_rescan(), table_scan_getnextslot(), table_slot_create(), table_tuple_lock(), TM_Deleted, TM_Invisible, TM_Ok, TM_Updated, TransactionIdIsValid, TupleTableSlot::tts_tid, TupleTableSlot::tts_tupleDescriptor, tuples_equal(), XactLockTableWait(), XLTW_None, SnapshotData::xmax, and SnapshotData::xmin.

Referenced by exec_rt_fetch(), and FindReplTupleInLocalRel().

297 {
298  TupleTableSlot *scanslot;
299  TableScanDesc scan;
300  SnapshotData snap;
301  TypeCacheEntry **eq;
302  TransactionId xwait;
303  bool found;
305 
306  Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
307 
308  eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
309 
310  /* Start a heap scan. */
311  InitDirtySnapshot(snap);
312  scan = table_beginscan(rel, &snap, 0, NULL);
313  scanslot = table_slot_create(rel, NULL);
314 
315 retry:
316  found = false;
317 
318  table_rescan(scan, NULL);
319 
320  /* Try to find the tuple */
321  while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
322  {
323  if (!tuples_equal(scanslot, searchslot, eq))
324  continue;
325 
326  found = true;
327  ExecCopySlot(outslot, scanslot);
328 
329  xwait = TransactionIdIsValid(snap.xmin) ?
330  snap.xmin : snap.xmax;
331 
332  /*
333  * If the tuple is locked, wait for locking transaction to finish and
334  * retry.
335  */
336  if (TransactionIdIsValid(xwait))
337  {
338  XactLockTableWait(xwait, NULL, NULL, XLTW_None);
339  goto retry;
340  }
341 
342  /* Found our tuple and it's not locked */
343  break;
344  }
345 
346  /* Found tuple, try to lock it in the lockmode. */
347  if (found)
348  {
349  TM_FailureData tmfd;
350  TM_Result res;
351 
353 
354  res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
355  outslot,
356  GetCurrentCommandId(false),
357  lockmode,
359  0 /* don't follow updates */ ,
360  &tmfd);
361 
363 
364  switch (res)
365  {
366  case TM_Ok:
367  break;
368  case TM_Updated:
369  /* XXX: Improve handling here */
371  ereport(LOG,
372  (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
373  errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
374  else
375  ereport(LOG,
376  (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
377  errmsg("concurrent update, retrying")));
378  goto retry;
379  case TM_Deleted:
380  /* XXX: Improve handling here */
381  ereport(LOG,
382  (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
383  errmsg("concurrent delete, retrying")));
384  goto retry;
385  case TM_Invisible:
386  elog(ERROR, "attempted to lock invisible tuple");
387  break;
388  default:
389  elog(ERROR, "unexpected table_tuple_lock status: %u", res);
390  break;
391  }
392  }
393 
394  table_endscan(scan);
396 
397  return found;
398 }
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:77
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:475
ItemPointerData ctid
Definition: tableam.h:124
#define InitDirtySnapshot(snapshotdata)
Definition: snapmgr.h:76
uint32 TransactionId
Definition: c.h:513
#define RelationGetDescr(relation)
Definition: rel.h:482
int errcode(int sqlerrcode)
Definition: elog.c:610
static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, TypeCacheEntry **eq)
void PopActiveSnapshot(void)
Definition: snapmgr.c:814
static bool table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
Definition: tableam.h:903
#define LOG
Definition: elog.h:26
static void table_rescan(TableScanDesc scan, struct ScanKeyData *key)
Definition: tableam.h:871
#define ERROR
Definition: elog.h:43
Definition: lmgr.h:26
static TableScanDesc table_beginscan(Relation rel, Snapshot snapshot, int nkeys, struct ScanKeyData *key)
Definition: tableam.h:754
static TM_Result table_tuple_lock(Relation rel, ItemPointer tid, Snapshot snapshot, TupleTableSlot *slot, CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, uint8 flags, TM_FailureData *tmfd)
Definition: tableam.h:1328
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:735
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1224
TransactionId xmax
Definition: snapshot.h:158
TransactionId xmin
Definition: snapshot.h:157
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
TM_Result
Definition: tableam.h:69
void * palloc0(Size size)
Definition: mcxt.c:980
#define ItemPointerIndicatesMovedPartitions(pointer)
Definition: itemptr.h:184
#define ereport(elevel,...)
Definition: elog.h:144
void XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid, XLTW_Oper oper)
Definition: lmgr.c:624
#define Assert(condition)
Definition: c.h:738
Definition: tableam.h:75
Snapshot GetLatestSnapshot(void)
Definition: snapmgr.c:381
static void table_endscan(TableScanDesc scan)
Definition: tableam.h:862
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define elog(elevel,...)
Definition: elog.h:214
bool equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2)
Definition: tupdesc.c:411
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:746
#define TransactionIdIsValid(xid)
Definition: transam.h:41
ItemPointerData tts_tid
Definition: tuptable.h:130
#define PG_USED_FOR_ASSERTS_ONLY
Definition: c.h:121

◆ tuples_equal()

static bool tuples_equal ( TupleTableSlot slot1,
TupleTableSlot slot2,
TypeCacheEntry **  eq 
)
static

Definition at line 229 of file execReplication.c.

References Assert, DatumGetBool, TypeCacheEntry::eq_opr_finfo, ereport, errcode(), errmsg(), ERROR, FmgrInfo::fn_oid, format_type_be(), FunctionCall2Coll(), lookup_type_cache(), TupleDescData::natts, OidIsValid, slot_getallattrs(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, TupleDescAttr, and TYPECACHE_EQ_OPR_FINFO.

Referenced by RelationFindReplTupleSeq().

231 {
232  int attrnum;
233 
234  Assert(slot1->tts_tupleDescriptor->natts ==
235  slot2->tts_tupleDescriptor->natts);
236 
237  slot_getallattrs(slot1);
238  slot_getallattrs(slot2);
239 
240  /* Check equality of the attributes. */
241  for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
242  {
243  Form_pg_attribute att;
244  TypeCacheEntry *typentry;
245 
246  /*
247  * If one value is NULL and other is not, then they are certainly not
248  * equal
249  */
250  if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
251  return false;
252 
253  /*
254  * If both are NULL, they can be considered equal.
255  */
256  if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
257  continue;
258 
259  att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
260 
261  typentry = eq[attrnum];
262  if (typentry == NULL)
263  {
264  typentry = lookup_type_cache(att->atttypid,
266  if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
267  ereport(ERROR,
268  (errcode(ERRCODE_UNDEFINED_FUNCTION),
269  errmsg("could not identify an equality operator for type %s",
270  format_type_be(att->atttypid))));
271  eq[attrnum] = typentry;
272  }
273 
275  att->attcollation,
276  slot1->tts_values[attrnum],
277  slot2->tts_values[attrnum])))
278  return false;
279  }
280 
281  return true;
282 }
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
#define TYPECACHE_EQ_OPR_FINFO
Definition: typcache.h:135
int errcode(int sqlerrcode)
Definition: elog.c:610
char * format_type_be(Oid type_oid)
Definition: format_type.c:327
Datum * tts_values
Definition: tuptable.h:126
Datum FunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
Definition: fmgr.c:1152
#define OidIsValid(objectId)
Definition: c.h:644
#define ERROR
Definition: elog.h:43
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:354
bool * tts_isnull
Definition: tuptable.h:128
#define DatumGetBool(X)
Definition: postgres.h:393
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:193
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
FmgrInfo eq_opr_finfo
Definition: typcache.h:74
TypeCacheEntry * lookup_type_cache(Oid type_id, int flags)
Definition: typcache.c:331
Oid fn_oid
Definition: fmgr.h:59
#define ereport(elevel,...)
Definition: elog.h:144
#define Assert(condition)
Definition: c.h:738
int errmsg(const char *fmt,...)
Definition: elog.c:824