PostgreSQL Source Code  git master
execReplication.c File Reference
#include "postgres.h"
#include "access/genam.h"
#include "access/relscan.h"
#include "access/tableam.h"
#include "access/transam.h"
#include "access/xact.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/nodeModifyTable.h"
#include "nodes/nodeFuncs.h"
#include "parser/parse_relation.h"
#include "parser/parsetree.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/datum.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/typcache.h"
Include dependency graph for execReplication.c:

Go to the source code of this file.

Functions

static bool build_replindex_scan_key (ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *searchslot)
 
bool RelationFindReplTupleByIndex (Relation rel, Oid idxoid, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
 
static bool tuples_equal (TupleTableSlot *slot1, TupleTableSlot *slot2)
 
bool RelationFindReplTupleSeq (Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot)
 
void ExecSimpleRelationInsert (EState *estate, TupleTableSlot *slot)
 
void ExecSimpleRelationUpdate (EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot)
 
void ExecSimpleRelationDelete (EState *estate, EPQState *epqstate, TupleTableSlot *searchslot)
 
void CheckCmdReplicaIdentity (Relation rel, CmdType cmd)
 
void CheckSubscriptionRelkind (char relkind, const char *nspname, const char *relname)
 

Function Documentation

◆ build_replindex_scan_key()

static bool build_replindex_scan_key ( ScanKey  skey,
Relation  rel,
Relation  idxrel,
TupleTableSlot searchslot 
)
static

Definition at line 50 of file execReplication.c.

References Assert, BTEqualStrategyNumber, DatumGetPointer, elog, ERROR, get_opclass_family(), get_opclass_input_type(), get_opcode(), get_opfamily_member(), IndexRelationGetNumberOfKeyAttributes, INDEXRELID, OidIsValid, RelationData::rd_indcollation, RelationData::rd_index, RelationData::rd_indextuple, RelationGetRelid, RelationGetReplicaIndex(), ScanKeyInit(), ScanKeyData::sk_collation, ScanKeyData::sk_flags, SK_ISNULL, SysCacheGetAttr(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, int2vector::values, and oidvector::values.

Referenced by RelationFindReplTupleByIndex().

52 {
53  int attoff;
54  bool isnull;
55  Datum indclassDatum;
56  oidvector *opclass;
57  int2vector *indkey = &idxrel->rd_index->indkey;
58  bool hasnulls = false;
59 
61 
62  indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple,
63  Anum_pg_index_indclass, &isnull);
64  Assert(!isnull);
65  opclass = (oidvector *) DatumGetPointer(indclassDatum);
66 
67  /* Build scankey for every attribute in the index. */
68  for (attoff = 0; attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++)
69  {
70  Oid operator;
71  Oid opfamily;
72  RegProcedure regop;
73  int pkattno = attoff + 1;
74  int mainattno = indkey->values[attoff];
75  Oid optype = get_opclass_input_type(opclass->values[attoff]);
76 
77  /*
78  * Load the operator info. We need this to get the equality operator
79  * function for the scan key.
80  */
81  opfamily = get_opclass_family(opclass->values[attoff]);
82 
83  operator = get_opfamily_member(opfamily, optype,
84  optype,
86  if (!OidIsValid(operator))
87  elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
88  BTEqualStrategyNumber, optype, optype, opfamily);
89 
90  regop = get_opcode(operator);
91 
92  /* Initialize the scankey. */
93  ScanKeyInit(&skey[attoff],
94  pkattno,
96  regop,
97  searchslot->tts_values[mainattno - 1]);
98 
99  skey[attoff].sk_collation = idxrel->rd_indcollation[attoff];
100 
101  /* Check for null value. */
102  if (searchslot->tts_isnull[mainattno - 1])
103  {
104  hasnulls = true;
105  skey[attoff].sk_flags |= SK_ISNULL;
106  }
107  }
108 
109  return hasnulls;
110 }
Definition: c.h:594
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4542
regproc RegProcedure
Definition: c.h:511
Datum * tts_values
Definition: tuptable.h:126
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:644
struct HeapTupleData * rd_indextuple
Definition: rel.h:151
Form_pg_index rd_index
Definition: rel.h:149
Oid * rd_indcollation
Definition: rel.h:174
#define ERROR
Definition: elog.h:43
bool * tts_isnull
Definition: tuptable.h:128
Oid values[FLEXIBLE_ARRAY_MEMBER]
Definition: c.h:602
Oid get_opfamily_member(Oid opfamily, Oid lefttype, Oid righttype, int16 strategy)
Definition: lsyscache.c:163
#define IndexRelationGetNumberOfKeyAttributes(relation)
Definition: rel.h:447
#define SK_ISNULL
Definition: skey.h:115
Definition: c.h:583
uintptr_t Datum
Definition: postgres.h:367
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1377
RegProcedure get_opcode(Oid opno)
Definition: lsyscache.c:1092
int sk_flags
Definition: skey.h:66
#define Assert(condition)
Definition: c.h:738
int16 values[FLEXIBLE_ARRAY_MEMBER]
Definition: c.h:591
Oid get_opclass_family(Oid opclass)
Definition: lsyscache.c:1019
#define DatumGetPointer(X)
Definition: postgres.h:549
Oid sk_collation
Definition: skey.h:70
#define elog(elevel,...)
Definition: elog.h:228
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
#define RelationGetRelid(relation)
Definition: rel.h:428
#define BTEqualStrategyNumber
Definition: stratnum.h:31
Oid get_opclass_input_type(Oid opclass)
Definition: lsyscache.c:1041

◆ CheckCmdReplicaIdentity()

void CheckCmdReplicaIdentity ( Relation  rel,
CmdType  cmd 
)

Definition at line 553 of file execReplication.c.

References CMD_DELETE, CMD_UPDATE, ereport, errcode(), errhint(), errmsg(), ERROR, GetRelationPublicationActions(), OidIsValid, PublicationActions::pubdelete, PublicationActions::pubupdate, RelationData::rd_rel, RelationGetRelationName, and RelationGetReplicaIndex().

Referenced by CheckValidResultRel(), exec_rt_fetch(), ExecSimpleRelationDelete(), ExecSimpleRelationInsert(), and ExecSimpleRelationUpdate().

554 {
555  PublicationActions *pubactions;
556 
557  /* We only need to do checks for UPDATE and DELETE. */
558  if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
559  return;
560 
561  /* If relation has replica identity we are always good. */
562  if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
564  return;
565 
566  /*
567  * This is either UPDATE OR DELETE and there is no replica identity.
568  *
569  * Check if the table publishes UPDATES or DELETES.
570  */
571  pubactions = GetRelationPublicationActions(rel);
572  if (cmd == CMD_UPDATE && pubactions->pubupdate)
573  ereport(ERROR,
574  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
575  errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
577  errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
578  else if (cmd == CMD_DELETE && pubactions->pubdelete)
579  ereport(ERROR,
580  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
581  errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
583  errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
584 }
int errhint(const char *fmt,...)
Definition: elog.c:1069
Oid RelationGetReplicaIndex(Relation relation)
Definition: relcache.c:4542
int errcode(int sqlerrcode)
Definition: elog.c:608
Form_pg_class rd_rel
Definition: rel.h:84
#define OidIsValid(objectId)
Definition: c.h:644
#define ERROR
Definition: elog.h:43
#define RelationGetRelationName(relation)
Definition: rel.h:462
#define ereport(elevel, rest)
Definition: elog.h:141
struct PublicationActions * GetRelationPublicationActions(Relation relation)
Definition: relcache.c:5125
int errmsg(const char *fmt,...)
Definition: elog.c:822

◆ CheckSubscriptionRelkind()

void CheckSubscriptionRelkind ( char  relkind,
const char *  nspname,
const char *  relname 
)

Definition at line 593 of file execReplication.c.

References ereport, errcode(), errdetail(), errmsg(), and ERROR.

Referenced by AlterSubscription_refresh(), CreateSubscription(), exec_rt_fetch(), and logicalrep_rel_open().

595 {
596  /*
597  * We currently only support writing to regular tables. However, give a
598  * more specific error for partitioned and foreign tables.
599  */
600  if (relkind == RELKIND_PARTITIONED_TABLE)
601  ereport(ERROR,
602  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
603  errmsg("cannot use relation \"%s.%s\" as logical replication target",
604  nspname, relname),
605  errdetail("\"%s.%s\" is a partitioned table.",
606  nspname, relname)));
607  else if (relkind == RELKIND_FOREIGN_TABLE)
608  ereport(ERROR,
609  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
610  errmsg("cannot use relation \"%s.%s\" as logical replication target",
611  nspname, relname),
612  errdetail("\"%s.%s\" is a foreign table.",
613  nspname, relname)));
614 
615  if (relkind != RELKIND_RELATION)
616  ereport(ERROR,
617  (errcode(ERRCODE_WRONG_OBJECT_TYPE),
618  errmsg("cannot use relation \"%s.%s\" as logical replication target",
619  nspname, relname),
620  errdetail("\"%s.%s\" is not a table.",
621  nspname, relname)));
622 }
int errcode(int sqlerrcode)
Definition: elog.c:608
NameData relname
Definition: pg_class.h:38
#define ERROR
Definition: elog.h:43
int errdetail(const char *fmt,...)
Definition: elog.c:955
#define ereport(elevel, rest)
Definition: elog.h:141
int errmsg(const char *fmt,...)
Definition: elog.c:822

◆ ExecSimpleRelationDelete()

void ExecSimpleRelationDelete ( EState estate,
EPQState epqstate,
TupleTableSlot searchslot 
)

Definition at line 519 of file execReplication.c.

References CheckCmdReplicaIdentity(), CMD_DELETE, EState::es_result_relation_info, EState::es_snapshot, ExecARDeleteTriggers(), ExecBRDeleteTriggers(), ResultRelInfo::ri_RelationDesc, ResultRelInfo::ri_TrigDesc, simple_table_tuple_delete(), TriggerDesc::trig_delete_before_row, and TupleTableSlot::tts_tid.

Referenced by apply_handle_delete(), and exec_rt_fetch().

521 {
522  bool skip_tuple = false;
523  ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
524  Relation rel = resultRelInfo->ri_RelationDesc;
525  ItemPointer tid = &searchslot->tts_tid;
526 
528 
529  /* BEFORE ROW DELETE Triggers */
530  if (resultRelInfo->ri_TrigDesc &&
531  resultRelInfo->ri_TrigDesc->trig_delete_before_row)
532  {
533  skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
534  tid, NULL, NULL);
535 
536  }
537 
538  if (!skip_tuple)
539  {
540  /* OK, delete the tuple */
541  simple_table_tuple_delete(rel, tid, estate->es_snapshot);
542 
543  /* AFTER ROW DELETE Triggers */
544  ExecARDeleteTriggers(estate, resultRelInfo,
545  tid, NULL, NULL);
546  }
547 }
Relation ri_RelationDesc
Definition: execnodes.h:411
Snapshot es_snapshot
Definition: execnodes.h:506
bool ExecBRDeleteTriggers(EState *estate, EPQState *epqstate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot **epqslot)
Definition: trigger.c:2749
void ExecARDeleteTriggers(EState *estate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TransitionCaptureState *transition_capture)
Definition: trigger.c:2840
void CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
TriggerDesc * ri_TrigDesc
Definition: execnodes.h:423
void simple_table_tuple_delete(Relation rel, ItemPointer tid, Snapshot snapshot)
Definition: tableam.c:275
ItemPointerData tts_tid
Definition: tuptable.h:130
bool trig_delete_before_row
Definition: reltrigger.h:65
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:525

◆ ExecSimpleRelationInsert()

void ExecSimpleRelationInsert ( EState estate,
TupleTableSlot slot 
)

Definition at line 396 of file execReplication.c.

References Assert, CheckCmdReplicaIdentity(), CMD_INSERT, TupleDescData::constr, EState::es_result_relation_info, ExecARInsertTriggers(), ExecBRInsertTriggers(), ExecComputeStoredGenerated(), ExecConstraints(), ExecInsertIndexTuples(), ExecPartitionCheck(), TupleConstr::has_generated_stored, list_free(), NIL, RelationData::rd_att, RelationData::rd_rel, ResultRelInfo::ri_NumIndices, ResultRelInfo::ri_PartitionCheck, ResultRelInfo::ri_RelationDesc, ResultRelInfo::ri_TrigDesc, simple_table_tuple_insert(), and TriggerDesc::trig_insert_before_row.

Referenced by apply_handle_insert(), and exec_rt_fetch().

397 {
398  bool skip_tuple = false;
399  ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
400  Relation rel = resultRelInfo->ri_RelationDesc;
401 
402  /* For now we support only tables. */
403  Assert(rel->rd_rel->relkind == RELKIND_RELATION);
404 
406 
407  /* BEFORE ROW INSERT Triggers */
408  if (resultRelInfo->ri_TrigDesc &&
409  resultRelInfo->ri_TrigDesc->trig_insert_before_row)
410  {
411  if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
412  skip_tuple = true; /* "do nothing" */
413  }
414 
415  if (!skip_tuple)
416  {
417  List *recheckIndexes = NIL;
418 
419  /* Compute stored generated columns */
420  if (rel->rd_att->constr &&
422  ExecComputeStoredGenerated(estate, slot, CMD_INSERT);
423 
424  /* Check the constraints of the tuple */
425  if (rel->rd_att->constr)
426  ExecConstraints(resultRelInfo, slot, estate);
427  if (resultRelInfo->ri_PartitionCheck)
428  ExecPartitionCheck(resultRelInfo, slot, estate, true);
429 
430  /* OK, store the tuple and create index entries for it */
431  simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
432 
433  if (resultRelInfo->ri_NumIndices > 0)
434  recheckIndexes = ExecInsertIndexTuples(slot, estate, false, NULL,
435  NIL);
436 
437  /* AFTER ROW INSERT Triggers */
438  ExecARInsertTriggers(estate, resultRelInfo, slot,
439  recheckIndexes, NULL);
440 
441  /*
442  * XXX we should in theory pass a TransitionCaptureState object to the
443  * above to capture transition tuples, but after statement triggers
444  * don't actually get fired by replication yet anyway
445  */
446 
447  list_free(recheckIndexes);
448  }
449 }
int ri_NumIndices
Definition: execnodes.h:414
#define NIL
Definition: pg_list.h:65
Relation ri_RelationDesc
Definition: execnodes.h:411
void ExecARInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot, List *recheckIndexes, TransitionCaptureState *transition_capture)
Definition: trigger.c:2592
void ExecComputeStoredGenerated(EState *estate, TupleTableSlot *slot, CmdType cmdtype)
void ExecConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate)
Definition: execMain.c:1896
Form_pg_class rd_rel
Definition: rel.h:84
bool has_generated_stored
Definition: tupdesc.h:45
bool ExecBRInsertTriggers(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *slot)
Definition: trigger.c:2525
List * ExecInsertIndexTuples(TupleTableSlot *slot, EState *estate, bool noDupErr, bool *specConflict, List *arbiterIndexes)
Definition: execIndexing.c:273
TupleConstr * constr
Definition: tupdesc.h:85
void CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
TriggerDesc * ri_TrigDesc
Definition: execnodes.h:423
bool trig_insert_before_row
Definition: reltrigger.h:55
List * ri_PartitionCheck
Definition: execnodes.h:479
TupleDesc rd_att
Definition: rel.h:85
#define Assert(condition)
Definition: c.h:738
void simple_table_tuple_insert(Relation rel, TupleTableSlot *slot)
Definition: tableam.c:261
void list_free(List *list)
Definition: list.c:1377
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1783
Definition: pg_list.h:50
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:525

◆ ExecSimpleRelationUpdate()

void ExecSimpleRelationUpdate ( EState estate,
EPQState epqstate,
TupleTableSlot searchslot,
TupleTableSlot slot 
)

Definition at line 458 of file execReplication.c.

References Assert, CheckCmdReplicaIdentity(), CMD_UPDATE, TupleDescData::constr, EState::es_result_relation_info, EState::es_snapshot, ExecARUpdateTriggers(), ExecBRUpdateTriggers(), ExecComputeStoredGenerated(), ExecConstraints(), ExecInsertIndexTuples(), ExecPartitionCheck(), TupleConstr::has_generated_stored, list_free(), NIL, RelationData::rd_att, RelationData::rd_rel, ResultRelInfo::ri_NumIndices, ResultRelInfo::ri_PartitionCheck, ResultRelInfo::ri_RelationDesc, ResultRelInfo::ri_TrigDesc, simple_table_tuple_update(), TriggerDesc::trig_update_before_row, and TupleTableSlot::tts_tid.

Referenced by apply_handle_update(), and exec_rt_fetch().

460 {
461  bool skip_tuple = false;
462  ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
463  Relation rel = resultRelInfo->ri_RelationDesc;
464  ItemPointer tid = &(searchslot->tts_tid);
465 
466  /* For now we support only tables. */
467  Assert(rel->rd_rel->relkind == RELKIND_RELATION);
468 
470 
471  /* BEFORE ROW UPDATE Triggers */
472  if (resultRelInfo->ri_TrigDesc &&
473  resultRelInfo->ri_TrigDesc->trig_update_before_row)
474  {
475  if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
476  tid, NULL, slot))
477  skip_tuple = true; /* "do nothing" */
478  }
479 
480  if (!skip_tuple)
481  {
482  List *recheckIndexes = NIL;
483  bool update_indexes;
484 
485  /* Compute stored generated columns */
486  if (rel->rd_att->constr &&
488  ExecComputeStoredGenerated(estate, slot, CMD_UPDATE);
489 
490  /* Check the constraints of the tuple */
491  if (rel->rd_att->constr)
492  ExecConstraints(resultRelInfo, slot, estate);
493  if (resultRelInfo->ri_PartitionCheck)
494  ExecPartitionCheck(resultRelInfo, slot, estate, true);
495 
496  simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
497  &update_indexes);
498 
499  if (resultRelInfo->ri_NumIndices > 0 && update_indexes)
500  recheckIndexes = ExecInsertIndexTuples(slot, estate, false, NULL,
501  NIL);
502 
503  /* AFTER ROW UPDATE Triggers */
504  ExecARUpdateTriggers(estate, resultRelInfo,
505  tid, NULL, slot,
506  recheckIndexes, NULL);
507 
508  list_free(recheckIndexes);
509  }
510 }
int ri_NumIndices
Definition: execnodes.h:414
#define NIL
Definition: pg_list.h:65
Relation ri_RelationDesc
Definition: execnodes.h:411
void ExecComputeStoredGenerated(EState *estate, TupleTableSlot *slot, CmdType cmdtype)
void ExecConstraints(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate)
Definition: execMain.c:1896
void simple_table_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot, Snapshot snapshot, bool *update_indexes)
Definition: tableam.c:320
Snapshot es_snapshot
Definition: execnodes.h:506
Form_pg_class rd_rel
Definition: rel.h:84
bool has_generated_stored
Definition: tupdesc.h:45
List * ExecInsertIndexTuples(TupleTableSlot *slot, EState *estate, bool noDupErr, bool *specConflict, List *arbiterIndexes)
Definition: execIndexing.c:273
TupleConstr * constr
Definition: tupdesc.h:85
bool trig_update_before_row
Definition: reltrigger.h:60
void CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
TriggerDesc * ri_TrigDesc
Definition: execnodes.h:423
void ExecARUpdateTriggers(EState *estate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot *newslot, List *recheckIndexes, TransitionCaptureState *transition_capture)
Definition: trigger.c:3127
List * ri_PartitionCheck
Definition: execnodes.h:479
TupleDesc rd_att
Definition: rel.h:85
#define Assert(condition)
Definition: c.h:738
void list_free(List *list)
Definition: list.c:1377
bool ExecBRUpdateTriggers(EState *estate, EPQState *epqstate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, TupleTableSlot *newslot)
Definition: trigger.c:2996
bool ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate, bool emitError)
Definition: execMain.c:1783
Definition: pg_list.h:50
ItemPointerData tts_tid
Definition: tuptable.h:130
ResultRelInfo * es_result_relation_info
Definition: execnodes.h:525

◆ RelationFindReplTupleByIndex()

bool RelationFindReplTupleByIndex ( Relation  rel,
Oid  idxoid,
LockTupleMode  lockmode,
TupleTableSlot searchslot,
TupleTableSlot outslot 
)

Definition at line 119 of file execReplication.c.

References build_replindex_scan_key(), TM_FailureData::ctid, elog, ereport, errcode(), errmsg(), ERROR, ExecMaterializeSlot(), ForwardScanDirection, GetCurrentCommandId(), GetLatestSnapshot(), index_beginscan(), index_close(), index_endscan(), index_getnext_slot(), INDEX_MAX_KEYS, index_open(), index_rescan(), IndexRelationGetNumberOfKeyAttributes, InitDirtySnapshot, ItemPointerIndicatesMovedPartitions, LockWaitBlock, LOG, NoLock, PopActiveSnapshot(), PushActiveSnapshot(), RowExclusiveLock, table_tuple_lock(), TM_Deleted, TM_Invisible, TM_Ok, TM_Updated, TransactionIdIsValid, TupleTableSlot::tts_tid, XactLockTableWait(), XLTW_None, SnapshotData::xmax, and SnapshotData::xmin.

Referenced by apply_handle_delete(), apply_handle_update(), and exec_rt_fetch().

123 {
125  IndexScanDesc scan;
126  SnapshotData snap;
127  TransactionId xwait;
128  Relation idxrel;
129  bool found;
130 
131  /* Open the index. */
132  idxrel = index_open(idxoid, RowExclusiveLock);
133 
134  /* Start an index scan. */
135  InitDirtySnapshot(snap);
136  scan = index_beginscan(rel, idxrel, &snap,
138  0);
139 
140  /* Build scan key. */
141  build_replindex_scan_key(skey, rel, idxrel, searchslot);
142 
143 retry:
144  found = false;
145 
146  index_rescan(scan, skey, IndexRelationGetNumberOfKeyAttributes(idxrel), NULL, 0);
147 
148  /* Try to find the tuple */
149  if (index_getnext_slot(scan, ForwardScanDirection, outslot))
150  {
151  found = true;
152  ExecMaterializeSlot(outslot);
153 
154  xwait = TransactionIdIsValid(snap.xmin) ?
155  snap.xmin : snap.xmax;
156 
157  /*
158  * If the tuple is locked, wait for locking transaction to finish and
159  * retry.
160  */
161  if (TransactionIdIsValid(xwait))
162  {
163  XactLockTableWait(xwait, NULL, NULL, XLTW_None);
164  goto retry;
165  }
166  }
167 
168  /* Found tuple, try to lock it in the lockmode. */
169  if (found)
170  {
171  TM_FailureData tmfd;
172  TM_Result res;
173 
175 
176  res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
177  outslot,
178  GetCurrentCommandId(false),
179  lockmode,
181  0 /* don't follow updates */ ,
182  &tmfd);
183 
185 
186  switch (res)
187  {
188  case TM_Ok:
189  break;
190  case TM_Updated:
191  /* XXX: Improve handling here */
193  ereport(LOG,
194  (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
195  errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
196  else
197  ereport(LOG,
198  (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
199  errmsg("concurrent update, retrying")));
200  goto retry;
201  case TM_Deleted:
202  /* XXX: Improve handling here */
203  ereport(LOG,
204  (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
205  errmsg("concurrent delete, retrying")));
206  goto retry;
207  case TM_Invisible:
208  elog(ERROR, "attempted to lock invisible tuple");
209  break;
210  default:
211  elog(ERROR, "unexpected table_tuple_lock status: %u", res);
212  break;
213  }
214  }
215 
216  index_endscan(scan);
217 
218  /* Don't release lock until commit. */
219  index_close(idxrel, NoLock);
220 
221  return found;
222 }
ItemPointerData ctid
Definition: tableam.h:124
#define InitDirtySnapshot(snapshotdata)
Definition: snapmgr.h:76
uint32 TransactionId
Definition: c.h:513
void index_rescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, int norderbys)
Definition: indexam.c:289
int errcode(int sqlerrcode)
Definition: elog.c:608
void PopActiveSnapshot(void)
Definition: snapmgr.c:814
#define LOG
Definition: elog.h:26
#define ERROR
Definition: elog.h:43
Definition: lmgr.h:26
static TM_Result table_tuple_lock(Relation rel, ItemPointer tid, Snapshot snapshot, TupleTableSlot *slot, CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, uint8 flags, TM_FailureData *tmfd)
Definition: tableam.h:1333
#define NoLock
Definition: lockdefs.h:34
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:735
#define RowExclusiveLock
Definition: lockdefs.h:38
TransactionId xmax
Definition: snapshot.h:158
TransactionId xmin
Definition: snapshot.h:157
#define IndexRelationGetNumberOfKeyAttributes(relation)
Definition: rel.h:447
void index_endscan(IndexScanDesc scan)
Definition: indexam.c:315
#define ereport(elevel, rest)
Definition: elog.h:141
TM_Result
Definition: tableam.h:69
#define ItemPointerIndicatesMovedPartitions(pointer)
Definition: itemptr.h:184
static void ExecMaterializeSlot(TupleTableSlot *slot)
Definition: tuptable.h:443
void XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid, XLTW_Oper oper)
Definition: lmgr.c:624
Definition: tableam.h:75
#define INDEX_MAX_KEYS
bool index_getnext_slot(IndexScanDesc scan, ScanDirection direction, TupleTableSlot *slot)
Definition: indexam.c:607
Snapshot GetLatestSnapshot(void)
Definition: snapmgr.c:381
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:152
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define elog(elevel,...)
Definition: elog.h:228
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:745
#define TransactionIdIsValid(xid)
Definition: transam.h:41
ItemPointerData tts_tid
Definition: tuptable.h:130
static bool build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, TupleTableSlot *searchslot)
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:126
IndexScanDesc index_beginscan(Relation heapRelation, Relation indexRelation, Snapshot snapshot, int nkeys, int norderbys)
Definition: indexam.c:197

◆ RelationFindReplTupleSeq()

bool RelationFindReplTupleSeq ( Relation  rel,
LockTupleMode  lockmode,
TupleTableSlot searchslot,
TupleTableSlot outslot 
)

Definition at line 287 of file execReplication.c.

References Assert, TM_FailureData::ctid, elog, equalTupleDescs(), ereport, errcode(), errmsg(), ERROR, ExecCopySlot(), ExecDropSingleTupleTableSlot(), ForwardScanDirection, GetCurrentCommandId(), GetLatestSnapshot(), InitDirtySnapshot, ItemPointerIndicatesMovedPartitions, LockWaitBlock, LOG, PG_USED_FOR_ASSERTS_ONLY, PopActiveSnapshot(), PushActiveSnapshot(), RelationGetDescr, table_beginscan(), table_endscan(), table_rescan(), table_scan_getnextslot(), table_slot_create(), table_tuple_lock(), TM_Deleted, TM_Invisible, TM_Ok, TM_Updated, TransactionIdIsValid, TupleTableSlot::tts_tid, TupleTableSlot::tts_tupleDescriptor, tuples_equal(), XactLockTableWait(), XLTW_None, SnapshotData::xmax, and SnapshotData::xmin.

Referenced by apply_handle_delete(), apply_handle_update(), and exec_rt_fetch().

289 {
290  TupleTableSlot *scanslot;
291  TableScanDesc scan;
292  SnapshotData snap;
293  TransactionId xwait;
294  bool found;
296 
297  Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
298 
299  /* Start a heap scan. */
300  InitDirtySnapshot(snap);
301  scan = table_beginscan(rel, &snap, 0, NULL);
302  scanslot = table_slot_create(rel, NULL);
303 
304 retry:
305  found = false;
306 
307  table_rescan(scan, NULL);
308 
309  /* Try to find the tuple */
310  while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
311  {
312  if (!tuples_equal(scanslot, searchslot))
313  continue;
314 
315  found = true;
316  ExecCopySlot(outslot, scanslot);
317 
318  xwait = TransactionIdIsValid(snap.xmin) ?
319  snap.xmin : snap.xmax;
320 
321  /*
322  * If the tuple is locked, wait for locking transaction to finish and
323  * retry.
324  */
325  if (TransactionIdIsValid(xwait))
326  {
327  XactLockTableWait(xwait, NULL, NULL, XLTW_None);
328  goto retry;
329  }
330 
331  /* Found our tuple and it's not locked */
332  break;
333  }
334 
335  /* Found tuple, try to lock it in the lockmode. */
336  if (found)
337  {
338  TM_FailureData tmfd;
339  TM_Result res;
340 
342 
343  res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
344  outslot,
345  GetCurrentCommandId(false),
346  lockmode,
348  0 /* don't follow updates */ ,
349  &tmfd);
350 
352 
353  switch (res)
354  {
355  case TM_Ok:
356  break;
357  case TM_Updated:
358  /* XXX: Improve handling here */
360  ereport(LOG,
361  (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
362  errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
363  else
364  ereport(LOG,
365  (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
366  errmsg("concurrent update, retrying")));
367  goto retry;
368  case TM_Deleted:
369  /* XXX: Improve handling here */
370  ereport(LOG,
371  (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
372  errmsg("concurrent delete, retrying")));
373  goto retry;
374  case TM_Invisible:
375  elog(ERROR, "attempted to lock invisible tuple");
376  break;
377  default:
378  elog(ERROR, "unexpected table_tuple_lock status: %u", res);
379  break;
380  }
381  }
382 
383  table_endscan(scan);
385 
386  return found;
387 }
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:77
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:475
ItemPointerData ctid
Definition: tableam.h:124
#define InitDirtySnapshot(snapshotdata)
Definition: snapmgr.h:76
uint32 TransactionId
Definition: c.h:513
#define RelationGetDescr(relation)
Definition: rel.h:454
int errcode(int sqlerrcode)
Definition: elog.c:608
void PopActiveSnapshot(void)
Definition: snapmgr.c:814
static bool table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
Definition: tableam.h:904
#define LOG
Definition: elog.h:26
static void table_rescan(TableScanDesc scan, struct ScanKeyData *key)
Definition: tableam.h:872
#define ERROR
Definition: elog.h:43
Definition: lmgr.h:26
static TableScanDesc table_beginscan(Relation rel, Snapshot snapshot, int nkeys, struct ScanKeyData *key)
Definition: tableam.h:755
static TM_Result table_tuple_lock(Relation rel, ItemPointer tid, Snapshot snapshot, TupleTableSlot *slot, CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, uint8 flags, TM_FailureData *tmfd)
Definition: tableam.h:1333
void PushActiveSnapshot(Snapshot snap)
Definition: snapmgr.c:735
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1224
TransactionId xmax
Definition: snapshot.h:158
TransactionId xmin
Definition: snapshot.h:157
#define ereport(elevel, rest)
Definition: elog.h:141
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
TM_Result
Definition: tableam.h:69
#define ItemPointerIndicatesMovedPartitions(pointer)
Definition: itemptr.h:184
static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2)
void XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid, XLTW_Oper oper)
Definition: lmgr.c:624
#define Assert(condition)
Definition: c.h:738
Definition: tableam.h:75
Snapshot GetLatestSnapshot(void)
Definition: snapmgr.c:381
static void table_endscan(TableScanDesc scan)
Definition: tableam.h:863
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define elog(elevel,...)
Definition: elog.h:228
bool equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2)
Definition: tupdesc.c:411
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:745
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define PG_USED_FOR_ASSERTS_ONLY
Definition: c.h:121
ItemPointerData tts_tid
Definition: tuptable.h:130

◆ tuples_equal()

static bool tuples_equal ( TupleTableSlot slot1,
TupleTableSlot slot2 
)
static

Definition at line 228 of file execReplication.c.

References Assert, DatumGetBool, TypeCacheEntry::eq_opr_finfo, ereport, errcode(), errmsg(), ERROR, FmgrInfo::fn_oid, format_type_be(), FunctionCall2Coll(), lookup_type_cache(), TupleDescData::natts, OidIsValid, slot_getallattrs(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, TupleDescAttr, and TYPECACHE_EQ_OPR_FINFO.

Referenced by RelationFindReplTupleSeq().

229 {
230  int attrnum;
231 
232  Assert(slot1->tts_tupleDescriptor->natts ==
233  slot2->tts_tupleDescriptor->natts);
234 
235  slot_getallattrs(slot1);
236  slot_getallattrs(slot2);
237 
238  /* Check equality of the attributes. */
239  for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
240  {
241  Form_pg_attribute att;
242  TypeCacheEntry *typentry;
243 
244  /*
245  * If one value is NULL and other is not, then they are certainly not
246  * equal
247  */
248  if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
249  return false;
250 
251  /*
252  * If both are NULL, they can be considered equal.
253  */
254  if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
255  continue;
256 
257  att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
258 
259  typentry = lookup_type_cache(att->atttypid, TYPECACHE_EQ_OPR_FINFO);
260  if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
261  ereport(ERROR,
262  (errcode(ERRCODE_UNDEFINED_FUNCTION),
263  errmsg("could not identify an equality operator for type %s",
264  format_type_be(att->atttypid))));
265 
267  att->attcollation,
268  slot1->tts_values[attrnum],
269  slot2->tts_values[attrnum])))
270  return false;
271  }
272 
273  return true;
274 }
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
#define TYPECACHE_EQ_OPR_FINFO
Definition: typcache.h:133
int errcode(int sqlerrcode)
Definition: elog.c:608
char * format_type_be(Oid type_oid)
Definition: format_type.c:326
Datum * tts_values
Definition: tuptable.h:126
Datum FunctionCall2Coll(FmgrInfo *flinfo, Oid collation, Datum arg1, Datum arg2)
Definition: fmgr.c:1150
#define OidIsValid(objectId)
Definition: c.h:644
#define ERROR
Definition: elog.h:43
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:354
bool * tts_isnull
Definition: tuptable.h:128
#define DatumGetBool(X)
Definition: postgres.h:393
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:200
#define ereport(elevel, rest)
Definition: elog.h:141
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:124
FmgrInfo eq_opr_finfo
Definition: typcache.h:72
TypeCacheEntry * lookup_type_cache(Oid type_id, int flags)
Definition: typcache.c:322
Oid fn_oid
Definition: fmgr.h:59
#define Assert(condition)
Definition: c.h:738
int errmsg(const char *fmt,...)
Definition: elog.c:822