PostgreSQL Source Code git master
Loading...
Searching...
No Matches
conflict.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 * conflict.c
3 * Support routines for logging conflicts.
4 *
5 * Copyright (c) 2024-2026, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/conflict.c
9 *
10 * This file contains the code for logging conflicts on the subscriber during
11 * logical replication.
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres.h"
16
17#include "access/commit_ts.h"
18#include "access/tableam.h"
19#include "executor/executor.h"
20#include "pgstat.h"
23#include "storage/lmgr.h"
24#include "utils/lsyscache.h"
25
26static const char *const ConflictTypeNames[] = {
27 [CT_INSERT_EXISTS] = "insert_exists",
28 [CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs",
29 [CT_UPDATE_EXISTS] = "update_exists",
30 [CT_UPDATE_MISSING] = "update_missing",
31 [CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
32 [CT_UPDATE_DELETED] = "update_deleted",
33 [CT_DELETE_MISSING] = "delete_missing",
34 [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
35};
36
38static void errdetail_apply_conflict(EState *estate,
44 Oid indexoid, TransactionId localxmin,
47static void get_tuple_desc(EState *estate, ResultRelInfo *relinfo,
52 Oid indexoid);
53static char *build_index_value_desc(EState *estate, Relation localrel,
54 TupleTableSlot *slot, Oid indexoid);
55
56/*
57 * Get the xmin and commit timestamp data (origin and timestamp) associated
58 * with the provided local row.
59 *
60 * Return true if the commit timestamp data was found, false otherwise.
61 */
62bool
65{
67 bool isnull;
68
70 &isnull);
72 Assert(!isnull);
73
74 /*
75 * The commit timestamp data is not available if track_commit_timestamp is
76 * disabled.
77 */
79 {
81 *localts = 0;
82 return false;
83 }
84
86}
87
88/*
89 * This function is used to report a conflict while applying replication
90 * changes.
91 *
92 * 'searchslot' should contain the tuple used to search the local row to be
93 * updated or deleted.
94 *
95 * 'remoteslot' should contain the remote new tuple, if any.
96 *
97 * conflicttuples is a list of local rows that caused the conflict and the
98 * conflict related information. See ConflictTupleInfo.
99 *
100 * The caller must ensure that all the indexes passed in ConflictTupleInfo are
101 * locked so that we can fetch and display the conflicting key values.
102 */
103void
107{
108 Relation localrel = relinfo->ri_RelationDesc;
110
112
113 /* Form errdetail message by combining conflicting tuples information. */
117 conflicttuple->indexoid,
118 conflicttuple->xmin,
119 conflicttuple->origin,
120 conflicttuple->ts,
121 &err_detail);
122
124
125 ereport(elevel,
127 errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
129 RelationGetRelationName(localrel),
131 errdetail_internal("%s", err_detail.data));
132}
133
134/*
135 * Find all unique indexes to check for a conflict and store them into
136 * ResultRelInfo.
137 */
138void
140{
142
143 for (int i = 0; i < relInfo->ri_NumIndices; i++)
144 {
145 Relation indexRelation = relInfo->ri_IndexRelationDescs[i];
146
147 if (indexRelation == NULL)
148 continue;
149
150 /* Detect conflict only for unique indexes */
151 if (!relInfo->ri_IndexRelationInfo[i]->ii_Unique)
152 continue;
153
154 /* Don't support conflict detection for deferrable index */
155 if (!indexRelation->rd_index->indimmediate)
156 continue;
157
159 RelationGetRelid(indexRelation));
160 }
161
162 relInfo->ri_onConflictArbiterIndexes = uniqueIndexes;
163}
164
165/*
166 * Add SQLSTATE error code to the current conflict report.
167 */
168static int
170{
171 switch (type)
172 {
173 case CT_INSERT_EXISTS:
174 case CT_UPDATE_EXISTS:
183 }
184
185 Assert(false);
186 return 0; /* silence compiler warning */
187}
188
189/*
190 * Helper function to build the additional details for conflicting key,
191 * local row, remote row, and replica identity columns.
192 */
193static void
195 bool need_newline)
196{
197 bool first = true;
198
199 Assert(buf != NULL && tuple_values != NIL);
200
202 {
203 /*
204 * Skip if the value is NULL. This means the current user does not
205 * have enough permissions to see all columns in the table. See
206 * get_tuple_desc().
207 */
208 if (!tuple_value)
209 continue;
210
211 if (first)
212 {
213 /*
214 * translator: The colon is used as a separator in conflict
215 * messages. The first part, built in the caller, describes what
216 * happened locally; the second part lists the conflicting keys
217 * and tuple data.
218 */
220 }
221 else
222 {
223 /*
224 * translator: This is a separator in a list of conflicting keys
225 * and tuple data.
226 */
228 }
229
231 first = false;
232 }
233
234 /* translator: This is the terminator of a conflict message */
236
237 if (need_newline)
239}
240
241/*
242 * Add an errdetail() line showing conflict detail.
243 *
244 * The DETAIL line comprises of two parts:
245 * 1. Explanation of the conflict type, including the origin and commit
246 * timestamp of the local row.
247 * 2. Display of conflicting key, local row, remote new row, and replica
248 * identity columns, if any. The remote old row is excluded as its
249 * information is covered in the replica identity columns.
250 */
251static void
255 Oid indexoid, TransactionId localxmin,
257 StringInfo err_msg)
258{
260 char *origin_name;
261 char *key_desc = NULL;
262 char *local_desc = NULL;
263 char *remote_desc = NULL;
264 char *search_desc = NULL;
265
266 /* Get key, replica identity, remote, and local value data */
271 indexoid);
272
274
275 /* Construct a detailed message describing the type of conflict */
276 switch (type)
277 {
278 case CT_INSERT_EXISTS:
279 case CT_UPDATE_EXISTS:
281 Assert(OidIsValid(indexoid) &&
283
284 if (err_msg->len == 0)
285 {
286 appendStringInfoString(&err_detail, _("Could not apply remote change"));
287
290 true);
291 }
292
293 if (localts)
294 {
296 appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified locally in transaction %u at %s"),
297 get_rel_name(indexoid),
299 else if (replorigin_by_oid(localorigin, true, &origin_name))
300 appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by origin \"%s\" in transaction %u at %s"),
301 get_rel_name(indexoid), origin_name,
303
304 /*
305 * The origin that modified this row has been removed. This
306 * can happen if the origin was created by a different apply
307 * worker and its associated subscription and origin were
308 * dropped after updating the row, or if the origin was
309 * manually dropped by the user.
310 */
311 else
312 appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by a non-existent origin in transaction %u at %s"),
313 get_rel_name(indexoid),
315 }
316 else
317 appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified in transaction %u"),
318 get_rel_name(indexoid), localxmin);
319
322
323 break;
324
327 appendStringInfo(&err_detail, _("Updating the row that was modified locally in transaction %u at %s"),
329 else if (replorigin_by_oid(localorigin, true, &origin_name))
330 appendStringInfo(&err_detail, _("Updating the row that was modified by a different origin \"%s\" in transaction %u at %s"),
331 origin_name, localxmin, timestamptz_to_str(localts));
332
333 /* The origin that modified this row has been removed. */
334 else
335 appendStringInfo(&err_detail, _("Updating the row that was modified by a non-existent origin in transaction %u at %s"),
337
340 search_desc), false);
341
342 break;
343
345 appendStringInfoString(&err_detail, _("Could not find the row to be updated"));
346
349 true);
350
351 if (localts)
352 {
354 appendStringInfo(&err_detail, _("The row to be updated was deleted locally in transaction %u at %s"),
356 else if (replorigin_by_oid(localorigin, true, &origin_name))
357 appendStringInfo(&err_detail, _("The row to be updated was deleted by a different origin \"%s\" in transaction %u at %s"),
358 origin_name, localxmin, timestamptz_to_str(localts));
359
360 /* The origin that modified this row has been removed. */
361 else
362 appendStringInfo(&err_detail, _("The row to be updated was deleted by a non-existent origin in transaction %u at %s"),
364 }
365 else
366 appendStringInfo(&err_detail, _("The row to be updated was deleted"));
367
368 break;
369
371 appendStringInfoString(&err_detail, _("Could not find the row to be updated"));
372
375 false);
376
377 break;
378
381 appendStringInfo(&err_detail, _("Deleting the row that was modified locally in transaction %u at %s"),
383 else if (replorigin_by_oid(localorigin, true, &origin_name))
384 appendStringInfo(&err_detail, _("Deleting the row that was modified by a different origin \"%s\" in transaction %u at %s"),
385 origin_name, localxmin, timestamptz_to_str(localts));
386
387 /* The origin that modified this row has been removed. */
388 else
389 appendStringInfo(&err_detail, _("Deleting the row that was modified by a non-existent origin in transaction %u at %s"),
391
394 search_desc), false);
395
396 break;
397
399 appendStringInfoString(&err_detail, _("Could not find the row to be deleted"));
400
402 list_make1(search_desc), false);
403
404 break;
405 }
406
407 Assert(err_detail.len > 0);
408
409 /*
410 * Insert a blank line to visually separate the new detail line from the
411 * existing ones.
412 */
413 if (err_msg->len > 0)
414 appendStringInfoChar(err_msg, '\n');
415
417}
418
419/*
420 * Extract conflicting key, local row, remote row, and replica identity
421 * columns. Results are set at xxx_desc.
422 *
423 * If the output is NULL, it indicates that the current user lacks permissions
424 * to view the columns involved.
425 */
426static void
428 char **key_desc,
432 Oid indexoid)
433{
434 Relation localrel = relinfo->ri_RelationDesc;
435 Oid relid = RelationGetRelid(localrel);
436 TupleDesc tupdesc = RelationGetDescr(localrel);
437 char *desc = NULL;
438
441
442 /*
443 * Report the conflicting key values in the case of a unique constraint
444 * violation.
445 */
448 {
449 Assert(OidIsValid(indexoid) && localslot);
450
451 desc = build_index_value_desc(estate, localrel, localslot,
452 indexoid);
453
454 if (desc)
455 *key_desc = psprintf(_("key %s"), desc);
456 }
457
458 if (localslot)
459 {
460 /*
461 * The 'modifiedCols' only applies to the new tuple, hence we pass
462 * NULL for the local row.
463 */
464 desc = ExecBuildSlotValueDescription(relid, localslot, tupdesc,
465 NULL, 64);
466
467 if (desc)
468 *local_desc = psprintf(_("local row %s"), desc);
469 }
470
471 if (remoteslot)
472 {
474
475 /*
476 * Although logical replication doesn't maintain the bitmap for the
477 * columns being inserted, we still use it to create 'modifiedCols'
478 * for consistency with other calls to ExecBuildSlotValueDescription.
479 *
480 * Note that generated columns are formed locally on the subscriber.
481 */
483 ExecGetUpdatedCols(relinfo, estate));
485 tupdesc, modifiedCols,
486 64);
487
488 if (desc)
489 *remote_desc = psprintf(_("remote row %s"), desc);
490 }
491
492 if (searchslot)
493 {
494 /*
495 * Note that while index other than replica identity may be used (see
496 * IsIndexUsableForReplicaIdentityFull for details) to find the tuple
497 * when applying update or delete, such an index scan may not result
498 * in a unique tuple and we still compare the complete tuple in such
499 * cases, thus such indexes are not used here.
500 */
502
504
505 /*
506 * If the table has a valid replica identity index, build the index
507 * key value string. Otherwise, construct the full tuple value for
508 * REPLICA IDENTITY FULL cases.
509 */
511 desc = build_index_value_desc(estate, localrel, searchslot, replica_index);
512 else
513 desc = ExecBuildSlotValueDescription(relid, searchslot, tupdesc, NULL, 64);
514
515 if (desc)
516 {
518 *search_desc = psprintf(_("replica identity %s"), desc);
519 else
520 *search_desc = psprintf(_("replica identity full %s"), desc);
521 }
522 }
523}
524
525/*
526 * Helper functions to construct a string describing the contents of an index
527 * entry. See BuildIndexValueDescription for details.
528 *
529 * The caller must ensure that the index with the OID 'indexoid' is locked so
530 * that we can fetch and display the conflicting key value.
531 */
532static char *
534 Oid indexoid)
535{
536 char *index_value;
539 bool isnull[INDEX_MAX_KEYS];
540 TupleTableSlot *tableslot = slot;
541
542 if (!tableslot)
543 return NULL;
544
546
547 indexDesc = index_open(indexoid, NoLock);
548
549 /*
550 * If the slot is a virtual slot, copy it into a heap tuple slot as
551 * FormIndexDatum only works with heap tuple slots.
552 */
553 if (TTS_IS_VIRTUAL(slot))
554 {
555 tableslot = table_slot_create(localrel, &estate->es_tupleTable);
556 tableslot = ExecCopySlot(tableslot, slot);
557 }
558
559 /*
560 * Initialize ecxt_scantuple for potential use in FormIndexDatum when
561 * index expressions are present.
562 */
563 GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
564
565 /*
566 * The values/nulls arrays passed to BuildIndexValueDescription should be
567 * the results of FormIndexDatum, which are the "raw" input to the index
568 * AM.
569 */
570 FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull);
571
573
575
576 return index_value;
577}
Subscription * MySubscription
Definition worker.c:479
const char * timestamptz_to_str(TimestampTz t)
Definition timestamp.c:1862
Bitmapset * bms_union(const Bitmapset *a, const Bitmapset *b)
Definition bitmapset.c:251
static Datum values[MAXATTR]
Definition bootstrap.c:155
#define Assert(condition)
Definition c.h:873
uint32 TransactionId
Definition c.h:666
#define OidIsValid(objectId)
Definition c.h:788
bool track_commit_timestamp
Definition commit_ts.c:109
bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, RepOriginId *nodeid)
Definition commit_ts.c:272
void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *remoteslot, List *conflicttuples)
Definition conflict.c:104
static void errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *localslot, TupleTableSlot *remoteslot, Oid indexoid, TransactionId localxmin, RepOriginId localorigin, TimestampTz localts, StringInfo err_msg)
Definition conflict.c:252
static const char *const ConflictTypeNames[]
Definition conflict.c:26
static char * build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot, Oid indexoid)
Definition conflict.c:533
void InitConflictIndexes(ResultRelInfo *relInfo)
Definition conflict.c:139
static int errcode_apply_conflict(ConflictType type)
Definition conflict.c:169
static void get_tuple_desc(EState *estate, ResultRelInfo *relinfo, ConflictType type, char **key_desc, TupleTableSlot *searchslot, char **search_desc, TupleTableSlot *localslot, char **local_desc, TupleTableSlot *remoteslot, char **remote_desc, Oid indexoid)
Definition conflict.c:427
bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, RepOriginId *localorigin, TimestampTz *localts)
Definition conflict.c:63
static void append_tuple_value_detail(StringInfo buf, List *tuple_values, bool need_newline)
Definition conflict.c:194
ConflictType
Definition conflict.h:32
@ CT_UPDATE_DELETED
Definition conflict.h:43
@ CT_MULTIPLE_UNIQUE_CONFLICTS
Definition conflict.h:55
@ CT_DELETE_MISSING
Definition conflict.h:52
@ CT_UPDATE_ORIGIN_DIFFERS
Definition conflict.h:37
@ CT_INSERT_EXISTS
Definition conflict.h:34
@ CT_UPDATE_EXISTS
Definition conflict.h:40
@ CT_UPDATE_MISSING
Definition conflict.h:46
@ CT_DELETE_ORIGIN_DIFFERS
Definition conflict.h:49
int64 TimestampTz
Definition timestamp.h:39
int errdetail_internal(const char *fmt,...)
Definition elog.c:1243
int errcode(int sqlerrcode)
Definition elog.c:863
int errmsg(const char *fmt,...)
Definition elog.c:1080
#define _(x)
Definition elog.c:91
#define ereport(elevel,...)
Definition elog.h:150
char * ExecBuildSlotValueDescription(Oid reloid, TupleTableSlot *slot, TupleDesc tupdesc, Bitmapset *modifiedCols, int maxfieldlen)
Definition execMain.c:2395
Bitmapset * ExecGetInsertedCols(ResultRelInfo *relinfo, EState *estate)
Definition execUtils.c:1361
Bitmapset * ExecGetUpdatedCols(ResultRelInfo *relinfo, EState *estate)
Definition execUtils.c:1382
#define GetPerTupleExprContext(estate)
Definition executor.h:656
char * BuildIndexValueDescription(Relation indexRelation, const Datum *values, const bool *isnull)
Definition genam.c:178
IndexInfo * BuildIndexInfo(Relation index)
Definition index.c:2426
void FormIndexDatum(IndexInfo *indexInfo, TupleTableSlot *slot, EState *estate, Datum *values, bool *isnull)
Definition index.c:2728
void index_close(Relation relation, LOCKMODE lockmode)
Definition indexam.c:177
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition indexam.c:133
int i
Definition isn.c:77
List * lappend_oid(List *list, Oid datum)
Definition list.c:375
bool CheckRelationOidLockedByMe(Oid relid, LOCKMODE lockmode, bool orstronger)
Definition lmgr.c:351
#define NoLock
Definition lockdefs.h:34
#define RowExclusiveLock
Definition lockdefs.h:38
char * get_rel_name(Oid relid)
Definition lsyscache.c:2078
char * get_namespace_name(Oid nspid)
Definition lsyscache.c:3516
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition origin.c:499
#define InvalidRepOriginId
Definition origin.h:33
#define INDEX_MAX_KEYS
#define NIL
Definition pg_list.h:68
#define list_make1(x1)
Definition pg_list.h:212
#define foreach_ptr(type, var, lst)
Definition pg_list.h:469
#define list_make3(x1, x2, x3)
Definition pg_list.h:216
#define list_make2(x1, x2)
Definition pg_list.h:214
static char buf[DEFAULT_XLOG_SEG_SIZE]
#define ERRCODE_T_R_SERIALIZATION_FAILURE
Definition pgbench.c:77
void pgstat_report_subscription_conflict(Oid subid, ConflictType type)
uint64_t Datum
Definition postgres.h:70
static TransactionId DatumGetTransactionId(Datum X)
Definition postgres.h:292
unsigned int Oid
static int fb(int x)
char * psprintf(const char *fmt,...)
Definition psprintf.c:43
#define RelationGetRelid(relation)
Definition rel.h:514
#define RelationGetDescr(relation)
Definition rel.h:540
#define RelationGetRelationName(relation)
Definition rel.h:548
#define RelationGetNamespace(relation)
Definition rel.h:555
Oid GetRelationIdentityOrPK(Relation rel)
Definition relation.c:905
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition stringinfo.c:230
void appendStringInfoChar(StringInfo str, char ch)
Definition stringinfo.c:242
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
List * es_tupleTable
Definition execnodes.h:714
Definition pg_list.h:54
Form_pg_index rd_index
Definition rel.h:192
#define MinTransactionIdAttributeNumber
Definition sysattr.h:22
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition tableam.c:92
#define TTS_IS_VIRTUAL(slot)
Definition tuptable.h:237
static Datum slot_getsysattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition tuptable.h:419
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition tuptable.h:524
const char * type
uint16 RepOriginId
Definition xlogdefs.h:69