PostgreSQL Source Code  git master
conflict.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  * conflict.c
3  * Support routines for logging conflicts.
4  *
5  * Copyright (c) 2024, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/backend/replication/logical/conflict.c
9  *
10  * This file contains the code for logging conflicts on the subscriber during
11  * logical replication.
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "access/commit_ts.h"
18 #include "access/tableam.h"
19 #include "executor/executor.h"
20 #include "pgstat.h"
21 #include "replication/conflict.h"
23 #include "storage/lmgr.h"
24 #include "utils/lsyscache.h"
25 
26 static const char *const ConflictTypeNames[] = {
27  [CT_INSERT_EXISTS] = "insert_exists",
28  [CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs",
29  [CT_UPDATE_EXISTS] = "update_exists",
30  [CT_UPDATE_MISSING] = "update_missing",
31  [CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
32  [CT_DELETE_MISSING] = "delete_missing"
33 };
34 
36 static int errdetail_apply_conflict(EState *estate,
37  ResultRelInfo *relinfo,
39  TupleTableSlot *searchslot,
40  TupleTableSlot *localslot,
41  TupleTableSlot *remoteslot,
42  Oid indexoid, TransactionId localxmin,
43  RepOriginId localorigin,
44  TimestampTz localts);
45 static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
47  TupleTableSlot *searchslot,
48  TupleTableSlot *localslot,
49  TupleTableSlot *remoteslot,
50  Oid indexoid);
51 static char *build_index_value_desc(EState *estate, Relation localrel,
52  TupleTableSlot *slot, Oid indexoid);
53 
54 /*
55  * Get the xmin and commit timestamp data (origin and timestamp) associated
56  * with the provided local tuple.
57  *
58  * Return true if the commit timestamp data was found, false otherwise.
59  */
60 bool
62  RepOriginId *localorigin, TimestampTz *localts)
63 {
64  Datum xminDatum;
65  bool isnull;
66 
67  xminDatum = slot_getsysattr(localslot, MinTransactionIdAttributeNumber,
68  &isnull);
69  *xmin = DatumGetTransactionId(xminDatum);
70  Assert(!isnull);
71 
72  /*
73  * The commit timestamp data is not available if track_commit_timestamp is
74  * disabled.
75  */
77  {
78  *localorigin = InvalidRepOriginId;
79  *localts = 0;
80  return false;
81  }
82 
83  return TransactionIdGetCommitTsData(*xmin, localts, localorigin);
84 }
85 
86 /*
87  * This function is used to report a conflict while applying replication
88  * changes.
89  *
90  * 'searchslot' should contain the tuple used to search the local tuple to be
91  * updated or deleted.
92  *
93  * 'localslot' should contain the existing local tuple, if any, that conflicts
94  * with the remote tuple. 'localxmin', 'localorigin', and 'localts' provide the
95  * transaction information related to this existing local tuple.
96  *
97  * 'remoteslot' should contain the remote new tuple, if any.
98  *
99  * The 'indexoid' represents the OID of the unique index that triggered the
100  * constraint violation error. We use this to report the key values for
101  * conflicting tuple.
102  *
103  * The caller must ensure that the index with the OID 'indexoid' is locked so
104  * that we can fetch and display the conflicting key value.
105  */
106 void
107 ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
108  ConflictType type, TupleTableSlot *searchslot,
109  TupleTableSlot *localslot, TupleTableSlot *remoteslot,
110  Oid indexoid, TransactionId localxmin,
111  RepOriginId localorigin, TimestampTz localts)
112 {
113  Relation localrel = relinfo->ri_RelationDesc;
114 
115  Assert(!OidIsValid(indexoid) ||
117 
119 
120  ereport(elevel,
122  errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
124  RelationGetRelationName(localrel),
126  errdetail_apply_conflict(estate, relinfo, type, searchslot,
127  localslot, remoteslot, indexoid,
128  localxmin, localorigin, localts));
129 }
130 
131 /*
132  * Find all unique indexes to check for a conflict and store them into
133  * ResultRelInfo.
134  */
135 void
137 {
138  List *uniqueIndexes = NIL;
139 
140  for (int i = 0; i < relInfo->ri_NumIndices; i++)
141  {
142  Relation indexRelation = relInfo->ri_IndexRelationDescs[i];
143 
144  if (indexRelation == NULL)
145  continue;
146 
147  /* Detect conflict only for unique indexes */
148  if (!relInfo->ri_IndexRelationInfo[i]->ii_Unique)
149  continue;
150 
151  /* Don't support conflict detection for deferrable index */
152  if (!indexRelation->rd_index->indimmediate)
153  continue;
154 
155  uniqueIndexes = lappend_oid(uniqueIndexes,
156  RelationGetRelid(indexRelation));
157  }
158 
159  relInfo->ri_onConflictArbiterIndexes = uniqueIndexes;
160 }
161 
162 /*
163  * Add SQLSTATE error code to the current conflict report.
164  */
165 static int
167 {
168  switch (type)
169  {
170  case CT_INSERT_EXISTS:
171  case CT_UPDATE_EXISTS:
172  return errcode(ERRCODE_UNIQUE_VIOLATION);
174  case CT_UPDATE_MISSING:
176  case CT_DELETE_MISSING:
178  }
179 
180  Assert(false);
181  return 0; /* silence compiler warning */
182 }
183 
184 /*
185  * Add an errdetail() line showing conflict detail.
186  *
187  * The DETAIL line comprises of two parts:
188  * 1. Explanation of the conflict type, including the origin and commit
189  * timestamp of the existing local tuple.
190  * 2. Display of conflicting key, existing local tuple, remote new tuple, and
191  * replica identity columns, if any. The remote old tuple is excluded as its
192  * information is covered in the replica identity columns.
193  */
194 static int
196  ConflictType type, TupleTableSlot *searchslot,
197  TupleTableSlot *localslot, TupleTableSlot *remoteslot,
198  Oid indexoid, TransactionId localxmin,
199  RepOriginId localorigin, TimestampTz localts)
200 {
201  StringInfoData err_detail;
202  char *val_desc;
203  char *origin_name;
204 
205  initStringInfo(&err_detail);
206 
207  /* First, construct a detailed message describing the type of conflict */
208  switch (type)
209  {
210  case CT_INSERT_EXISTS:
211  case CT_UPDATE_EXISTS:
212  Assert(OidIsValid(indexoid));
213 
214  if (localts)
215  {
216  if (localorigin == InvalidRepOriginId)
217  appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified locally in transaction %u at %s."),
218  get_rel_name(indexoid),
219  localxmin, timestamptz_to_str(localts));
220  else if (replorigin_by_oid(localorigin, true, &origin_name))
221  appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by origin \"%s\" in transaction %u at %s."),
222  get_rel_name(indexoid), origin_name,
223  localxmin, timestamptz_to_str(localts));
224 
225  /*
226  * The origin that modified this row has been removed. This
227  * can happen if the origin was created by a different apply
228  * worker and its associated subscription and origin were
229  * dropped after updating the row, or if the origin was
230  * manually dropped by the user.
231  */
232  else
233  appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by a non-existent origin in transaction %u at %s."),
234  get_rel_name(indexoid),
235  localxmin, timestamptz_to_str(localts));
236  }
237  else
238  appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified in transaction %u."),
239  get_rel_name(indexoid), localxmin);
240 
241  break;
242 
244  if (localorigin == InvalidRepOriginId)
245  appendStringInfo(&err_detail, _("Updating the row that was modified locally in transaction %u at %s."),
246  localxmin, timestamptz_to_str(localts));
247  else if (replorigin_by_oid(localorigin, true, &origin_name))
248  appendStringInfo(&err_detail, _("Updating the row that was modified by a different origin \"%s\" in transaction %u at %s."),
249  origin_name, localxmin, timestamptz_to_str(localts));
250 
251  /* The origin that modified this row has been removed. */
252  else
253  appendStringInfo(&err_detail, _("Updating the row that was modified by a non-existent origin in transaction %u at %s."),
254  localxmin, timestamptz_to_str(localts));
255 
256  break;
257 
258  case CT_UPDATE_MISSING:
259  appendStringInfo(&err_detail, _("Could not find the row to be updated."));
260  break;
261 
263  if (localorigin == InvalidRepOriginId)
264  appendStringInfo(&err_detail, _("Deleting the row that was modified locally in transaction %u at %s."),
265  localxmin, timestamptz_to_str(localts));
266  else if (replorigin_by_oid(localorigin, true, &origin_name))
267  appendStringInfo(&err_detail, _("Deleting the row that was modified by a different origin \"%s\" in transaction %u at %s."),
268  origin_name, localxmin, timestamptz_to_str(localts));
269 
270  /* The origin that modified this row has been removed. */
271  else
272  appendStringInfo(&err_detail, _("Deleting the row that was modified by a non-existent origin in transaction %u at %s."),
273  localxmin, timestamptz_to_str(localts));
274 
275  break;
276 
277  case CT_DELETE_MISSING:
278  appendStringInfo(&err_detail, _("Could not find the row to be deleted."));
279  break;
280  }
281 
282  Assert(err_detail.len > 0);
283 
284  val_desc = build_tuple_value_details(estate, relinfo, type, searchslot,
285  localslot, remoteslot, indexoid);
286 
287  /*
288  * Next, append the key values, existing local tuple, remote tuple and
289  * replica identity columns after the message.
290  */
291  if (val_desc)
292  appendStringInfo(&err_detail, "\n%s", val_desc);
293 
294  return errdetail_internal("%s", err_detail.data);
295 }
296 
297 /*
298  * Helper function to build the additional details for conflicting key,
299  * existing local tuple, remote tuple, and replica identity columns.
300  *
301  * If the return value is NULL, it indicates that the current user lacks
302  * permissions to view the columns involved.
303  */
304 static char *
307  TupleTableSlot *searchslot,
308  TupleTableSlot *localslot,
309  TupleTableSlot *remoteslot,
310  Oid indexoid)
311 {
312  Relation localrel = relinfo->ri_RelationDesc;
313  Oid relid = RelationGetRelid(localrel);
314  TupleDesc tupdesc = RelationGetDescr(localrel);
315  StringInfoData tuple_value;
316  char *desc = NULL;
317 
318  Assert(searchslot || localslot || remoteslot);
319 
320  initStringInfo(&tuple_value);
321 
322  /*
323  * Report the conflicting key values in the case of a unique constraint
324  * violation.
325  */
327  {
328  Assert(OidIsValid(indexoid) && localslot);
329 
330  desc = build_index_value_desc(estate, localrel, localslot, indexoid);
331 
332  if (desc)
333  appendStringInfo(&tuple_value, _("Key %s"), desc);
334  }
335 
336  if (localslot)
337  {
338  /*
339  * The 'modifiedCols' only applies to the new tuple, hence we pass
340  * NULL for the existing local tuple.
341  */
342  desc = ExecBuildSlotValueDescription(relid, localslot, tupdesc,
343  NULL, 64);
344 
345  if (desc)
346  {
347  if (tuple_value.len > 0)
348  {
349  appendStringInfoString(&tuple_value, "; ");
350  appendStringInfo(&tuple_value, _("existing local tuple %s"),
351  desc);
352  }
353  else
354  {
355  appendStringInfo(&tuple_value, _("Existing local tuple %s"),
356  desc);
357  }
358  }
359  }
360 
361  if (remoteslot)
362  {
363  Bitmapset *modifiedCols;
364 
365  /*
366  * Although logical replication doesn't maintain the bitmap for the
367  * columns being inserted, we still use it to create 'modifiedCols'
368  * for consistency with other calls to ExecBuildSlotValueDescription.
369  *
370  * Note that generated columns are formed locally on the subscriber.
371  */
372  modifiedCols = bms_union(ExecGetInsertedCols(relinfo, estate),
373  ExecGetUpdatedCols(relinfo, estate));
374  desc = ExecBuildSlotValueDescription(relid, remoteslot, tupdesc,
375  modifiedCols, 64);
376 
377  if (desc)
378  {
379  if (tuple_value.len > 0)
380  {
381  appendStringInfoString(&tuple_value, "; ");
382  appendStringInfo(&tuple_value, _("remote tuple %s"), desc);
383  }
384  else
385  {
386  appendStringInfo(&tuple_value, _("Remote tuple %s"), desc);
387  }
388  }
389  }
390 
391  if (searchslot)
392  {
393  /*
394  * Note that while index other than replica identity may be used (see
395  * IsIndexUsableForReplicaIdentityFull for details) to find the tuple
396  * when applying update or delete, such an index scan may not result
397  * in a unique tuple and we still compare the complete tuple in such
398  * cases, thus such indexes are not used here.
399  */
400  Oid replica_index = GetRelationIdentityOrPK(localrel);
401 
403 
404  /*
405  * If the table has a valid replica identity index, build the index
406  * key value string. Otherwise, construct the full tuple value for
407  * REPLICA IDENTITY FULL cases.
408  */
409  if (OidIsValid(replica_index))
410  desc = build_index_value_desc(estate, localrel, searchslot, replica_index);
411  else
412  desc = ExecBuildSlotValueDescription(relid, searchslot, tupdesc, NULL, 64);
413 
414  if (desc)
415  {
416  if (tuple_value.len > 0)
417  {
418  appendStringInfoString(&tuple_value, "; ");
419  appendStringInfo(&tuple_value, OidIsValid(replica_index)
420  ? _("replica identity %s")
421  : _("replica identity full %s"), desc);
422  }
423  else
424  {
425  appendStringInfo(&tuple_value, OidIsValid(replica_index)
426  ? _("Replica identity %s")
427  : _("Replica identity full %s"), desc);
428  }
429  }
430  }
431 
432  if (tuple_value.len == 0)
433  return NULL;
434 
435  appendStringInfoChar(&tuple_value, '.');
436  return tuple_value.data;
437 }
438 
439 /*
440  * Helper functions to construct a string describing the contents of an index
441  * entry. See BuildIndexValueDescription for details.
442  *
443  * The caller must ensure that the index with the OID 'indexoid' is locked so
444  * that we can fetch and display the conflicting key value.
445  */
446 static char *
448  Oid indexoid)
449 {
450  char *index_value;
451  Relation indexDesc;
453  bool isnull[INDEX_MAX_KEYS];
454  TupleTableSlot *tableslot = slot;
455 
456  if (!tableslot)
457  return NULL;
458 
460 
461  indexDesc = index_open(indexoid, NoLock);
462 
463  /*
464  * If the slot is a virtual slot, copy it into a heap tuple slot as
465  * FormIndexDatum only works with heap tuple slots.
466  */
467  if (TTS_IS_VIRTUAL(slot))
468  {
469  tableslot = table_slot_create(localrel, &estate->es_tupleTable);
470  tableslot = ExecCopySlot(tableslot, slot);
471  }
472 
473  /*
474  * Initialize ecxt_scantuple for potential use in FormIndexDatum when
475  * index expressions are present.
476  */
477  GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
478 
479  /*
480  * The values/nulls arrays passed to BuildIndexValueDescription should be
481  * the results of FormIndexDatum, which are the "raw" input to the index
482  * AM.
483  */
484  FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull);
485 
486  index_value = BuildIndexValueDescription(indexDesc, values, isnull);
487 
488  index_close(indexDesc, NoLock);
489 
490  return index_value;
491 }
Subscription * MySubscription
Definition: worker.c:299
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1843
Bitmapset * bms_union(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:251
static Datum values[MAXATTR]
Definition: bootstrap.c:150
#define Assert(condition)
Definition: c.h:849
uint32 TransactionId
Definition: c.h:643
#define OidIsValid(objectId)
Definition: c.h:766
bool track_commit_timestamp
Definition: commit_ts.c:109
bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, RepOriginId *nodeid)
Definition: commit_ts.c:274
static const char *const ConflictTypeNames[]
Definition: conflict.c:26
static char * build_tuple_value_details(EState *estate, ResultRelInfo *relinfo, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *localslot, TupleTableSlot *remoteslot, Oid indexoid)
Definition: conflict.c:305
static char * build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot, Oid indexoid)
Definition: conflict.c:447
void InitConflictIndexes(ResultRelInfo *relInfo)
Definition: conflict.c:136
static int errcode_apply_conflict(ConflictType type)
Definition: conflict.c:166
static int errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *localslot, TupleTableSlot *remoteslot, Oid indexoid, TransactionId localxmin, RepOriginId localorigin, TimestampTz localts)
Definition: conflict.c:195
void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *localslot, TupleTableSlot *remoteslot, Oid indexoid, TransactionId localxmin, RepOriginId localorigin, TimestampTz localts)
Definition: conflict.c:107
bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, RepOriginId *localorigin, TimestampTz *localts)
Definition: conflict.c:61
ConflictType
Definition: conflict.h:25
@ CT_DELETE_MISSING
Definition: conflict.h:42
@ CT_UPDATE_ORIGIN_DIFFERS
Definition: conflict.h:30
@ CT_INSERT_EXISTS
Definition: conflict.h:27
@ CT_UPDATE_EXISTS
Definition: conflict.h:33
@ CT_UPDATE_MISSING
Definition: conflict.h:36
@ CT_DELETE_ORIGIN_DIFFERS
Definition: conflict.h:39
int64 TimestampTz
Definition: timestamp.h:39
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1230
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define _(x)
Definition: elog.c:90
#define ereport(elevel,...)
Definition: elog.h:149
char * ExecBuildSlotValueDescription(Oid reloid, TupleTableSlot *slot, TupleDesc tupdesc, Bitmapset *modifiedCols, int maxfieldlen)
Definition: execMain.c:2218
Bitmapset * ExecGetUpdatedCols(ResultRelInfo *relinfo, EState *estate)
Definition: execUtils.c:1290
Bitmapset * ExecGetInsertedCols(ResultRelInfo *relinfo, EState *estate)
Definition: execUtils.c:1269
#define GetPerTupleExprContext(estate)
Definition: executor.h:561
char * BuildIndexValueDescription(Relation indexRelation, const Datum *values, const bool *isnull)
Definition: genam.c:177
void FormIndexDatum(IndexInfo *indexInfo, TupleTableSlot *slot, EState *estate, Datum *values, bool *isnull)
Definition: index.c:2731
IndexInfo * BuildIndexInfo(Relation index)
Definition: index.c:2430
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:177
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:133
int i
Definition: isn.c:73
List * lappend_oid(List *list, Oid datum)
Definition: list.c:375
bool CheckRelationOidLockedByMe(Oid relid, LOCKMODE lockmode, bool orstronger)
Definition: lmgr.c:347
#define NoLock
Definition: lockdefs.h:34
#define RowExclusiveLock
Definition: lockdefs.h:38
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3366
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1928
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:469
#define InvalidRepOriginId
Definition: origin.h:33
#define INDEX_MAX_KEYS
#define NIL
Definition: pg_list.h:68
#define ERRCODE_T_R_SERIALIZATION_FAILURE
Definition: pgbench.c:76
void pgstat_report_subscription_conflict(Oid subid, ConflictType type)
uintptr_t Datum
Definition: postgres.h:64
static TransactionId DatumGetTransactionId(Datum X)
Definition: postgres.h:262
unsigned int Oid
Definition: postgres_ext.h:31
#define RelationGetRelid(relation)
Definition: rel.h:505
#define RelationGetDescr(relation)
Definition: rel.h:531
#define RelationGetRelationName(relation)
Definition: rel.h:539
#define RelationGetNamespace(relation)
Definition: rel.h:546
Oid GetRelationIdentityOrPK(Relation rel)
Definition: relation.c:851
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:97
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:182
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:194
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
List * es_tupleTable
Definition: execnodes.h:677
bool ii_Unique
Definition: execnodes.h:199
Definition: pg_list.h:54
Form_pg_index rd_index
Definition: rel.h:192
int ri_NumIndices
Definition: execnodes.h:462
List * ri_onConflictArbiterIndexes
Definition: execnodes.h:550
Relation ri_RelationDesc
Definition: execnodes.h:459
RelationPtr ri_IndexRelationDescs
Definition: execnodes.h:465
IndexInfo ** ri_IndexRelationInfo
Definition: execnodes.h:468
#define MinTransactionIdAttributeNumber
Definition: sysattr.h:22
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition: tableam.c:91
#define TTS_IS_VIRTUAL(slot)
Definition: tuptable.h:234
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition: tuptable.h:509
static Datum slot_getsysattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: tuptable.h:416
const char * type
uint16 RepOriginId
Definition: xlogdefs.h:65