PostgreSQL Source Code  git master
reorderbuffer.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * reorderbuffer.c
4  * PostgreSQL logical replay/reorder buffer management
5  *
6  *
7  * Copyright (c) 2012-2021, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/replication/reorderbuffer.c
12  *
13  * NOTES
14  * This module gets handed individual pieces of transactions in the order
15  * they are written to the WAL and is responsible to reassemble them into
16  * toplevel transaction sized pieces. When a transaction is completely
17  * reassembled - signaled by reading the transaction commit record - it
18  * will then call the output plugin (cf. ReorderBufferCommit()) with the
19  * individual changes. The output plugins rely on snapshots built by
20  * snapbuild.c which hands them to us.
21  *
22  * Transactions and subtransactions/savepoints in postgres are not
23  * immediately linked to each other from outside the performing
24  * backend. Only at commit/abort (or special xact_assignment records) they
25  * are linked together. Which means that we will have to splice together a
26  * toplevel transaction from its subtransactions. To do that efficiently we
27  * build a binary heap indexed by the smallest current lsn of the individual
28  * subtransactions' changestreams. As the individual streams are inherently
29  * ordered by LSN - since that is where we build them from - the transaction
30  * can easily be reassembled by always using the subtransaction with the
31  * smallest current LSN from the heap.
32  *
33  * In order to cope with large transactions - which can be several times as
34  * big as the available memory - this module supports spooling the contents
35  * of a large transactions to disk. When the transaction is replayed the
36  * contents of individual (sub-)transactions will be read from disk in
37  * chunks.
38  *
39  * This module also has to deal with reassembling toast records from the
40  * individual chunks stored in WAL. When a new (or initial) version of a
41  * tuple is stored in WAL it will always be preceded by the toast chunks
42  * emitted for the columns stored out of line. Within a single toplevel
43  * transaction there will be no other data carrying records between a row's
44  * toast chunks and the row data itself. See ReorderBufferToast* for
45  * details.
46  *
47  * ReorderBuffer uses two special memory context types - SlabContext for
48  * allocations of fixed-length structures (changes and transactions), and
49  * GenerationContext for the variable-length transaction data (allocated
50  * and freed in groups with similar lifespans).
51  *
52  * To limit the amount of memory used by decoded changes, we track memory
53  * used at the reorder buffer level (i.e. total amount of memory), and for
54  * each transaction. When the total amount of used memory exceeds the
55  * limit, the transaction consuming the most memory is then serialized to
56  * disk.
57  *
58  * Only decoded changes are evicted from memory (spilled to disk), not the
59  * transaction records. The number of toplevel transactions is limited,
60  * but a transaction with many subtransactions may still consume significant
61  * amounts of memory. However, the transaction records are fairly small and
62  * are not included in the memory limit.
63  *
64  * The current eviction algorithm is very simple - the transaction is
65  * picked merely by size, while it might be useful to also consider age
66  * (LSN) of the changes for example. With the new Generational memory
67  * allocator, evicting the oldest changes would make it more likely the
68  * memory gets actually freed.
69  *
70  * We still rely on max_changes_in_memory when loading serialized changes
71  * back into memory. At that point we can't use the memory limit directly
72  * as we load the subxacts independently. One option to deal with this
73  * would be to count the subxacts, and allow each to allocate 1/N of the
74  * memory limit. That however does not seem very appealing, because with
75  * many subtransactions it may easily cause thrashing (short cycles of
76  * deserializing and applying very few changes). We probably should give
77  * a bit more memory to the oldest subtransactions, because it's likely
78  * they are the source for the next sequence of changes.
79  *
80  * -------------------------------------------------------------------------
81  */
82 #include "postgres.h"
83 
84 #include <unistd.h>
85 #include <sys/stat.h>
86 
87 #include "access/detoast.h"
88 #include "access/heapam.h"
89 #include "access/rewriteheap.h"
90 #include "access/transam.h"
91 #include "access/xact.h"
92 #include "access/xlog_internal.h"
93 #include "catalog/catalog.h"
94 #include "lib/binaryheap.h"
95 #include "miscadmin.h"
96 #include "pgstat.h"
97 #include "replication/logical.h"
99 #include "replication/slot.h"
100 #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
101 #include "storage/bufmgr.h"
102 #include "storage/fd.h"
103 #include "storage/sinval.h"
104 #include "utils/builtins.h"
105 #include "utils/combocid.h"
106 #include "utils/memdebug.h"
107 #include "utils/memutils.h"
108 #include "utils/rel.h"
109 #include "utils/relfilenodemap.h"
110 
111 
112 /* entry for a hash table we use to map from xid to our transaction state */
114 {
118 
119 /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
121 {
125 
127 {
131  CommandId combocid; /* just for debugging */
133 
134 /* Virtual file descriptor with file offset tracking */
135 typedef struct TXNEntryFile
136 {
137  File vfd; /* -1 when the file is closed */
138  off_t curOffset; /* offset for next write or read. Reset to 0
139  * when vfd is opened. */
140 } TXNEntryFile;
141 
142 /* k-way in-order change iteration support structures */
144 {
151 
153 {
159 
160 /* toast datastructures */
161 typedef struct ReorderBufferToastEnt
162 {
163  Oid chunk_id; /* toast_table.chunk_id */
164  int32 last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
165  * have seen */
166  Size num_chunks; /* number of chunks we've already seen */
167  Size size; /* combined size of chunks seen */
168  dlist_head chunks; /* linked list of chunks */
169  struct varlena *reconstructed; /* reconstructed varlena now pointed to in
170  * main tup */
172 
173 /* Disk serialization support datastructures */
175 {
178  /* data follows */
180 
181 #define IsSpecInsert(action) \
182 ( \
183  ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \
184 )
185 #define IsSpecConfirmOrAbort(action) \
186 ( \
187  (((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) || \
188  ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT)) \
189 )
190 #define IsInsertOrUpdate(action) \
191 ( \
192  (((action) == REORDER_BUFFER_CHANGE_INSERT) || \
193  ((action) == REORDER_BUFFER_CHANGE_UPDATE) || \
194  ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT)) \
195 )
196 
197 /*
198  * Maximum number of changes kept in memory, per transaction. After that,
199  * changes are spooled to disk.
200  *
201  * The current value should be sufficient to decode the entire transaction
202  * without hitting disk in OLTP workloads, while starting to spool to disk in
203  * other workloads reasonably fast.
204  *
205  * At some point in the future it probably makes sense to have a more elaborate
206  * resource management here, but it's not entirely clear what that would look
207  * like.
208  */
210 static const Size max_changes_in_memory = 4096; /* XXX for restore only */
211 
212 /* ---------------------------------------
213  * primary reorderbuffer support routines
214  * ---------------------------------------
215  */
219  TransactionId xid, bool create, bool *is_new,
220  XLogRecPtr lsn, bool create_as_top);
222  ReorderBufferTXN *subtxn);
223 
224 static void AssertTXNLsnOrder(ReorderBuffer *rb);
225 
226 /* ---------------------------------------
227  * support functions for lsn-order iterating over the ->changes of a
228  * transaction and its subtransactions
229  *
230  * used for iteration over the k-way heap merge of a transaction and its
231  * subtransactions
232  * ---------------------------------------
233  */
235  ReorderBufferIterTXNState *volatile *iter_state);
240 
241 /*
242  * ---------------------------------------
243  * Disk serialization support functions
244  * ---------------------------------------
245  */
249  int fd, ReorderBufferChange *change);
251  TXNEntryFile *file, XLogSegNo *segno);
253  char *change);
256  bool txn_prepared);
257 static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
258 static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
259  TransactionId xid, XLogSegNo segno);
260 
261 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
264 
265 /*
266  * ---------------------------------------
267  * Streaming support functions
268  * ---------------------------------------
269  */
270 static inline bool ReorderBufferCanStream(ReorderBuffer *rb);
271 static inline bool ReorderBufferCanStartStreaming(ReorderBuffer *rb);
274 
275 /* ---------------------------------------
276  * toast reassembly support
277  * ---------------------------------------
278  */
282  Relation relation, ReorderBufferChange *change);
284  Relation relation, ReorderBufferChange *change);
285 
286 /*
287  * ---------------------------------------
288  * memory accounting
289  * ---------------------------------------
290  */
293  ReorderBufferChange *change,
294  bool addition, Size sz);
295 
296 /*
297  * Allocate a new ReorderBuffer and clean out any old serialized state from
298  * prior ReorderBuffer instances for the same slot.
299  */
302 {
303  ReorderBuffer *buffer;
304  HASHCTL hash_ctl;
305  MemoryContext new_ctx;
306 
307  Assert(MyReplicationSlot != NULL);
308 
309  /* allocate memory in own context, to have better accountability */
311  "ReorderBuffer",
313 
314  buffer =
315  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
316 
317  memset(&hash_ctl, 0, sizeof(hash_ctl));
318 
319  buffer->context = new_ctx;
320 
321  buffer->change_context = SlabContextCreate(new_ctx,
322  "Change",
324  sizeof(ReorderBufferChange));
325 
326  buffer->txn_context = SlabContextCreate(new_ctx,
327  "TXN",
329  sizeof(ReorderBufferTXN));
330 
331  buffer->tup_context = GenerationContextCreate(new_ctx,
332  "Tuples",
334 
335  hash_ctl.keysize = sizeof(TransactionId);
336  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
337  hash_ctl.hcxt = buffer->context;
338 
339  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
341 
343  buffer->by_txn_last_txn = NULL;
344 
345  buffer->outbuf = NULL;
346  buffer->outbufsize = 0;
347  buffer->size = 0;
348 
349  buffer->spillTxns = 0;
350  buffer->spillCount = 0;
351  buffer->spillBytes = 0;
352  buffer->streamTxns = 0;
353  buffer->streamCount = 0;
354  buffer->streamBytes = 0;
355  buffer->totalTxns = 0;
356  buffer->totalBytes = 0;
357 
359 
360  dlist_init(&buffer->toplevel_by_lsn);
362 
363  /*
364  * Ensure there's no stale data from prior uses of this slot, in case some
365  * prior exit avoided calling ReorderBufferFree. Failure to do this can
366  * produce duplicated txns, and it's very cheap if there's nothing there.
367  */
369 
370  return buffer;
371 }
372 
373 /*
374  * Free a ReorderBuffer
375  */
376 void
378 {
379  MemoryContext context = rb->context;
380 
381  /*
382  * We free separately allocated data by entirely scrapping reorderbuffer's
383  * memory context.
384  */
385  MemoryContextDelete(context);
386 
387  /* Free disk space used by unconsumed reorder buffers */
389 }
390 
391 /*
392  * Get an unused, possibly preallocated, ReorderBufferTXN.
393  */
394 static ReorderBufferTXN *
396 {
398 
399  txn = (ReorderBufferTXN *)
401 
402  memset(txn, 0, sizeof(ReorderBufferTXN));
403 
404  dlist_init(&txn->changes);
405  dlist_init(&txn->tuplecids);
406  dlist_init(&txn->subtxns);
407 
408  /* InvalidCommandId is not zero, so set it explicitly */
410  txn->output_plugin_private = NULL;
411 
412  return txn;
413 }
414 
415 /*
416  * Free a ReorderBufferTXN.
417  */
418 static void
420 {
421  /* clean the lookup cache if we were cached (quite likely) */
422  if (rb->by_txn_last_xid == txn->xid)
423  {
425  rb->by_txn_last_txn = NULL;
426  }
427 
428  /* free data that's contained */
429 
430  if (txn->gid != NULL)
431  {
432  pfree(txn->gid);
433  txn->gid = NULL;
434  }
435 
436  if (txn->tuplecid_hash != NULL)
437  {
439  txn->tuplecid_hash = NULL;
440  }
441 
442  if (txn->invalidations)
443  {
444  pfree(txn->invalidations);
445  txn->invalidations = NULL;
446  }
447 
448  /* Reset the toast hash */
449  ReorderBufferToastReset(rb, txn);
450 
451  pfree(txn);
452 }
453 
454 /*
455  * Get an fresh ReorderBufferChange.
456  */
459 {
460  ReorderBufferChange *change;
461 
462  change = (ReorderBufferChange *)
464 
465  memset(change, 0, sizeof(ReorderBufferChange));
466  return change;
467 }
468 
469 /*
470  * Free a ReorderBufferChange and update memory accounting, if requested.
471  */
472 void
474  bool upd_mem)
475 {
476  /* update memory accounting info */
477  if (upd_mem)
478  ReorderBufferChangeMemoryUpdate(rb, change, false,
479  ReorderBufferChangeSize(change));
480 
481  /* free contained data */
482  switch (change->action)
483  {
488  if (change->data.tp.newtuple)
489  {
490  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
491  change->data.tp.newtuple = NULL;
492  }
493 
494  if (change->data.tp.oldtuple)
495  {
496  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
497  change->data.tp.oldtuple = NULL;
498  }
499  break;
501  if (change->data.msg.prefix != NULL)
502  pfree(change->data.msg.prefix);
503  change->data.msg.prefix = NULL;
504  if (change->data.msg.message != NULL)
505  pfree(change->data.msg.message);
506  change->data.msg.message = NULL;
507  break;
509  if (change->data.inval.invalidations)
510  pfree(change->data.inval.invalidations);
511  change->data.inval.invalidations = NULL;
512  break;
514  if (change->data.snapshot)
515  {
516  ReorderBufferFreeSnap(rb, change->data.snapshot);
517  change->data.snapshot = NULL;
518  }
519  break;
520  /* no data in addition to the struct itself */
522  if (change->data.truncate.relids != NULL)
523  {
524  ReorderBufferReturnRelids(rb, change->data.truncate.relids);
525  change->data.truncate.relids = NULL;
526  }
527  break;
532  break;
533  }
534 
535  pfree(change);
536 }
537 
538 /*
539  * Get a fresh ReorderBufferTupleBuf fitting at least a tuple of size
540  * tuple_len (excluding header overhead).
541  */
544 {
545  ReorderBufferTupleBuf *tuple;
546  Size alloc_len;
547 
548  alloc_len = tuple_len + SizeofHeapTupleHeader;
549 
550  tuple = (ReorderBufferTupleBuf *)
552  sizeof(ReorderBufferTupleBuf) +
553  MAXIMUM_ALIGNOF + alloc_len);
554  tuple->alloc_tuple_size = alloc_len;
555  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
556 
557  return tuple;
558 }
559 
560 /*
561  * Free an ReorderBufferTupleBuf.
562  */
563 void
565 {
566  pfree(tuple);
567 }
568 
569 /*
570  * Get an array for relids of truncated relations.
571  *
572  * We use the global memory context (for the whole reorder buffer), because
573  * none of the existing ones seems like a good match (some are SLAB, so we
574  * can't use those, and tup_context is meant for tuple data, not relids). We
575  * could add yet another context, but it seems like an overkill - TRUNCATE is
576  * not particularly common operation, so it does not seem worth it.
577  */
578 Oid *
580 {
581  Oid *relids;
582  Size alloc_len;
583 
584  alloc_len = sizeof(Oid) * nrelids;
585 
586  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
587 
588  return relids;
589 }
590 
591 /*
592  * Free an array of relids.
593  */
594 void
596 {
597  pfree(relids);
598 }
599 
600 /*
601  * Return the ReorderBufferTXN from the given buffer, specified by Xid.
602  * If create is true, and a transaction doesn't already exist, create it
603  * (with the given LSN, and as top transaction if that's specified);
604  * when this happens, is_new is set to true.
605  */
606 static ReorderBufferTXN *
608  bool *is_new, XLogRecPtr lsn, bool create_as_top)
609 {
612  bool found;
613 
615 
616  /*
617  * Check the one-entry lookup cache first
618  */
620  rb->by_txn_last_xid == xid)
621  {
622  txn = rb->by_txn_last_txn;
623 
624  if (txn != NULL)
625  {
626  /* found it, and it's valid */
627  if (is_new)
628  *is_new = false;
629  return txn;
630  }
631 
632  /*
633  * cached as non-existent, and asked not to create? Then nothing else
634  * to do.
635  */
636  if (!create)
637  return NULL;
638  /* otherwise fall through to create it */
639  }
640 
641  /*
642  * If the cache wasn't hit or it yielded an "does-not-exist" and we want
643  * to create an entry.
644  */
645 
646  /* search the lookup table */
647  ent = (ReorderBufferTXNByIdEnt *)
648  hash_search(rb->by_txn,
649  (void *) &xid,
650  create ? HASH_ENTER : HASH_FIND,
651  &found);
652  if (found)
653  txn = ent->txn;
654  else if (create)
655  {
656  /* initialize the new entry, if creation was requested */
657  Assert(ent != NULL);
658  Assert(lsn != InvalidXLogRecPtr);
659 
660  ent->txn = ReorderBufferGetTXN(rb);
661  ent->txn->xid = xid;
662  txn = ent->txn;
663  txn->first_lsn = lsn;
665 
666  if (create_as_top)
667  {
668  dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
669  AssertTXNLsnOrder(rb);
670  }
671  }
672  else
673  txn = NULL; /* not found and not asked to create */
674 
675  /* update cache */
676  rb->by_txn_last_xid = xid;
677  rb->by_txn_last_txn = txn;
678 
679  if (is_new)
680  *is_new = !found;
681 
682  Assert(!create || txn != NULL);
683  return txn;
684 }
685 
686 /*
687  * Record the partial change for the streaming of in-progress transactions. We
688  * can stream only complete changes so if we have a partial change like toast
689  * table insert or speculative insert then we mark such a 'txn' so that it
690  * can't be streamed. We also ensure that if the changes in such a 'txn' are
691  * above logical_decoding_work_mem threshold then we stream them as soon as we
692  * have a complete change.
693  */
694 static void
696  ReorderBufferChange *change,
697  bool toast_insert)
698 {
699  ReorderBufferTXN *toptxn;
700 
701  /*
702  * The partial changes need to be processed only while streaming
703  * in-progress transactions.
704  */
705  if (!ReorderBufferCanStream(rb))
706  return;
707 
708  /* Get the top transaction. */
709  if (txn->toptxn != NULL)
710  toptxn = txn->toptxn;
711  else
712  toptxn = txn;
713 
714  /*
715  * Indicate a partial change for toast inserts. The change will be
716  * considered as complete once we get the insert or update on the main
717  * table and we are sure that the pending toast chunks are not required
718  * anymore.
719  *
720  * If we allow streaming when there are pending toast chunks then such
721  * chunks won't be released till the insert (multi_insert) is complete and
722  * we expect the txn to have streamed all changes after streaming. This
723  * restriction is mainly to ensure the correctness of streamed
724  * transactions and it doesn't seem worth uplifting such a restriction
725  * just to allow this case because anyway we will stream the transaction
726  * once such an insert is complete.
727  */
728  if (toast_insert)
730  else if (rbtxn_has_partial_change(toptxn) &&
731  IsInsertOrUpdate(change->action) &&
732  change->data.tp.clear_toast_afterwards)
734 
735  /*
736  * Indicate a partial change for speculative inserts. The change will be
737  * considered as complete once we get the speculative confirm or abort
738  * token.
739  */
740  if (IsSpecInsert(change->action))
742  else if (rbtxn_has_partial_change(toptxn) &&
743  IsSpecConfirmOrAbort(change->action))
745 
746  /*
747  * Stream the transaction if it is serialized before and the changes are
748  * now complete in the top-level transaction.
749  *
750  * The reason for doing the streaming of such a transaction as soon as we
751  * get the complete change for it is that previously it would have reached
752  * the memory threshold and wouldn't get streamed because of incomplete
753  * changes. Delaying such transactions would increase apply lag for them.
754  */
756  !(rbtxn_has_partial_change(toptxn)) &&
757  rbtxn_is_serialized(txn))
758  ReorderBufferStreamTXN(rb, toptxn);
759 }
760 
761 /*
762  * Queue a change into a transaction so it can be replayed upon commit or will be
763  * streamed when we reach logical_decoding_work_mem threshold.
764  */
765 void
767  ReorderBufferChange *change, bool toast_insert)
768 {
770 
771  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
772 
773  /*
774  * While streaming the previous changes we have detected that the
775  * transaction is aborted. So there is no point in collecting further
776  * changes for it.
777  */
778  if (txn->concurrent_abort)
779  {
780  /*
781  * We don't need to update memory accounting for this change as we
782  * have not added it to the queue yet.
783  */
784  ReorderBufferReturnChange(rb, change, false);
785  return;
786  }
787 
788  change->lsn = lsn;
789  change->txn = txn;
790 
791  Assert(InvalidXLogRecPtr != lsn);
792  dlist_push_tail(&txn->changes, &change->node);
793  txn->nentries++;
794  txn->nentries_mem++;
795 
796  /* update memory accounting information */
797  ReorderBufferChangeMemoryUpdate(rb, change, true,
798  ReorderBufferChangeSize(change));
799 
800  /* process partial change */
801  ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
802 
803  /* check the memory limits and evict something if needed */
805 }
806 
807 /*
808  * A transactional message is queued to be processed upon commit and a
809  * non-transactional message gets processed immediately.
810  */
811 void
813  Snapshot snapshot, XLogRecPtr lsn,
814  bool transactional, const char *prefix,
815  Size message_size, const char *message)
816 {
817  if (transactional)
818  {
819  MemoryContext oldcontext;
820  ReorderBufferChange *change;
821 
823 
824  oldcontext = MemoryContextSwitchTo(rb->context);
825 
826  change = ReorderBufferGetChange(rb);
828  change->data.msg.prefix = pstrdup(prefix);
829  change->data.msg.message_size = message_size;
830  change->data.msg.message = palloc(message_size);
831  memcpy(change->data.msg.message, message, message_size);
832 
833  ReorderBufferQueueChange(rb, xid, lsn, change, false);
834 
835  MemoryContextSwitchTo(oldcontext);
836  }
837  else
838  {
839  ReorderBufferTXN *txn = NULL;
840  volatile Snapshot snapshot_now = snapshot;
841 
842  if (xid != InvalidTransactionId)
843  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
844 
845  /* setup snapshot to allow catalog access */
846  SetupHistoricSnapshot(snapshot_now, NULL);
847  PG_TRY();
848  {
849  rb->message(rb, txn, lsn, false, prefix, message_size, message);
850 
852  }
853  PG_CATCH();
854  {
856  PG_RE_THROW();
857  }
858  PG_END_TRY();
859  }
860 }
861 
862 /*
863  * AssertTXNLsnOrder
864  * Verify LSN ordering of transaction lists in the reorderbuffer
865  *
866  * Other LSN-related invariants are checked too.
867  *
868  * No-op if assertions are not in use.
869  */
870 static void
872 {
873 #ifdef USE_ASSERT_CHECKING
874  dlist_iter iter;
875  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
876  XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
877 
878  dlist_foreach(iter, &rb->toplevel_by_lsn)
879  {
881  iter.cur);
882 
883  /* start LSN must be set */
884  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
885 
886  /* If there is an end LSN, it must be higher than start LSN */
887  if (cur_txn->end_lsn != InvalidXLogRecPtr)
888  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
889 
890  /* Current initial LSN must be strictly higher than previous */
891  if (prev_first_lsn != InvalidXLogRecPtr)
892  Assert(prev_first_lsn < cur_txn->first_lsn);
893 
894  /* known-as-subtxn txns must not be listed */
895  Assert(!rbtxn_is_known_subxact(cur_txn));
896 
897  prev_first_lsn = cur_txn->first_lsn;
898  }
899 
901  {
903  base_snapshot_node,
904  iter.cur);
905 
906  /* base snapshot (and its LSN) must be set */
907  Assert(cur_txn->base_snapshot != NULL);
909 
910  /* current LSN must be strictly higher than previous */
911  if (prev_base_snap_lsn != InvalidXLogRecPtr)
912  Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
913 
914  /* known-as-subtxn txns must not be listed */
915  Assert(!rbtxn_is_known_subxact(cur_txn));
916 
917  prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
918  }
919 #endif
920 }
921 
922 /*
923  * AssertChangeLsnOrder
924  *
925  * Check ordering of changes in the (sub)transaction.
926  */
927 static void
929 {
930 #ifdef USE_ASSERT_CHECKING
931  dlist_iter iter;
932  XLogRecPtr prev_lsn = txn->first_lsn;
933 
934  dlist_foreach(iter, &txn->changes)
935  {
936  ReorderBufferChange *cur_change;
937 
938  cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
939 
941  Assert(cur_change->lsn != InvalidXLogRecPtr);
942  Assert(txn->first_lsn <= cur_change->lsn);
943 
944  if (txn->end_lsn != InvalidXLogRecPtr)
945  Assert(cur_change->lsn <= txn->end_lsn);
946 
947  Assert(prev_lsn <= cur_change->lsn);
948 
949  prev_lsn = cur_change->lsn;
950  }
951 #endif
952 }
953 
954 /*
955  * ReorderBufferGetOldestTXN
956  * Return oldest transaction in reorderbuffer
957  */
960 {
962 
963  AssertTXNLsnOrder(rb);
964 
966  return NULL;
967 
969 
972  return txn;
973 }
974 
975 /*
976  * ReorderBufferGetOldestXmin
977  * Return oldest Xmin in reorderbuffer
978  *
979  * Returns oldest possibly running Xid from the point of view of snapshots
980  * used in the transactions kept by reorderbuffer, or InvalidTransactionId if
981  * there are none.
982  *
983  * Since snapshots are assigned monotonically, this equals the Xmin of the
984  * base snapshot with minimal base_snapshot_lsn.
985  */
988 {
990 
991  AssertTXNLsnOrder(rb);
992 
994  return InvalidTransactionId;
995 
996  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
998  return txn->base_snapshot->xmin;
999 }
1000 
1001 void
1003 {
1004  rb->current_restart_decoding_lsn = ptr;
1005 }
1006 
1007 /*
1008  * ReorderBufferAssignChild
1009  *
1010  * Make note that we know that subxid is a subtransaction of xid, seen as of
1011  * the given lsn.
1012  */
1013 void
1015  TransactionId subxid, XLogRecPtr lsn)
1016 {
1018  ReorderBufferTXN *subtxn;
1019  bool new_top;
1020  bool new_sub;
1021 
1022  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1023  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1024 
1025  if (!new_sub)
1026  {
1027  if (rbtxn_is_known_subxact(subtxn))
1028  {
1029  /* already associated, nothing to do */
1030  return;
1031  }
1032  else
1033  {
1034  /*
1035  * We already saw this transaction, but initially added it to the
1036  * list of top-level txns. Now that we know it's not top-level,
1037  * remove it from there.
1038  */
1039  dlist_delete(&subtxn->node);
1040  }
1041  }
1042 
1043  subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1044  subtxn->toplevel_xid = xid;
1045  Assert(subtxn->nsubtxns == 0);
1046 
1047  /* set the reference to top-level transaction */
1048  subtxn->toptxn = txn;
1049 
1050  /* add to subtransaction list */
1051  dlist_push_tail(&txn->subtxns, &subtxn->node);
1052  txn->nsubtxns++;
1053 
1054  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1055  ReorderBufferTransferSnapToParent(txn, subtxn);
1056 
1057  /* Verify LSN-ordering invariant */
1058  AssertTXNLsnOrder(rb);
1059 }
1060 
1061 /*
1062  * ReorderBufferTransferSnapToParent
1063  * Transfer base snapshot from subtxn to top-level txn, if needed
1064  *
1065  * This is done if the top-level txn doesn't have a base snapshot, or if the
1066  * subtxn's base snapshot has an earlier LSN than the top-level txn's base
1067  * snapshot's LSN. This can happen if there are no changes in the toplevel
1068  * txn but there are some in the subtxn, or the first change in subtxn has
1069  * earlier LSN than first change in the top-level txn and we learned about
1070  * their kinship only now.
1071  *
1072  * The subtransaction's snapshot is cleared regardless of the transfer
1073  * happening, since it's not needed anymore in either case.
1074  *
1075  * We do this as soon as we become aware of their kinship, to avoid queueing
1076  * extra snapshots to txns known-as-subtxns -- only top-level txns will
1077  * receive further snapshots.
1078  */
1079 static void
1081  ReorderBufferTXN *subtxn)
1082 {
1083  Assert(subtxn->toplevel_xid == txn->xid);
1084 
1085  if (subtxn->base_snapshot != NULL)
1086  {
1087  if (txn->base_snapshot == NULL ||
1088  subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
1089  {
1090  /*
1091  * If the toplevel transaction already has a base snapshot but
1092  * it's newer than the subxact's, purge it.
1093  */
1094  if (txn->base_snapshot != NULL)
1095  {
1098  }
1099 
1100  /*
1101  * The snapshot is now the top transaction's; transfer it, and
1102  * adjust the list position of the top transaction in the list by
1103  * moving it to where the subtransaction is.
1104  */
1105  txn->base_snapshot = subtxn->base_snapshot;
1106  txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
1108  &txn->base_snapshot_node);
1109 
1110  /*
1111  * The subtransaction doesn't have a snapshot anymore (so it
1112  * mustn't be in the list.)
1113  */
1114  subtxn->base_snapshot = NULL;
1116  dlist_delete(&subtxn->base_snapshot_node);
1117  }
1118  else
1119  {
1120  /* Base snap of toplevel is fine, so subxact's is not needed */
1122  dlist_delete(&subtxn->base_snapshot_node);
1123  subtxn->base_snapshot = NULL;
1125  }
1126  }
1127 }
1128 
1129 /*
1130  * Associate a subtransaction with its toplevel transaction at commit
1131  * time. There may be no further changes added after this.
1132  */
1133 void
1135  TransactionId subxid, XLogRecPtr commit_lsn,
1136  XLogRecPtr end_lsn)
1137 {
1138  ReorderBufferTXN *subtxn;
1139 
1140  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1141  InvalidXLogRecPtr, false);
1142 
1143  /*
1144  * No need to do anything if that subtxn didn't contain any changes
1145  */
1146  if (!subtxn)
1147  return;
1148 
1149  subtxn->final_lsn = commit_lsn;
1150  subtxn->end_lsn = end_lsn;
1151 
1152  /*
1153  * Assign this subxact as a child of the toplevel xact (no-op if already
1154  * done.)
1155  */
1156  ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
1157 }
1158 
1159 
1160 /*
1161  * Support for efficiently iterating over a transaction's and its
1162  * subtransactions' changes.
1163  *
1164  * We do by doing a k-way merge between transactions/subtransactions. For that
1165  * we model the current heads of the different transactions as a binary heap
1166  * so we easily know which (sub-)transaction has the change with the smallest
1167  * lsn next.
1168  *
1169  * We assume the changes in individual transactions are already sorted by LSN.
1170  */
1171 
1172 /*
1173  * Binary heap comparison function.
1174  */
1175 static int
1177 {
1179  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
1180  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
1181 
1182  if (pos_a < pos_b)
1183  return 1;
1184  else if (pos_a == pos_b)
1185  return 0;
1186  return -1;
1187 }
1188 
1189 /*
1190  * Allocate & initialize an iterator which iterates in lsn order over a
1191  * transaction and all its subtransactions.
1192  *
1193  * Note: The iterator state is returned through iter_state parameter rather
1194  * than the function's return value. This is because the state gets cleaned up
1195  * in a PG_CATCH block in the caller, so we want to make sure the caller gets
1196  * back the state even if this function throws an exception.
1197  */
1198 static void
1200  ReorderBufferIterTXNState *volatile *iter_state)
1201 {
1202  Size nr_txns = 0;
1204  dlist_iter cur_txn_i;
1205  int32 off;
1206 
1207  *iter_state = NULL;
1208 
1209  /* Check ordering of changes in the toplevel transaction. */
1210  AssertChangeLsnOrder(txn);
1211 
1212  /*
1213  * Calculate the size of our heap: one element for every transaction that
1214  * contains changes. (Besides the transactions already in the reorder
1215  * buffer, we count the one we were directly passed.)
1216  */
1217  if (txn->nentries > 0)
1218  nr_txns++;
1219 
1220  dlist_foreach(cur_txn_i, &txn->subtxns)
1221  {
1222  ReorderBufferTXN *cur_txn;
1223 
1224  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1225 
1226  /* Check ordering of changes in this subtransaction. */
1227  AssertChangeLsnOrder(cur_txn);
1228 
1229  if (cur_txn->nentries > 0)
1230  nr_txns++;
1231  }
1232 
1233  /* allocate iteration state */
1234  state = (ReorderBufferIterTXNState *)
1236  sizeof(ReorderBufferIterTXNState) +
1237  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1238 
1239  state->nr_txns = nr_txns;
1240  dlist_init(&state->old_change);
1241 
1242  for (off = 0; off < state->nr_txns; off++)
1243  {
1244  state->entries[off].file.vfd = -1;
1245  state->entries[off].segno = 0;
1246  }
1247 
1248  /* allocate heap */
1249  state->heap = binaryheap_allocate(state->nr_txns,
1251  state);
1252 
1253  /* Now that the state fields are initialized, it is safe to return it. */
1254  *iter_state = state;
1255 
1256  /*
1257  * Now insert items into the binary heap, in an unordered fashion. (We
1258  * will run a heap assembly step at the end; this is more efficient.)
1259  */
1260 
1261  off = 0;
1262 
1263  /* add toplevel transaction if it contains changes */
1264  if (txn->nentries > 0)
1265  {
1266  ReorderBufferChange *cur_change;
1267 
1268  if (rbtxn_is_serialized(txn))
1269  {
1270  /* serialize remaining changes */
1271  ReorderBufferSerializeTXN(rb, txn);
1272  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1273  &state->entries[off].segno);
1274  }
1275 
1276  cur_change = dlist_head_element(ReorderBufferChange, node,
1277  &txn->changes);
1278 
1279  state->entries[off].lsn = cur_change->lsn;
1280  state->entries[off].change = cur_change;
1281  state->entries[off].txn = txn;
1282 
1284  }
1285 
1286  /* add subtransactions if they contain changes */
1287  dlist_foreach(cur_txn_i, &txn->subtxns)
1288  {
1289  ReorderBufferTXN *cur_txn;
1290 
1291  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1292 
1293  if (cur_txn->nentries > 0)
1294  {
1295  ReorderBufferChange *cur_change;
1296 
1297  if (rbtxn_is_serialized(cur_txn))
1298  {
1299  /* serialize remaining changes */
1300  ReorderBufferSerializeTXN(rb, cur_txn);
1301  ReorderBufferRestoreChanges(rb, cur_txn,
1302  &state->entries[off].file,
1303  &state->entries[off].segno);
1304  }
1305  cur_change = dlist_head_element(ReorderBufferChange, node,
1306  &cur_txn->changes);
1307 
1308  state->entries[off].lsn = cur_change->lsn;
1309  state->entries[off].change = cur_change;
1310  state->entries[off].txn = cur_txn;
1311 
1313  }
1314  }
1315 
1316  /* assemble a valid binary heap */
1317  binaryheap_build(state->heap);
1318 }
1319 
1320 /*
1321  * Return the next change when iterating over a transaction and its
1322  * subtransactions.
1323  *
1324  * Returns NULL when no further changes exist.
1325  */
1326 static ReorderBufferChange *
1328 {
1329  ReorderBufferChange *change;
1331  int32 off;
1332 
1333  /* nothing there anymore */
1334  if (state->heap->bh_size == 0)
1335  return NULL;
1336 
1337  off = DatumGetInt32(binaryheap_first(state->heap));
1338  entry = &state->entries[off];
1339 
1340  /* free memory we might have "leaked" in the previous *Next call */
1341  if (!dlist_is_empty(&state->old_change))
1342  {
1343  change = dlist_container(ReorderBufferChange, node,
1344  dlist_pop_head_node(&state->old_change));
1345  ReorderBufferReturnChange(rb, change, true);
1346  Assert(dlist_is_empty(&state->old_change));
1347  }
1348 
1349  change = entry->change;
1350 
1351  /*
1352  * update heap with information about which transaction has the next
1353  * relevant change in LSN order
1354  */
1355 
1356  /* there are in-memory changes */
1357  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1358  {
1359  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1360  ReorderBufferChange *next_change =
1361  dlist_container(ReorderBufferChange, node, next);
1362 
1363  /* txn stays the same */
1364  state->entries[off].lsn = next_change->lsn;
1365  state->entries[off].change = next_change;
1366 
1368  return change;
1369  }
1370 
1371  /* try to load changes from disk */
1372  if (entry->txn->nentries != entry->txn->nentries_mem)
1373  {
1374  /*
1375  * Ugly: restoring changes will reuse *Change records, thus delete the
1376  * current one from the per-tx list and only free in the next call.
1377  */
1378  dlist_delete(&change->node);
1379  dlist_push_tail(&state->old_change, &change->node);
1380 
1381  /*
1382  * Update the total bytes processed by the txn for which we are
1383  * releasing the current set of changes and restoring the new set of
1384  * changes.
1385  */
1386  rb->totalBytes += entry->txn->size;
1387  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1388  &state->entries[off].segno))
1389  {
1390  /* successfully restored changes from disk */
1391  ReorderBufferChange *next_change =
1393  &entry->txn->changes);
1394 
1395  elog(DEBUG2, "restored %u/%u changes from disk",
1396  (uint32) entry->txn->nentries_mem,
1397  (uint32) entry->txn->nentries);
1398 
1399  Assert(entry->txn->nentries_mem);
1400  /* txn stays the same */
1401  state->entries[off].lsn = next_change->lsn;
1402  state->entries[off].change = next_change;
1404 
1405  return change;
1406  }
1407  }
1408 
1409  /* ok, no changes there anymore, remove */
1410  binaryheap_remove_first(state->heap);
1411 
1412  return change;
1413 }
1414 
1415 /*
1416  * Deallocate the iterator
1417  */
1418 static void
1421 {
1422  int32 off;
1423 
1424  for (off = 0; off < state->nr_txns; off++)
1425  {
1426  if (state->entries[off].file.vfd != -1)
1427  FileClose(state->entries[off].file.vfd);
1428  }
1429 
1430  /* free memory we might have "leaked" in the last *Next call */
1431  if (!dlist_is_empty(&state->old_change))
1432  {
1433  ReorderBufferChange *change;
1434 
1435  change = dlist_container(ReorderBufferChange, node,
1436  dlist_pop_head_node(&state->old_change));
1437  ReorderBufferReturnChange(rb, change, true);
1438  Assert(dlist_is_empty(&state->old_change));
1439  }
1440 
1441  binaryheap_free(state->heap);
1442  pfree(state);
1443 }
1444 
1445 /*
1446  * Cleanup the contents of a transaction, usually after the transaction
1447  * committed or aborted.
1448  */
1449 static void
1451 {
1452  bool found;
1453  dlist_mutable_iter iter;
1454 
1455  /* cleanup subtransactions & their changes */
1456  dlist_foreach_modify(iter, &txn->subtxns)
1457  {
1458  ReorderBufferTXN *subtxn;
1459 
1460  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1461 
1462  /*
1463  * Subtransactions are always associated to the toplevel TXN, even if
1464  * they originally were happening inside another subtxn, so we won't
1465  * ever recurse more than one level deep here.
1466  */
1467  Assert(rbtxn_is_known_subxact(subtxn));
1468  Assert(subtxn->nsubtxns == 0);
1469 
1470  ReorderBufferCleanupTXN(rb, subtxn);
1471  }
1472 
1473  /* cleanup changes in the txn */
1474  dlist_foreach_modify(iter, &txn->changes)
1475  {
1476  ReorderBufferChange *change;
1477 
1478  change = dlist_container(ReorderBufferChange, node, iter.cur);
1479 
1480  /* Check we're not mixing changes from different transactions. */
1481  Assert(change->txn == txn);
1482 
1483  ReorderBufferReturnChange(rb, change, true);
1484  }
1485 
1486  /*
1487  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1488  * They are always stored in the toplevel transaction.
1489  */
1490  dlist_foreach_modify(iter, &txn->tuplecids)
1491  {
1492  ReorderBufferChange *change;
1493 
1494  change = dlist_container(ReorderBufferChange, node, iter.cur);
1495 
1496  /* Check we're not mixing changes from different transactions. */
1497  Assert(change->txn == txn);
1499 
1500  ReorderBufferReturnChange(rb, change, true);
1501  }
1502 
1503  /*
1504  * Cleanup the base snapshot, if set.
1505  */
1506  if (txn->base_snapshot != NULL)
1507  {
1510  }
1511 
1512  /*
1513  * Cleanup the snapshot for the last streamed run.
1514  */
1515  if (txn->snapshot_now != NULL)
1516  {
1517  Assert(rbtxn_is_streamed(txn));
1519  }
1520 
1521  /*
1522  * Remove TXN from its containing list.
1523  *
1524  * Note: if txn is known as subxact, we are deleting the TXN from its
1525  * parent's list of known subxacts; this leaves the parent's nsubxacts
1526  * count too high, but we don't care. Otherwise, we are deleting the TXN
1527  * from the LSN-ordered list of toplevel TXNs.
1528  */
1529  dlist_delete(&txn->node);
1530 
1531  /* now remove reference from buffer */
1532  hash_search(rb->by_txn,
1533  (void *) &txn->xid,
1534  HASH_REMOVE,
1535  &found);
1536  Assert(found);
1537 
1538  /* remove entries spilled to disk */
1539  if (rbtxn_is_serialized(txn))
1540  ReorderBufferRestoreCleanup(rb, txn);
1541 
1542  /* deallocate */
1543  ReorderBufferReturnTXN(rb, txn);
1544 }
1545 
1546 /*
1547  * Discard changes from a transaction (and subtransactions), either after
1548  * streaming or decoding them at PREPARE. Keep the remaining info -
1549  * transactions, tuplecids, invalidations and snapshots.
1550  *
1551  * We additionally remove tuplecids after decoding the transaction at prepare
1552  * time as we only need to perform invalidation at rollback or commit prepared.
1553  *
1554  * 'txn_prepared' indicates that we have decoded the transaction at prepare
1555  * time.
1556  */
1557 static void
1559 {
1560  dlist_mutable_iter iter;
1561 
1562  /* cleanup subtransactions & their changes */
1563  dlist_foreach_modify(iter, &txn->subtxns)
1564  {
1565  ReorderBufferTXN *subtxn;
1566 
1567  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1568 
1569  /*
1570  * Subtransactions are always associated to the toplevel TXN, even if
1571  * they originally were happening inside another subtxn, so we won't
1572  * ever recurse more than one level deep here.
1573  */
1574  Assert(rbtxn_is_known_subxact(subtxn));
1575  Assert(subtxn->nsubtxns == 0);
1576 
1577  ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
1578  }
1579 
1580  /* cleanup changes in the txn */
1581  dlist_foreach_modify(iter, &txn->changes)
1582  {
1583  ReorderBufferChange *change;
1584 
1585  change = dlist_container(ReorderBufferChange, node, iter.cur);
1586 
1587  /* Check we're not mixing changes from different transactions. */
1588  Assert(change->txn == txn);
1589 
1590  /* remove the change from it's containing list */
1591  dlist_delete(&change->node);
1592 
1593  ReorderBufferReturnChange(rb, change, true);
1594  }
1595 
1596  /*
1597  * Mark the transaction as streamed.
1598  *
1599  * The toplevel transaction, identified by (toptxn==NULL), is marked as
1600  * streamed always, even if it does not contain any changes (that is, when
1601  * all the changes are in subtransactions).
1602  *
1603  * For subtransactions, we only mark them as streamed when there are
1604  * changes in them.
1605  *
1606  * We do it this way because of aborts - we don't want to send aborts for
1607  * XIDs the downstream is not aware of. And of course, it always knows
1608  * about the toplevel xact (we send the XID in all messages), but we never
1609  * stream XIDs of empty subxacts.
1610  */
1611  if ((!txn_prepared) && ((!txn->toptxn) || (txn->nentries_mem != 0)))
1612  txn->txn_flags |= RBTXN_IS_STREAMED;
1613 
1614  if (txn_prepared)
1615  {
1616  /*
1617  * If this is a prepared txn, cleanup the tuplecids we stored for
1618  * decoding catalog snapshot access. They are always stored in the
1619  * toplevel transaction.
1620  */
1621  dlist_foreach_modify(iter, &txn->tuplecids)
1622  {
1623  ReorderBufferChange *change;
1624 
1625  change = dlist_container(ReorderBufferChange, node, iter.cur);
1626 
1627  /* Check we're not mixing changes from different transactions. */
1628  Assert(change->txn == txn);
1630 
1631  /* Remove the change from its containing list. */
1632  dlist_delete(&change->node);
1633 
1634  ReorderBufferReturnChange(rb, change, true);
1635  }
1636  }
1637 
1638  /*
1639  * Destroy the (relfilenode, ctid) hashtable, so that we don't leak any
1640  * memory. We could also keep the hash table and update it with new ctid
1641  * values, but this seems simpler and good enough for now.
1642  */
1643  if (txn->tuplecid_hash != NULL)
1644  {
1646  txn->tuplecid_hash = NULL;
1647  }
1648 
1649  /* If this txn is serialized then clean the disk space. */
1650  if (rbtxn_is_serialized(txn))
1651  {
1652  ReorderBufferRestoreCleanup(rb, txn);
1653  txn->txn_flags &= ~RBTXN_IS_SERIALIZED;
1654 
1655  /*
1656  * We set this flag to indicate if the transaction is ever serialized.
1657  * We need this to accurately update the stats as otherwise the same
1658  * transaction can be counted as serialized multiple times.
1659  */
1661  }
1662 
1663  /* also reset the number of entries in the transaction */
1664  txn->nentries_mem = 0;
1665  txn->nentries = 0;
1666 }
1667 
1668 /*
1669  * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by
1670  * HeapTupleSatisfiesHistoricMVCC.
1671  */
1672 static void
1674 {
1675  dlist_iter iter;
1676  HASHCTL hash_ctl;
1677 
1679  return;
1680 
1681  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1682  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1683  hash_ctl.hcxt = rb->context;
1684 
1685  /*
1686  * create the hash with the exact number of to-be-stored tuplecids from
1687  * the start
1688  */
1689  txn->tuplecid_hash =
1690  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1692 
1693  dlist_foreach(iter, &txn->tuplecids)
1694  {
1697  bool found;
1698  ReorderBufferChange *change;
1699 
1700  change = dlist_container(ReorderBufferChange, node, iter.cur);
1701 
1703 
1704  /* be careful about padding */
1705  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1706 
1707  key.relnode = change->data.tuplecid.node;
1708 
1709  ItemPointerCopy(&change->data.tuplecid.tid,
1710  &key.tid);
1711 
1712  ent = (ReorderBufferTupleCidEnt *)
1714  (void *) &key,
1715  HASH_ENTER,
1716  &found);
1717  if (!found)
1718  {
1719  ent->cmin = change->data.tuplecid.cmin;
1720  ent->cmax = change->data.tuplecid.cmax;
1721  ent->combocid = change->data.tuplecid.combocid;
1722  }
1723  else
1724  {
1725  /*
1726  * Maybe we already saw this tuple before in this transaction, but
1727  * if so it must have the same cmin.
1728  */
1729  Assert(ent->cmin == change->data.tuplecid.cmin);
1730 
1731  /*
1732  * cmax may be initially invalid, but once set it can only grow,
1733  * and never become invalid again.
1734  */
1735  Assert((ent->cmax == InvalidCommandId) ||
1736  ((change->data.tuplecid.cmax != InvalidCommandId) &&
1737  (change->data.tuplecid.cmax > ent->cmax)));
1738  ent->cmax = change->data.tuplecid.cmax;
1739  }
1740  }
1741 }
1742 
1743 /*
1744  * Copy a provided snapshot so we can modify it privately. This is needed so
1745  * that catalog modifying transactions can look into intermediate catalog
1746  * states.
1747  */
1748 static Snapshot
1751 {
1752  Snapshot snap;
1753  dlist_iter iter;
1754  int i = 0;
1755  Size size;
1756 
1757  size = sizeof(SnapshotData) +
1758  sizeof(TransactionId) * orig_snap->xcnt +
1759  sizeof(TransactionId) * (txn->nsubtxns + 1);
1760 
1761  snap = MemoryContextAllocZero(rb->context, size);
1762  memcpy(snap, orig_snap, sizeof(SnapshotData));
1763 
1764  snap->copied = true;
1765  snap->active_count = 1; /* mark as active so nobody frees it */
1766  snap->regd_count = 0;
1767  snap->xip = (TransactionId *) (snap + 1);
1768 
1769  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1770 
1771  /*
1772  * snap->subxip contains all txids that belong to our transaction which we
1773  * need to check via cmin/cmax. That's why we store the toplevel
1774  * transaction in there as well.
1775  */
1776  snap->subxip = snap->xip + snap->xcnt;
1777  snap->subxip[i++] = txn->xid;
1778 
1779  /*
1780  * subxcnt isn't decreased when subtransactions abort, so count manually.
1781  * Since it's an upper boundary it is safe to use it for the allocation
1782  * above.
1783  */
1784  snap->subxcnt = 1;
1785 
1786  dlist_foreach(iter, &txn->subtxns)
1787  {
1788  ReorderBufferTXN *sub_txn;
1789 
1790  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1791  snap->subxip[i++] = sub_txn->xid;
1792  snap->subxcnt++;
1793  }
1794 
1795  /* sort so we can bsearch() later */
1796  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1797 
1798  /* store the specified current CommandId */
1799  snap->curcid = cid;
1800 
1801  return snap;
1802 }
1803 
1804 /*
1805  * Free a previously ReorderBufferCopySnap'ed snapshot
1806  */
1807 static void
1809 {
1810  if (snap->copied)
1811  pfree(snap);
1812  else
1814 }
1815 
1816 /*
1817  * If the transaction was (partially) streamed, we need to prepare or commit
1818  * it in a 'streamed' way. That is, we first stream the remaining part of the
1819  * transaction, and then invoke stream_prepare or stream_commit message as per
1820  * the case.
1821  */
1822 static void
1824 {
1825  /* we should only call this for previously streamed transactions */
1826  Assert(rbtxn_is_streamed(txn));
1827 
1828  ReorderBufferStreamTXN(rb, txn);
1829 
1830  if (rbtxn_prepared(txn))
1831  {
1832  /*
1833  * Note, we send stream prepare even if a concurrent abort is
1834  * detected. See DecodePrepare for more information.
1835  */
1836  rb->stream_prepare(rb, txn, txn->final_lsn);
1837 
1838  /*
1839  * This is a PREPARED transaction, part of a two-phase commit. The
1840  * full cleanup will happen as part of the COMMIT PREPAREDs, so now
1841  * just truncate txn by removing changes and tuple_cids.
1842  */
1843  ReorderBufferTruncateTXN(rb, txn, true);
1844  /* Reset the CheckXidAlive */
1846  }
1847  else
1848  {
1849  rb->stream_commit(rb, txn, txn->final_lsn);
1850  ReorderBufferCleanupTXN(rb, txn);
1851  }
1852 }
1853 
1854 /*
1855  * Set xid to detect concurrent aborts.
1856  *
1857  * While streaming an in-progress transaction or decoding a prepared
1858  * transaction there is a possibility that the (sub)transaction might get
1859  * aborted concurrently. In such case if the (sub)transaction has catalog
1860  * update then we might decode the tuple using wrong catalog version. For
1861  * example, suppose there is one catalog tuple with (xmin: 500, xmax: 0). Now,
1862  * the transaction 501 updates the catalog tuple and after that we will have
1863  * two tuples (xmin: 500, xmax: 501) and (xmin: 501, xmax: 0). Now, if 501 is
1864  * aborted and some other transaction say 502 updates the same catalog tuple
1865  * then the first tuple will be changed to (xmin: 500, xmax: 502). So, the
1866  * problem is that when we try to decode the tuple inserted/updated in 501
1867  * after the catalog update, we will see the catalog tuple with (xmin: 500,
1868  * xmax: 502) as visible because it will consider that the tuple is deleted by
1869  * xid 502 which is not visible to our snapshot. And when we will try to
1870  * decode with that catalog tuple, it can lead to a wrong result or a crash.
1871  * So, it is necessary to detect concurrent aborts to allow streaming of
1872  * in-progress transactions or decoding of prepared transactions.
1873  *
1874  * For detecting the concurrent abort we set CheckXidAlive to the current
1875  * (sub)transaction's xid for which this change belongs to. And, during
1876  * catalog scan we can check the status of the xid and if it is aborted we will
1877  * report a specific error so that we can stop streaming current transaction
1878  * and discard the already streamed changes on such an error. We might have
1879  * already streamed some of the changes for the aborted (sub)transaction, but
1880  * that is fine because when we decode the abort we will stream abort message
1881  * to truncate the changes in the subscriber. Similarly, for prepared
1882  * transactions, we stop decoding if concurrent abort is detected and then
1883  * rollback the changes when rollback prepared is encountered. See
1884  * DecodePrepare.
1885  */
1886 static inline void
1888 {
1889  /*
1890  * If the input transaction id is already set as a CheckXidAlive then
1891  * nothing to do.
1892  */
1894  return;
1895 
1896  /*
1897  * setup CheckXidAlive if it's not committed yet. We don't check if the
1898  * xid is aborted. That will happen during catalog access.
1899  */
1900  if (!TransactionIdDidCommit(xid))
1901  CheckXidAlive = xid;
1902  else
1904 }
1905 
1906 /*
1907  * Helper function for ReorderBufferProcessTXN for applying change.
1908  */
1909 static inline void
1911  Relation relation, ReorderBufferChange *change,
1912  bool streaming)
1913 {
1914  if (streaming)
1915  rb->stream_change(rb, txn, relation, change);
1916  else
1917  rb->apply_change(rb, txn, relation, change);
1918 }
1919 
1920 /*
1921  * Helper function for ReorderBufferProcessTXN for applying the truncate.
1922  */
1923 static inline void
1925  int nrelations, Relation *relations,
1926  ReorderBufferChange *change, bool streaming)
1927 {
1928  if (streaming)
1929  rb->stream_truncate(rb, txn, nrelations, relations, change);
1930  else
1931  rb->apply_truncate(rb, txn, nrelations, relations, change);
1932 }
1933 
1934 /*
1935  * Helper function for ReorderBufferProcessTXN for applying the message.
1936  */
1937 static inline void
1939  ReorderBufferChange *change, bool streaming)
1940 {
1941  if (streaming)
1942  rb->stream_message(rb, txn, change->lsn, true,
1943  change->data.msg.prefix,
1944  change->data.msg.message_size,
1945  change->data.msg.message);
1946  else
1947  rb->message(rb, txn, change->lsn, true,
1948  change->data.msg.prefix,
1949  change->data.msg.message_size,
1950  change->data.msg.message);
1951 }
1952 
1953 /*
1954  * Function to store the command id and snapshot at the end of the current
1955  * stream so that we can reuse the same while sending the next stream.
1956  */
1957 static inline void
1959  Snapshot snapshot_now, CommandId command_id)
1960 {
1961  txn->command_id = command_id;
1962 
1963  /* Avoid copying if it's already copied. */
1964  if (snapshot_now->copied)
1965  txn->snapshot_now = snapshot_now;
1966  else
1967  txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1968  txn, command_id);
1969 }
1970 
1971 /*
1972  * Helper function for ReorderBufferProcessTXN to handle the concurrent
1973  * abort of the streaming transaction. This resets the TXN such that it
1974  * can be used to stream the remaining data of transaction being processed.
1975  * This can happen when the subtransaction is aborted and we still want to
1976  * continue processing the main or other subtransactions data.
1977  */
1978 static void
1980  Snapshot snapshot_now,
1981  CommandId command_id,
1982  XLogRecPtr last_lsn,
1983  ReorderBufferChange *specinsert)
1984 {
1985  /* Discard the changes that we just streamed */
1987 
1988  /* Free all resources allocated for toast reconstruction */
1989  ReorderBufferToastReset(rb, txn);
1990 
1991  /* Return the spec insert change if it is not NULL */
1992  if (specinsert != NULL)
1993  {
1994  ReorderBufferReturnChange(rb, specinsert, true);
1995  specinsert = NULL;
1996  }
1997 
1998  /*
1999  * For the streaming case, stop the stream and remember the command ID and
2000  * snapshot for the streaming run.
2001  */
2002  if (rbtxn_is_streamed(txn))
2003  {
2004  rb->stream_stop(rb, txn, last_lsn);
2005  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2006  }
2007 }
2008 
2009 /*
2010  * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN.
2011  *
2012  * Send data of a transaction (and its subtransactions) to the
2013  * output plugin. We iterate over the top and subtransactions (using a k-way
2014  * merge) and replay the changes in lsn order.
2015  *
2016  * If streaming is true then data will be sent using stream API.
2017  *
2018  * Note: "volatile" markers on some parameters are to avoid trouble with
2019  * PG_TRY inside the function.
2020  */
2021 static void
2023  XLogRecPtr commit_lsn,
2024  volatile Snapshot snapshot_now,
2025  volatile CommandId command_id,
2026  bool streaming)
2027 {
2028  bool using_subtxn;
2030  ReorderBufferIterTXNState *volatile iterstate = NULL;
2031  volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
2032  ReorderBufferChange *volatile specinsert = NULL;
2033  volatile bool stream_started = false;
2034  ReorderBufferTXN *volatile curtxn = NULL;
2035 
2036  /* build data to be able to lookup the CommandIds of catalog tuples */
2038 
2039  /* setup the initial snapshot */
2040  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2041 
2042  /*
2043  * Decoding needs access to syscaches et al., which in turn use
2044  * heavyweight locks and such. Thus we need to have enough state around to
2045  * keep track of those. The easiest way is to simply use a transaction
2046  * internally. That also allows us to easily enforce that nothing writes
2047  * to the database by checking for xid assignments.
2048  *
2049  * When we're called via the SQL SRF there's already a transaction
2050  * started, so start an explicit subtransaction there.
2051  */
2052  using_subtxn = IsTransactionOrTransactionBlock();
2053 
2054  PG_TRY();
2055  {
2056  ReorderBufferChange *change;
2057 
2058  if (using_subtxn)
2059  BeginInternalSubTransaction(streaming ? "stream" : "replay");
2060  else
2062 
2063  /*
2064  * We only need to send begin/begin-prepare for non-streamed
2065  * transactions.
2066  */
2067  if (!streaming)
2068  {
2069  if (rbtxn_prepared(txn))
2070  rb->begin_prepare(rb, txn);
2071  else
2072  rb->begin(rb, txn);
2073  }
2074 
2075  ReorderBufferIterTXNInit(rb, txn, &iterstate);
2076  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
2077  {
2078  Relation relation = NULL;
2079  Oid reloid;
2080 
2081  /*
2082  * We can't call start stream callback before processing first
2083  * change.
2084  */
2085  if (prev_lsn == InvalidXLogRecPtr)
2086  {
2087  if (streaming)
2088  {
2089  txn->origin_id = change->origin_id;
2090  rb->stream_start(rb, txn, change->lsn);
2091  stream_started = true;
2092  }
2093  }
2094 
2095  /*
2096  * Enforce correct ordering of changes, merged from multiple
2097  * subtransactions. The changes may have the same LSN due to
2098  * MULTI_INSERT xlog records.
2099  */
2100  Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
2101 
2102  prev_lsn = change->lsn;
2103 
2104  /*
2105  * Set the current xid to detect concurrent aborts. This is
2106  * required for the cases when we decode the changes before the
2107  * COMMIT record is processed.
2108  */
2109  if (streaming || rbtxn_prepared(change->txn))
2110  {
2111  curtxn = change->txn;
2112  SetupCheckXidLive(curtxn->xid);
2113  }
2114 
2115  switch (change->action)
2116  {
2118 
2119  /*
2120  * Confirmation for speculative insertion arrived. Simply
2121  * use as a normal record. It'll be cleaned up at the end
2122  * of INSERT processing.
2123  */
2124  if (specinsert == NULL)
2125  elog(ERROR, "invalid ordering of speculative insertion changes");
2126  Assert(specinsert->data.tp.oldtuple == NULL);
2127  change = specinsert;
2129 
2130  /* intentionally fall through */
2134  Assert(snapshot_now);
2135 
2136  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
2137  change->data.tp.relnode.relNode);
2138 
2139  /*
2140  * Mapped catalog tuple without data, emitted while
2141  * catalog table was in the process of being rewritten. We
2142  * can fail to look up the relfilenode, because the
2143  * relmapper has no "historic" view, in contrast to the
2144  * normal catalog during decoding. Thus repeated rewrites
2145  * can cause a lookup failure. That's OK because we do not
2146  * decode catalog changes anyway. Normally such tuples
2147  * would be skipped over below, but we can't identify
2148  * whether the table should be logically logged without
2149  * mapping the relfilenode to the oid.
2150  */
2151  if (reloid == InvalidOid &&
2152  change->data.tp.newtuple == NULL &&
2153  change->data.tp.oldtuple == NULL)
2154  goto change_done;
2155  else if (reloid == InvalidOid)
2156  elog(ERROR, "could not map filenode \"%s\" to relation OID",
2157  relpathperm(change->data.tp.relnode,
2158  MAIN_FORKNUM));
2159 
2160  relation = RelationIdGetRelation(reloid);
2161 
2162  if (!RelationIsValid(relation))
2163  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
2164  reloid,
2165  relpathperm(change->data.tp.relnode,
2166  MAIN_FORKNUM));
2167 
2168  if (!RelationIsLogicallyLogged(relation))
2169  goto change_done;
2170 
2171  /*
2172  * Ignore temporary heaps created during DDL unless the
2173  * plugin has asked for them.
2174  */
2175  if (relation->rd_rel->relrewrite && !rb->output_rewrites)
2176  goto change_done;
2177 
2178  /*
2179  * For now ignore sequence changes entirely. Most of the
2180  * time they don't log changes using records we
2181  * understand, so it doesn't make sense to handle the few
2182  * cases we do.
2183  */
2184  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
2185  goto change_done;
2186 
2187  /* user-triggered change */
2188  if (!IsToastRelation(relation))
2189  {
2190  ReorderBufferToastReplace(rb, txn, relation, change);
2191  ReorderBufferApplyChange(rb, txn, relation, change,
2192  streaming);
2193 
2194  /*
2195  * Only clear reassembled toast chunks if we're sure
2196  * they're not required anymore. The creator of the
2197  * tuple tells us.
2198  */
2199  if (change->data.tp.clear_toast_afterwards)
2200  ReorderBufferToastReset(rb, txn);
2201  }
2202  /* we're not interested in toast deletions */
2203  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
2204  {
2205  /*
2206  * Need to reassemble the full toasted Datum in
2207  * memory, to ensure the chunks don't get reused till
2208  * we're done remove it from the list of this
2209  * transaction's changes. Otherwise it will get
2210  * freed/reused while restoring spooled data from
2211  * disk.
2212  */
2213  Assert(change->data.tp.newtuple != NULL);
2214 
2215  dlist_delete(&change->node);
2216  ReorderBufferToastAppendChunk(rb, txn, relation,
2217  change);
2218  }
2219 
2220  change_done:
2221 
2222  /*
2223  * If speculative insertion was confirmed, the record
2224  * isn't needed anymore.
2225  */
2226  if (specinsert != NULL)
2227  {
2228  ReorderBufferReturnChange(rb, specinsert, true);
2229  specinsert = NULL;
2230  }
2231 
2232  if (RelationIsValid(relation))
2233  {
2234  RelationClose(relation);
2235  relation = NULL;
2236  }
2237  break;
2238 
2240 
2241  /*
2242  * Speculative insertions are dealt with by delaying the
2243  * processing of the insert until the confirmation record
2244  * arrives. For that we simply unlink the record from the
2245  * chain, so it does not get freed/reused while restoring
2246  * spooled data from disk.
2247  *
2248  * This is safe in the face of concurrent catalog changes
2249  * because the relevant relation can't be changed between
2250  * speculative insertion and confirmation due to
2251  * CheckTableNotInUse() and locking.
2252  */
2253 
2254  /* clear out a pending (and thus failed) speculation */
2255  if (specinsert != NULL)
2256  {
2257  ReorderBufferReturnChange(rb, specinsert, true);
2258  specinsert = NULL;
2259  }
2260 
2261  /* and memorize the pending insertion */
2262  dlist_delete(&change->node);
2263  specinsert = change;
2264  break;
2265 
2267 
2268  /*
2269  * Abort for speculative insertion arrived. So cleanup the
2270  * specinsert tuple and toast hash.
2271  *
2272  * Note that we get the spec abort change for each toast
2273  * entry but we need to perform the cleanup only the first
2274  * time we get it for the main table.
2275  */
2276  if (specinsert != NULL)
2277  {
2278  /*
2279  * We must clean the toast hash before processing a
2280  * completely new tuple to avoid confusion about the
2281  * previous tuple's toast chunks.
2282  */
2283  Assert(change->data.tp.clear_toast_afterwards);
2284  ReorderBufferToastReset(rb, txn);
2285 
2286  /* We don't need this record anymore. */
2287  ReorderBufferReturnChange(rb, specinsert, true);
2288  specinsert = NULL;
2289  }
2290  break;
2291 
2293  {
2294  int i;
2295  int nrelids = change->data.truncate.nrelids;
2296  int nrelations = 0;
2297  Relation *relations;
2298 
2299  relations = palloc0(nrelids * sizeof(Relation));
2300  for (i = 0; i < nrelids; i++)
2301  {
2302  Oid relid = change->data.truncate.relids[i];
2303  Relation relation;
2304 
2305  relation = RelationIdGetRelation(relid);
2306 
2307  if (!RelationIsValid(relation))
2308  elog(ERROR, "could not open relation with OID %u", relid);
2309 
2310  if (!RelationIsLogicallyLogged(relation))
2311  continue;
2312 
2313  relations[nrelations++] = relation;
2314  }
2315 
2316  /* Apply the truncate. */
2317  ReorderBufferApplyTruncate(rb, txn, nrelations,
2318  relations, change,
2319  streaming);
2320 
2321  for (i = 0; i < nrelations; i++)
2322  RelationClose(relations[i]);
2323 
2324  break;
2325  }
2326 
2328  ReorderBufferApplyMessage(rb, txn, change, streaming);
2329  break;
2330 
2332  /* Execute the invalidation messages locally */
2334  change->data.inval.ninvalidations,
2335  change->data.inval.invalidations);
2336  break;
2337 
2339  /* get rid of the old */
2340  TeardownHistoricSnapshot(false);
2341 
2342  if (snapshot_now->copied)
2343  {
2344  ReorderBufferFreeSnap(rb, snapshot_now);
2345  snapshot_now =
2346  ReorderBufferCopySnap(rb, change->data.snapshot,
2347  txn, command_id);
2348  }
2349 
2350  /*
2351  * Restored from disk, need to be careful not to double
2352  * free. We could introduce refcounting for that, but for
2353  * now this seems infrequent enough not to care.
2354  */
2355  else if (change->data.snapshot->copied)
2356  {
2357  snapshot_now =
2358  ReorderBufferCopySnap(rb, change->data.snapshot,
2359  txn, command_id);
2360  }
2361  else
2362  {
2363  snapshot_now = change->data.snapshot;
2364  }
2365 
2366  /* and continue with the new one */
2367  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2368  break;
2369 
2371  Assert(change->data.command_id != InvalidCommandId);
2372 
2373  if (command_id < change->data.command_id)
2374  {
2375  command_id = change->data.command_id;
2376 
2377  if (!snapshot_now->copied)
2378  {
2379  /* we don't use the global one anymore */
2380  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2381  txn, command_id);
2382  }
2383 
2384  snapshot_now->curcid = command_id;
2385 
2386  TeardownHistoricSnapshot(false);
2387  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2388  }
2389 
2390  break;
2391 
2393  elog(ERROR, "tuplecid value in changequeue");
2394  break;
2395  }
2396  }
2397 
2398  /* speculative insertion record must be freed by now */
2399  Assert(!specinsert);
2400 
2401  /* clean up the iterator */
2402  ReorderBufferIterTXNFinish(rb, iterstate);
2403  iterstate = NULL;
2404 
2405  /*
2406  * Update total transaction count and total bytes processed by the
2407  * transaction and its subtransactions. Ensure to not count the
2408  * streamed transaction multiple times.
2409  *
2410  * Note that the statistics computation has to be done after
2411  * ReorderBufferIterTXNFinish as it releases the serialized change
2412  * which we have already accounted in ReorderBufferIterTXNNext.
2413  */
2414  if (!rbtxn_is_streamed(txn))
2415  rb->totalTxns++;
2416 
2417  rb->totalBytes += txn->total_size;
2418 
2419  /*
2420  * Done with current changes, send the last message for this set of
2421  * changes depending upon streaming mode.
2422  */
2423  if (streaming)
2424  {
2425  if (stream_started)
2426  {
2427  rb->stream_stop(rb, txn, prev_lsn);
2428  stream_started = false;
2429  }
2430  }
2431  else
2432  {
2433  /*
2434  * Call either PREPARE (for two-phase transactions) or COMMIT (for
2435  * regular ones).
2436  */
2437  if (rbtxn_prepared(txn))
2438  rb->prepare(rb, txn, commit_lsn);
2439  else
2440  rb->commit(rb, txn, commit_lsn);
2441  }
2442 
2443  /* this is just a sanity check against bad output plugin behaviour */
2445  elog(ERROR, "output plugin used XID %u",
2447 
2448  /*
2449  * Remember the command ID and snapshot for the next set of changes in
2450  * streaming mode.
2451  */
2452  if (streaming)
2453  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2454  else if (snapshot_now->copied)
2455  ReorderBufferFreeSnap(rb, snapshot_now);
2456 
2457  /* cleanup */
2458  TeardownHistoricSnapshot(false);
2459 
2460  /*
2461  * Aborting the current (sub-)transaction as a whole has the right
2462  * semantics. We want all locks acquired in here to be released, not
2463  * reassigned to the parent and we do not want any database access
2464  * have persistent effects.
2465  */
2467 
2468  /* make sure there's no cache pollution */
2470 
2471  if (using_subtxn)
2473 
2474  /*
2475  * We are here due to one of the four reasons: 1. Decoding an
2476  * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
2477  * prepared txn that was (partially) streamed. 4. Decoding a committed
2478  * txn.
2479  *
2480  * For 1, we allow truncation of txn data by removing the changes
2481  * already streamed but still keeping other things like invalidations,
2482  * snapshot, and tuplecids. For 2 and 3, we indicate
2483  * ReorderBufferTruncateTXN to do more elaborate truncation of txn
2484  * data as the entire transaction has been decoded except for commit.
2485  * For 4, as the entire txn has been decoded, we can fully clean up
2486  * the TXN reorder buffer.
2487  */
2488  if (streaming || rbtxn_prepared(txn))
2489  {
2491  /* Reset the CheckXidAlive */
2493  }
2494  else
2495  ReorderBufferCleanupTXN(rb, txn);
2496  }
2497  PG_CATCH();
2498  {
2499  MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
2500  ErrorData *errdata = CopyErrorData();
2501 
2502  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
2503  if (iterstate)
2504  ReorderBufferIterTXNFinish(rb, iterstate);
2505 
2507 
2508  /*
2509  * Force cache invalidation to happen outside of a valid transaction
2510  * to prevent catalog access as we just caught an error.
2511  */
2513 
2514  /* make sure there's no cache pollution */
2516  txn->invalidations);
2517 
2518  if (using_subtxn)
2520 
2521  /*
2522  * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
2523  * abort of the (sub)transaction we are streaming or preparing. We
2524  * need to do the cleanup and return gracefully on this error, see
2525  * SetupCheckXidLive.
2526  *
2527  * This error code can be thrown by one of the callbacks we call
2528  * during decoding so we need to ensure that we return gracefully only
2529  * when we are sending the data in streaming mode and the streaming is
2530  * not finished yet or when we are sending the data out on a PREPARE
2531  * during a two-phase commit.
2532  */
2533  if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
2534  (stream_started || rbtxn_prepared(txn)))
2535  {
2536  /* curtxn must be set for streaming or prepared transactions */
2537  Assert(curtxn);
2538 
2539  /* Cleanup the temporary error state. */
2540  FlushErrorState();
2541  FreeErrorData(errdata);
2542  errdata = NULL;
2543  curtxn->concurrent_abort = true;
2544 
2545  /* Reset the TXN so that it is allowed to stream remaining data. */
2546  ReorderBufferResetTXN(rb, txn, snapshot_now,
2547  command_id, prev_lsn,
2548  specinsert);
2549  }
2550  else
2551  {
2552  ReorderBufferCleanupTXN(rb, txn);
2553  MemoryContextSwitchTo(ecxt);
2554  PG_RE_THROW();
2555  }
2556  }
2557  PG_END_TRY();
2558 }
2559 
2560 /*
2561  * Perform the replay of a transaction and its non-aborted subtransactions.
2562  *
2563  * Subtransactions previously have to be processed by
2564  * ReorderBufferCommitChild(), even if previously assigned to the toplevel
2565  * transaction with ReorderBufferAssignChild.
2566  *
2567  * This interface is called once a prepare or toplevel commit is read for both
2568  * streamed as well as non-streamed transactions.
2569  */
2570 static void
2573  XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2574  TimestampTz commit_time,
2575  RepOriginId origin_id, XLogRecPtr origin_lsn)
2576 {
2577  Snapshot snapshot_now;
2578  CommandId command_id = FirstCommandId;
2579 
2580  txn->final_lsn = commit_lsn;
2581  txn->end_lsn = end_lsn;
2582  txn->xact_time.commit_time = commit_time;
2583  txn->origin_id = origin_id;
2584  txn->origin_lsn = origin_lsn;
2585 
2586  /*
2587  * If the transaction was (partially) streamed, we need to commit it in a
2588  * 'streamed' way. That is, we first stream the remaining part of the
2589  * transaction, and then invoke stream_commit message.
2590  *
2591  * Called after everything (origin ID, LSN, ...) is stored in the
2592  * transaction to avoid passing that information directly.
2593  */
2594  if (rbtxn_is_streamed(txn))
2595  {
2596  ReorderBufferStreamCommit(rb, txn);
2597  return;
2598  }
2599 
2600  /*
2601  * If this transaction has no snapshot, it didn't make any changes to the
2602  * database, so there's nothing to decode. Note that
2603  * ReorderBufferCommitChild will have transferred any snapshots from
2604  * subtransactions if there were any.
2605  */
2606  if (txn->base_snapshot == NULL)
2607  {
2608  Assert(txn->ninvalidations == 0);
2609 
2610  /*
2611  * Removing this txn before a commit might result in the computation
2612  * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
2613  */
2614  if (!rbtxn_prepared(txn))
2615  ReorderBufferCleanupTXN(rb, txn);
2616  return;
2617  }
2618 
2619  snapshot_now = txn->base_snapshot;
2620 
2621  /* Process and send the changes to output plugin. */
2622  ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2623  command_id, false);
2624 }
2625 
2626 /*
2627  * Commit a transaction.
2628  *
2629  * See comments for ReorderBufferReplay().
2630  */
2631 void
2633  XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2634  TimestampTz commit_time,
2635  RepOriginId origin_id, XLogRecPtr origin_lsn)
2636 {
2638 
2639  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2640  false);
2641 
2642  /* unknown transaction, nothing to replay */
2643  if (txn == NULL)
2644  return;
2645 
2646  ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2647  origin_id, origin_lsn);
2648 }
2649 
2650 /*
2651  * Record the prepare information for a transaction.
2652  */
2653 bool
2655  XLogRecPtr prepare_lsn, XLogRecPtr end_lsn,
2656  TimestampTz prepare_time,
2657  RepOriginId origin_id, XLogRecPtr origin_lsn)
2658 {
2660 
2661  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2662 
2663  /* unknown transaction, nothing to do */
2664  if (txn == NULL)
2665  return false;
2666 
2667  /*
2668  * Remember the prepare information to be later used by commit prepared in
2669  * case we skip doing prepare.
2670  */
2671  txn->final_lsn = prepare_lsn;
2672  txn->end_lsn = end_lsn;
2673  txn->xact_time.prepare_time = prepare_time;
2674  txn->origin_id = origin_id;
2675  txn->origin_lsn = origin_lsn;
2676 
2677  return true;
2678 }
2679 
2680 /* Remember that we have skipped prepare */
2681 void
2683 {
2685 
2686  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2687 
2688  /* unknown transaction, nothing to do */
2689  if (txn == NULL)
2690  return;
2691 
2693 }
2694 
2695 /*
2696  * Prepare a two-phase transaction.
2697  *
2698  * See comments for ReorderBufferReplay().
2699  */
2700 void
2702  char *gid)
2703 {
2705 
2706  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2707  false);
2708 
2709  /* unknown transaction, nothing to replay */
2710  if (txn == NULL)
2711  return;
2712 
2713  txn->txn_flags |= RBTXN_PREPARE;
2714  txn->gid = pstrdup(gid);
2715 
2716  /* The prepare info must have been updated in txn by now. */
2718 
2719  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2720  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2721 
2722  /*
2723  * We send the prepare for the concurrently aborted xacts so that later
2724  * when rollback prepared is decoded and sent, the downstream should be
2725  * able to rollback such a xact. See comments atop DecodePrepare.
2726  *
2727  * Note, for the concurrent_abort + streaming case a stream_prepare was
2728  * already sent within the ReorderBufferReplay call above.
2729  */
2730  if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
2731  rb->prepare(rb, txn, txn->final_lsn);
2732 }
2733 
2734 /*
2735  * This is used to handle COMMIT/ROLLBACK PREPARED.
2736  */
2737 void
2739  XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2740  XLogRecPtr two_phase_at,
2741  TimestampTz commit_time, RepOriginId origin_id,
2742  XLogRecPtr origin_lsn, char *gid, bool is_commit)
2743 {
2745  XLogRecPtr prepare_end_lsn;
2746  TimestampTz prepare_time;
2747 
2748  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
2749 
2750  /* unknown transaction, nothing to do */
2751  if (txn == NULL)
2752  return;
2753 
2754  /*
2755  * By this time the txn has the prepare record information, remember it to
2756  * be later used for rollback.
2757  */
2758  prepare_end_lsn = txn->end_lsn;
2759  prepare_time = txn->xact_time.prepare_time;
2760 
2761  /* add the gid in the txn */
2762  txn->gid = pstrdup(gid);
2763 
2764  /*
2765  * It is possible that this transaction is not decoded at prepare time
2766  * either because by that time we didn't have a consistent snapshot, or
2767  * two_phase was not enabled, or it was decoded earlier but we have
2768  * restarted. We only need to send the prepare if it was not decoded
2769  * earlier. We don't need to decode the xact for aborts if it is not done
2770  * already.
2771  */
2772  if ((txn->final_lsn < two_phase_at) && is_commit)
2773  {
2774  txn->txn_flags |= RBTXN_PREPARE;
2775 
2776  /*
2777  * The prepare info must have been updated in txn even if we skip
2778  * prepare.
2779  */
2781 
2782  /*
2783  * By this time the txn has the prepare record information and it is
2784  * important to use that so that downstream gets the accurate
2785  * information. If instead, we have passed commit information here
2786  * then downstream can behave as it has already replayed commit
2787  * prepared after the restart.
2788  */
2789  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2790  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2791  }
2792 
2793  txn->final_lsn = commit_lsn;
2794  txn->end_lsn = end_lsn;
2795  txn->xact_time.commit_time = commit_time;
2796  txn->origin_id = origin_id;
2797  txn->origin_lsn = origin_lsn;
2798 
2799  if (is_commit)
2800  rb->commit_prepared(rb, txn, commit_lsn);
2801  else
2802  rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
2803 
2804  /* cleanup: make sure there's no cache pollution */
2806  txn->invalidations);
2807  ReorderBufferCleanupTXN(rb, txn);
2808 }
2809 
2810 /*
2811  * Abort a transaction that possibly has previous changes. Needs to be first
2812  * called for subtransactions and then for the toplevel xid.
2813  *
2814  * NB: Transactions handled here have to have actively aborted (i.e. have
2815  * produced an abort record). Implicitly aborted transactions are handled via
2816  * ReorderBufferAbortOld(); transactions we're just not interested in, but
2817  * which have committed are handled in ReorderBufferForget().
2818  *
2819  * This function purges this transaction and its contents from memory and
2820  * disk.
2821  */
2822 void
2824 {
2826 
2827  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2828  false);
2829 
2830  /* unknown, nothing to remove */
2831  if (txn == NULL)
2832  return;
2833 
2834  /* For streamed transactions notify the remote node about the abort. */
2835  if (rbtxn_is_streamed(txn))
2836  {
2837  rb->stream_abort(rb, txn, lsn);
2838 
2839  /*
2840  * We might have decoded changes for this transaction that could load
2841  * the cache as per the current transaction's view (consider DDL's
2842  * happened in this transaction). We don't want the decoding of future
2843  * transactions to use those cache entries so execute invalidations.
2844  */
2845  if (txn->ninvalidations > 0)
2847  txn->invalidations);
2848  }
2849 
2850  /* cosmetic... */
2851  txn->final_lsn = lsn;
2852 
2853  /* remove potential on-disk data, and deallocate */
2854  ReorderBufferCleanupTXN(rb, txn);
2855 }
2856 
2857 /*
2858  * Abort all transactions that aren't actually running anymore because the
2859  * server restarted.
2860  *
2861  * NB: These really have to be transactions that have aborted due to a server
2862  * crash/immediate restart, as we don't deal with invalidations here.
2863  */
2864 void
2866 {
2867  dlist_mutable_iter it;
2868 
2869  /*
2870  * Iterate through all (potential) toplevel TXNs and abort all that are
2871  * older than what possibly can be running. Once we've found the first
2872  * that is alive we stop, there might be some that acquired an xid earlier
2873  * but started writing later, but it's unlikely and they will be cleaned
2874  * up in a later call to this function.
2875  */
2877  {
2879 
2880  txn = dlist_container(ReorderBufferTXN, node, it.cur);
2881 
2882  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
2883  {
2884  elog(DEBUG2, "aborting old transaction %u", txn->xid);
2885 
2886  /* remove potential on-disk data, and deallocate this tx */
2887  ReorderBufferCleanupTXN(rb, txn);
2888  }
2889  else
2890  return;
2891  }
2892 }
2893 
2894 /*
2895  * Forget the contents of a transaction if we aren't interested in its
2896  * contents. Needs to be first called for subtransactions and then for the
2897  * toplevel xid.
2898  *
2899  * This is significantly different to ReorderBufferAbort() because
2900  * transactions that have committed need to be treated differently from aborted
2901  * ones since they may have modified the catalog.
2902  *
2903  * Note that this is only allowed to be called in the moment a transaction
2904  * commit has just been read, not earlier; otherwise later records referring
2905  * to this xid might re-create the transaction incompletely.
2906  */
2907 void
2909 {
2911 
2912  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2913  false);
2914 
2915  /* unknown, nothing to forget */
2916  if (txn == NULL)
2917  return;
2918 
2919  /* For streamed transactions notify the remote node about the abort. */
2920  if (rbtxn_is_streamed(txn))
2921  rb->stream_abort(rb, txn, lsn);
2922 
2923  /* cosmetic... */
2924  txn->final_lsn = lsn;
2925 
2926  /*
2927  * Process cache invalidation messages if there are any. Even if we're not
2928  * interested in the transaction's contents, it could have manipulated the
2929  * catalog and we need to update the caches according to that.
2930  */
2931  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2933  txn->invalidations);
2934  else
2935  Assert(txn->ninvalidations == 0);
2936 
2937  /* remove potential on-disk data, and deallocate */
2938  ReorderBufferCleanupTXN(rb, txn);
2939 }
2940 
2941 /*
2942  * Invalidate cache for those transactions that need to be skipped just in case
2943  * catalogs were manipulated as part of the transaction.
2944  *
2945  * Note that this is a special-purpose function for prepared transactions where
2946  * we don't want to clean up the TXN even when we decide to skip it. See
2947  * DecodePrepare.
2948  */
2949 void
2951 {
2953 
2954  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2955  false);
2956 
2957  /* unknown, nothing to do */
2958  if (txn == NULL)
2959  return;
2960 
2961  /*
2962  * Process cache invalidation messages if there are any. Even if we're not
2963  * interested in the transaction's contents, it could have manipulated the
2964  * catalog and we need to update the caches according to that.
2965  */
2966  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2968  txn->invalidations);
2969  else
2970  Assert(txn->ninvalidations == 0);
2971 }
2972 
2973 
2974 /*
2975  * Execute invalidations happening outside the context of a decoded
2976  * transaction. That currently happens either for xid-less commits
2977  * (cf. RecordTransactionCommit()) or for invalidations in uninteresting
2978  * transactions (via ReorderBufferForget()).
2979  */
2980 void
2982  SharedInvalidationMessage *invalidations)
2983 {
2984  bool use_subtxn = IsTransactionOrTransactionBlock();
2985  int i;
2986 
2987  if (use_subtxn)
2988  BeginInternalSubTransaction("replay");
2989 
2990  /*
2991  * Force invalidations to happen outside of a valid transaction - that way
2992  * entries will just be marked as invalid without accessing the catalog.
2993  * That's advantageous because we don't need to setup the full state
2994  * necessary for catalog access.
2995  */
2996  if (use_subtxn)
2998 
2999  for (i = 0; i < ninvalidations; i++)
3000  LocalExecuteInvalidationMessage(&invalidations[i]);
3001 
3002  if (use_subtxn)
3004 }
3005 
3006 /*
3007  * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
3008  * least once for every xid in XLogRecord->xl_xid (other places in records
3009  * may, but do not have to be passed through here).
3010  *
3011  * Reorderbuffer keeps some datastructures about transactions in LSN order,
3012  * for efficiency. To do that it has to know about when transactions are seen
3013  * first in the WAL. As many types of records are not actually interesting for
3014  * logical decoding, they do not necessarily pass though here.
3015  */
3016 void
3018 {
3019  /* many records won't have an xid assigned, centralize check here */
3020  if (xid != InvalidTransactionId)
3021  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3022 }
3023 
3024 /*
3025  * Add a new snapshot to this transaction that may only used after lsn 'lsn'
3026  * because the previous snapshot doesn't describe the catalog correctly for
3027  * following rows.
3028  */
3029 void
3031  XLogRecPtr lsn, Snapshot snap)
3032 {
3034 
3035  change->data.snapshot = snap;
3037 
3038  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3039 }
3040 
3041 /*
3042  * Set up the transaction's base snapshot.
3043  *
3044  * If we know that xid is a subtransaction, set the base snapshot on the
3045  * top-level transaction instead.
3046  */
3047 void
3049  XLogRecPtr lsn, Snapshot snap)
3050 {
3052  bool is_new;
3053 
3054  AssertArg(snap != NULL);
3055 
3056  /*
3057  * Fetch the transaction to operate on. If we know it's a subtransaction,
3058  * operate on its top-level transaction instead.
3059  */
3060  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
3061  if (rbtxn_is_known_subxact(txn))
3062  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3063  NULL, InvalidXLogRecPtr, false);
3064  Assert(txn->base_snapshot == NULL);
3065 
3066  txn->base_snapshot = snap;
3067  txn->base_snapshot_lsn = lsn;
3069 
3070  AssertTXNLsnOrder(rb);
3071 }
3072 
3073 /*
3074  * Access the catalog with this CommandId at this point in the changestream.
3075  *
3076  * May only be called for command ids > 1
3077  */
3078 void
3080  XLogRecPtr lsn, CommandId cid)
3081 {
3083 
3084  change->data.command_id = cid;
3086 
3087  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3088 }
3089 
3090 /*
3091  * Update memory counters to account for the new or removed change.
3092  *
3093  * We update two counters - in the reorder buffer, and in the transaction
3094  * containing the change. The reorder buffer counter allows us to quickly
3095  * decide if we reached the memory limit, the transaction counter allows
3096  * us to quickly pick the largest transaction for eviction.
3097  *
3098  * When streaming is enabled, we need to update the toplevel transaction
3099  * counters instead - we don't really care about subtransactions as we
3100  * can't stream them individually anyway, and we only pick toplevel
3101  * transactions for eviction. So only toplevel transactions matter.
3102  */
3103 static void
3105  ReorderBufferChange *change,
3106  bool addition, Size sz)
3107 {
3109  ReorderBufferTXN *toptxn;
3110 
3111  Assert(change->txn);
3112 
3113  /*
3114  * Ignore tuple CID changes, because those are not evicted when reaching
3115  * memory limit. So we just don't count them, because it might easily
3116  * trigger a pointless attempt to spill.
3117  */
3119  return;
3120 
3121  txn = change->txn;
3122 
3123  /*
3124  * Update the total size in top level as well. This is later used to
3125  * compute the decoding stats.
3126  */
3127  if (txn->toptxn != NULL)
3128  toptxn = txn->toptxn;
3129  else
3130  toptxn = txn;
3131 
3132  if (addition)
3133  {
3134  txn->size += sz;
3135  rb->size += sz;
3136 
3137  /* Update the total size in the top transaction. */
3138  toptxn->total_size += sz;
3139  }
3140  else
3141  {
3142  Assert((rb->size >= sz) && (txn->size >= sz));
3143  txn->size -= sz;
3144  rb->size -= sz;
3145 
3146  /* Update the total size in the top transaction. */
3147  toptxn->total_size -= sz;
3148  }
3149 
3150  Assert(txn->size <= rb->size);
3151 }
3152 
3153 /*
3154  * Add new (relfilenode, tid) -> (cmin, cmax) mappings.
3155  *
3156  * We do not include this change type in memory accounting, because we
3157  * keep CIDs in a separate list and do not evict them when reaching
3158  * the memory limit.
3159  */
3160 void
3162  XLogRecPtr lsn, RelFileNode node,
3163  ItemPointerData tid, CommandId cmin,
3164  CommandId cmax, CommandId combocid)
3165 {
3168 
3169  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3170 
3171  change->data.tuplecid.node = node;
3172  change->data.tuplecid.tid = tid;
3173  change->data.tuplecid.cmin = cmin;
3174  change->data.tuplecid.cmax = cmax;
3175  change->data.tuplecid.combocid = combocid;
3176  change->lsn = lsn;
3177  change->txn = txn;
3179 
3180  dlist_push_tail(&txn->tuplecids, &change->node);
3181  txn->ntuplecids++;
3182 }
3183 
3184 /*
3185  * Setup the invalidation of the toplevel transaction.
3186  *
3187  * This needs to be called for each XLOG_XACT_INVALIDATIONS message and
3188  * accumulates all the invalidation messages in the toplevel transaction as
3189  * well as in the form of change in reorder buffer. We require to record it in
3190  * form of the change so that we can execute only the required invalidations
3191  * instead of executing all the invalidations on each CommandId increment. We
3192  * also need to accumulate these in the toplevel transaction because in some
3193  * cases we skip processing the transaction (see ReorderBufferForget), we need
3194  * to execute all the invalidations together.
3195  */
3196 void
3198  XLogRecPtr lsn, Size nmsgs,
3200 {
3202  MemoryContext oldcontext;
3203  ReorderBufferChange *change;
3204 
3205  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3206 
3207  oldcontext = MemoryContextSwitchTo(rb->context);
3208 
3209  /*
3210  * Collect all the invalidations under the top transaction so that we can
3211  * execute them all together. See comment atop this function
3212  */
3213  if (txn->toptxn)
3214  txn = txn->toptxn;
3215 
3216  Assert(nmsgs > 0);
3217 
3218  /* Accumulate invalidations. */
3219  if (txn->ninvalidations == 0)
3220  {
3221  txn->ninvalidations = nmsgs;
3223  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3224  memcpy(txn->invalidations, msgs,
3225  sizeof(SharedInvalidationMessage) * nmsgs);
3226  }
3227  else
3228  {
3231  (txn->ninvalidations + nmsgs));
3232 
3233  memcpy(txn->invalidations + txn->ninvalidations, msgs,
3234  nmsgs * sizeof(SharedInvalidationMessage));
3235  txn->ninvalidations += nmsgs;
3236  }
3237 
3238  change = ReorderBufferGetChange(rb);
3240  change->data.inval.ninvalidations = nmsgs;
3241  change->data.inval.invalidations = (SharedInvalidationMessage *)
3242  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3243  memcpy(change->data.inval.invalidations, msgs,
3244  sizeof(SharedInvalidationMessage) * nmsgs);
3245 
3246  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3247 
3248  MemoryContextSwitchTo(oldcontext);
3249 }
3250 
3251 /*
3252  * Apply all invalidations we know. Possibly we only need parts at this point
3253  * in the changestream but we don't know which those are.
3254  */
3255 static void
3257 {
3258  int i;
3259 
3260  for (i = 0; i < nmsgs; i++)
3262 }
3263 
3264 /*
3265  * Mark a transaction as containing catalog changes
3266  */
3267 void
3269  XLogRecPtr lsn)
3270 {
3272 
3273  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3274 
3276 
3277  /*
3278  * Mark top-level transaction as having catalog changes too if one of its
3279  * children has so that the ReorderBufferBuildTupleCidHash can
3280  * conveniently check just top-level transaction and decide whether to
3281  * build the hash table or not.
3282  */
3283  if (txn->toptxn != NULL)
3285 }
3286 
3287 /*
3288  * Query whether a transaction is already *known* to contain catalog
3289  * changes. This can be wrong until directly before the commit!
3290  */
3291 bool
3293 {
3295 
3296  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3297  false);
3298  if (txn == NULL)
3299  return false;
3300 
3301  return rbtxn_has_catalog_changes(txn);
3302 }
3303 
3304 /*
3305  * ReorderBufferXidHasBaseSnapshot
3306  * Have we already set the base snapshot for the given txn/subtxn?
3307  */
3308 bool
3310 {
3312 
3313  txn = ReorderBufferTXNByXid(rb, xid, false,
3314  NULL, InvalidXLogRecPtr, false);
3315 
3316  /* transaction isn't known yet, ergo no snapshot */
3317  if (txn == NULL)
3318  return false;
3319 
3320  /* a known subtxn? operate on top-level txn instead */
3321  if (rbtxn_is_known_subxact(txn))
3322  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3323  NULL, InvalidXLogRecPtr, false);
3324 
3325  return txn->base_snapshot != NULL;
3326 }
3327 
3328 
3329 /*
3330  * ---------------------------------------
3331  * Disk serialization support
3332  * ---------------------------------------
3333  */
3334 
3335 /*
3336  * Ensure the IO buffer is >= sz.
3337  */
3338 static void
3340 {
3341  if (!rb->outbufsize)
3342  {
3343  rb->outbuf = MemoryContextAlloc(rb->context, sz);
3344  rb->outbufsize = sz;
3345  }
3346  else if (rb->outbufsize < sz)
3347  {
3348  rb->outbuf = repalloc(rb->outbuf, sz);
3349  rb->outbufsize = sz;
3350  }
3351 }
3352 
3353 /*
3354  * Find the largest transaction (toplevel or subxact) to evict (spill to disk).
3355  *
3356  * XXX With many subtransactions this might be quite slow, because we'll have
3357  * to walk through all of them. There are some options how we could improve
3358  * that: (a) maintain some secondary structure with transactions sorted by
3359  * amount of changes, (b) not looking for the entirely largest transaction,
3360  * but e.g. for transaction using at least some fraction of the memory limit,
3361  * and (c) evicting multiple transactions at once, e.g. to free a given portion
3362  * of the memory limit (e.g. 50%).
3363  */
3364 static ReorderBufferTXN *
3366 {
3367  HASH_SEQ_STATUS hash_seq;
3369  ReorderBufferTXN *largest = NULL;
3370 
3371  hash_seq_init(&hash_seq, rb->by_txn);
3372  while ((ent = hash_seq_search(&hash_seq)) != NULL)
3373  {
3374  ReorderBufferTXN *txn = ent->txn;
3375 
3376  /* if the current transaction is larger, remember it */
3377  if ((!largest) || (txn->size > largest->size))
3378  largest = txn;
3379  }
3380 
3381  Assert(largest);
3382  Assert(largest->size > 0);
3383  Assert(largest->size <= rb->size);
3384 
3385  return largest;
3386 }
3387 
3388 /*
3389  * Find the largest toplevel transaction to evict (by streaming).
3390  *
3391  * This can be seen as an optimized version of ReorderBufferLargestTXN, which
3392  * should give us the same transaction (because we don't update memory account
3393  * for subtransaction with streaming, so it's always 0). But we can simply
3394  * iterate over the limited number of toplevel transactions that have a base
3395  * snapshot. There is no use of selecting a transaction that doesn't have base
3396  * snapshot because we don't decode such transactions.
3397  *
3398  * Note that, we skip transactions that contains incomplete changes. There
3399  * is a scope of optimization here such that we can select the largest
3400  * transaction which has incomplete changes. But that will make the code and
3401  * design quite complex and that might not be worth the benefit. If we plan to
3402  * stream the transactions that contains incomplete changes then we need to
3403  * find a way to partially stream/truncate the transaction changes in-memory
3404  * and build a mechanism to partially truncate the spilled files.
3405  * Additionally, whenever we partially stream the transaction we need to
3406  * maintain the last streamed lsn and next time we need to restore from that
3407  * segment and the offset in WAL. As we stream the changes from the top
3408  * transaction and restore them subtransaction wise, we need to even remember
3409  * the subxact from where we streamed the last change.
3410  */
3411 static ReorderBufferTXN *
3413 {
3414  dlist_iter iter;
3415  Size largest_size = 0;
3416  ReorderBufferTXN *largest = NULL;
3417 
3418  /* Find the largest top-level transaction having a base snapshot. */
3420  {
3422 
3423  txn = dlist_container(ReorderBufferTXN, base_snapshot_node, iter.cur);
3424 
3425  /* must not be a subtxn */
3427  /* base_snapshot must be set */
3428  Assert(txn->base_snapshot != NULL);
3429 
3430  if ((largest == NULL || txn->total_size > largest_size) &&
3431  (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)))
3432  {
3433  largest = txn;
3434  largest_size = txn->total_size;
3435  }
3436  }
3437 
3438  return largest;
3439 }
3440 
3441 /*
3442  * Check whether the logical_decoding_work_mem limit was reached, and if yes
3443  * pick the largest (sub)transaction at-a-time to evict and spill its changes to
3444  * disk until we reach under the memory limit.
3445  *
3446  * XXX At this point we select the transactions until we reach under the memory
3447  * limit, but we might also adapt a more elaborate eviction strategy - for example
3448  * evicting enough transactions to free certain fraction (e.g. 50%) of the memory
3449  * limit.
3450  */
3451 static void
3453 {
3455 
3456  /* bail out if we haven't exceeded the memory limit */
3457  if (rb->size < logical_decoding_work_mem * 1024L)
3458  return;
3459 
3460  /*
3461  * Loop until we reach under the memory limit. One might think that just
3462  * by evicting the largest (sub)transaction we will come under the memory
3463  * limit based on assumption that the selected transaction is at least as
3464  * large as the most recent change (which caused us to go over the memory
3465  * limit). However, that is not true because a user can reduce the
3466  * logical_decoding_work_mem to a smaller value before the most recent
3467  * change.
3468  */
3469  while (rb->size >= logical_decoding_work_mem * 1024L)
3470  {
3471  /*
3472  * Pick the largest transaction (or subtransaction) and evict it from
3473  * memory by streaming, if possible. Otherwise, spill to disk.
3474  */
3476  (txn = ReorderBufferLargestTopTXN(rb)) != NULL)
3477  {
3478  /* we know there has to be one, because the size is not zero */
3479  Assert(txn && !txn->toptxn);
3480  Assert(txn->total_size > 0);
3481  Assert(rb->size >= txn->total_size);
3482 
3483  ReorderBufferStreamTXN(rb, txn);
3484  }
3485  else
3486  {
3487  /*
3488  * Pick the largest transaction (or subtransaction) and evict it
3489  * from memory by serializing it to disk.
3490  */
3491  txn = ReorderBufferLargestTXN(rb);
3492 
3493  /* we know there has to be one, because the size is not zero */
3494  Assert(txn);
3495  Assert(txn->size > 0);
3496  Assert(rb->size >= txn->size);
3497 
3498  ReorderBufferSerializeTXN(rb, txn);
3499  }
3500 
3501  /*
3502  * After eviction, the transaction should have no entries in memory,
3503  * and should use 0 bytes for changes.
3504  */
3505  Assert(txn->size == 0);
3506  Assert(txn->nentries_mem == 0);
3507  }
3508 
3509  /* We must be under the memory limit now. */
3510  Assert(rb->size < logical_decoding_work_mem * 1024L);
3511 }
3512 
3513 /*
3514  * Spill data of a large transaction (and its subtransactions) to disk.
3515  */
3516 static void
3518 {
3519  dlist_iter subtxn_i;
3520  dlist_mutable_iter change_i;
3521  int fd = -1;
3522  XLogSegNo curOpenSegNo = 0;
3523  Size spilled = 0;
3524  Size size = txn->size;
3525 
3526  elog(DEBUG2, "spill %u changes in XID %u to disk",
3527  (uint32) txn->nentries_mem, txn->xid);
3528 
3529  /* do the same to all child TXs */
3530  dlist_foreach(subtxn_i, &txn->subtxns)
3531  {
3532  ReorderBufferTXN *subtxn;
3533 
3534  subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
3535  ReorderBufferSerializeTXN(rb, subtxn);
3536  }
3537 
3538  /* serialize changestream */
3539  dlist_foreach_modify(change_i, &txn->changes)
3540  {
3541  ReorderBufferChange *change;
3542 
3543  change = dlist_container(ReorderBufferChange, node, change_i.cur);
3544 
3545  /*
3546  * store in segment in which it belongs by start lsn, don't split over
3547  * multiple segments tho
3548  */
3549  if (fd == -1 ||
3550  !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
3551  {
3552  char path[MAXPGPATH];
3553 
3554  if (fd != -1)
3555  CloseTransientFile(fd);
3556 
3557  XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
3558 
3559  /*
3560  * No need to care about TLIs here, only used during a single run,
3561  * so each LSN only maps to a specific WAL record.
3562  */
3564  curOpenSegNo);
3565 
3566  /* open segment, create it if necessary */
3567  fd = OpenTransientFile(path,
3568  O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
3569 
3570  if (fd < 0)
3571  ereport(ERROR,
3573  errmsg("could not open file \"%s\": %m", path)));
3574  }
3575 
3576  ReorderBufferSerializeChange(rb, txn, fd, change);
3577  dlist_delete(&change->node);
3578  ReorderBufferReturnChange(rb, change, true);
3579 
3580  spilled++;
3581  }
3582 
3583  /* update the statistics iff we have spilled anything */
3584  if (spilled)
3585  {
3586  rb->spillCount += 1;
3587  rb->spillBytes += size;
3588 
3589  /* don't consider already serialized transactions */
3590  rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
3591 
3592  /* update the decoding stats */
3594  }
3595 
3596  Assert(spilled == txn->nentries_mem);
3597  Assert(dlist_is_empty(&txn->changes));
3598  txn->nentries_mem = 0;
3600 
3601  if (fd != -1)
3602  CloseTransientFile(fd);
3603 }
3604 
3605 /*
3606  * Serialize individual change to disk.
3607  */
3608 static void
3610  int fd, ReorderBufferChange *change)
3611 {
3612  ReorderBufferDiskChange *ondisk;
3613  Size sz = sizeof(ReorderBufferDiskChange);
3614 
3616 
3617  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3618  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
3619 
3620  switch (change->action)
3621  {
3622  /* fall through these, they're all similar enough */
3627  {
3628  char *data;
3629  ReorderBufferTupleBuf *oldtup,
3630  *newtup;
3631  Size oldlen = 0;
3632  Size newlen = 0;
3633 
3634  oldtup = change->data.tp.oldtuple;
3635  newtup = change->data.tp.newtuple;
3636 
3637  if (oldtup)
3638  {
3639  sz += sizeof(HeapTupleData);
3640  oldlen = oldtup->tuple.t_len;
3641  sz += oldlen;
3642  }
3643 
3644  if (newtup)
3645  {
3646  sz += sizeof(HeapTupleData);
3647  newlen = newtup->tuple.t_len;
3648  sz += newlen;
3649  }
3650 
3651  /* make sure we have enough space */
3653 
3654  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3655  /* might have been reallocated above */
3656  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3657 
3658  if (oldlen)
3659  {
3660  memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
3661  data += sizeof(HeapTupleData);
3662 
3663  memcpy(data, oldtup->tuple.t_data, oldlen);
3664  data += oldlen;
3665  }
3666 
3667  if (newlen)
3668  {
3669  memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
3670  data += sizeof(HeapTupleData);
3671 
3672  memcpy(data, newtup->tuple.t_data, newlen);
3673  data += newlen;
3674  }
3675  break;
3676  }
3678  {
3679  char *data;
3680  Size prefix_size = strlen(change->data.msg.prefix) + 1;
3681 
3682  sz += prefix_size + change->data.msg.message_size +
3683  sizeof(Size) + sizeof(Size);
3685 
3686  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3687 
3688  /* might have been reallocated above */
3689  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3690 
3691  /* write the prefix including the size */
3692  memcpy(data, &prefix_size, sizeof(Size));
3693  data += sizeof(Size);
3694  memcpy(data, change->data.msg.prefix,
3695  prefix_size);
3696  data += prefix_size;
3697 
3698  /* write the message including the size */
3699  memcpy(data, &change->data.msg.message_size, sizeof(Size));
3700  data += sizeof(Size);
3701  memcpy(data, change->data.msg.message,
3702  change->data.msg.message_size);
3703  data += change->data.msg.message_size;
3704 
3705  break;
3706  }
3708  {
3709  char *data;
3710  Size inval_size = sizeof(SharedInvalidationMessage) *
3711  change->data.inval.ninvalidations;
3712 
3713  sz += inval_size;
3714 
3716  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3717 
3718  /* might have been reallocated above */
3719  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3720  memcpy(data, change->data.inval.invalidations, inval_size);
3721  data += inval_size;
3722 
3723  break;
3724  }
3726  {
3727  Snapshot snap;
3728  char *data;
3729 
3730  snap = change->data.snapshot;
3731 
3732  sz += sizeof(SnapshotData) +
3733  sizeof(TransactionId) * snap->xcnt +
3734  sizeof(TransactionId) * snap->subxcnt;
3735 
3736  /* make sure we have enough space */
3738  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3739  /* might have been reallocated above */
3740  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3741 
3742  memcpy(data, snap, sizeof(SnapshotData));
3743  data += sizeof(SnapshotData);
3744 
3745  if (snap->xcnt)
3746  {
3747  memcpy(data, snap->xip,
3748  sizeof(TransactionId) * snap->xcnt);
3749  data += sizeof(TransactionId) * snap->xcnt;
3750  }
3751 
3752  if (snap->subxcnt)
3753  {
3754  memcpy(data, snap->subxip,
3755  sizeof(TransactionId) * snap->subxcnt);
3756  data += sizeof(TransactionId) * snap->subxcnt;
3757  }
3758  break;
3759  }
3761  {
3762  Size size;
3763  char *data;
3764 
3765  /* account for the OIDs of truncated relations */
3766  size = sizeof(Oid) * change->data.truncate.nrelids;
3767  sz += size;
3768 
3769  /* make sure we have enough space */
3771 
3772  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3773  /* might have been reallocated above */
3774  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3775 
3776  memcpy(data, change->data.truncate.relids, size);
3777  data += size;
3778 
3779  break;
3780  }
3785  /* ReorderBufferChange contains everything important */
3786  break;
3787  }
3788 
3789  ondisk->size = sz;
3790 
3791  errno = 0;
3793  if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
3794  {
3795  int save_errno = errno;
3796 
3797  CloseTransientFile(fd);
3798 
3799  /* if write didn't set errno, assume problem is no disk space */
3800  errno = save_errno ? save_errno : ENOSPC;
3801  ereport(ERROR,
3803  errmsg("could not write to data file for XID %u: %m",
3804  txn->xid)));
3805  }
3807 
3808  /*
3809  * Keep the transaction's final_lsn up to date with each change we send to
3810  * disk, so that ReorderBufferRestoreCleanup works correctly. (We used to
3811  * only do this on commit and abort records, but that doesn't work if a
3812  * system crash leaves a transaction without its abort record).
3813  *
3814  * Make sure not to move it backwards.
3815  */
3816  if (txn->final_lsn < change->lsn)
3817  txn->final_lsn = change->lsn;
3818 
3819  Assert(ondisk->change.action == change->action);
3820 }
3821 
3822 /* Returns true, if the output plugin supports streaming, false, otherwise. */
3823 static inline bool
3825 {
3827 
3828  return ctx->streaming;
3829 }
3830 
3831 /* Returns true, if the streaming can be started now, false, otherwise. */
3832 static inline bool
3834 {
3836  SnapBuild *builder = ctx->snapshot_builder;
3837 
3838  /* We can't start streaming unless a consistent state is reached. */
3840  return false;
3841 
3842  /*
3843  * We can't start streaming immediately even if the streaming is enabled
3844  * because we previously decoded this transaction and now just are
3845  * restarting.
3846  */
3847  if (ReorderBufferCanStream(rb) &&
3848  !SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))
3849  return true;
3850 
3851  return false;
3852 }
3853 
3854 /*
3855  * Send data of a large transaction (and its subtransactions) to the
3856  * output plugin, but using the stream API.
3857  */
3858 static void
3860 {
3861  Snapshot snapshot_now;
3862  CommandId command_id;
3863  Size stream_bytes;
3864  bool txn_is_streamed;
3865 
3866  /* We can never reach here for a subtransaction. */
3867  Assert(txn->toptxn == NULL);
3868 
3869  /*
3870  * We can't make any assumptions about base snapshot here, similar to what
3871  * ReorderBufferCommit() does. That relies on base_snapshot getting
3872  * transferred from subxact in ReorderBufferCommitChild(), but that was
3873  * not yet called as the transaction is in-progress.
3874  *
3875  * So just walk the subxacts and use the same logic here. But we only need
3876  * to do that once, when the transaction is streamed for the first time.
3877  * After that we need to reuse the snapshot from the previous run.
3878  *
3879  * Unlike DecodeCommit which adds xids of all the subtransactions in
3880  * snapshot's xip array via SnapBuildCommittedTxn, we can't do that here
3881  * but we do add them to subxip array instead via ReorderBufferCopySnap.
3882  * This allows the catalog changes made in subtransactions decoded till
3883  * now to be visible.
3884  */
3885  if (txn->snapshot_now == NULL)
3886  {
3887  dlist_iter subxact_i;
3888 
3889  /* make sure this transaction is streamed for the first time */
3890  Assert(!rbtxn_is_streamed(txn));
3891 
3892  /* at the beginning we should have invalid command ID */
3894 
3895  dlist_foreach(subxact_i, &txn->subtxns)
3896  {
3897  ReorderBufferTXN *subtxn;
3898 
3899  subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
3900  ReorderBufferTransferSnapToParent(txn, subtxn);
3901  }
3902 
3903  /*
3904  * If this transaction has no snapshot, it didn't make any changes to
3905  * the database till now, so there's nothing to decode.
3906  */
3907  if (txn->base_snapshot == NULL)
3908  {
3909  Assert(txn->ninvalidations == 0);
3910  return;
3911  }
3912 
3913  command_id = FirstCommandId;
3914  snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
3915  txn, command_id);
3916  }
3917  else
3918  {
3919  /* the transaction must have been already streamed */
3920  Assert(rbtxn_is_streamed(txn));
3921 
3922  /*
3923  * Nah, we already have snapshot from the previous streaming run. We
3924  * assume new subxacts can't move the LSN backwards, and so can't beat
3925  * the LSN condition in the previous branch (so no need to walk
3926  * through subxacts again). In fact, we must not do that as we may be
3927  * using snapshot half-way through the subxact.
3928  */
3929  command_id = txn->command_id;
3930 
3931  /*
3932  * We can't use txn->snapshot_now directly because after the last
3933  * streaming run, we might have got some new sub-transactions. So we
3934  * need to add them to the snapshot.
3935  */
3936  snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now,
3937  txn, command_id);
3938 
3939  /* Free the previously copied snapshot. */
3940  Assert(txn->snapshot_now->copied);
3942  txn->snapshot_now = NULL;
3943  }
3944 
3945  /*
3946  * Remember this information to be used later to update stats. We can't
3947  * update the stats here as an error while processing the changes would
3948  * lead to the accumulation of stats even though we haven't streamed all
3949  * the changes.
3950  */
3951  txn_is_streamed = rbtxn_is_streamed(txn);
3952  stream_bytes = txn->total_size;
3953 
3954  /* Process and send the changes to output plugin. */
3955  ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
3956  command_id, true);
3957 
3958  rb->streamCount += 1;
3959  rb->streamBytes += stream_bytes;
3960 
3961  /* Don't consider already streamed transaction. */
3962  rb->streamTxns += (txn_is_streamed) ? 0 : 1;
3963 
3964  /* update the decoding stats */
3966 
3967  Assert(dlist_is_empty(&txn->changes));
3968  Assert(txn->nentries == 0);
3969  Assert(txn->nentries_mem == 0);
3970 }
3971 
3972 /*
3973  * Size of a change in memory.
3974  */
3975 static Size
3977 {
3978  Size sz = sizeof(ReorderBufferChange);
3979 
3980  switch (change->action)
3981  {
3982  /* fall through these, they're all similar enough */
3987  {
3988  ReorderBufferTupleBuf *oldtup,
3989  *newtup;
3990  Size oldlen = 0;
3991  Size newlen = 0;
3992 
3993  oldtup = change->data.tp.oldtuple;
3994  newtup = change->data.tp.newtuple;
3995 
3996  if (oldtup)
3997  {
3998  sz += sizeof(HeapTupleData);
3999  oldlen = oldtup->tuple.t_len;
4000  sz += oldlen;
4001  }
4002 
4003  if (newtup)
4004  {
4005  sz += sizeof(HeapTupleData);
4006  newlen = newtup->tuple.t_len;
4007  sz += newlen;
4008  }
4009 
4010  break;
4011  }
4013  {
4014  Size prefix_size = strlen(change->data.msg.prefix) + 1;
4015 
4016  sz += prefix_size + change->data.msg.message_size +
4017  sizeof(Size) + sizeof(Size);
4018 
4019  break;
4020  }
4022  {
4023  sz += sizeof(SharedInvalidationMessage) *
4024  change->data.inval.ninvalidations;
4025  break;
4026  }
4028  {
4029  Snapshot snap;
4030 
4031  snap = change->data.snapshot;
4032 
4033  sz += sizeof(SnapshotData) +
4034  sizeof(TransactionId) * snap->xcnt +
4035  sizeof(TransactionId) * snap->subxcnt;
4036 
4037  break;
4038  }
4040  {
4041  sz += sizeof(Oid) * change->data.truncate.nrelids;
4042 
4043  break;
4044  }
4049  /* ReorderBufferChange contains everything important */
4050  break;
4051  }
4052 
4053  return sz;
4054 }
4055 
4056 
4057 /*
4058  * Restore a number of changes spilled to disk back into memory.
4059  */
4060 static Size
4062  TXNEntryFile *file, XLogSegNo *segno)
4063 {
4064  Size restored = 0;
4065  XLogSegNo last_segno;
4066  dlist_mutable_iter cleanup_iter;
4067  File *fd = &file->vfd;
4068 
4071 
4072  /* free current entries, so we have memory for more */
4073  dlist_foreach_modify(cleanup_iter, &txn->changes)
4074  {
4076  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
4077 
4078  dlist_delete(&cleanup->node);
4079  ReorderBufferReturnChange(rb, cleanup, true);
4080  }
4081  txn->nentries_mem = 0;
4082  Assert(dlist_is_empty(&txn->changes));
4083 
4084  XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
4085 
4086  while (restored < max_changes_in_memory && *segno <= last_segno)
4087  {
4088  int readBytes;
4089  ReorderBufferDiskChange *ondisk;
4090 
4091  if (*fd == -1)
4092  {
4093  char path[MAXPGPATH];
4094 
4095  /* first time in */
4096  if (*segno == 0)
4097  XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
4098 
4099  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
4100 
4101  /*
4102  * No need to care about TLIs here, only used during a single run,
4103  * so each LSN only maps to a specific WAL record.
4104  */
4106  *segno);
4107 
4108  *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
4109 
4110  /* No harm in resetting the offset even in case of failure */
4111  file->curOffset = 0;
4112 
4113  if (*fd < 0 && errno == ENOENT)
4114  {
4115  *fd = -1;
4116  (*segno)++;
4117  continue;
4118  }
4119  else if (*fd < 0)
4120  ereport(ERROR,
4122  errmsg("could not open file \"%s\": %m",
4123  path)));
4124  }
4125 
4126  /*
4127  * Read the statically sized part of a change which has information
4128  * about the total size. If we couldn't read a record, we're at the
4129  * end of this file.
4130  */
4132  readBytes = FileRead(file->vfd, rb->outbuf,
4133  sizeof(ReorderBufferDiskChange),
4135 
4136  /* eof */
4137  if (readBytes == 0)
4138  {
4139  FileClose(*fd);
4140  *fd = -1;
4141  (*segno)++;
4142  continue;
4143  }
4144  else if (readBytes < 0)
4145  ereport(ERROR,
4147  errmsg("could not read from reorderbuffer spill file: %m")));
4148  else if (readBytes != sizeof(ReorderBufferDiskChange))
4149  ereport(ERROR,
4151  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4152  readBytes,
4153  (uint32) sizeof(ReorderBufferDiskChange))));
4154 
4155  file->curOffset += readBytes;
4156 
4157  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4158 
4160  sizeof(ReorderBufferDiskChange) + ondisk->size);
4161  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4162 
4163  readBytes = FileRead(file->vfd,
4164  rb->outbuf + sizeof(ReorderBufferDiskChange),
4165  ondisk->size - sizeof(ReorderBufferDiskChange),
4166  file->curOffset,
4168 
4169  if (readBytes < 0)
4170  ereport(ERROR,
4172  errmsg("could not read from reorderbuffer spill file: %m")));
4173  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
4174  ereport(ERROR,
4176  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4177  readBytes,
4178  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
4179 
4180  file->curOffset += readBytes;
4181 
4182  /*
4183  * ok, read a full change from disk, now restore it into proper
4184  * in-memory format
4185  */
4186  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
4187  restored++;
4188  }
4189 
4190  return restored;
4191 }
4192 
4193 /*
4194  * Convert change from its on-disk format to in-memory format and queue it onto
4195  * the TXN's ->changes list.
4196  *
4197  * Note: although "data" is declared char*, at entry it points to a
4198  * maxalign'd buffer, making it safe in most of this function to assume
4199  * that the pointed-to data is suitably aligned for direct access.
4200  */
4201 static void
4203  char *data)
4204 {
4205  ReorderBufferDiskChange *ondisk;
4206  ReorderBufferChange *change;
4207 
4208  ondisk = (ReorderBufferDiskChange *) data;
4209 
4210  change = ReorderBufferGetChange(rb);
4211 
4212  /* copy static part */
4213  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
4214 
4215  data += sizeof(ReorderBufferDiskChange);
4216 
4217  /* restore individual stuff */
4218  switch (change->action)
4219  {
4220  /* fall through these, they're all similar enough */
4225  if (change->data.tp.oldtuple)
4226  {
4227  uint32 tuplelen = ((HeapTuple) data)->t_len;
4228 
4229  change->data.tp.oldtuple =
4231 
4232  /* restore ->tuple */
4233  memcpy(&change->data.tp.oldtuple->tuple, data,
4234  sizeof(HeapTupleData));
4235  data += sizeof(HeapTupleData);
4236 
4237  /* reset t_data pointer into the new tuplebuf */
4238  change->data.tp.oldtuple->tuple.t_data =
4239  ReorderBufferTupleBufData(change->data.tp.oldtuple);
4240 
4241  /* restore tuple data itself */
4242  memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
4243  data += tuplelen;
4244  }
4245 
4246  if (change->data.tp.newtuple)
4247  {
4248  /* here, data might not be suitably aligned! */
4249  uint32 tuplelen;
4250 
4251  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
4252  sizeof(uint32));
4253 
4254  change->data.tp.newtuple =
4256 
4257  /* restore ->tuple */
4258  memcpy(&change->data.tp.newtuple->tuple, data,
4259  sizeof(HeapTupleData));
4260  data += sizeof(HeapTupleData);
4261 
4262  /* reset t_data pointer into the new tuplebuf */
4263  change->data.tp.newtuple->tuple.t_data =
4264  ReorderBufferTupleBufData(change->data.tp.newtuple);
4265 
4266  /* restore tuple data itself */
4267  memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
4268  data += tuplelen;
4269  }
4270 
4271  break;
4273  {
4274  Size prefix_size;
4275 
4276  /* read prefix */
4277  memcpy(&prefix_size, data, sizeof(Size));
4278  data += sizeof(Size);
4279  change->data.msg.prefix = MemoryContextAlloc(rb->context,
4280  prefix_size);
4281  memcpy(change->data.msg.prefix, data, prefix_size);
4282  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
4283  data += prefix_size;
4284 
4285  /* read the message */
4286  memcpy(&change->data.msg.message_size, data, sizeof(Size));
4287  data += sizeof(Size);
4288  change->data.msg.message = MemoryContextAlloc(rb->context,
4289  change->data.msg.message_size);
4290  memcpy(change->data.msg.message, data,
4291  change->data.msg.message_size);
4292  data += change->data.msg.message_size;
4293 
4294  break;
4295  }
4297  {
4298  Size inval_size = sizeof(SharedInvalidationMessage) *
4299  change->data.inval.ninvalidations;
4300 
4301  change->data.inval.invalidations =
4302  MemoryContextAlloc(rb->context, inval_size);
4303 
4304  /* read the message */
4305  memcpy(change->data.inval.invalidations, data, inval_size);
4306 
4307  break;
4308  }
4310  {
4311  Snapshot oldsnap;
4312  Snapshot newsnap;
4313  Size size;
4314 
4315  oldsnap = (Snapshot) data;
4316 
4317  size = sizeof(SnapshotData) +
4318  sizeof(TransactionId) * oldsnap->xcnt +
4319  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
4320 
4321  change->data.snapshot = MemoryContextAllocZero(rb->context, size);
4322 
4323  newsnap = change->data.snapshot;
4324 
4325  memcpy(newsnap, data, size);
4326  newsnap->xip = (TransactionId *)
4327  (((char *) newsnap) + sizeof(SnapshotData));
4328  newsnap->subxip = newsnap->xip + newsnap->xcnt;
4329  newsnap->copied = true;
4330  break;
4331  }
4332  /* the base struct contains all the data, easy peasy */
4334  {
4335  Oid *relids;
4336 
4337  relids = ReorderBufferGetRelids(rb,
4338  change->data.truncate.nrelids);
4339  memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
4340  change->data.truncate.relids = relids;
4341 
4342  break;
4343  }
4348  break;
4349  }
4350 
4351  dlist_push_tail(&txn->changes, &change->node);
4352  txn->nentries_mem++;
4353 
4354  /*
4355  * Update memory accounting for the restored change. We need to do this
4356  * although we don't check the memory limit when restoring the changes in
4357  * this branch (we only do that when initially queueing the changes after
4358  * decoding), because we will release the changes later, and that will
4359  * update the accounting too (subtracting the size from the counters). And
4360  * we don't want to underflow there.
4361  */
4362  ReorderBufferChangeMemoryUpdate(rb, change, true,
4363  ReorderBufferChangeSize(change));
4364 }
4365 
4366 /*
4367  * Remove all on-disk stored for the passed in transaction.
4368  */
4369 static void
4371 {
4372  XLogSegNo first;
4373  XLogSegNo cur;
4374  XLogSegNo last;
4375 
4378 
4379  XLByteToSeg(txn->first_lsn, first, wal_segment_size);
4380  XLByteToSeg(txn->final_lsn, last, wal_segment_size);
4381 
4382  /* iterate over all possible filenames, and delete them */
4383  for (cur = first; cur <= last; cur++)
4384  {
4385  char path[MAXPGPATH];
4386 
4388  if (unlink(path) != 0 && errno != ENOENT)
4389  ereport(ERROR,
4391  errmsg("could not remove file \"%s\": %m", path)));
4392  }
4393 }
4394 
4395 /*
4396  * Remove any leftover serialized reorder buffers from a slot directory after a
4397  * prior crash or decoding session exit.
4398  */
4399 static void
4401 {
4402  DIR *spill_dir;
4403  struct dirent *spill_de;
4404  struct stat statbuf;
4405  char path[MAXPGPATH * 2 + 12];
4406 
4407  sprintf(path, "pg_replslot/%s", slotname);
4408 
4409  /* we're only handling directories here, skip if it's not ours */
4410  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
4411  return;
4412 
4413  spill_dir = AllocateDir(path);
4414  while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
4415  {
4416  /* only look at names that can be ours */
4417  if (strncmp(spill_de->d_name, "xid", 3) == 0)
4418  {
4419  snprintf(path, sizeof(path),
4420  "pg_replslot/%s/%s", slotname,
4421  spill_de->d_name);
4422 
4423  if (unlink(path) != 0)
4424  ereport(ERROR,
4426  errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
4427  path, slotname)));
4428  }
4429  }
4430  FreeDir(spill_dir);
4431 }
4432 
4433 /*
4434  * Given a replication slot, transaction ID and segment number, fill in the
4435  * corresponding spill file into 'path', which is a caller-owned buffer of size
4436  * at least MAXPGPATH.
4437  */
4438 static void
4440  XLogSegNo segno)
4441 {
4442  XLogRecPtr recptr;
4443 
4444  XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
4445 
4446  snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.spill",
4448  xid, LSN_FORMAT_ARGS(recptr));
4449 }
4450 
4451 /*
4452  * Delete all data spilled to disk after we've restarted/crashed. It will be
4453  * recreated when the respective slots are reused.
4454  */
4455 void
4457 {
4458  DIR *logical_dir;
4459  struct dirent *logical_de;
4460 
4461  logical_dir = AllocateDir("pg_replslot");
4462  while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
4463  {
4464  if (strcmp(logical_de->d_name, ".") == 0 ||
4465  strcmp(logical_de->d_name, "..") == 0)
4466  continue;
4467 
4468  /* if it cannot be a slot, skip the directory */
4469  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
4470  continue;
4471 
4472  /*
4473  * ok, has to be a surviving logical slot, iterate and delete
4474  * everything starting with xid-*
4475  */
4477  }
4478  FreeDir(logical_dir);
4479 }
4480 
4481 /* ---------------------------------------
4482  * toast reassembly support
4483  * ---------------------------------------
4484  */
4485 
4486 /*
4487  * Initialize per tuple toast reconstruction support.
4488  */
4489 static void
4491 {
4492  HASHCTL hash_ctl;
4493 
4494  Assert(txn->toast_hash == NULL);
4495 
4496  hash_ctl.keysize = sizeof(Oid);
4497  hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
4498  hash_ctl.hcxt = rb->context;
4499  txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
4501 }
4502 
4503 /*
4504  * Per toast-chunk handling for toast reconstruction
4505  *
4506  * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
4507  * toasted Datum comes along.
4508  */
4509 static void
4511  Relation relation, ReorderBufferChange *change)
4512 {
4513  ReorderBufferToastEnt *ent;
4514  ReorderBufferTupleBuf *newtup;
4515  bool found;
4516  int32 chunksize;
4517  bool isnull;
4518  Pointer chunk;
4519  TupleDesc desc = RelationGetDescr(relation);
4520  Oid chunk_id;
4521  int32 chunk_seq;
4522 
4523  if (txn->toast_hash == NULL)
4524  ReorderBufferToastInitHash(rb, txn);
4525 
4526  Assert(IsToastRelation(relation));
4527 
4528  newtup = change->data.tp.newtuple;
4529  chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
4530  Assert(!isnull);
4531  chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
4532  Assert(!isnull);
4533 
4534  ent = (ReorderBufferToastEnt *)
4535  hash_search(txn->toast_hash,
4536  (void *) &chunk_id,
4537  HASH_ENTER,
4538  &found);
4539 
4540  if (!found)
4541  {
4542  Assert(ent->chunk_id == chunk_id);
4543  ent->num_chunks = 0;
4544  ent->last_chunk_seq = 0;
4545  ent->size = 0;
4546  ent->reconstructed = NULL;
4547  dlist_init(&ent->chunks);
4548 
4549  if (chunk_seq != 0)
4550  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
4551  chunk_seq, chunk_id);
4552  }
4553  else if (found && chunk_seq != ent->last_chunk_seq + 1)
4554  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
4555  chunk_seq, chunk_id, ent->last_chunk_seq + 1);
4556 
4557  chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
4558  Assert(!isnull);
4559 
4560  /* calculate size so we can allocate the right size at once later */
4561  if (!VARATT_IS_EXTENDED(chunk))
4562  chunksize = VARSIZE(chunk) - VARHDRSZ;
4563  else if (VARATT_IS_SHORT(chunk))
4564  /* could happen due to heap_form_tuple doing its thing */
4565  chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
4566  else
4567  elog(ERROR, "unexpected type of toast chunk");
4568 
4569  ent->size += chunksize;
4570  ent->last_chunk_seq = chunk_seq;
4571  ent->num_chunks++;
4572  dlist_push_tail(&ent->chunks, &change->node);
4573 }
4574 
4575 /*
4576  * Rejigger change->newtuple to point to in-memory toast tuples instead to
4577  * on-disk toast tuples that may not longer exist (think DROP TABLE or VACUUM).
4578  *
4579  * We cannot replace unchanged toast tuples though, so those will still point
4580  * to on-disk toast data.
4581  *
4582  * While updating the existing change with detoasted tuple data, we need to
4583  * update the memory accounting info, because the change size will differ.
4584  * Otherwise the accounting may get out of sync, triggering serialization
4585  * at unexpected times.
4586  *
4587  * We simply subtract size of the change before rejiggering the tuple, and
4588  * then adding the new size. This makes it look like the change was removed
4589  * and then added back, except it only tweaks the accounting info.
4590  *
4591  * In particular it can't trigger serialization, which would be pointless
4592  * anyway as it happens during commit processing right before handing
4593  * the change to the output plugin.
4594  */
4595 static void
4597  Relation relation, ReorderBufferChange *change)
4598 {
4599  TupleDesc desc;
4600  int natt;
4601  Datum *attrs;
4602  bool *isnull;
4603  bool *free;
4604  HeapTuple tmphtup;
4605  Relation toast_rel;
4606  TupleDesc toast_desc;
4607  MemoryContext oldcontext;
4608  ReorderBufferTupleBuf *newtup;
4609  Size old_size;
4610 
4611  /* no toast tuples changed */
4612  if (txn->toast_hash == NULL)
4613  return;
4614 
4615  /*
4616  * We're going to modify the size of the change. So, to make sure the
4617  * accounting is correct we record the current change size and then after
4618  * re-computing the change we'll subtract the recorded size and then
4619  * re-add the new change size at the end. We don't immediately subtract
4620  * the old size because if there is any error before we add the new size,
4621  * we will release the changes and that will update the accounting info
4622  * (subtracting the size from the counters). And we don't want to
4623  * underflow there.
4624  */
4625  old_size = ReorderBufferChangeSize(change);
4626 
4627  oldcontext = MemoryContextSwitchTo(rb->context);
4628 
4629  /* we should only have toast tuples in an INSERT or UPDATE */
4630  Assert(change->data.tp.newtuple);
4631 
4632  desc = RelationGetDescr(relation);
4633 
4634  toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
4635  if (!RelationIsValid(toast_rel))
4636  elog(ERROR, "could not open toast relation with OID %u (base relation \"%s\")",
4637  relation->rd_rel->reltoastrelid, RelationGetRelationName(relation));
4638 
4639  toast_desc = RelationGetDescr(toast_rel);
4640 
4641  /* should we allocate from stack instead? */
4642  attrs = palloc0(sizeof(Datum) * desc->natts);
4643  isnull = palloc0(sizeof(bool) * desc->natts);
4644  free = palloc0(sizeof(bool) * desc->natts);
4645 
4646  newtup = change->data.tp.newtuple;
4647 
4648  heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
4649 
4650  for (natt = 0; natt < desc->natts; natt++)
4651  {
4652  Form_pg_attribute attr = TupleDescAttr(desc, natt);
4653  ReorderBufferToastEnt *ent;
4654  struct varlena *varlena;
4655 
4656  /* va_rawsize is the size of the original datum -- including header */
4657  struct varatt_external toast_pointer;
4658  struct varatt_indirect redirect_pointer;
4659  struct varlena *new_datum = NULL;
4660  struct varlena *reconstructed;
4661  dlist_iter it;
4662  Size data_done = 0;
4663 
4664  /* system columns aren't toasted */
4665  if (attr->attnum < 0)
4666  continue;
4667 
4668  if (attr->attisdropped)
4669  continue;
4670 
4671  /* not a varlena datatype */
4672  if (attr->attlen != -1)
4673  continue;
4674 
4675  /* no data */
4676  if (isnull[natt])
4677  continue;
4678 
4679  /* ok, we know we have a toast datum */
4680  varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
4681 
4682  /* no need to do anything if the tuple isn't external */
4683  if (!VARATT_IS_EXTERNAL(varlena))
4684  continue;
4685 
4686  VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
4687 
4688  /*
4689  * Check whether the toast tuple changed, replace if so.
4690  */
4691  ent = (ReorderBufferToastEnt *)
4692  hash_search(txn->toast_hash,
4693  (void *) &toast_pointer.va_valueid,
4694  HASH_FIND,
4695  NULL);
4696  if (ent == NULL)
4697  continue;
4698 
4699  new_datum =
4700  (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
4701 
4702  free[natt] = true;
4703 
4704  reconstructed = palloc0(toast_pointer.va_rawsize);
4705 
4706  ent->reconstructed = reconstructed;
4707 
4708  /* stitch toast tuple back together from its parts */
4709  dlist_foreach(it, &ent->chunks)
4710  {
4711  bool isnull;
4712  ReorderBufferChange *cchange;
4713  ReorderBufferTupleBuf *ctup;
4714  Pointer chunk;
4715 
4716  cchange = dlist_container(ReorderBufferChange, node, it.cur);
4717  ctup = cchange->data.tp.newtuple;
4718  chunk = DatumGetPointer(fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
4719 
4720  Assert(!isnull);
4721  Assert(!VARATT_IS_EXTERNAL(chunk));
4722  Assert(!VARATT_IS_SHORT(chunk));
4723 
4724  memcpy(VARDATA(reconstructed) + data_done,
4725  VARDATA(chunk),
4726  VARSIZE(chunk) - VARHDRSZ);
4727  data_done += VARSIZE(chunk) - VARHDRSZ;
4728  }
4729  Assert(data_done == VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer));
4730 
4731  /* make sure its marked as compressed or not */
4732  if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
4733  SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
4734  else
4735  SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
4736 
4737  memset(&redirect_pointer, 0, sizeof(redirect_pointer));
4738  redirect_pointer.pointer = reconstructed;
4739 
4741  memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
4742  sizeof(redirect_pointer));
4743 
4744  attrs[natt] = PointerGetDatum(new_datum);
4745  }
4746 
4747  /*
4748  * Build tuple in separate memory & copy tuple back into the tuplebuf
4749  * passed to the output plugin. We can't directly heap_fill_tuple() into
4750  * the tuplebuf because attrs[] will point back into the current content.
4751  */
4752  tmphtup = heap_form_tuple(desc, attrs, isnull);
4753  Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
4754  Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
4755 
4756  memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
4757  newtup->tuple.t_len = tmphtup->t_len;
4758 
4759  /*
4760  * free resources we won't further need, more persistent stuff will be
4761  * free'd in ReorderBufferToastReset().
4762  */
4763  RelationClose(toast_rel);
4764  pfree(tmphtup);
4765  for (natt = 0; natt < desc->natts; natt++)
4766  {
4767  if (free[natt])
4768  pfree(DatumGetPointer(attrs[natt]));
4769  }
4770  pfree(attrs);
4771  pfree(free);
4772  pfree(isnull);
4773 
4774  MemoryContextSwitchTo(oldcontext);
4775 
4776  /* subtract the old change size */
4777  ReorderBufferChangeMemoryUpdate(rb, change, false, old_size);
4778  /* now add the change back, with the correct size */
4779  ReorderBufferChangeMemoryUpdate(rb, change, true,
4780  ReorderBufferChangeSize(change));
4781 }
4782 
4783 /*
4784  * Free all resources allocated for toast reconstruction.
4785  */
4786 static void
4788 {
4789  HASH_SEQ_STATUS hstat;
4790  ReorderBufferToastEnt *ent;
4791 
4792  if (txn->toast_hash == NULL)
4793  return;
4794 
4795  /* sequentially walk over the hash and free everything */
4796  hash_seq_init(&hstat, txn->toast_hash);
4797  while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
4798  {
4799  dlist_mutable_iter it;
4800 
4801  if (ent->reconstructed != NULL)
4802  pfree(ent->reconstructed);
4803 
4804  dlist_foreach_modify(it, &ent->chunks)
4805  {
4806  ReorderBufferChange *change =
4808 
4809  dlist_delete(&change->node);
4810  ReorderBufferReturnChange(rb, change, true);
4811  }
4812  }
4813 
4814  hash_destroy(txn->toast_hash);
4815  txn->toast_hash = NULL;
4816 }
4817 
4818 
4819 /* ---------------------------------------
4820  * Visibility support for logical decoding
4821  *
4822  *
4823  * Lookup actual cmin/cmax values when using decoding snapshot. We can't
4824  * always rely on stored cmin/cmax values because of two scenarios:
4825  *
4826  * * A tuple got changed multiple times during a single transaction and thus
4827  * has got a combo CID. Combo CIDs are only valid for the duration of a
4828  * single transaction.
4829  * * A tuple with a cmin but no cmax (and thus no combo CID) got
4830  * deleted/updated in another transaction than the one which created it
4831  * which we are looking at right now. As only one of cmin, cmax or combo CID
4832  * is actually stored in the heap we don't have access to the value we
4833  * need anymore.
4834  *
4835  * To resolve those problems we have a per-transaction hash of (cmin,
4836  * cmax) tuples keyed by (relfilenode, ctid) which contains the actual
4837  * (cmin, cmax) values. That also takes care of combo CIDs by simply
4838  * not caring about them at all. As we have the real cmin/cmax values
4839  * combo CIDs aren't interesting.
4840  *
4841  * As we only care about catalog tuples here the overhead of this
4842  * hashtable should be acceptable.
4843  *
4844  * Heap rewrites complicate this a bit, check rewriteheap.c for
4845  * details.
4846  * -------------------------------------------------------------------------
4847  */
4848 
4849 /* struct for sorting mapping files by LSN efficiently */
4850 typedef struct RewriteMappingFile
4851 {
4853  char fname[MAXPGPATH];
4855 
4856 #ifdef NOT_USED
4857 static void
4858 DisplayMapping(HTAB *tuplecid_data)
4859 {
4860  HASH_SEQ_STATUS hstat;
4862 
4863  hash_seq_init(&hstat, tuplecid_data);
4864  while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
4865  {
4866  elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
4867  ent->key.relnode.dbNode,
4868  ent->key.relnode.spcNode,
4869  ent->key.relnode.relNode,
4872  ent->cmin,
4873  ent->cmax
4874  );
4875  }
4876 }
4877 #endif
4878 
4879 /*
4880  * Apply a single mapping file to tuplecid_data.
4881  *
4882  * The mapping file has to have been verified to be a) committed b) for our
4883  * transaction c) applied in LSN order.
4884  */
4885 static void
4886 ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
4887 {
4888  char path[MAXPGPATH];
4889  int fd;
4890  int readBytes;
4892 
4893  sprintf(path, "pg_logical/mappings/%s", fname);
4894  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
4895  if (fd < 0)
4896  ereport(ERROR,
4898  errmsg("could not open file \"%s\": %m", path)));
4899 
4900  while (true)
4901  {
4904  ReorderBufferTupleCidEnt *new_ent;
4905  bool found;
4906 
4907  /* be careful about padding */
4908  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
4909 
4910  /* read all mappings till the end of the file */
4912  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
4914 
4915  if (readBytes < 0)
4916  ereport(ERROR,
4918  errmsg("could not read file \"%s\": %m",
4919  path)));
4920  else if (readBytes == 0) /* EOF */
4921  break;
4922  else if (readBytes != sizeof(LogicalRewriteMappingData))
4923  ereport(ERROR,
4925  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
4926  path, readBytes,
4927  (int32) sizeof(LogicalRewriteMappingData))));
4928 
4929  key.relnode = map.old_node;
4930  ItemPointerCopy(&map.old_tid,
4931  &key.tid);
4932 
4933 
4934  ent = (ReorderBufferTupleCidEnt *)
4935  hash_search(tuplecid_data,
4936  (void *) &key,
4937  HASH_FIND,
4938  NULL);
4939 
4940  /* no existing mapping, no need to update */
4941  if (!ent)
4942  continue;
4943 
4944  key.relnode = map.new_node;
4945  ItemPointerCopy(&map.new_tid,
4946  &key.tid);
4947 
4948  new_ent = (ReorderBufferTupleCidEnt *)
4949  hash_search(tuplecid_data,
4950  (void *) &key,
4951  HASH_ENTER,
4952  &found);
4953 
4954  if (found)
4955  {
4956  /*
4957  * Make sure the existing mapping makes sense. We sometime update
4958  * old records that did not yet have a cmax (e.g. pg_class' own
4959  * entry while rewriting it) during rewrites, so allow that.
4960  */
4961  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
4962  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
4963  }
4964  else
4965  {
4966  /* update mapping */
4967  new_ent->cmin = ent->cmin;
4968  new_ent->cmax = ent->cmax;
4969  new_ent->combocid = ent->combocid;
4970  }
4971  }
4972 
4973  if (CloseTransientFile(fd) != 0)
4974  ereport(ERROR,
4976  errmsg("could not close file \"%s\": %m", path)));
4977 }
4978 
4979 
4980 /*
4981  * Check whether the TransactionId 'xid' is in the pre-sorted array 'xip'.
4982  */
4983 static bool
4985 {
4986  return bsearch(&xid, xip, num,
4987  sizeof(TransactionId), xidComparator) != NULL;
4988 }
4989 
4990 /*
4991  * list_sort() comparator for sorting RewriteMappingFiles in LSN order.
4992  */
4993 static int
4994 file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
4995 {
4998 
4999  if (a->lsn < b->lsn)
5000  return -1;
5001  else if (a->lsn > b->lsn)
5002  return 1;
5003  return 0;
5004 }
5005 
5006 /*
5007  * Apply any existing logical remapping files if there are any targeted at our
5008  * transaction for relid.
5009  */
5010 static void
5012 {
5013  DIR *mapping_dir;
5014  struct dirent *mapping_de;
5015  List *files = NIL;
5016  ListCell *file;
5017  Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
5018 
5019  mapping_dir = AllocateDir("pg_logical/mappings");
5020  while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
5021  {
5022  Oid f_dboid;
5023  Oid f_relid;
5024  TransactionId f_mapped_xid;
5025  TransactionId f_create_xid;
5026  XLogRecPtr f_lsn;
5027  uint32 f_hi,
5028  f_lo;
5029  RewriteMappingFile *f;
5030 
5031  if (strcmp(mapping_de->d_name, ".") == 0 ||
5032  strcmp(mapping_de->d_name, "..") == 0)
5033  continue;
5034 
5035  /* Ignore files that aren't ours */
5036  if (strncmp(mapping_de->d_name, "map-", 4) != 0)
5037  continue;
5038 
5039  if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
5040  &f_dboid, &f_relid, &f_hi, &f_lo,
5041  &f_mapped_xid, &f_create_xid) != 6)
5042  elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
5043 
5044  f_lsn = ((uint64) f_hi) << 32 | f_lo;
5045 
5046  /* mapping for another database */
5047  if (f_dboid != dboid)
5048  continue;
5049 
5050  /* mapping for another relation */
5051  if (f_relid != relid)
5052  continue;
5053 
5054  /* did the creating transaction abort? */
5055  if (!TransactionIdDidCommit(f_create_xid))
5056  continue;
5057 
5058  /* not for our transaction */
5059  if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
5060  continue;
5061 
5062  /* ok, relevant, queue for apply */
5063  f = palloc(sizeof(RewriteMappingFile));
5064  f->lsn = f_lsn;
5065  strcpy(f->fname, mapping_de->d_name);
5066  files = lappend(files, f);
5067  }
5068  FreeDir(mapping_dir);
5069 
5070  /* sort files so we apply them in LSN order */
5071  list_sort(files, file_sort_by_lsn);
5072 
5073  foreach(file, files)
5074  {
5076 
5077  elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
5078  snapshot->subxip[0]);
5079  ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
5080  pfree(f);
5081  }
5082 }
5083 
5084 /*
5085  * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
5086  * combo CIDs.
5087  */
5088 bool
5090  Snapshot snapshot,
5091  HeapTuple htup, Buffer buffer,
5092  CommandId *cmin, CommandId *cmax)
5093 {
5096  ForkNumber forkno;
5097  BlockNumber blockno;
5098  bool updated_mapping = false;
5099 
5100  /*
5101  * Return unresolved if tuplecid_data is not valid. That's because when
5102  * streaming in-progress transactions we may run into tuples with the CID
5103  * before actually decoding them. Think e.g. about INSERT followed by
5104  * TRUNCATE, where the TRUNCATE may not be decoded yet when applying the
5105  * INSERT. So in such cases, we assume the CID is from the future
5106  * command.
5107  */
5108  if (tuplecid_data == NULL)
5109  return false;
5110 
5111  /* be careful about padding */
5112  memset(&key, 0, sizeof(key));
5113 
5114  Assert(!BufferIsLocal(buffer));
5115 
5116  /*
5117  * get relfilenode from the buffer, no convenient way to access it other
5118  * than that.
5119  */
5120  BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
5121 
5122  /* tuples can only be in the main fork */
5123  Assert(forkno == MAIN_FORKNUM);
5124  Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
5125 
5126  ItemPointerCopy(&htup->t_self,
5127  &key.tid);
5128 
5129 restart:
5130  ent = (ReorderBufferTupleCidEnt *)
5131  hash_search(tuplecid_data,
5132  (void *) &key,
5133  HASH_FIND,
5134  NULL);
5135 
5136  /*
5137  * failed to find a mapping, check whether the table was rewritten and
5138  * apply mapping if so, but only do that once - there can be no new
5139  * mappings while we are in here since we have to hold a lock on the
5140  * relation.
5141  */
5142  if (ent == NULL && !updated_mapping)
5143  {
5144  UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
5145  /* now check but don't update for a mapping again */
5146  updated_mapping = true;
5147  goto restart;
5148  }
5149  else if (ent == NULL)
5150  return false;
5151 
5152  if (cmin)
5153  *cmin = ent->cmin;
5154  if (cmax)
5155  *cmax = ent->cmax;
5156  return true;
5157 }
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
XLogRecPtr first_lsn
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
#define NIL
Definition: pg_list.h:65
uint32 CommandId
Definition: c.h:601
TimestampTz commit_time
struct ReorderBufferToastEnt ReorderBufferToastEnt
struct TXNEntryFile TXNEntryFile
void AbortCurrentTransaction(void)
Definition: xact.c:3224
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
bool IsToastRelation(Relation relation)
Definition: catalog.c:146
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:862
void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
#define relpathperm(rnode, forknum)
Definition: relpath.h:83
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
ReorderBufferApplyChangeCB apply_change
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:218
#define AllocSetContextCreate
Definition: memutils.h:173
HeapTupleData * HeapTuple
Definition: htup.h:71
#define rbtxn_prepared(txn)
void * private_data
dlist_node base_snapshot_node
#define DEBUG1
Definition: elog.h:25
dlist_node * cur
Definition: ilist.h:180
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
void StartupReorderBuffer(void)
#define VARDATA(PTR)
Definition: postgres.h:315
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define fastgetattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:711
static void pgstat_report_wait_end(void)
Definition: wait_event.h:274
static int32 next
Definition: blutils.c:219
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1564
#define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)
Definition: postgres.h:391
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
int wal_segment_size
Definition: xlog.c:119
void ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:543
uint32 TransactionId
Definition: c.h:587
void ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
bool copied
Definition: snapshot.h:185
bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
Definition: snapbuild.c:394
void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
#define RBTXN_IS_STREAMED
MemoryContext hcxt
Definition: hsearch.h:86
#define DatumGetInt32(X)
Definition: postgres.h:516
int sqlerrcode
Definition: elog.h:381
#define RelationGetDescr(relation)
Definition: rel.h:503
static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb)
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
#define DEBUG3
Definition: elog.h:23
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
Definition: snapbuild.c:367
#define write(a, b, c)
Definition: win32.h:14
#define VARHDRSZ_SHORT
Definition: postgres.h:292
TransactionId by_txn_last_xid
#define VARSIZE(PTR)
Definition: postgres.h:316
ErrorData * CopyErrorData(void)
Definition: elog.c:1560
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
Definition: detoast.h:22
int64 TimestampTz
Definition: timestamp.h:39
#define PointerGetDatum(X)
Definition: postgres.h:600
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
ReorderBufferTXN * txn
static bool TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
#define VARHDRSZ
Definition: c.h:627
#define dlist_foreach(iter, lhead)
Definition: ilist.h:526
ReorderBufferStreamAbortCB stream_abort
XLogRecPtr current_restart_decoding_lsn
#define DatumGetObjectId(X)
Definition: postgres.h:544
char * pstrdup(const char *in)
Definition: mcxt.c:1299
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2801
static ReorderBufferTXN * ReorderBufferGetTXN(ReorderBuffer *rb)
static void ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:88
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void ReorderBufferFree(ReorderBuffer *rb)
#define rbtxn_is_serialized_clear(txn)
uint16 RepOriginId
Definition: xlogdefs.h:65
Size entrysize
Definition: hsearch.h:76
CommandId command_id
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:350
Snapshot snapshot_now
struct cursor * cur
Definition: ecpg.c:28
#define rbtxn_has_partial_change(txn)
char fname[MAXPGPATH]
int32 va_rawsize
Definition: postgres.h:71
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4715
#define INFO
Definition: elog.h:33
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:175
void binaryheap_replace_first(binaryheap *heap, Datum d)
Definition: binaryheap.c:204
void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileNode node, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
uint32 BlockNumber
Definition: block.h:31
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2066
ReorderBufferCommitCB commit
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:674
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:125
ReplicationSlotPersistentData data
Definition: slot.h:147
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
struct SnapshotData * Snapshot
Definition: snapshot.h:121
Form_pg_class rd_rel
Definition: rel.h:109
unsigned int Oid
Definition: postgres_ext.h:31
XLogRecPtr base_snapshot_lsn
ReorderBufferStreamCommitCB stream_commit
Definition: dirent.h:9
uint32 regd_count
Definition: snapshot.h:205
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:85
void binaryheap_add_unordered(binaryheap *heap, Datum d)
Definition: binaryheap.c:110
MemoryContext change_context
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define VARDATA_EXTERNAL(PTR)
Definition: postgres.h:323
#define PG_BINARY
Definition: c.h:1271
void FlushErrorState(void)
Definition: elog.c:1654
XLogRecPtr origin_lsn
static void ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
signed int int32
Definition: c.h:429
#define FirstCommandId
Definition: c.h:603
XLogRecPtr EndRecPtr
Definition: xlogreader.h:176
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
#define RBTXN_SKIPPED_PREPARE
HeapTupleHeader t_data
Definition: htup.h:68
#define VARATT_IS_EXTERNAL(PTR)
Definition: postgres.h:326
#define sprintf
Definition: port.h:219
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:172
#define rbtxn_is_streamed(txn)
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:440
Definition: dynahash.c:219
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
ReorderBufferStreamMessageCB stream_message
#define RBTXN_HAS_CATALOG_CHANGES
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:38
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
void pfree(void *pointer)
Definition: mcxt.c:1169
char * Pointer
Definition: c.h:418
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1616
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
Definition: dirent.c:25
#define ERROR
Definition: elog.h:46
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2509
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:225
#define VARATT_IS_SHORT(PTR)
Definition: postgres.h:339
#define RelationIsValid(relation)
Definition: rel.h:450
dlist_head changes
void ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)
dlist_head txns_by_base_snapshot_lsn
Datum binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:159
#define IsSpecInsert(action)
ReorderBufferStreamPrepareCB stream_prepare
#define MAXPGPATH
ItemPointerData t_self
Definition: htup.h:65
ReorderBufferTupleCidKey key
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:195
#define DEBUG2
Definition: elog.h:24
static bool ReorderBufferCanStream(ReorderBuffer *rb)
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:439
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
uint32 t_len
Definition: htup.h:64
#define MaxHeapTupleSize
Definition: htup_details.h:559
void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid)
struct varlena * reconstructed
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4526
#define SET_VARTAG_EXTERNAL(PTR, tag)
Definition: postgres.h:346
uint64 XLogSegNo
Definition: xlogdefs.h:48
int errcode_for_file_access(void)
Definition: elog.c:721
HeapTupleData tuple
Definition: reorderbuffer.h:29
struct SnapshotData SnapshotData
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:456
static ReorderBufferTXN * ReorderBufferLargestTopTXN(ReorderBuffer *rb)
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
#define InvalidTransactionId
Definition: transam.h:31
#define RelationGetRelationName(relation)
Definition: rel.h:511
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:207
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
unsigned int uint32
Definition: c.h:441
XLogRecPtr final_lsn
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2720