PostgreSQL Source Code git master
Loading...
Searching...
No Matches
conflict.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 * conflict.c
3 * Support routines for logging conflicts.
4 *
5 * Copyright (c) 2024-2026, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/conflict.c
9 *
10 * This file contains the code for logging conflicts on the subscriber during
11 * logical replication.
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres.h"
16
17#include "access/commit_ts.h"
18#include "access/genam.h"
19#include "access/tableam.h"
20#include "executor/executor.h"
21#include "pgstat.h"
24#include "storage/lmgr.h"
25#include "utils/lsyscache.h"
26
27static const char *const ConflictTypeNames[] = {
28 [CT_INSERT_EXISTS] = "insert_exists",
29 [CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs",
30 [CT_UPDATE_EXISTS] = "update_exists",
31 [CT_UPDATE_MISSING] = "update_missing",
32 [CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
33 [CT_UPDATE_DELETED] = "update_deleted",
34 [CT_DELETE_MISSING] = "delete_missing",
35 [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
36};
37
39static void errdetail_apply_conflict(EState *estate,
45 Oid indexoid, TransactionId localxmin,
48static void get_tuple_desc(EState *estate, ResultRelInfo *relinfo,
53 Oid indexoid);
54static char *build_index_value_desc(EState *estate, Relation localrel,
55 TupleTableSlot *slot, Oid indexoid);
56
57/*
58 * Get the xmin and commit timestamp data (origin and timestamp) associated
59 * with the provided local row.
60 *
61 * Return true if the commit timestamp data was found, false otherwise.
62 */
63bool
66{
68 bool isnull;
69
71 &isnull);
73 Assert(!isnull);
74
75 /*
76 * The commit timestamp data is not available if track_commit_timestamp is
77 * disabled.
78 */
80 {
82 *localts = 0;
83 return false;
84 }
85
87}
88
89/*
90 * This function is used to report a conflict while applying replication
91 * changes.
92 *
93 * 'searchslot' should contain the tuple used to search the local row to be
94 * updated or deleted.
95 *
96 * 'remoteslot' should contain the remote new tuple, if any.
97 *
98 * conflicttuples is a list of local rows that caused the conflict and the
99 * conflict related information. See ConflictTupleInfo.
100 *
101 * The caller must ensure that all the indexes passed in ConflictTupleInfo are
102 * locked so that we can fetch and display the conflicting key values.
103 */
104void
108{
109 Relation localrel = relinfo->ri_RelationDesc;
111
113
114 /* Form errdetail message by combining conflicting tuples information. */
118 conflicttuple->indexoid,
119 conflicttuple->xmin,
120 conflicttuple->origin,
121 conflicttuple->ts,
122 &err_detail);
123
125
126 ereport(elevel,
128 errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
130 RelationGetRelationName(localrel),
132 errdetail_internal("%s", err_detail.data));
133}
134
135/*
136 * Find all unique indexes to check for a conflict and store them into
137 * ResultRelInfo.
138 */
139void
141{
143
144 for (int i = 0; i < relInfo->ri_NumIndices; i++)
145 {
146 Relation indexRelation = relInfo->ri_IndexRelationDescs[i];
147
148 if (indexRelation == NULL)
149 continue;
150
151 /* Detect conflict only for unique indexes */
152 if (!relInfo->ri_IndexRelationInfo[i]->ii_Unique)
153 continue;
154
155 /* Don't support conflict detection for deferrable index */
156 if (!indexRelation->rd_index->indimmediate)
157 continue;
158
160 RelationGetRelid(indexRelation));
161 }
162
163 relInfo->ri_onConflictArbiterIndexes = uniqueIndexes;
164}
165
166/*
167 * Add SQLSTATE error code to the current conflict report.
168 */
169static int
171{
172 switch (type)
173 {
174 case CT_INSERT_EXISTS:
175 case CT_UPDATE_EXISTS:
184 }
185
186 Assert(false);
187 return 0; /* silence compiler warning */
188}
189
190/*
191 * Helper function to build the additional details for conflicting key,
192 * local row, remote row, and replica identity columns.
193 */
194static void
196{
197 bool first = true;
198
199 Assert(buf != NULL && tuple_values != NIL);
200
202 {
203 /*
204 * Skip if the value is NULL. This means the current user does not
205 * have enough permissions to see all columns in the table. See
206 * get_tuple_desc().
207 */
208 if (!tuple_value)
209 continue;
210
211 /* standard SQL punctuation, not translated */
212 if (!first)
214
216 first = false;
217 }
218}
219
220/*
221 * Add an errdetail() line showing conflict detail.
222 *
223 * The DETAIL line comprises of two parts:
224 * 1. Explanation of the conflict type, including the origin and commit
225 * timestamp of the local row.
226 * 2. Display of conflicting key, local row, remote new row, and replica
227 * identity columns, if any. The remote old row is excluded as its
228 * information is covered in the replica identity columns.
229 */
230static void
234 Oid indexoid, TransactionId localxmin,
236 StringInfo err_msg)
237{
240 char *origin_name;
241 char *key_desc = NULL;
242 char *local_desc = NULL;
243 char *remote_desc = NULL;
244 char *search_desc = NULL;
245
246 /* Get key, replica identity, remote, and local value data */
251 indexoid);
252
255
256 /* Construct a detailed message describing the type of conflict */
257 switch (type)
258 {
259 case CT_INSERT_EXISTS:
260 case CT_UPDATE_EXISTS:
262 Assert(OidIsValid(indexoid) &&
264
265 if (err_msg->len == 0)
266 {
269
270 if (tuple_buf.len)
271 appendStringInfo(&err_detail, _("Could not apply remote change: %s.\n"),
272 tuple_buf.data);
273 else
274 appendStringInfo(&err_detail, _("Could not apply remote change.\n"));
275
276
278 }
279
282
283 if (localts)
284 {
286 {
287 if (tuple_buf.len)
288 appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified locally in transaction %u at %s: %s."),
289 get_rel_name(indexoid),
291 tuple_buf.data);
292 else
293 appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified locally in transaction %u at %s."),
294 get_rel_name(indexoid),
296 }
297 else if (replorigin_by_oid(localorigin, true, &origin_name))
298 {
299 if (tuple_buf.len)
300 appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by origin \"%s\" in transaction %u at %s: %s."),
301 get_rel_name(indexoid), origin_name,
303 tuple_buf.data);
304 else
305 appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by origin \"%s\" in transaction %u at %s."),
306 get_rel_name(indexoid), origin_name,
308 }
309
310 /*
311 * The origin that modified this row has been removed. This
312 * can happen if the origin was created by a different apply
313 * worker and its associated subscription and origin were
314 * dropped after updating the row, or if the origin was
315 * manually dropped by the user.
316 */
317 else
318 {
319 if (tuple_buf.len)
320 appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by a non-existent origin in transaction %u at %s: %s."),
321 get_rel_name(indexoid),
323 tuple_buf.data);
324 else
325 appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified by a non-existent origin in transaction %u at %s."),
326 get_rel_name(indexoid),
328 }
329 }
330 else
331 {
332 if (tuple_buf.len)
333 appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified in transaction %u: %s."),
334 get_rel_name(indexoid), localxmin,
335 tuple_buf.data);
336 else
337 appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified in transaction %u."),
338 get_rel_name(indexoid), localxmin);
339 }
340
341 break;
342
346 search_desc));
347
349 {
350 if (tuple_buf.len)
351 appendStringInfo(&err_detail, _("Updating the row that was modified locally in transaction %u at %s: %s."),
353 tuple_buf.data);
354 else
355 appendStringInfo(&err_detail, _("Updating the row that was modified locally in transaction %u at %s."),
357 }
358 else if (replorigin_by_oid(localorigin, true, &origin_name))
359 {
360 if (tuple_buf.len)
361 appendStringInfo(&err_detail, _("Updating the row that was modified by a different origin \"%s\" in transaction %u at %s: %s."),
362 origin_name, localxmin,
364 tuple_buf.data);
365 else
366 appendStringInfo(&err_detail, _("Updating the row that was modified by a different origin \"%s\" in transaction %u at %s."),
367 origin_name, localxmin,
369 }
370
371 /* The origin that modified this row has been removed. */
372 else
373 {
374 if (tuple_buf.len)
375 appendStringInfo(&err_detail, _("Updating the row that was modified by a non-existent origin in transaction %u at %s: %s."),
377 tuple_buf.data);
378 else
379 appendStringInfo(&err_detail, _("Updating the row that was modified by a non-existent origin in transaction %u at %s."),
381 }
382
383 break;
384
388
389 if (tuple_buf.len)
390 appendStringInfo(&err_detail, _("Could not find the row to be updated: %s.\n"),
391 tuple_buf.data);
392 else
393 appendStringInfo(&err_detail, _("Could not find the row to be updated.\n"));
394
395 if (localts)
396 {
398 appendStringInfo(&err_detail, _("The row to be updated was deleted locally in transaction %u at %s"),
400 else if (replorigin_by_oid(localorigin, true, &origin_name))
401 appendStringInfo(&err_detail, _("The row to be updated was deleted by a different origin \"%s\" in transaction %u at %s"),
402 origin_name, localxmin, timestamptz_to_str(localts));
403
404 /* The origin that modified this row has been removed. */
405 else
406 appendStringInfo(&err_detail, _("The row to be updated was deleted by a non-existent origin in transaction %u at %s"),
408 }
409 else
410 appendStringInfoString(&err_detail, _("The row to be updated was deleted"));
411
412 break;
413
417
418 if (tuple_buf.len)
419 appendStringInfo(&err_detail, _("Could not find the row to be updated: %s."),
420 tuple_buf.data);
421 else
422 appendStringInfo(&err_detail, _("Could not find the row to be updated."));
423
424 break;
425
429 search_desc));
430
432 {
433 if (tuple_buf.len)
434 appendStringInfo(&err_detail, _("Deleting the row that was modified locally in transaction %u at %s: %s."),
436 tuple_buf.data);
437 else
438 appendStringInfo(&err_detail, _("Deleting the row that was modified locally in transaction %u at %s."),
440 }
441 else if (replorigin_by_oid(localorigin, true, &origin_name))
442 {
443 if (tuple_buf.len)
444 appendStringInfo(&err_detail, _("Deleting the row that was modified by a different origin \"%s\" in transaction %u at %s: %s."),
445 origin_name, localxmin,
447 tuple_buf.data);
448 else
449 appendStringInfo(&err_detail, _("Deleting the row that was modified by a different origin \"%s\" in transaction %u at %s."),
450 origin_name, localxmin,
452 }
453
454 /* The origin that modified this row has been removed. */
455 else
456 {
457 if (tuple_buf.len)
458 appendStringInfo(&err_detail, _("Deleting the row that was modified by a non-existent origin in transaction %u at %s: %s."),
460 tuple_buf.data);
461 else
462 appendStringInfo(&err_detail, _("Deleting the row that was modified by a non-existent origin in transaction %u at %s."),
464 }
465
466 break;
467
471
472 if (tuple_buf.len)
473 appendStringInfo(&err_detail, _("Could not find the row to be deleted: %s."),
474 tuple_buf.data);
475 else
476 appendStringInfo(&err_detail, _("Could not find the row to be deleted."));
477
478 break;
479 }
480
481 Assert(err_detail.len > 0);
482
483 /*
484 * Insert a blank line to visually separate the new detail line from the
485 * existing ones.
486 */
487 if (err_msg->len > 0)
488 appendStringInfoChar(err_msg, '\n');
489
491}
492
493/*
494 * Extract conflicting key, local row, remote row, and replica identity
495 * columns. Results are set at xxx_desc.
496 *
497 * If the output is NULL, it indicates that the current user lacks permissions
498 * to view the columns involved.
499 */
500static void
502 char **key_desc,
506 Oid indexoid)
507{
508 Relation localrel = relinfo->ri_RelationDesc;
509 Oid relid = RelationGetRelid(localrel);
510 TupleDesc tupdesc = RelationGetDescr(localrel);
511 char *desc = NULL;
512
515
516 /*
517 * Report the conflicting key values in the case of a unique constraint
518 * violation.
519 */
522 {
523 Assert(OidIsValid(indexoid) && localslot);
524
525 desc = build_index_value_desc(estate, localrel, localslot,
526 indexoid);
527
528 if (desc)
529 *key_desc = psprintf(_("key %s"), desc);
530 }
531
532 if (localslot)
533 {
534 /*
535 * The 'modifiedCols' only applies to the new tuple, hence we pass
536 * NULL for the local row.
537 */
538 desc = ExecBuildSlotValueDescription(relid, localslot, tupdesc,
539 NULL, 64);
540
541 if (desc)
542 *local_desc = psprintf(_("local row %s"), desc);
543 }
544
545 if (remoteslot)
546 {
548
549 /*
550 * Although logical replication doesn't maintain the bitmap for the
551 * columns being inserted, we still use it to create 'modifiedCols'
552 * for consistency with other calls to ExecBuildSlotValueDescription.
553 *
554 * Note that generated columns are formed locally on the subscriber.
555 */
557 ExecGetUpdatedCols(relinfo, estate));
559 tupdesc, modifiedCols,
560 64);
561
562 if (desc)
563 *remote_desc = psprintf(_("remote row %s"), desc);
564 }
565
566 if (searchslot)
567 {
568 /*
569 * Note that while index other than replica identity may be used (see
570 * IsIndexUsableForReplicaIdentityFull for details) to find the tuple
571 * when applying update or delete, such an index scan may not result
572 * in a unique tuple and we still compare the complete tuple in such
573 * cases, thus such indexes are not used here.
574 */
576
578
579 /*
580 * If the table has a valid replica identity index, build the index
581 * key value string. Otherwise, construct the full tuple value for
582 * REPLICA IDENTITY FULL cases.
583 */
585 desc = build_index_value_desc(estate, localrel, searchslot, replica_index);
586 else
587 desc = ExecBuildSlotValueDescription(relid, searchslot, tupdesc, NULL, 64);
588
589 if (desc)
590 {
592 *search_desc = psprintf(_("replica identity %s"), desc);
593 else
594 *search_desc = psprintf(_("replica identity full %s"), desc);
595 }
596 }
597}
598
599/*
600 * Helper functions to construct a string describing the contents of an index
601 * entry. See BuildIndexValueDescription for details.
602 *
603 * The caller must ensure that the index with the OID 'indexoid' is locked so
604 * that we can fetch and display the conflicting key value.
605 */
606static char *
608 Oid indexoid)
609{
610 char *index_value;
613 bool isnull[INDEX_MAX_KEYS];
614 TupleTableSlot *tableslot = slot;
615
616 if (!tableslot)
617 return NULL;
618
620
621 indexDesc = index_open(indexoid, NoLock);
622
623 /*
624 * If the slot is a virtual slot, copy it into a heap tuple slot as
625 * FormIndexDatum only works with heap tuple slots.
626 */
627 if (TTS_IS_VIRTUAL(slot))
628 {
629 tableslot = table_slot_create(localrel, &estate->es_tupleTable);
630 tableslot = ExecCopySlot(tableslot, slot);
631 }
632
633 /*
634 * Initialize ecxt_scantuple for potential use in FormIndexDatum when
635 * index expressions are present.
636 */
637 GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
638
639 /*
640 * The values/nulls arrays passed to BuildIndexValueDescription should be
641 * the results of FormIndexDatum, which are the "raw" input to the index
642 * AM.
643 */
644 FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull);
645
647
649
650 return index_value;
651}
Subscription * MySubscription
Definition worker.c:484
const char * timestamptz_to_str(TimestampTz t)
Definition timestamp.c:1870
Bitmapset * bms_union(const Bitmapset *a, const Bitmapset *b)
Definition bitmapset.c:251
static Datum values[MAXATTR]
Definition bootstrap.c:190
#define Assert(condition)
Definition c.h:943
uint32 TransactionId
Definition c.h:736
#define OidIsValid(objectId)
Definition c.h:858
bool track_commit_timestamp
Definition commit_ts.c:121
bool TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, ReplOriginId *nodeid)
Definition commit_ts.c:283
bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, ReplOriginId *localorigin, TimestampTz *localts)
Definition conflict.c:64
void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *remoteslot, List *conflicttuples)
Definition conflict.c:105
static const char *const ConflictTypeNames[]
Definition conflict.c:27
static char * build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot, Oid indexoid)
Definition conflict.c:607
static void append_tuple_value_detail(StringInfo buf, List *tuple_values)
Definition conflict.c:195
static void get_tuple_desc(EState *estate, ResultRelInfo *relinfo, ConflictType type, char **key_desc, TupleTableSlot *localslot, char **local_desc, TupleTableSlot *remoteslot, char **remote_desc, TupleTableSlot *searchslot, char **search_desc, Oid indexoid)
Definition conflict.c:501
static void errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *localslot, TupleTableSlot *remoteslot, Oid indexoid, TransactionId localxmin, ReplOriginId localorigin, TimestampTz localts, StringInfo err_msg)
Definition conflict.c:231
void InitConflictIndexes(ResultRelInfo *relInfo)
Definition conflict.c:140
static int errcode_apply_conflict(ConflictType type)
Definition conflict.c:170
ConflictType
Definition conflict.h:32
@ CT_UPDATE_DELETED
Definition conflict.h:43
@ CT_MULTIPLE_UNIQUE_CONFLICTS
Definition conflict.h:55
@ CT_DELETE_MISSING
Definition conflict.h:52
@ CT_UPDATE_ORIGIN_DIFFERS
Definition conflict.h:37
@ CT_INSERT_EXISTS
Definition conflict.h:34
@ CT_UPDATE_EXISTS
Definition conflict.h:40
@ CT_UPDATE_MISSING
Definition conflict.h:46
@ CT_DELETE_ORIGIN_DIFFERS
Definition conflict.h:49
int64 TimestampTz
Definition timestamp.h:39
int errcode(int sqlerrcode)
Definition elog.c:875
#define _(x)
Definition elog.c:96
int int errdetail_internal(const char *fmt,...) pg_attribute_printf(1
#define ereport(elevel,...)
Definition elog.h:152
char * ExecBuildSlotValueDescription(Oid reloid, TupleTableSlot *slot, TupleDesc tupdesc, Bitmapset *modifiedCols, int maxfieldlen)
Definition execMain.c:2420
Bitmapset * ExecGetInsertedCols(ResultRelInfo *relinfo, EState *estate)
Definition execUtils.c:1387
Bitmapset * ExecGetUpdatedCols(ResultRelInfo *relinfo, EState *estate)
Definition execUtils.c:1408
#define GetPerTupleExprContext(estate)
Definition executor.h:667
char * BuildIndexValueDescription(Relation indexRelation, const Datum *values, const bool *isnull)
Definition genam.c:178
IndexInfo * BuildIndexInfo(Relation index)
Definition index.c:2446
void FormIndexDatum(IndexInfo *indexInfo, TupleTableSlot *slot, EState *estate, Datum *values, bool *isnull)
Definition index.c:2748
void index_close(Relation relation, LOCKMODE lockmode)
Definition indexam.c:178
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition indexam.c:134
int i
Definition isn.c:77
List * lappend_oid(List *list, Oid datum)
Definition list.c:375
bool CheckRelationOidLockedByMe(Oid relid, LOCKMODE lockmode, bool orstronger)
Definition lmgr.c:351
#define NoLock
Definition lockdefs.h:34
#define RowExclusiveLock
Definition lockdefs.h:38
char * get_rel_name(Oid relid)
Definition lsyscache.c:2159
char * get_namespace_name(Oid nspid)
Definition lsyscache.c:3599
static char * errmsg
bool replorigin_by_oid(ReplOriginId roident, bool missing_ok, char **roname)
Definition origin.c:513
#define InvalidReplOriginId
Definition origin.h:33
#define INDEX_MAX_KEYS
#define NIL
Definition pg_list.h:68
#define list_make1(x1)
Definition pg_list.h:244
#define foreach_ptr(type, var, lst)
Definition pg_list.h:501
#define list_make3(x1, x2, x3)
Definition pg_list.h:248
#define list_make2(x1, x2)
Definition pg_list.h:246
static char buf[DEFAULT_XLOG_SEG_SIZE]
#define ERRCODE_T_R_SERIALIZATION_FAILURE
Definition pgbench.c:77
void pgstat_report_subscription_conflict(Oid subid, ConflictType type)
uint64_t Datum
Definition postgres.h:70
static TransactionId DatumGetTransactionId(Datum X)
Definition postgres.h:282
unsigned int Oid
static int fb(int x)
char * psprintf(const char *fmt,...)
Definition psprintf.c:43
#define RelationGetRelid(relation)
Definition rel.h:516
#define RelationGetDescr(relation)
Definition rel.h:542
#define RelationGetRelationName(relation)
Definition rel.h:550
#define RelationGetNamespace(relation)
Definition rel.h:557
Oid GetRelationIdentityOrPK(Relation rel)
Definition relation.c:905
void resetStringInfo(StringInfo str)
Definition stringinfo.c:126
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition stringinfo.c:230
void appendStringInfoChar(StringInfo str, char ch)
Definition stringinfo.c:242
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
List * es_tupleTable
Definition execnodes.h:748
Definition pg_list.h:54
Form_pg_index rd_index
Definition rel.h:192
#define MinTransactionIdAttributeNumber
Definition sysattr.h:22
TupleTableSlot * table_slot_create(Relation relation, List **reglist)
Definition tableam.c:92
#define TTS_IS_VIRTUAL(slot)
Definition tuptable.h:253
static Datum slot_getsysattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition tuptable.h:438
static TupleTableSlot * ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot)
Definition tuptable.h:544
const char * type
uint16 ReplOriginId
Definition xlogdefs.h:69