PostgreSQL Source Code  git master
reorderbuffer.h
Go to the documentation of this file.
1 /*
2  * reorderbuffer.h
3  * PostgreSQL logical replay/reorder buffer management.
4  *
5  * Copyright (c) 2012-2020, PostgreSQL Global Development Group
6  *
7  * src/include/replication/reorderbuffer.h
8  */
9 #ifndef REORDERBUFFER_H
10 #define REORDERBUFFER_H
11 
12 #include "access/htup_details.h"
13 #include "lib/ilist.h"
14 #include "storage/sinval.h"
15 #include "utils/hsearch.h"
16 #include "utils/relcache.h"
17 #include "utils/snapshot.h"
18 #include "utils/timestamp.h"
19 
21 
22 /* an individual tuple, stored in one chunk of memory */
23 typedef struct ReorderBufferTupleBuf
24 {
25  /* position in preallocated list */
27 
28  /* tuple header, the interesting bit for users of logical decoding */
30 
31  /* pre-allocated size of tuple buffer, different from tuple size */
33 
34  /* actual tuple data follows */
36 
37 /* pointer to the data stored in a TupleBuf */
38 #define ReorderBufferTupleBufData(p) \
39  ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))
40 
41 /*
42  * Types of the change passed to a 'change' callback.
43  *
44  * For efficiency and simplicity reasons we want to keep Snapshots, CommandIds
45  * and ComboCids in the same list with the user visible INSERT/UPDATE/DELETE
46  * changes. Users of the decoding facilities will never see changes with
47  * *_INTERNAL_* actions.
48  *
49  * The INTERNAL_SPEC_INSERT and INTERNAL_SPEC_CONFIRM changes concern
50  * "speculative insertions", and their confirmation respectively. They're
51  * used by INSERT .. ON CONFLICT .. UPDATE. Users of logical decoding don't
52  * have to care about these.
53  */
55 {
66 };
67 
68 /* forward declaration */
69 struct ReorderBufferTXN;
70 
71 /*
72  * a single 'change', can be an insert (with one tuple), an update (old, new),
73  * or a delete (old).
74  *
75  * The same struct is also used internally for other purposes but that should
76  * never be visible outside reorderbuffer.c.
77  */
78 typedef struct ReorderBufferChange
79 {
81 
82  /* The type of change. */
84 
85  /* Transaction this change belongs to. */
87 
89 
90  /*
91  * Context data for the change. Which part of the union is valid depends
92  * on action.
93  */
94  union
95  {
96  /* Old, new tuples when action == *_INSERT|UPDATE|DELETE */
97  struct
98  {
99  /* relation that has been changed */
101 
102  /* no previously reassembled toast chunks are necessary anymore */
104 
105  /* valid for DELETE || UPDATE */
107  /* valid for INSERT || UPDATE */
109  } tp;
110 
111  /*
112  * Truncate data for REORDER_BUFFER_CHANGE_TRUNCATE representing one
113  * set of relations to be truncated.
114  */
115  struct
116  {
118  bool cascade;
121  } truncate;
122 
123  /* Message with arbitrary data. */
124  struct
125  {
126  char *prefix;
128  char *message;
129  } msg;
130 
131  /* New snapshot, set when action == *_INTERNAL_SNAPSHOT */
133 
134  /*
135  * New command id for existing snapshot in a catalog changing tx. Set
136  * when action == *_INTERNAL_COMMAND_ID.
137  */
139 
140  /*
141  * New cid mapping for catalog changing transaction, set when action
142  * == *_INTERNAL_TUPLECID.
143  */
144  struct
145  {
151  } tuplecid;
152  } data;
153 
154  /*
155  * While in use this is how a change is linked into a transactions,
156  * otherwise it's the preallocated list.
157  */
160 
161 /* ReorderBufferTXN txn_flags */
162 #define RBTXN_HAS_CATALOG_CHANGES 0x0001
163 #define RBTXN_IS_SUBXACT 0x0002
164 #define RBTXN_IS_SERIALIZED 0x0004
165 
166 /* Does the transaction have catalog changes? */
167 #define rbtxn_has_catalog_changes(txn) \
168 ( \
169  ((txn)->txn_flags & RBTXN_HAS_CATALOG_CHANGES) != 0 \
170 )
171 
172 /* Is the transaction known as a subxact? */
173 #define rbtxn_is_known_subxact(txn) \
174 ( \
175  ((txn)->txn_flags & RBTXN_IS_SUBXACT) != 0 \
176 )
177 
178 /* Has this transaction been spilled to disk? */
179 #define rbtxn_is_serialized(txn) \
180 ( \
181  ((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \
182 )
183 
184 typedef struct ReorderBufferTXN
185 {
186  /* See above */
188 
189  /* The transaction's transaction id, can be a toplevel or sub xid. */
191 
192  /* Xid of top-level transaction, if known */
194 
195  /*
196  * LSN of the first data carrying, WAL record with knowledge about this
197  * xid. This is allowed to *not* be first record adorned with this xid, if
198  * the previous records aren't relevant for logical decoding.
199  */
201 
202  /* ----
203  * LSN of the record that lead to this xact to be committed or
204  * aborted. This can be a
205  * * plain commit record
206  * * plain commit record, of a parent transaction
207  * * prepared transaction commit
208  * * plain abort record
209  * * prepared transaction abort
210  *
211  * This can also become set to earlier values than transaction end when
212  * a transaction is spilled to disk; specifically it's set to the LSN of
213  * the latest change written to disk so far.
214  * ----
215  */
217 
218  /*
219  * LSN pointing to the end of the commit record + 1.
220  */
222 
223  /*
224  * LSN of the last lsn at which snapshot information reside, so we can
225  * restart decoding from there and fully recover this transaction from
226  * WAL.
227  */
229 
230  /* origin of the change that caused this transaction */
233 
234  /*
235  * Commit time, only known when we read the actual commit record.
236  */
238 
239  /*
240  * The base snapshot is used to decode all changes until either this
241  * transaction modifies the catalog, or another catalog-modifying
242  * transaction commits.
243  */
246  dlist_node base_snapshot_node; /* link in txns_by_base_snapshot_lsn */
247 
248  /*
249  * How many ReorderBufferChange's do we have in this txn.
250  *
251  * Changes in subtransactions are *not* included but tracked separately.
252  */
253  uint64 nentries;
254 
255  /*
256  * How many of the above entries are stored in memory in contrast to being
257  * spilled to disk.
258  */
259  uint64 nentries_mem;
260 
261  /*
262  * List of ReorderBufferChange structs, including new Snapshots and new
263  * CommandIds
264  */
266 
267  /*
268  * List of (relation, ctid) => (cmin, cmax) mappings for catalog tuples.
269  * Those are always assigned to the toplevel transaction. (Keep track of
270  * #entries to create a hash of the right size)
271  */
273  uint64 ntuplecids;
274 
275  /*
276  * On-demand built hash for looking up the above values.
277  */
279 
280  /*
281  * Hash containing (potentially partial) toast entries. NULL if no toast
282  * tuples have been found for the current change.
283  */
285 
286  /*
287  * non-hierarchical list of subtransactions that are *not* aborted. Only
288  * used in toplevel transactions.
289  */
292 
293  /*
294  * Stored cache invalidations. This is not a linked list because we get
295  * all the invalidations at once.
296  */
299 
300  /* ---
301  * Position in one of three lists:
302  * * list of subtransactions if we are *known* to be subxact
303  * * list of toplevel xacts (can be an as-yet unknown subxact)
304  * * list of preallocated ReorderBufferTXNs (if unused)
305  * ---
306  */
308 
309  /*
310  * Size of this transaction (changes currently in memory, in bytes).
311  */
314 
315 /* so we can define the callbacks used inside struct ReorderBuffer itself */
317 
318 /* change callback signature */
320  ReorderBufferTXN *txn,
321  Relation relation,
322  ReorderBufferChange *change);
323 
324 /* truncate callback signature */
326  ReorderBufferTXN *txn,
327  int nrelations,
328  Relation relations[],
329  ReorderBufferChange *change);
330 
331 /* begin callback signature */
332 typedef void (*ReorderBufferBeginCB) (ReorderBuffer *rb,
333  ReorderBufferTXN *txn);
334 
335 /* commit callback signature */
337  ReorderBufferTXN *txn,
338  XLogRecPtr commit_lsn);
339 
340 /* message callback signature */
342  ReorderBufferTXN *txn,
343  XLogRecPtr message_lsn,
344  bool transactional,
345  const char *prefix, Size sz,
346  const char *message);
347 
349 {
350  /*
351  * xid => ReorderBufferTXN lookup table
352  */
354 
355  /*
356  * Transactions that could be a toplevel xact, ordered by LSN of the first
357  * record bearing that xid.
358  */
360 
361  /*
362  * Transactions and subtransactions that have a base snapshot, ordered by
363  * LSN of the record which caused us to first obtain the base snapshot.
364  * This is not the same as toplevel_by_lsn, because we only set the base
365  * snapshot on the first logical-decoding-relevant record (eg. heap
366  * writes), whereas the initial LSN could be set by other operations.
367  */
369 
370  /*
371  * one-entry sized cache for by_txn. Very frequently the same txn gets
372  * looked up over and over again.
373  */
376 
377  /*
378  * Callbacks to be called when a transactions commits.
379  */
385 
386  /*
387  * Pointer that will be passed untouched to the callbacks.
388  */
390 
391  /*
392  * Saved output plugin option
393  */
395 
396  /*
397  * Private memory context.
398  */
400 
401  /*
402  * Memory contexts for specific types objects
403  */
407 
409 
410  /* buffer for disk<->memory conversions */
411  char *outbuf;
413 
414  /* memory accounting */
416 
417  /*
418  * Statistics about transactions spilled to disk.
419  *
420  * A single transaction may be spilled repeatedly, which is why we keep
421  * two different counters. For spilling, the transaction counter includes
422  * both toplevel transactions and subtransactions.
423  */
424  int64 spillCount; /* spill-to-disk invocation counter */
425  int64 spillTxns; /* number of transactions spilled to disk */
426  int64 spillBytes; /* amount of data spilled to disk */
427 };
428 
429 
432 
437 
438 Oid *ReorderBufferGetRelids(ReorderBuffer *, int nrelids);
440 
443  bool transactional, const char *prefix,
444  Size message_size, const char *message);
446  XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
447  TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
450  XLogRecPtr commit_lsn, XLogRecPtr end_lsn);
454 
458  CommandId cid);
461  CommandId cmin, CommandId cmax, CommandId combocid);
463  Size nmsgs, SharedInvalidationMessage *msgs);
465  SharedInvalidationMessage *invalidations);
470 
473 
475 
476 void StartupReorderBuffer(void);
477 
478 #endif
ReorderBuffer * ReorderBufferAllocate(void)
XLogRecPtr first_lsn
uint32 CommandId
Definition: c.h:528
TimestampTz commit_time
ReorderBufferTupleBuf * ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len)
void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
Snapshot base_snapshot
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid)
void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
ReorderBufferApplyChangeCB apply_change
void * private_data
dlist_node base_snapshot_node
RepOriginId origin_id
ReorderBufferTupleBuf * oldtuple
void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn)
uint32 TransactionId
Definition: c.h:514
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid)
void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn)
TransactionId by_txn_last_xid
void ReorderBufferAddSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap)
int64 TimestampTz
Definition: timestamp.h:39
XLogRecPtr current_restart_decoding_lsn
void(* ReorderBufferBeginCB)(ReorderBuffer *rb, ReorderBufferTXN *txn)
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:86
PGDLLIMPORT int logical_decoding_work_mem
void StartupReorderBuffer(void)
uint16 RepOriginId
Definition: xlogdefs.h:58
void ReorderBufferForget(ReorderBuffer *, TransactionId, XLogRecPtr lsn)
ReorderBufferCommitCB commit
void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *)
unsigned int Oid
Definition: postgres_ext.h:31
XLogRecPtr base_snapshot_lsn
Oid * ReorderBufferGetRelids(ReorderBuffer *, int nrelids)
MemoryContext change_context
void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
XLogRecPtr origin_lsn
#define PGDLLIMPORT
Definition: c.h:1271
void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
void ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple)
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *)
Definition: dynahash.c:208
ReorderBufferChangeType
Definition: reorderbuffer.h:54
ReorderBufferTupleBuf * newtuple
dlist_head changes
dlist_head txns_by_base_snapshot_lsn
void ReorderBufferSetBaseSnapshot(ReorderBuffer *, TransactionId, XLogRecPtr lsn, struct SnapshotData *snap)
void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr)
HeapTupleData tuple
Definition: reorderbuffer.h:29
unsigned int uint32
Definition: c.h:359
XLogRecPtr final_lsn
ItemPointerData tid
ReorderBufferMessageCB message
void ReorderBufferReturnRelids(ReorderBuffer *, Oid *relids)
void(* ReorderBufferCommitCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb)
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn)
void(* ReorderBufferApplyChangeCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
Definition: reorderbuffer.h:88
void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn, RelFileNode node, ItemPointerData pt, CommandId cmin, CommandId cmax, CommandId combocid)
MemoryContext context
void ReorderBufferFree(ReorderBuffer *)
ReorderBufferTXN * by_txn_last_txn
void ReorderBufferAbort(ReorderBuffer *, TransactionId, XLogRecPtr lsn)
dlist_head toplevel_by_lsn
TransactionId xid
ReorderBufferTXN * ReorderBufferGetOldestTXN(ReorderBuffer *)
void ReorderBufferAddNewCommandId(ReorderBuffer *, TransactionId, XLogRecPtr lsn, CommandId cid)
uint32 bits32
Definition: c.h:368
uint64 XLogRecPtr
Definition: xlogdefs.h:21
struct ReorderBufferTupleBuf ReorderBufferTupleBuf
XLogRecPtr end_lsn
void ReorderBufferAbortOld(ReorderBuffer *, TransactionId xid)
void(* ReorderBufferMessageCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
size_t Size
Definition: c.h:467
SharedInvalidationMessage * invalidations
void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *)
dlist_head subtxns
void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
void(* ReorderBufferApplyTruncateCB)(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
ReorderBufferApplyTruncateCB apply_truncate
XLogRecPtr restart_decoding_lsn
MemoryContext tup_context
ReorderBufferBeginCB begin
TransactionId toplevel_xid
MemoryContext txn_context
dlist_head tuplecids
struct ReorderBufferTXN ReorderBufferTXN
struct ReorderBufferChange ReorderBufferChange