PostgreSQL Source Code  git master
reorderbuffer.h
Go to the documentation of this file.
1 /*
2  * reorderbuffer.h
3  * PostgreSQL logical replay/reorder buffer management.
4  *
5  * Copyright (c) 2012-2020, PostgreSQL Global Development Group
6  *
7  * src/include/replication/reorderbuffer.h
8  */
9 #ifndef REORDERBUFFER_H
10 #define REORDERBUFFER_H
11 
12 #include "access/htup_details.h"
13 #include "lib/ilist.h"
14 #include "storage/sinval.h"
15 #include "utils/hsearch.h"
16 #include "utils/relcache.h"
17 #include "utils/snapshot.h"
18 #include "utils/timestamp.h"
19 
21 
22 /* an individual tuple, stored in one chunk of memory */
23 typedef struct ReorderBufferTupleBuf
24 {
25  /* position in preallocated list */
27 
28  /* tuple header, the interesting bit for users of logical decoding */
30 
31  /* pre-allocated size of tuple buffer, different from tuple size */
33 
34  /* actual tuple data follows */
36 
37 /* pointer to the data stored in a TupleBuf */
38 #define ReorderBufferTupleBufData(p) \
39  ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))
40 
41 /*
42  * Types of the change passed to a 'change' callback.
43  *
44  * For efficiency and simplicity reasons we want to keep Snapshots, CommandIds
45  * and ComboCids in the same list with the user visible INSERT/UPDATE/DELETE
46  * changes. Users of the decoding facilities will never see changes with
47  * *_INTERNAL_* actions.
48  *
49  * The INTERNAL_SPEC_INSERT and INTERNAL_SPEC_CONFIRM changes concern
50  * "speculative insertions", and their confirmation respectively. They're
51  * used by INSERT .. ON CONFLICT .. UPDATE. Users of logical decoding don't
52  * have to care about these.
53  */
55 {
66 };
67 
68 /* forward declaration */
69 struct ReorderBufferTXN;
70 
71 /*
72  * a single 'change', can be an insert (with one tuple), an update (old, new),
73  * or a delete (old).
74  *
75  * The same struct is also used internally for other purposes but that should
76  * never be visible outside reorderbuffer.c.
77  */
78 typedef struct ReorderBufferChange
79 {
81 
82  /* The type of change. */
84 
85  /* Transaction this change belongs to. */
87 
89 
90  /*
91  * Context data for the change. Which part of the union is valid depends
92  * on action.
93  */
94  union
95  {
96  /* Old, new tuples when action == *_INSERT|UPDATE|DELETE */
97  struct
98  {
99  /* relation that has been changed */
101 
102  /* no previously reassembled toast chunks are necessary anymore */
104 
105  /* valid for DELETE || UPDATE */
107  /* valid for INSERT || UPDATE */
109  } tp;
110 
111  /*
112  * Truncate data for REORDER_BUFFER_CHANGE_TRUNCATE representing one
113  * set of relations to be truncated.
114  */
115  struct
116  {
118  bool cascade;
121  } truncate;
122 
123  /* Message with arbitrary data. */
124  struct
125  {
126  char *prefix;
128  char *message;
129  } msg;
130 
131  /* New snapshot, set when action == *_INTERNAL_SNAPSHOT */
133 
134  /*
135  * New command id for existing snapshot in a catalog changing tx. Set
136  * when action == *_INTERNAL_COMMAND_ID.
137  */
139 
140  /*
141  * New cid mapping for catalog changing transaction, set when action
142  * == *_INTERNAL_TUPLECID.
143  */
144  struct
145  {
151  } tuplecid;
152  } data;
153 
154  /*
155  * While in use this is how a change is linked into a transactions,
156  * otherwise it's the preallocated list.
157  */
160 
161 /* ReorderBufferTXN txn_flags */
162 #define RBTXN_HAS_CATALOG_CHANGES 0x0001
163 #define RBTXN_IS_SUBXACT 0x0002
164 #define RBTXN_IS_SERIALIZED 0x0004
165 #define RBTXN_IS_STREAMED 0x0008
166 #define RBTXN_HAS_TOAST_INSERT 0x0010
167 #define RBTXN_HAS_SPEC_INSERT 0x0020
168 
169 /* Does the transaction have catalog changes? */
170 #define rbtxn_has_catalog_changes(txn) \
171 ( \
172  ((txn)->txn_flags & RBTXN_HAS_CATALOG_CHANGES) != 0 \
173 )
174 
175 /* Is the transaction known as a subxact? */
176 #define rbtxn_is_known_subxact(txn) \
177 ( \
178  ((txn)->txn_flags & RBTXN_IS_SUBXACT) != 0 \
179 )
180 
181 /* Has this transaction been spilled to disk? */
182 #define rbtxn_is_serialized(txn) \
183 ( \
184  ((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \
185 )
186 
187 /* This transaction's changes has toast insert, without main table insert. */
188 #define rbtxn_has_toast_insert(txn) \
189 ( \
190  ((txn)->txn_flags & RBTXN_HAS_TOAST_INSERT) != 0 \
191 )
192 /*
193  * This transaction's changes has speculative insert, without speculative
194  * confirm.
195  */
196 #define rbtxn_has_spec_insert(txn) \
197 ( \
198  ((txn)->txn_flags & RBTXN_HAS_SPEC_INSERT) != 0 \
199 )
200 
201 /* Check whether this transaction has an incomplete change. */
202 #define rbtxn_has_incomplete_tuple(txn) \
203 ( \
204  rbtxn_has_toast_insert(txn) || rbtxn_has_spec_insert(txn) \
205 )
206 
207 /*
208  * Has this transaction been streamed to downstream?
209  *
210  * (It's not possible to deduce this from nentries and nentries_mem for
211  * various reasons. For example, all changes may be in subtransactions in
212  * which case we'd have nentries==0 for the toplevel one, which would say
213  * nothing about the streaming. So we maintain this flag, but only for the
214  * toplevel transaction.)
215  */
216 #define rbtxn_is_streamed(txn) \
217 ( \
218  ((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \
219 )
220 
221 typedef struct ReorderBufferTXN
222 {
223  /* See above */
225 
226  /* The transaction's transaction id, can be a toplevel or sub xid. */
228 
229  /* Xid of top-level transaction, if known */
231 
232  /*
233  * LSN of the first data carrying, WAL record with knowledge about this
234  * xid. This is allowed to *not* be first record adorned with this xid, if
235  * the previous records aren't relevant for logical decoding.
236  */
238 
239  /* ----
240  * LSN of the record that lead to this xact to be committed or
241  * aborted. This can be a
242  * * plain commit record
243  * * plain commit record, of a parent transaction
244  * * prepared transaction commit
245  * * plain abort record
246  * * prepared transaction abort
247  *
248  * This can also become set to earlier values than transaction end when
249  * a transaction is spilled to disk; specifically it's set to the LSN of
250  * the latest change written to disk so far.
251  * ----
252  */
254 
255  /*
256  * LSN pointing to the end of the commit record + 1.
257  */
259 
260  /* Toplevel transaction for this subxact (NULL for top-level). */
262 
263  /*
264  * LSN of the last lsn at which snapshot information reside, so we can
265  * restart decoding from there and fully recover this transaction from
266  * WAL.
267  */
269 
270  /* origin of the change that caused this transaction */
273 
274  /*
275  * Commit time, only known when we read the actual commit record.
276  */
278 
279  /*
280  * The base snapshot is used to decode all changes until either this
281  * transaction modifies the catalog, or another catalog-modifying
282  * transaction commits.
283  */
286  dlist_node base_snapshot_node; /* link in txns_by_base_snapshot_lsn */
287 
288  /*
289  * Snapshot/CID from the previous streaming run. Only valid for already
290  * streamed transactions (NULL/InvalidCommandId otherwise).
291  */
294 
295  /*
296  * How many ReorderBufferChange's do we have in this txn.
297  *
298  * Changes in subtransactions are *not* included but tracked separately.
299  */
300  uint64 nentries;
301 
302  /*
303  * How many of the above entries are stored in memory in contrast to being
304  * spilled to disk.
305  */
306  uint64 nentries_mem;
307 
308  /*
309  * List of ReorderBufferChange structs, including new Snapshots and new
310  * CommandIds
311  */
313 
314  /*
315  * List of (relation, ctid) => (cmin, cmax) mappings for catalog tuples.
316  * Those are always assigned to the toplevel transaction. (Keep track of
317  * #entries to create a hash of the right size)
318  */
320  uint64 ntuplecids;
321 
322  /*
323  * On-demand built hash for looking up the above values.
324  */
326 
327  /*
328  * Hash containing (potentially partial) toast entries. NULL if no toast
329  * tuples have been found for the current change.
330  */
332 
333  /*
334  * non-hierarchical list of subtransactions that are *not* aborted. Only
335  * used in toplevel transactions.
336  */
339 
340  /*
341  * Stored cache invalidations. This is not a linked list because we get
342  * all the invalidations at once.
343  */
346 
347  /* ---
348  * Position in one of three lists:
349  * * list of subtransactions if we are *known* to be subxact
350  * * list of toplevel xacts (can be an as-yet unknown subxact)
351  * * list of preallocated ReorderBufferTXNs (if unused)
352  * ---
353  */
355 
356  /*
357  * Size of this transaction (changes currently in memory, in bytes).
358  */
360 
361  /* Size of top-transaction including sub-transactions. */
363 
364  /* If we have detected concurrent abort then ignore future changes. */
367 
368 /* so we can define the callbacks used inside struct ReorderBuffer itself */
370 
371 /* change callback signature */
373  ReorderBufferTXN *txn,
374  Relation relation,
375  ReorderBufferChange *change);
376 
377 /* truncate callback signature */
379  ReorderBufferTXN *txn,
380  int nrelations,
381  Relation relations[],
382  ReorderBufferChange *change);
383 
384 /* begin callback signature */
385 typedef void (*ReorderBufferBeginCB) (ReorderBuffer *rb,
386  ReorderBufferTXN *txn);
387 
388 /* commit callback signature */
390  ReorderBufferTXN *txn,
391  XLogRecPtr commit_lsn);
392 
393 /* message callback signature */
395  ReorderBufferTXN *txn,
396  XLogRecPtr message_lsn,
397  bool transactional,
398  const char *prefix, Size sz,
399  const char *message);
400 
401 /* start streaming transaction callback signature */
402 typedef void (*ReorderBufferStreamStartCB) (
403  ReorderBuffer *rb,
404  ReorderBufferTXN *txn,
405  XLogRecPtr first_lsn);
406 
407 /* stop streaming transaction callback signature */
408 typedef void (*ReorderBufferStreamStopCB) (
409  ReorderBuffer *rb,
410  ReorderBufferTXN *txn,
411  XLogRecPtr last_lsn);
412 
413 /* discard streamed transaction callback signature */
414 typedef void (*ReorderBufferStreamAbortCB) (
415  ReorderBuffer *rb,
416  ReorderBufferTXN *txn,
417  XLogRecPtr abort_lsn);
418 
419 /* commit streamed transaction callback signature */
421  ReorderBuffer *rb,
422  ReorderBufferTXN *txn,
423  XLogRecPtr commit_lsn);
424 
425 /* stream change callback signature */
427  ReorderBuffer *rb,
428  ReorderBufferTXN *txn,
429  Relation relation,
430  ReorderBufferChange *change);
431 
432 /* stream message callback signature */
434  ReorderBuffer *rb,
435  ReorderBufferTXN *txn,
436  XLogRecPtr message_lsn,
437  bool transactional,
438  const char *prefix, Size sz,
439  const char *message);
440 
441 /* stream truncate callback signature */
443  ReorderBuffer *rb,
444  ReorderBufferTXN *txn,
445  int nrelations,
446  Relation relations[],
447  ReorderBufferChange *change);
448 
450 {
451  /*
452  * xid => ReorderBufferTXN lookup table
453  */
455 
456  /*
457  * Transactions that could be a toplevel xact, ordered by LSN of the first
458  * record bearing that xid.
459  */
461 
462  /*
463  * Transactions and subtransactions that have a base snapshot, ordered by
464  * LSN of the record which caused us to first obtain the base snapshot.
465  * This is not the same as toplevel_by_lsn, because we only set the base
466  * snapshot on the first logical-decoding-relevant record (eg. heap
467  * writes), whereas the initial LSN could be set by other operations.
468  */
470 
471  /*
472  * one-entry sized cache for by_txn. Very frequently the same txn gets
473  * looked up over and over again.
474  */
477 
478  /*
479  * Callbacks to be called when a transactions commits.
480  */
486 
487  /*
488  * Callbacks to be called when streaming a transaction.
489  */
497 
498  /*
499  * Pointer that will be passed untouched to the callbacks.
500  */
502 
503  /*
504  * Saved output plugin option
505  */
507 
508  /*
509  * Private memory context.
510  */
512 
513  /*
514  * Memory contexts for specific types objects
515  */
519 
521 
522  /* buffer for disk<->memory conversions */
523  char *outbuf;
525 
526  /* memory accounting */
528 };
529 
530 
533 
538 
539 Oid *ReorderBufferGetRelids(ReorderBuffer *, int nrelids);
541 
544  bool toast_insert);
546  bool transactional, const char *prefix,
547  Size message_size, const char *message);
549  XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
550  TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
553  XLogRecPtr commit_lsn, XLogRecPtr end_lsn);
557 
561  CommandId cid);
564  CommandId cmin, CommandId cmax, CommandId combocid);
566  Size nmsgs, SharedInvalidationMessage *msgs);
568  SharedInvalidationMessage *invalidations);
573 
576 
578 
579 void StartupReorderBuffer(void);
580 
581 #endif
void(* ReorderBufferStreamTruncateCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
ReorderBuffer * ReorderBufferAllocate(void)
XLogRecPtr first_lsn
uint32 CommandId
Definition: c.h:534
TimestampTz commit_time
ReorderBufferTupleBuf * ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len)
void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
Snapshot base_snapshot
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid)
void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
ReorderBufferApplyChangeCB apply_change
void * private_data
dlist_node base_snapshot_node
RepOriginId origin_id
ReorderBufferTupleBuf * oldtuple
void(* ReorderBufferStreamMessageCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn)
uint32 TransactionId
Definition: c.h:520
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid)
void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn)
TransactionId by_txn_last_xid
void ReorderBufferAddSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap)
int64 TimestampTz
Definition: timestamp.h:39
ReorderBufferStreamAbortCB stream_abort
XLogRecPtr current_restart_decoding_lsn
void(* ReorderBufferStreamAbortCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
void(* ReorderBufferBeginCB)(ReorderBuffer *rb, ReorderBufferTXN *txn)
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:86
void(* ReorderBufferStreamChangeCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
PGDLLIMPORT int logical_decoding_work_mem
void StartupReorderBuffer(void)
uint16 RepOriginId
Definition: xlogdefs.h:58
CommandId command_id
Snapshot snapshot_now
void ReorderBufferForget(ReorderBuffer *, TransactionId, XLogRecPtr lsn)
ReorderBufferCommitCB commit
unsigned int Oid
Definition: postgres_ext.h:31
XLogRecPtr base_snapshot_lsn
ReorderBufferStreamCommitCB stream_commit
Oid * ReorderBufferGetRelids(ReorderBuffer *, int nrelids)
MemoryContext change_context
void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
XLogRecPtr origin_lsn
#define PGDLLIMPORT
Definition: c.h:1257
void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple)
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *)
Definition: dynahash.c:218
ReorderBufferStreamMessageCB stream_message
ReorderBufferChangeType
Definition: reorderbuffer.h:54
void(* ReorderBufferStreamCommitCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
ReorderBufferTupleBuf * newtuple
dlist_head changes
dlist_head txns_by_base_snapshot_lsn
void ReorderBufferSetBaseSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap)
void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr)
HeapTupleData tuple
Definition: reorderbuffer.h:29
unsigned int uint32
Definition: c.h:374
XLogRecPtr final_lsn
struct ReorderBufferTXN * toptxn
ItemPointerData tid
ReorderBufferMessageCB message
void ReorderBufferReturnRelids(ReorderBuffer *, Oid *relids)
ReorderBufferStreamChangeCB stream_change
void(* ReorderBufferCommitCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb)
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn)
void(* ReorderBufferApplyChangeCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
Definition: reorderbuffer.h:88
ReorderBufferStreamTruncateCB stream_truncate
void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn, RelFileNode node, ItemPointerData pt, CommandId cmin, CommandId cmax, CommandId combocid)
MemoryContext context
void(* ReorderBufferStreamStopCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr last_lsn)
void ReorderBufferFree(ReorderBuffer *)
ReorderBufferTXN * by_txn_last_txn
void ReorderBufferAbort(ReorderBuffer *, TransactionId, XLogRecPtr lsn)
dlist_head toplevel_by_lsn
TransactionId xid
ReorderBufferStreamStartCB stream_start
ReorderBufferTXN * ReorderBufferGetOldestTXN(ReorderBuffer *)
void ReorderBufferAddNewCommandId(ReorderBuffer *, TransactionId, XLogRecPtr lsn, CommandId cid)
uint32 bits32
Definition: c.h:383
uint64 XLogRecPtr
Definition: xlogdefs.h:21
struct ReorderBufferTupleBuf ReorderBufferTupleBuf
XLogRecPtr end_lsn
void ReorderBufferAbortOld(ReorderBuffer *, TransactionId xid)
void(* ReorderBufferMessageCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
size_t Size
Definition: c.h:473
SharedInvalidationMessage * invalidations
dlist_head subtxns
void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
void(* ReorderBufferApplyTruncateCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
void(* ReorderBufferStreamStartCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr first_lsn)
ReorderBufferApplyTruncateCB apply_truncate
void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *, bool toast_insert)
void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *, bool)
XLogRecPtr restart_decoding_lsn
MemoryContext tup_context
ReorderBufferBeginCB begin
TransactionId toplevel_xid
MemoryContext txn_context
dlist_head tuplecids
struct ReorderBufferTXN ReorderBufferTXN
struct ReorderBufferChange ReorderBufferChange
ReorderBufferStreamStopCB stream_stop