PostgreSQL Source Code git master
Loading...
Searching...
No Matches
conflict.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 * conflict.c
3 * Support routines for logging conflicts.
4 *
5 * Copyright (c) 2024-2026, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/conflict.c
9 *
10 * This file contains the code for logging conflicts on the subscriber during
11 * logical replication.
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres.h"
16
17#include "access/commit_ts.h"
18#include "access/genam.h"
19#include "access/tableam.h"
20#include "executor/executor.h"
21#include "pgstat.h"
24#include "storage/lmgr.h"
25#include "utils/lsyscache.h"
26
27static const char *const ConflictTypeNames[] = {
28 [CT_INSERT_EXISTS] = "insert_exists",
29 [CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs",
30 [CT_UPDATE_EXISTS] = "update_exists",
31 [CT_UPDATE_MISSING] = "update_missing",
32 [CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
33 [CT_UPDATE_DELETED] = "update_deleted",
34 [CT_DELETE_MISSING] = "delete_missing",
35 [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
36};
37
39static void errdetail_apply_conflict(EState *estate,
45 Oid indexoid, TransactionId localxmin,
48static void get_tuple_desc(EState *estate, ResultRelInfo *relinfo,
53 Oid indexoid);
54static char *build_index_value_desc(EState *estate, Relation localrel,
55 TupleTableSlot *slot, Oid indexoid);
56
57/*
58 * Get the xmin and commit timestamp data (origin and timestamp) associated
59 * with the provided local row.
60 *
61 * Return true if the commit timestamp data was found, false otherwise.
62 */
63bool
66{
68 bool isnull;
69
71 &isnull);
73 Assert(!isnull);
74
75 /*
76 * The commit timestamp data is not available if track_commit_timestamp is
77 * disabled.
78 */
80 {
82 *localts = 0;
83 return false;
84 }
85
87}
88
89/*
90 * This function is used to report a conflict while applying replication
91 * changes.
92 *
93 * 'searchslot' should contain the tuple used to search the local row to be
94 * updated or deleted.
95 *
96 * 'remoteslot' should contain the remote new tuple, if any.
97 *
98 * conflicttuples is a list of local rows that caused the conflict and the
99 * conflict related information. See ConflictTupleInfo.
100 *
101 * The caller must ensure that all the indexes passed in ConflictTupleInfo are
102 * locked so that we can fetch and display the conflicting key values.
103 */
104void
108{
109 Relation localrel = relinfo->ri_RelationDesc;
111
113
114 /* Form errdetail message by combining conflicting tuples information. */
118 conflicttuple->indexoid,
119 conflicttuple->xmin,
120 conflicttuple->origin,
121 conflicttuple->ts,
122 &err_detail);
123
125
126 ereport(elevel,
128 errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
130 RelationGetRelationName(localrel),
132 errdetail_internal("%s", err_detail.data));
133}
134
135/*
136 * Find all unique indexes to check for a conflict and store them into
137 * ResultRelInfo.
138 */
139void
141{
143
144 for (int i = 0; i < relInfo->ri_NumIndices; i++)
145 {
146 Relation indexRelation = relInfo->ri_IndexRelationDescs[i];
147
148 if (indexRelation == NULL)
149 continue;
150
151 /* Detect conflict only for unique indexes */
152 if (!relInfo->ri_IndexRelationInfo[i]->ii_Unique)
153 continue;
154
155 /* Don't support conflict detection for deferrable index */
156 if (!indexRelation->rd_index->indimmediate)
157 continue;
158
160 RelationGetRelid(indexRelation));
161 }
162
163 relInfo->ri_onConflictArbiterIndexes = uniqueIndexes;
164}
165
166/*
167 * Add SQLSTATE error code to the current conflict report.
168 */
169static int
171{
172 switch (type)
173 {
174 case CT_INSERT_EXISTS:
175 case CT_UPDATE_EXISTS:
184 }
185
186 Assert(false);
187 return 0; /* silence compiler warning */
188}
189
190/*
191 * Helper function to build the additional details for conflicting key,
192 * local row, remote row, and replica identity columns.
193 */
194static void
196 bool need_newline)
197{
198 bool first = true;
199
200 Assert(buf != NULL && tuple_values != NIL);
201
203 {
204 /*
205 * Skip if the value is NULL. This means the current user does not
206 * have enough permissions to see all columns in the table. See
207 * get_tuple_desc().
208 */
209 if (!tuple_value)
210 continue;
211
212 if (first)
213 {
214 /*
215 * translator: The colon is used as a separator in conflict
216 * messages. The first part, built in the caller, describes what
217 * happened locally; the second part lists the conflicting keys
218 * and tuple data.
219 */
221 }
222 else
223 {
224 /*
225 * translator: This is a separator in a list of conflicting keys
226 * and tuple data.
227 */
229 }
230
232 first = false;
233 }
234
235 /* translator: This is the terminator of a conflict message */
237
238 if (need_newline)
240}
241
242/*
243 * Add an errdetail() line showing conflict detail.
244 *
245 * The DETAIL line comprises of two parts:
246 * 1. Explanation of the conflict type, including the origin and commit
247 * timestamp of the local row.
248 * 2. Display of conflicting key, local row, remote new row, and replica
249 * identity columns, if any. The remote old row is excluded as its
250 * information is covered in the replica identity columns.
251 */
252static void
256 Oid indexoid, TransactionId localxmin,
258 StringInfo err_msg)
259{
261 char *origin_name;
262 char *key_desc = NULL;
263 char *local_desc = NULL;
264 char *remote_desc = NULL;
265 char *search_desc = NULL;
266
267 /* Get key, replica identity, remote, and local value data */
272 indexoid);
273
275
276 /* Construct a detailed message describing the type of conflict */
277 switch (type)
278 {
279 case CT_INSERT_EXISTS:
280 case CT_UPDATE_EXISTS:
282 Assert(OidIsValid(indexoid) &&
284
285 if (err_msg->len == 0)
286 {
287 appendStringInfoString(&err_detail, _("Could not apply remote change"));
288
291 true);
292 }
293
294 if (localts)
295 {
297 appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified locally in transaction %u at %s"),
298 get_rel_name(indexoid),
300 else if (replorigin_by_oid(localorigin, true, &origin_name))
301 appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by origin \"%s\" in transaction %u at %s"),
302 get_rel_name(indexoid), origin_name,
304
305 /*
306 * The origin that modified this row has been removed. This
307 * can happen if the origin was created by a different apply
308 * worker and its associated subscription and origin were
309 * dropped after updating the row, or if the origin was
310 * manually dropped by the user.
311 */
312 else
313 appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by a non-existent origin in transaction %u at %s"),
314 get_rel_name(indexoid),
316 }
317 else
318 appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified in transaction %u"),
319 get_rel_name(indexoid), localxmin);
320
323
324 break;
325
328 appendStringInfo(&err_detail, _("Updating the row that was modified locally in transaction %u at %s"),
330 else if (replorigin_by_oid(localorigin, true, &origin_name))
331 appendStringInfo(&err_detail, _("Updating the row that was modified by a different origin \"%s\" in transaction %u at %s"),
332 origin_name, localxmin, timestamptz_to_str(localts));
333
334 /* The origin that modified this row has been removed. */
335 else
336 appendStringInfo(&err_detail, _("Updating the row that was modified by a non-existent origin in transaction %u at %s"),
338
341 search_desc), false);
342
343 break;
344
346 appendStringInfoString(&err_detail, _("Could not find the row to be updated"));
347
350 true);
351
352 if (localts)
353 {
355 appendStringInfo(&err_detail, _("The row to be updated was deleted locally in transaction %u at %s"),
357 else if (replorigin_by_oid(localorigin, true, &origin_name))
358 appendStringInfo(&err_detail, _("The row to be updated was deleted by a different origin \"%s\" in transaction %u at %s"),
359 origin_name, localxmin, timestamptz_to_str(localts));
360
361 /* The origin that modified this row has been removed. */
362 else
363 appendStringInfo(&err_detail, _("The row to be updated was deleted by a non-existent origin in transaction %u at %s"),
365 }
366 else
367 appendStringInfo(&err_detail, _("The row to be updated was deleted"));
368
369 break;
370
372 appendStringInfoString(&err_detail, _("Could not find the row to be updated"));
373
376 false);
377
378 break;
379
382 appendStringInfo(&err_detail, _("Deleting the row that was modified locally in transaction %u at %s"),
384 else if (replorigin_by_oid(localorigin, true, &origin_name))
385 appendStringInfo(&err_detail, _("Deleting the row that was modified by a different origin \"%s\" in transaction %u at %s"),
386 origin_name, localxmin, timestamptz_to_str(localts));
387
388 /* The origin that modified this row has been removed. */
389 else
390 appendStringInfo(&err_detail, _("Deleting the row that was modified by a non-existent origin in transaction %u at %s"),
392
395 search_desc), false);
396
397 break;
398
400 appendStringInfoString(&err_detail, _("Could not find the row to be deleted"));
401
403 list_make1(search_desc), false);
404
405 break;
406 }
407
408 Assert(err_detail.len > 0);
409
410 /*
411 * Insert a blank line to visually separate the new detail line from the
412 * existing ones.
413 */
414 if (err_msg->len > 0)
415 appendStringInfoChar(err_msg, '\n');
416
418}
419
420/*
421 * Extract conflicting key, local row, remote row, and replica identity
422 * columns. Results are set at xxx_desc.
423 *
424 * If the output is NULL, it indicates that the current user lacks permissions
425 * to view the columns involved.
426 */
427static void
429 char **key_desc,
433 Oid indexoid)
434{
435 Relation localrel = relinfo->ri_RelationDesc;
436 Oid relid = RelationGetRelid(localrel);
437 TupleDesc tupdesc = RelationGetDescr(localrel);
438 char *desc = NULL;
439
442
443 /*
444 * Report the conflicting key values in the case of a unique constraint
445 * violation.
446 */
449 {
450 Assert(OidIsValid(indexoid) && localslot);
451
452 desc = build_index_value_desc(estate, localrel, localslot,
453 indexoid);
454
455 if (desc)
456 *key_desc = psprintf(_("key %s"), desc);
457 }
458
459 if (localslot)
460 {
461 /*
462 * The 'modifiedCols' only applies to the new tuple, hence we pass
463 * NULL for the local row.
464 */
465 desc = ExecBuildSlotValueDescription(relid, localslot, tupdesc,
466 NULL, 64);
467
468 if (desc)
469 *local_desc = psprintf(_("local row %s"), desc);
470 }
471
472 if (remoteslot)
473 {
475
476 /*
477 * Although logical replication doesn't maintain the bitmap for the
478 * columns being inserted, we still use it to create 'modifiedCols'
479 * for consistency with other calls to ExecBuildSlotValueDescription.
480 *
481 * Note that generated columns are formed locally on the subscriber.
482 */
484 ExecGetUpdatedCols(relinfo, estate));
486 tupdesc, modifiedCols,
487 64);
488
489 if (desc)
490 *remote_desc = psprintf(_("remote row %s"), desc);
491 }
492
493 if (searchslot)
494 {
495 /*
496 * Note that while index other than replica identity may be used (see
497 * IsIndexUsableForReplicaIdentityFull for details) to find the tuple
498 * when applying update or delete, such an index scan may not result
499 * in a unique tuple and we still compare the complete tuple in such
500 * cases, thus such indexes are not used here.
501 */
503
505
506 /*
507 * If the table has a valid replica identity index, build the index
508 * key value string. Otherwise, construct the full tuple value for
509 * REPLICA IDENTITY FULL cases.
510 */
512 desc = build_index_value_desc(estate, localrel, searchslot, replica_index);
513 else
514 desc = ExecBuildSlotValueDescription(relid, searchslot, tupdesc, NULL, 64);
515
516 if (desc)
517 {
519 *search_desc = psprintf(_("replica identity %s"), desc);
520 else
521 *search_desc = psprintf(_("replica identity full %s"), desc);
522 }
523 }
524}
525
526/*
527 * Helper functions to construct a string describing the contents of an index
528 * entry. See BuildIndexValueDescription for details.
529 *
530 * The caller must ensure that the index with the OID 'indexoid' is locked so
531 * that we can fetch and display the conflicting key value.
532 */
533static char *
535 Oid indexoid)
536{
537 char *index_value;
540 bool isnull[INDEX_MAX_KEYS];
541 TupleTableSlot *tableslot = slot;
542
543 if (!tableslot)
544 return NULL;
545
547
548 indexDesc = index_open(indexoid, NoLock);
549
550 /*
551 * If the slot is a virtual slot, copy it into a heap tuple slot as
552 * FormIndexDatum only works with heap tuple slots.
553 */
554 if (TTS_IS_VIRTUAL(slot))
555 {
556 tableslot = table_slot_create(localrel, &estate->es_tupleTable);
557 tableslot = ExecCopySlot(tableslot, slot);
558 }
559
560 /*
561 * Initialize ecxt_scantuple for potential use in FormIndexDatum when
562 * index expressions are present.
563 */
564 GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
565
566 /*
567 * The values/nulls arrays passed to BuildIndexValueDescription should be
568 * the results of FormIndexDatum, which are the "raw" input to the index
569 * AM.
570 */
571 FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull);
572
574
576
577 return index_value;
578}
Subscription * MySubscription
Definition worker.c:484
const char * timestamptz_to_str(TimestampTz t)
Definition timestamp.c:1856
Bitmapset * bms_union(const Bitmapset *a, const Bitmapset *b)
Definition bitmapset.c:251
static Datum values[MAXATTR]
Definition bootstrap.c:190
#define Assert(condition)
Definition c.h:943
uint32 TransactionId
Definition c.h:736
#define OidIsValid(objectId)
Definition c.h:858
bool track_commit_timestamp
Definition commit_ts.c:121
bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, ReplOriginId *nodeid)
Definition commit_ts.c:283
bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, ReplOriginId *localorigin, TimestampTz *localts)
Definition conflict.c:64
void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *remoteslot, List *conflicttuples)
Definition conflict.c:105
static const char *const ConflictTypeNames[]
Definition conflict.c:27
static char * build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot, Oid indexoid)
Definition conflict.c:534
static void errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *localslot, TupleTableSlot *remoteslot, Oid indexoid, TransactionId localxmin, ReplOriginId localorigin, TimestampTz localts, StringInfo err_msg)
Definition conflict.c:253
void InitConflictIndexes(ResultRelInfo *relInfo)
Definition conflict.c:140
static int errcode_apply_conflict(ConflictType type)
Definition conflict.c:170
static void get_tuple_desc(EState *estate, ResultRelInfo *relinfo, ConflictType type, char **key_desc, TupleTableSlot *searchslot, char **search_desc, TupleTableSlot *localslot, char **local_desc, TupleTableSlot *remoteslot, char **remote_desc, Oid indexoid)
Definition conflict.c:428
static void append_tuple_value_detail(StringInfo buf, List *tuple_values, bool need_newline)
Definition conflict.c:195
ConflictType
Definition conflict.h:32
@ CT_UPDATE_DELETED
Definition conflict.h:43
@ CT_MULTIPLE_UNIQUE_CONFLICTS
Definition conflict.h:55
@ CT_DELETE_MISSING
Definition conflict.h:52
@ CT_UPDATE_ORIGIN_DIFFERS
Definition conflict.h:37
@ CT_INSERT_EXISTS
Definition conflict.h:34
@ CT_UPDATE_EXISTS
Definition conflict.h:40
@ CT_UPDATE_MISSING
Definition conflict.h:46
@ CT_DELETE_ORIGIN_DIFFERS
Definition conflict.h:49
int64 TimestampTz
Definition timestamp.h:39
int errcode(int sqlerrcode)
Definition elog.c:874
#define _(x)
Definition elog.c:95
int int errdetail_internal(const char *fmt,...) pg_attribute_printf(1
#define ereport(elevel,...)
Definition elog.h:151
char * ExecBuildSlotValueDescription(Oid reloid, TupleTableSlot *slot, TupleDesc tupdesc, Bitmapset *modifiedCols, int maxfieldlen)
Definition execMain.c:2420
Bitmapset * ExecGetInsertedCols(ResultRelInfo *relinfo, EState *estate)
Definition execUtils.c:1387
Bitmapset * ExecGetUpdatedCols(ResultRelInfo *relinfo, EState *estate)
Definition execUtils.c:1408
#define GetPerTupleExprContext(estate)
Definition executor.h:667
char * BuildIndexValueDescription(Relation indexRelation, const Datum *values, const bool *isnull)
Definition genam.c:178
IndexInfo * BuildIndexInfo(Relation index)
Definition index.c:2446
void FormIndexDatum(IndexInfo *indexInfo, TupleTableSlot *slot, EState *estate, Datum *values, bool *isnull)
Definition index.c:2748
void index_close(Relation relation, LOCKMODE lockmode)
Definition indexam.c:178
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition indexam.c:134
int i
Definition isn.c:77
List * lappend_oid(List *list, Oid datum)
Definition list.c:375
bool CheckRelationOidLockedByMe(Oid relid, LOCKMODE lockmode, bool orstronger)
Definition lmgr.c:351
#define NoLock
Definition lockdefs.h:34
#define RowExclusiveLock
Definition lockdefs.h:38
char * get_rel_name(Oid relid)
Definition lsyscache.c:2148
char * get_namespace_name(Oid nspid)
Definition lsyscache.c:3588
static char * errmsg
bool replorigin_by_oid(ReplOriginId roident, bool missing_ok, char **roname)
Definition origin.c:513
#define InvalidReplOriginId
Definition origin.h:33
#define INDEX_MAX_KEYS
#define NIL
Definition pg_list.h:68
#define list_make1(x1)
Definition pg_list.h:244
#define foreach_ptr(type, var, lst)
Definition pg_list.h:501
#define list_make3(x1, x2, x3)
Definition pg_list.h:248
#define list_make2(x1, x2)
Definition pg_list.h:246
static char buf[DEFAULT_XLOG_SEG_SIZE]
#define ERRCODE_T_R_SERIALIZATION_FAILURE
Definition pgbench.c:77
void pgstat_report_subscription_conflict(Oid subid, ConflictType type)
uint64_t Datum
Definition postgres.h:70
static TransactionId DatumGetTransactionId(Datum X)
Definition postgres.h:282
unsigned int Oid
static int fb(int x)
char * psprintf(const char *fmt,...)
Definition psprintf.c:43
#define RelationGetRelid(relation)
Definition rel.h:516
#define RelationGetDescr(relation)
Definition rel.h:542
#define RelationGetRelationName(relation)
Definition rel.h:550
#define RelationGetNamespace(relation)
Definition rel.h:557
Oid GetRelationIdentityOrPK(Relation rel)
Definition relation.c:905
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition stringinfo.c:230
void appendStringInfoChar(StringInfo str, char ch)
Definition stringinfo.c:242
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
List * es_tupleTable
Definition execnodes.h:748
Definition pg_list.h:54
Form_pg_index rd_index
Definition rel.h:192
#define MinTransactionIdAttributeNumber
Definition sysattr.h:22
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition tableam.c:92
#define TTS_IS_VIRTUAL(slot)
Definition tuptable.h:253
static Datum slot_getsysattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition tuptable.h:438
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition tuptable.h:543
const char * type
uint16 ReplOriginId
Definition xlogdefs.h:69