PostgreSQL Source Code  git master
reorderbuffer.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * reorderbuffer.c
4  * PostgreSQL logical replay/reorder buffer management
5  *
6  *
7  * Copyright (c) 2012-2021, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/replication/reorderbuffer.c
12  *
13  * NOTES
14  * This module gets handed individual pieces of transactions in the order
15  * they are written to the WAL and is responsible to reassemble them into
16  * toplevel transaction sized pieces. When a transaction is completely
17  * reassembled - signaled by reading the transaction commit record - it
18  * will then call the output plugin (cf. ReorderBufferCommit()) with the
19  * individual changes. The output plugins rely on snapshots built by
20  * snapbuild.c which hands them to us.
21  *
22  * Transactions and subtransactions/savepoints in postgres are not
23  * immediately linked to each other from outside the performing
24  * backend. Only at commit/abort (or special xact_assignment records) they
25  * are linked together. Which means that we will have to splice together a
26  * toplevel transaction from its subtransactions. To do that efficiently we
27  * build a binary heap indexed by the smallest current lsn of the individual
28  * subtransactions' changestreams. As the individual streams are inherently
29  * ordered by LSN - since that is where we build them from - the transaction
30  * can easily be reassembled by always using the subtransaction with the
31  * smallest current LSN from the heap.
32  *
33  * In order to cope with large transactions - which can be several times as
34  * big as the available memory - this module supports spooling the contents
35  * of a large transactions to disk. When the transaction is replayed the
36  * contents of individual (sub-)transactions will be read from disk in
37  * chunks.
38  *
39  * This module also has to deal with reassembling toast records from the
40  * individual chunks stored in WAL. When a new (or initial) version of a
41  * tuple is stored in WAL it will always be preceded by the toast chunks
42  * emitted for the columns stored out of line. Within a single toplevel
43  * transaction there will be no other data carrying records between a row's
44  * toast chunks and the row data itself. See ReorderBufferToast* for
45  * details.
46  *
47  * ReorderBuffer uses two special memory context types - SlabContext for
48  * allocations of fixed-length structures (changes and transactions), and
49  * GenerationContext for the variable-length transaction data (allocated
50  * and freed in groups with similar lifespans).
51  *
52  * To limit the amount of memory used by decoded changes, we track memory
53  * used at the reorder buffer level (i.e. total amount of memory), and for
54  * each transaction. When the total amount of used memory exceeds the
55  * limit, the transaction consuming the most memory is then serialized to
56  * disk.
57  *
58  * Only decoded changes are evicted from memory (spilled to disk), not the
59  * transaction records. The number of toplevel transactions is limited,
60  * but a transaction with many subtransactions may still consume significant
61  * amounts of memory. However, the transaction records are fairly small and
62  * are not included in the memory limit.
63  *
64  * The current eviction algorithm is very simple - the transaction is
65  * picked merely by size, while it might be useful to also consider age
66  * (LSN) of the changes for example. With the new Generational memory
67  * allocator, evicting the oldest changes would make it more likely the
68  * memory gets actually freed.
69  *
70  * We still rely on max_changes_in_memory when loading serialized changes
71  * back into memory. At that point we can't use the memory limit directly
72  * as we load the subxacts independently. One option to deal with this
73  * would be to count the subxacts, and allow each to allocate 1/N of the
74  * memory limit. That however does not seem very appealing, because with
75  * many subtransactions it may easily cause thrashing (short cycles of
76  * deserializing and applying very few changes). We probably should give
77  * a bit more memory to the oldest subtransactions, because it's likely
78  * they are the source for the next sequence of changes.
79  *
80  * -------------------------------------------------------------------------
81  */
82 #include "postgres.h"
83 
84 #include <unistd.h>
85 #include <sys/stat.h>
86 
87 #include "access/detoast.h"
88 #include "access/heapam.h"
89 #include "access/rewriteheap.h"
90 #include "access/transam.h"
91 #include "access/xact.h"
92 #include "access/xlog_internal.h"
93 #include "catalog/catalog.h"
94 #include "lib/binaryheap.h"
95 #include "miscadmin.h"
96 #include "pgstat.h"
97 #include "replication/logical.h"
99 #include "replication/slot.h"
100 #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
101 #include "storage/bufmgr.h"
102 #include "storage/fd.h"
103 #include "storage/sinval.h"
104 #include "utils/builtins.h"
105 #include "utils/combocid.h"
106 #include "utils/memdebug.h"
107 #include "utils/memutils.h"
108 #include "utils/rel.h"
109 #include "utils/relfilenodemap.h"
110 
111 
112 /* entry for a hash table we use to map from xid to our transaction state */
114 {
118 
119 /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
121 {
125 
127 {
131  CommandId combocid; /* just for debugging */
133 
134 /* Virtual file descriptor with file offset tracking */
135 typedef struct TXNEntryFile
136 {
137  File vfd; /* -1 when the file is closed */
138  off_t curOffset; /* offset for next write or read. Reset to 0
139  * when vfd is opened. */
140 } TXNEntryFile;
141 
142 /* k-way in-order change iteration support structures */
144 {
151 
153 {
159 
160 /* toast datastructures */
161 typedef struct ReorderBufferToastEnt
162 {
163  Oid chunk_id; /* toast_table.chunk_id */
164  int32 last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
165  * have seen */
166  Size num_chunks; /* number of chunks we've already seen */
167  Size size; /* combined size of chunks seen */
168  dlist_head chunks; /* linked list of chunks */
169  struct varlena *reconstructed; /* reconstructed varlena now pointed to in
170  * main tup */
172 
173 /* Disk serialization support datastructures */
175 {
178  /* data follows */
180 
181 #define IsSpecInsert(action) \
182 ( \
183  ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \
184 )
185 #define IsSpecConfirmOrAbort(action) \
186 ( \
187  (((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) || \
188  ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT)) \
189 )
190 #define IsInsertOrUpdate(action) \
191 ( \
192  (((action) == REORDER_BUFFER_CHANGE_INSERT) || \
193  ((action) == REORDER_BUFFER_CHANGE_UPDATE) || \
194  ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT)) \
195 )
196 
197 /*
198  * Maximum number of changes kept in memory, per transaction. After that,
199  * changes are spooled to disk.
200  *
201  * The current value should be sufficient to decode the entire transaction
202  * without hitting disk in OLTP workloads, while starting to spool to disk in
203  * other workloads reasonably fast.
204  *
205  * At some point in the future it probably makes sense to have a more elaborate
206  * resource management here, but it's not entirely clear what that would look
207  * like.
208  */
210 static const Size max_changes_in_memory = 4096; /* XXX for restore only */
211 
212 /* ---------------------------------------
213  * primary reorderbuffer support routines
214  * ---------------------------------------
215  */
219  TransactionId xid, bool create, bool *is_new,
220  XLogRecPtr lsn, bool create_as_top);
222  ReorderBufferTXN *subtxn);
223 
224 static void AssertTXNLsnOrder(ReorderBuffer *rb);
225 
226 /* ---------------------------------------
227  * support functions for lsn-order iterating over the ->changes of a
228  * transaction and its subtransactions
229  *
230  * used for iteration over the k-way heap merge of a transaction and its
231  * subtransactions
232  * ---------------------------------------
233  */
235  ReorderBufferIterTXNState *volatile *iter_state);
240 
241 /*
242  * ---------------------------------------
243  * Disk serialization support functions
244  * ---------------------------------------
245  */
249  int fd, ReorderBufferChange *change);
251  TXNEntryFile *file, XLogSegNo *segno);
253  char *change);
256  bool txn_prepared);
257 static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
258 static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
259  TransactionId xid, XLogSegNo segno);
260 
261 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
264 
265 /*
266  * ---------------------------------------
267  * Streaming support functions
268  * ---------------------------------------
269  */
270 static inline bool ReorderBufferCanStream(ReorderBuffer *rb);
271 static inline bool ReorderBufferCanStartStreaming(ReorderBuffer *rb);
274 
275 /* ---------------------------------------
276  * toast reassembly support
277  * ---------------------------------------
278  */
282  Relation relation, ReorderBufferChange *change);
284  Relation relation, ReorderBufferChange *change);
285 
286 /*
287  * ---------------------------------------
288  * memory accounting
289  * ---------------------------------------
290  */
293  ReorderBufferChange *change, bool addition);
294 
295 /*
296  * Allocate a new ReorderBuffer and clean out any old serialized state from
297  * prior ReorderBuffer instances for the same slot.
298  */
301 {
302  ReorderBuffer *buffer;
303  HASHCTL hash_ctl;
304  MemoryContext new_ctx;
305 
306  Assert(MyReplicationSlot != NULL);
307 
308  /* allocate memory in own context, to have better accountability */
310  "ReorderBuffer",
312 
313  buffer =
314  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
315 
316  memset(&hash_ctl, 0, sizeof(hash_ctl));
317 
318  buffer->context = new_ctx;
319 
320  buffer->change_context = SlabContextCreate(new_ctx,
321  "Change",
323  sizeof(ReorderBufferChange));
324 
325  buffer->txn_context = SlabContextCreate(new_ctx,
326  "TXN",
328  sizeof(ReorderBufferTXN));
329 
330  buffer->tup_context = GenerationContextCreate(new_ctx,
331  "Tuples",
333 
334  hash_ctl.keysize = sizeof(TransactionId);
335  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
336  hash_ctl.hcxt = buffer->context;
337 
338  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
340 
342  buffer->by_txn_last_txn = NULL;
343 
344  buffer->outbuf = NULL;
345  buffer->outbufsize = 0;
346  buffer->size = 0;
347 
348  buffer->spillTxns = 0;
349  buffer->spillCount = 0;
350  buffer->spillBytes = 0;
351  buffer->streamTxns = 0;
352  buffer->streamCount = 0;
353  buffer->streamBytes = 0;
354  buffer->totalTxns = 0;
355  buffer->totalBytes = 0;
356 
358 
359  dlist_init(&buffer->toplevel_by_lsn);
361 
362  /*
363  * Ensure there's no stale data from prior uses of this slot, in case some
364  * prior exit avoided calling ReorderBufferFree. Failure to do this can
365  * produce duplicated txns, and it's very cheap if there's nothing there.
366  */
368 
369  return buffer;
370 }
371 
372 /*
373  * Free a ReorderBuffer
374  */
375 void
377 {
378  MemoryContext context = rb->context;
379 
380  /*
381  * We free separately allocated data by entirely scrapping reorderbuffer's
382  * memory context.
383  */
384  MemoryContextDelete(context);
385 
386  /* Free disk space used by unconsumed reorder buffers */
388 }
389 
390 /*
391  * Get an unused, possibly preallocated, ReorderBufferTXN.
392  */
393 static ReorderBufferTXN *
395 {
397 
398  txn = (ReorderBufferTXN *)
400 
401  memset(txn, 0, sizeof(ReorderBufferTXN));
402 
403  dlist_init(&txn->changes);
404  dlist_init(&txn->tuplecids);
405  dlist_init(&txn->subtxns);
406 
407  /* InvalidCommandId is not zero, so set it explicitly */
409  txn->output_plugin_private = NULL;
410 
411  return txn;
412 }
413 
414 /*
415  * Free a ReorderBufferTXN.
416  */
417 static void
419 {
420  /* clean the lookup cache if we were cached (quite likely) */
421  if (rb->by_txn_last_xid == txn->xid)
422  {
424  rb->by_txn_last_txn = NULL;
425  }
426 
427  /* free data that's contained */
428 
429  if (txn->gid != NULL)
430  {
431  pfree(txn->gid);
432  txn->gid = NULL;
433  }
434 
435  if (txn->tuplecid_hash != NULL)
436  {
438  txn->tuplecid_hash = NULL;
439  }
440 
441  if (txn->invalidations)
442  {
443  pfree(txn->invalidations);
444  txn->invalidations = NULL;
445  }
446 
447  /* Reset the toast hash */
448  ReorderBufferToastReset(rb, txn);
449 
450  pfree(txn);
451 }
452 
453 /*
454  * Get an fresh ReorderBufferChange.
455  */
458 {
459  ReorderBufferChange *change;
460 
461  change = (ReorderBufferChange *)
463 
464  memset(change, 0, sizeof(ReorderBufferChange));
465  return change;
466 }
467 
468 /*
469  * Free a ReorderBufferChange and update memory accounting, if requested.
470  */
471 void
473  bool upd_mem)
474 {
475  /* update memory accounting info */
476  if (upd_mem)
477  ReorderBufferChangeMemoryUpdate(rb, change, false);
478 
479  /* free contained data */
480  switch (change->action)
481  {
486  if (change->data.tp.newtuple)
487  {
488  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
489  change->data.tp.newtuple = NULL;
490  }
491 
492  if (change->data.tp.oldtuple)
493  {
494  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
495  change->data.tp.oldtuple = NULL;
496  }
497  break;
499  if (change->data.msg.prefix != NULL)
500  pfree(change->data.msg.prefix);
501  change->data.msg.prefix = NULL;
502  if (change->data.msg.message != NULL)
503  pfree(change->data.msg.message);
504  change->data.msg.message = NULL;
505  break;
507  if (change->data.inval.invalidations)
508  pfree(change->data.inval.invalidations);
509  change->data.inval.invalidations = NULL;
510  break;
512  if (change->data.snapshot)
513  {
514  ReorderBufferFreeSnap(rb, change->data.snapshot);
515  change->data.snapshot = NULL;
516  }
517  break;
518  /* no data in addition to the struct itself */
520  if (change->data.truncate.relids != NULL)
521  {
522  ReorderBufferReturnRelids(rb, change->data.truncate.relids);
523  change->data.truncate.relids = NULL;
524  }
525  break;
530  break;
531  }
532 
533  pfree(change);
534 }
535 
536 /*
537  * Get a fresh ReorderBufferTupleBuf fitting at least a tuple of size
538  * tuple_len (excluding header overhead).
539  */
542 {
543  ReorderBufferTupleBuf *tuple;
544  Size alloc_len;
545 
546  alloc_len = tuple_len + SizeofHeapTupleHeader;
547 
548  tuple = (ReorderBufferTupleBuf *)
550  sizeof(ReorderBufferTupleBuf) +
551  MAXIMUM_ALIGNOF + alloc_len);
552  tuple->alloc_tuple_size = alloc_len;
553  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
554 
555  return tuple;
556 }
557 
558 /*
559  * Free an ReorderBufferTupleBuf.
560  */
561 void
563 {
564  pfree(tuple);
565 }
566 
567 /*
568  * Get an array for relids of truncated relations.
569  *
570  * We use the global memory context (for the whole reorder buffer), because
571  * none of the existing ones seems like a good match (some are SLAB, so we
572  * can't use those, and tup_context is meant for tuple data, not relids). We
573  * could add yet another context, but it seems like an overkill - TRUNCATE is
574  * not particularly common operation, so it does not seem worth it.
575  */
576 Oid *
578 {
579  Oid *relids;
580  Size alloc_len;
581 
582  alloc_len = sizeof(Oid) * nrelids;
583 
584  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
585 
586  return relids;
587 }
588 
589 /*
590  * Free an array of relids.
591  */
592 void
594 {
595  pfree(relids);
596 }
597 
598 /*
599  * Return the ReorderBufferTXN from the given buffer, specified by Xid.
600  * If create is true, and a transaction doesn't already exist, create it
601  * (with the given LSN, and as top transaction if that's specified);
602  * when this happens, is_new is set to true.
603  */
604 static ReorderBufferTXN *
606  bool *is_new, XLogRecPtr lsn, bool create_as_top)
607 {
610  bool found;
611 
613 
614  /*
615  * Check the one-entry lookup cache first
616  */
618  rb->by_txn_last_xid == xid)
619  {
620  txn = rb->by_txn_last_txn;
621 
622  if (txn != NULL)
623  {
624  /* found it, and it's valid */
625  if (is_new)
626  *is_new = false;
627  return txn;
628  }
629 
630  /*
631  * cached as non-existent, and asked not to create? Then nothing else
632  * to do.
633  */
634  if (!create)
635  return NULL;
636  /* otherwise fall through to create it */
637  }
638 
639  /*
640  * If the cache wasn't hit or it yielded an "does-not-exist" and we want
641  * to create an entry.
642  */
643 
644  /* search the lookup table */
645  ent = (ReorderBufferTXNByIdEnt *)
646  hash_search(rb->by_txn,
647  (void *) &xid,
648  create ? HASH_ENTER : HASH_FIND,
649  &found);
650  if (found)
651  txn = ent->txn;
652  else if (create)
653  {
654  /* initialize the new entry, if creation was requested */
655  Assert(ent != NULL);
656  Assert(lsn != InvalidXLogRecPtr);
657 
658  ent->txn = ReorderBufferGetTXN(rb);
659  ent->txn->xid = xid;
660  txn = ent->txn;
661  txn->first_lsn = lsn;
663 
664  if (create_as_top)
665  {
666  dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
667  AssertTXNLsnOrder(rb);
668  }
669  }
670  else
671  txn = NULL; /* not found and not asked to create */
672 
673  /* update cache */
674  rb->by_txn_last_xid = xid;
675  rb->by_txn_last_txn = txn;
676 
677  if (is_new)
678  *is_new = !found;
679 
680  Assert(!create || txn != NULL);
681  return txn;
682 }
683 
684 /*
685  * Record the partial change for the streaming of in-progress transactions. We
686  * can stream only complete changes so if we have a partial change like toast
687  * table insert or speculative insert then we mark such a 'txn' so that it
688  * can't be streamed. We also ensure that if the changes in such a 'txn' are
689  * above logical_decoding_work_mem threshold then we stream them as soon as we
690  * have a complete change.
691  */
692 static void
694  ReorderBufferChange *change,
695  bool toast_insert)
696 {
697  ReorderBufferTXN *toptxn;
698 
699  /*
700  * The partial changes need to be processed only while streaming
701  * in-progress transactions.
702  */
703  if (!ReorderBufferCanStream(rb))
704  return;
705 
706  /* Get the top transaction. */
707  if (txn->toptxn != NULL)
708  toptxn = txn->toptxn;
709  else
710  toptxn = txn;
711 
712  /*
713  * Indicate a partial change for toast inserts. The change will be
714  * considered as complete once we get the insert or update on the main
715  * table and we are sure that the pending toast chunks are not required
716  * anymore.
717  *
718  * If we allow streaming when there are pending toast chunks then such
719  * chunks won't be released till the insert (multi_insert) is complete and
720  * we expect the txn to have streamed all changes after streaming. This
721  * restriction is mainly to ensure the correctness of streamed
722  * transactions and it doesn't seem worth uplifting such a restriction
723  * just to allow this case because anyway we will stream the transaction
724  * once such an insert is complete.
725  */
726  if (toast_insert)
728  else if (rbtxn_has_partial_change(toptxn) &&
729  IsInsertOrUpdate(change->action) &&
730  change->data.tp.clear_toast_afterwards)
732 
733  /*
734  * Indicate a partial change for speculative inserts. The change will be
735  * considered as complete once we get the speculative confirm or abort
736  * token.
737  */
738  if (IsSpecInsert(change->action))
740  else if (rbtxn_has_partial_change(toptxn) &&
741  IsSpecConfirmOrAbort(change->action))
743 
744  /*
745  * Stream the transaction if it is serialized before and the changes are
746  * now complete in the top-level transaction.
747  *
748  * The reason for doing the streaming of such a transaction as soon as we
749  * get the complete change for it is that previously it would have reached
750  * the memory threshold and wouldn't get streamed because of incomplete
751  * changes. Delaying such transactions would increase apply lag for them.
752  */
754  !(rbtxn_has_partial_change(toptxn)) &&
755  rbtxn_is_serialized(txn))
756  ReorderBufferStreamTXN(rb, toptxn);
757 }
758 
759 /*
760  * Queue a change into a transaction so it can be replayed upon commit or will be
761  * streamed when we reach logical_decoding_work_mem threshold.
762  */
763 void
765  ReorderBufferChange *change, bool toast_insert)
766 {
768 
769  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
770 
771  /*
772  * While streaming the previous changes we have detected that the
773  * transaction is aborted. So there is no point in collecting further
774  * changes for it.
775  */
776  if (txn->concurrent_abort)
777  {
778  /*
779  * We don't need to update memory accounting for this change as we
780  * have not added it to the queue yet.
781  */
782  ReorderBufferReturnChange(rb, change, false);
783  return;
784  }
785 
786  change->lsn = lsn;
787  change->txn = txn;
788 
789  Assert(InvalidXLogRecPtr != lsn);
790  dlist_push_tail(&txn->changes, &change->node);
791  txn->nentries++;
792  txn->nentries_mem++;
793 
794  /* update memory accounting information */
795  ReorderBufferChangeMemoryUpdate(rb, change, true);
796 
797  /* process partial change */
798  ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
799 
800  /* check the memory limits and evict something if needed */
802 }
803 
804 /*
805  * A transactional message is queued to be processed upon commit and a
806  * non-transactional message gets processed immediately.
807  */
808 void
810  Snapshot snapshot, XLogRecPtr lsn,
811  bool transactional, const char *prefix,
812  Size message_size, const char *message)
813 {
814  if (transactional)
815  {
816  MemoryContext oldcontext;
817  ReorderBufferChange *change;
818 
820 
821  oldcontext = MemoryContextSwitchTo(rb->context);
822 
823  change = ReorderBufferGetChange(rb);
825  change->data.msg.prefix = pstrdup(prefix);
826  change->data.msg.message_size = message_size;
827  change->data.msg.message = palloc(message_size);
828  memcpy(change->data.msg.message, message, message_size);
829 
830  ReorderBufferQueueChange(rb, xid, lsn, change, false);
831 
832  MemoryContextSwitchTo(oldcontext);
833  }
834  else
835  {
836  ReorderBufferTXN *txn = NULL;
837  volatile Snapshot snapshot_now = snapshot;
838 
839  if (xid != InvalidTransactionId)
840  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
841 
842  /* setup snapshot to allow catalog access */
843  SetupHistoricSnapshot(snapshot_now, NULL);
844  PG_TRY();
845  {
846  rb->message(rb, txn, lsn, false, prefix, message_size, message);
847 
849  }
850  PG_CATCH();
851  {
853  PG_RE_THROW();
854  }
855  PG_END_TRY();
856  }
857 }
858 
859 /*
860  * AssertTXNLsnOrder
861  * Verify LSN ordering of transaction lists in the reorderbuffer
862  *
863  * Other LSN-related invariants are checked too.
864  *
865  * No-op if assertions are not in use.
866  */
867 static void
869 {
870 #ifdef USE_ASSERT_CHECKING
871  dlist_iter iter;
872  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
873  XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
874 
875  dlist_foreach(iter, &rb->toplevel_by_lsn)
876  {
878  iter.cur);
879 
880  /* start LSN must be set */
881  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
882 
883  /* If there is an end LSN, it must be higher than start LSN */
884  if (cur_txn->end_lsn != InvalidXLogRecPtr)
885  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
886 
887  /* Current initial LSN must be strictly higher than previous */
888  if (prev_first_lsn != InvalidXLogRecPtr)
889  Assert(prev_first_lsn < cur_txn->first_lsn);
890 
891  /* known-as-subtxn txns must not be listed */
892  Assert(!rbtxn_is_known_subxact(cur_txn));
893 
894  prev_first_lsn = cur_txn->first_lsn;
895  }
896 
898  {
900  base_snapshot_node,
901  iter.cur);
902 
903  /* base snapshot (and its LSN) must be set */
904  Assert(cur_txn->base_snapshot != NULL);
906 
907  /* current LSN must be strictly higher than previous */
908  if (prev_base_snap_lsn != InvalidXLogRecPtr)
909  Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
910 
911  /* known-as-subtxn txns must not be listed */
912  Assert(!rbtxn_is_known_subxact(cur_txn));
913 
914  prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
915  }
916 #endif
917 }
918 
919 /*
920  * AssertChangeLsnOrder
921  *
922  * Check ordering of changes in the (sub)transaction.
923  */
924 static void
926 {
927 #ifdef USE_ASSERT_CHECKING
928  dlist_iter iter;
929  XLogRecPtr prev_lsn = txn->first_lsn;
930 
931  dlist_foreach(iter, &txn->changes)
932  {
933  ReorderBufferChange *cur_change;
934 
935  cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
936 
938  Assert(cur_change->lsn != InvalidXLogRecPtr);
939  Assert(txn->first_lsn <= cur_change->lsn);
940 
941  if (txn->end_lsn != InvalidXLogRecPtr)
942  Assert(cur_change->lsn <= txn->end_lsn);
943 
944  Assert(prev_lsn <= cur_change->lsn);
945 
946  prev_lsn = cur_change->lsn;
947  }
948 #endif
949 }
950 
951 /*
952  * ReorderBufferGetOldestTXN
953  * Return oldest transaction in reorderbuffer
954  */
957 {
959 
960  AssertTXNLsnOrder(rb);
961 
963  return NULL;
964 
966 
969  return txn;
970 }
971 
972 /*
973  * ReorderBufferGetOldestXmin
974  * Return oldest Xmin in reorderbuffer
975  *
976  * Returns oldest possibly running Xid from the point of view of snapshots
977  * used in the transactions kept by reorderbuffer, or InvalidTransactionId if
978  * there are none.
979  *
980  * Since snapshots are assigned monotonically, this equals the Xmin of the
981  * base snapshot with minimal base_snapshot_lsn.
982  */
985 {
987 
988  AssertTXNLsnOrder(rb);
989 
991  return InvalidTransactionId;
992 
993  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
995  return txn->base_snapshot->xmin;
996 }
997 
998 void
1000 {
1001  rb->current_restart_decoding_lsn = ptr;
1002 }
1003 
1004 /*
1005  * ReorderBufferAssignChild
1006  *
1007  * Make note that we know that subxid is a subtransaction of xid, seen as of
1008  * the given lsn.
1009  */
1010 void
1012  TransactionId subxid, XLogRecPtr lsn)
1013 {
1015  ReorderBufferTXN *subtxn;
1016  bool new_top;
1017  bool new_sub;
1018 
1019  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1020  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1021 
1022  if (!new_sub)
1023  {
1024  if (rbtxn_is_known_subxact(subtxn))
1025  {
1026  /* already associated, nothing to do */
1027  return;
1028  }
1029  else
1030  {
1031  /*
1032  * We already saw this transaction, but initially added it to the
1033  * list of top-level txns. Now that we know it's not top-level,
1034  * remove it from there.
1035  */
1036  dlist_delete(&subtxn->node);
1037  }
1038  }
1039 
1040  subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1041  subtxn->toplevel_xid = xid;
1042  Assert(subtxn->nsubtxns == 0);
1043 
1044  /* set the reference to top-level transaction */
1045  subtxn->toptxn = txn;
1046 
1047  /* add to subtransaction list */
1048  dlist_push_tail(&txn->subtxns, &subtxn->node);
1049  txn->nsubtxns++;
1050 
1051  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1052  ReorderBufferTransferSnapToParent(txn, subtxn);
1053 
1054  /* Verify LSN-ordering invariant */
1055  AssertTXNLsnOrder(rb);
1056 }
1057 
1058 /*
1059  * ReorderBufferTransferSnapToParent
1060  * Transfer base snapshot from subtxn to top-level txn, if needed
1061  *
1062  * This is done if the top-level txn doesn't have a base snapshot, or if the
1063  * subtxn's base snapshot has an earlier LSN than the top-level txn's base
1064  * snapshot's LSN. This can happen if there are no changes in the toplevel
1065  * txn but there are some in the subtxn, or the first change in subtxn has
1066  * earlier LSN than first change in the top-level txn and we learned about
1067  * their kinship only now.
1068  *
1069  * The subtransaction's snapshot is cleared regardless of the transfer
1070  * happening, since it's not needed anymore in either case.
1071  *
1072  * We do this as soon as we become aware of their kinship, to avoid queueing
1073  * extra snapshots to txns known-as-subtxns -- only top-level txns will
1074  * receive further snapshots.
1075  */
1076 static void
1078  ReorderBufferTXN *subtxn)
1079 {
1080  Assert(subtxn->toplevel_xid == txn->xid);
1081 
1082  if (subtxn->base_snapshot != NULL)
1083  {
1084  if (txn->base_snapshot == NULL ||
1085  subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
1086  {
1087  /*
1088  * If the toplevel transaction already has a base snapshot but
1089  * it's newer than the subxact's, purge it.
1090  */
1091  if (txn->base_snapshot != NULL)
1092  {
1095  }
1096 
1097  /*
1098  * The snapshot is now the top transaction's; transfer it, and
1099  * adjust the list position of the top transaction in the list by
1100  * moving it to where the subtransaction is.
1101  */
1102  txn->base_snapshot = subtxn->base_snapshot;
1103  txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
1105  &txn->base_snapshot_node);
1106 
1107  /*
1108  * The subtransaction doesn't have a snapshot anymore (so it
1109  * mustn't be in the list.)
1110  */
1111  subtxn->base_snapshot = NULL;
1113  dlist_delete(&subtxn->base_snapshot_node);
1114  }
1115  else
1116  {
1117  /* Base snap of toplevel is fine, so subxact's is not needed */
1119  dlist_delete(&subtxn->base_snapshot_node);
1120  subtxn->base_snapshot = NULL;
1122  }
1123  }
1124 }
1125 
1126 /*
1127  * Associate a subtransaction with its toplevel transaction at commit
1128  * time. There may be no further changes added after this.
1129  */
1130 void
1132  TransactionId subxid, XLogRecPtr commit_lsn,
1133  XLogRecPtr end_lsn)
1134 {
1135  ReorderBufferTXN *subtxn;
1136 
1137  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1138  InvalidXLogRecPtr, false);
1139 
1140  /*
1141  * No need to do anything if that subtxn didn't contain any changes
1142  */
1143  if (!subtxn)
1144  return;
1145 
1146  subtxn->final_lsn = commit_lsn;
1147  subtxn->end_lsn = end_lsn;
1148 
1149  /*
1150  * Assign this subxact as a child of the toplevel xact (no-op if already
1151  * done.)
1152  */
1153  ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
1154 }
1155 
1156 
1157 /*
1158  * Support for efficiently iterating over a transaction's and its
1159  * subtransactions' changes.
1160  *
1161  * We do by doing a k-way merge between transactions/subtransactions. For that
1162  * we model the current heads of the different transactions as a binary heap
1163  * so we easily know which (sub-)transaction has the change with the smallest
1164  * lsn next.
1165  *
1166  * We assume the changes in individual transactions are already sorted by LSN.
1167  */
1168 
1169 /*
1170  * Binary heap comparison function.
1171  */
1172 static int
1174 {
1176  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
1177  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
1178 
1179  if (pos_a < pos_b)
1180  return 1;
1181  else if (pos_a == pos_b)
1182  return 0;
1183  return -1;
1184 }
1185 
1186 /*
1187  * Allocate & initialize an iterator which iterates in lsn order over a
1188  * transaction and all its subtransactions.
1189  *
1190  * Note: The iterator state is returned through iter_state parameter rather
1191  * than the function's return value. This is because the state gets cleaned up
1192  * in a PG_CATCH block in the caller, so we want to make sure the caller gets
1193  * back the state even if this function throws an exception.
1194  */
1195 static void
1197  ReorderBufferIterTXNState *volatile *iter_state)
1198 {
1199  Size nr_txns = 0;
1201  dlist_iter cur_txn_i;
1202  int32 off;
1203 
1204  *iter_state = NULL;
1205 
1206  /* Check ordering of changes in the toplevel transaction. */
1207  AssertChangeLsnOrder(txn);
1208 
1209  /*
1210  * Calculate the size of our heap: one element for every transaction that
1211  * contains changes. (Besides the transactions already in the reorder
1212  * buffer, we count the one we were directly passed.)
1213  */
1214  if (txn->nentries > 0)
1215  nr_txns++;
1216 
1217  dlist_foreach(cur_txn_i, &txn->subtxns)
1218  {
1219  ReorderBufferTXN *cur_txn;
1220 
1221  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1222 
1223  /* Check ordering of changes in this subtransaction. */
1224  AssertChangeLsnOrder(cur_txn);
1225 
1226  if (cur_txn->nentries > 0)
1227  nr_txns++;
1228  }
1229 
1230  /* allocate iteration state */
1231  state = (ReorderBufferIterTXNState *)
1233  sizeof(ReorderBufferIterTXNState) +
1234  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1235 
1236  state->nr_txns = nr_txns;
1237  dlist_init(&state->old_change);
1238 
1239  for (off = 0; off < state->nr_txns; off++)
1240  {
1241  state->entries[off].file.vfd = -1;
1242  state->entries[off].segno = 0;
1243  }
1244 
1245  /* allocate heap */
1246  state->heap = binaryheap_allocate(state->nr_txns,
1248  state);
1249 
1250  /* Now that the state fields are initialized, it is safe to return it. */
1251  *iter_state = state;
1252 
1253  /*
1254  * Now insert items into the binary heap, in an unordered fashion. (We
1255  * will run a heap assembly step at the end; this is more efficient.)
1256  */
1257 
1258  off = 0;
1259 
1260  /* add toplevel transaction if it contains changes */
1261  if (txn->nentries > 0)
1262  {
1263  ReorderBufferChange *cur_change;
1264 
1265  if (rbtxn_is_serialized(txn))
1266  {
1267  /* serialize remaining changes */
1268  ReorderBufferSerializeTXN(rb, txn);
1269  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1270  &state->entries[off].segno);
1271  }
1272 
1273  cur_change = dlist_head_element(ReorderBufferChange, node,
1274  &txn->changes);
1275 
1276  state->entries[off].lsn = cur_change->lsn;
1277  state->entries[off].change = cur_change;
1278  state->entries[off].txn = txn;
1279 
1281  }
1282 
1283  /* add subtransactions if they contain changes */
1284  dlist_foreach(cur_txn_i, &txn->subtxns)
1285  {
1286  ReorderBufferTXN *cur_txn;
1287 
1288  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1289 
1290  if (cur_txn->nentries > 0)
1291  {
1292  ReorderBufferChange *cur_change;
1293 
1294  if (rbtxn_is_serialized(cur_txn))
1295  {
1296  /* serialize remaining changes */
1297  ReorderBufferSerializeTXN(rb, cur_txn);
1298  ReorderBufferRestoreChanges(rb, cur_txn,
1299  &state->entries[off].file,
1300  &state->entries[off].segno);
1301  }
1302  cur_change = dlist_head_element(ReorderBufferChange, node,
1303  &cur_txn->changes);
1304 
1305  state->entries[off].lsn = cur_change->lsn;
1306  state->entries[off].change = cur_change;
1307  state->entries[off].txn = cur_txn;
1308 
1310  }
1311  }
1312 
1313  /* assemble a valid binary heap */
1314  binaryheap_build(state->heap);
1315 }
1316 
1317 /*
1318  * Return the next change when iterating over a transaction and its
1319  * subtransactions.
1320  *
1321  * Returns NULL when no further changes exist.
1322  */
1323 static ReorderBufferChange *
1325 {
1326  ReorderBufferChange *change;
1328  int32 off;
1329 
1330  /* nothing there anymore */
1331  if (state->heap->bh_size == 0)
1332  return NULL;
1333 
1334  off = DatumGetInt32(binaryheap_first(state->heap));
1335  entry = &state->entries[off];
1336 
1337  /* free memory we might have "leaked" in the previous *Next call */
1338  if (!dlist_is_empty(&state->old_change))
1339  {
1340  change = dlist_container(ReorderBufferChange, node,
1341  dlist_pop_head_node(&state->old_change));
1342  ReorderBufferReturnChange(rb, change, true);
1343  Assert(dlist_is_empty(&state->old_change));
1344  }
1345 
1346  change = entry->change;
1347 
1348  /*
1349  * update heap with information about which transaction has the next
1350  * relevant change in LSN order
1351  */
1352 
1353  /* there are in-memory changes */
1354  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1355  {
1356  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1357  ReorderBufferChange *next_change =
1358  dlist_container(ReorderBufferChange, node, next);
1359 
1360  /* txn stays the same */
1361  state->entries[off].lsn = next_change->lsn;
1362  state->entries[off].change = next_change;
1363 
1365  return change;
1366  }
1367 
1368  /* try to load changes from disk */
1369  if (entry->txn->nentries != entry->txn->nentries_mem)
1370  {
1371  /*
1372  * Ugly: restoring changes will reuse *Change records, thus delete the
1373  * current one from the per-tx list and only free in the next call.
1374  */
1375  dlist_delete(&change->node);
1376  dlist_push_tail(&state->old_change, &change->node);
1377 
1378  /*
1379  * Update the total bytes processed by the txn for which we are
1380  * releasing the current set of changes and restoring the new set of
1381  * changes.
1382  */
1383  rb->totalBytes += entry->txn->size;
1384  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1385  &state->entries[off].segno))
1386  {
1387  /* successfully restored changes from disk */
1388  ReorderBufferChange *next_change =
1390  &entry->txn->changes);
1391 
1392  elog(DEBUG2, "restored %u/%u changes from disk",
1393  (uint32) entry->txn->nentries_mem,
1394  (uint32) entry->txn->nentries);
1395 
1396  Assert(entry->txn->nentries_mem);
1397  /* txn stays the same */
1398  state->entries[off].lsn = next_change->lsn;
1399  state->entries[off].change = next_change;
1401 
1402  return change;
1403  }
1404  }
1405 
1406  /* ok, no changes there anymore, remove */
1407  binaryheap_remove_first(state->heap);
1408 
1409  return change;
1410 }
1411 
1412 /*
1413  * Deallocate the iterator
1414  */
1415 static void
1418 {
1419  int32 off;
1420 
1421  for (off = 0; off < state->nr_txns; off++)
1422  {
1423  if (state->entries[off].file.vfd != -1)
1424  FileClose(state->entries[off].file.vfd);
1425  }
1426 
1427  /* free memory we might have "leaked" in the last *Next call */
1428  if (!dlist_is_empty(&state->old_change))
1429  {
1430  ReorderBufferChange *change;
1431 
1432  change = dlist_container(ReorderBufferChange, node,
1433  dlist_pop_head_node(&state->old_change));
1434  ReorderBufferReturnChange(rb, change, true);
1435  Assert(dlist_is_empty(&state->old_change));
1436  }
1437 
1438  binaryheap_free(state->heap);
1439  pfree(state);
1440 }
1441 
1442 /*
1443  * Cleanup the contents of a transaction, usually after the transaction
1444  * committed or aborted.
1445  */
1446 static void
1448 {
1449  bool found;
1450  dlist_mutable_iter iter;
1451 
1452  /* cleanup subtransactions & their changes */
1453  dlist_foreach_modify(iter, &txn->subtxns)
1454  {
1455  ReorderBufferTXN *subtxn;
1456 
1457  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1458 
1459  /*
1460  * Subtransactions are always associated to the toplevel TXN, even if
1461  * they originally were happening inside another subtxn, so we won't
1462  * ever recurse more than one level deep here.
1463  */
1464  Assert(rbtxn_is_known_subxact(subtxn));
1465  Assert(subtxn->nsubtxns == 0);
1466 
1467  ReorderBufferCleanupTXN(rb, subtxn);
1468  }
1469 
1470  /* cleanup changes in the txn */
1471  dlist_foreach_modify(iter, &txn->changes)
1472  {
1473  ReorderBufferChange *change;
1474 
1475  change = dlist_container(ReorderBufferChange, node, iter.cur);
1476 
1477  /* Check we're not mixing changes from different transactions. */
1478  Assert(change->txn == txn);
1479 
1480  ReorderBufferReturnChange(rb, change, true);
1481  }
1482 
1483  /*
1484  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1485  * They are always stored in the toplevel transaction.
1486  */
1487  dlist_foreach_modify(iter, &txn->tuplecids)
1488  {
1489  ReorderBufferChange *change;
1490 
1491  change = dlist_container(ReorderBufferChange, node, iter.cur);
1492 
1493  /* Check we're not mixing changes from different transactions. */
1494  Assert(change->txn == txn);
1496 
1497  ReorderBufferReturnChange(rb, change, true);
1498  }
1499 
1500  /*
1501  * Cleanup the base snapshot, if set.
1502  */
1503  if (txn->base_snapshot != NULL)
1504  {
1507  }
1508 
1509  /*
1510  * Cleanup the snapshot for the last streamed run.
1511  */
1512  if (txn->snapshot_now != NULL)
1513  {
1514  Assert(rbtxn_is_streamed(txn));
1516  }
1517 
1518  /*
1519  * Remove TXN from its containing list.
1520  *
1521  * Note: if txn is known as subxact, we are deleting the TXN from its
1522  * parent's list of known subxacts; this leaves the parent's nsubxacts
1523  * count too high, but we don't care. Otherwise, we are deleting the TXN
1524  * from the LSN-ordered list of toplevel TXNs.
1525  */
1526  dlist_delete(&txn->node);
1527 
1528  /* now remove reference from buffer */
1529  hash_search(rb->by_txn,
1530  (void *) &txn->xid,
1531  HASH_REMOVE,
1532  &found);
1533  Assert(found);
1534 
1535  /* remove entries spilled to disk */
1536  if (rbtxn_is_serialized(txn))
1537  ReorderBufferRestoreCleanup(rb, txn);
1538 
1539  /* deallocate */
1540  ReorderBufferReturnTXN(rb, txn);
1541 }
1542 
1543 /*
1544  * Discard changes from a transaction (and subtransactions), either after
1545  * streaming or decoding them at PREPARE. Keep the remaining info -
1546  * transactions, tuplecids, invalidations and snapshots.
1547  *
1548  * We additionally remove tuplecids after decoding the transaction at prepare
1549  * time as we only need to perform invalidation at rollback or commit prepared.
1550  *
1551  * 'txn_prepared' indicates that we have decoded the transaction at prepare
1552  * time.
1553  */
1554 static void
1556 {
1557  dlist_mutable_iter iter;
1558 
1559  /* cleanup subtransactions & their changes */
1560  dlist_foreach_modify(iter, &txn->subtxns)
1561  {
1562  ReorderBufferTXN *subtxn;
1563 
1564  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1565 
1566  /*
1567  * Subtransactions are always associated to the toplevel TXN, even if
1568  * they originally were happening inside another subtxn, so we won't
1569  * ever recurse more than one level deep here.
1570  */
1571  Assert(rbtxn_is_known_subxact(subtxn));
1572  Assert(subtxn->nsubtxns == 0);
1573 
1574  ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
1575  }
1576 
1577  /* cleanup changes in the txn */
1578  dlist_foreach_modify(iter, &txn->changes)
1579  {
1580  ReorderBufferChange *change;
1581 
1582  change = dlist_container(ReorderBufferChange, node, iter.cur);
1583 
1584  /* Check we're not mixing changes from different transactions. */
1585  Assert(change->txn == txn);
1586 
1587  /* remove the change from it's containing list */
1588  dlist_delete(&change->node);
1589 
1590  ReorderBufferReturnChange(rb, change, true);
1591  }
1592 
1593  /*
1594  * Mark the transaction as streamed.
1595  *
1596  * The toplevel transaction, identified by (toptxn==NULL), is marked as
1597  * streamed always, even if it does not contain any changes (that is, when
1598  * all the changes are in subtransactions).
1599  *
1600  * For subtransactions, we only mark them as streamed when there are
1601  * changes in them.
1602  *
1603  * We do it this way because of aborts - we don't want to send aborts for
1604  * XIDs the downstream is not aware of. And of course, it always knows
1605  * about the toplevel xact (we send the XID in all messages), but we never
1606  * stream XIDs of empty subxacts.
1607  */
1608  if ((!txn_prepared) && ((!txn->toptxn) || (txn->nentries_mem != 0)))
1609  txn->txn_flags |= RBTXN_IS_STREAMED;
1610 
1611  if (txn_prepared)
1612  {
1613  /*
1614  * If this is a prepared txn, cleanup the tuplecids we stored for
1615  * decoding catalog snapshot access. They are always stored in the
1616  * toplevel transaction.
1617  */
1618  dlist_foreach_modify(iter, &txn->tuplecids)
1619  {
1620  ReorderBufferChange *change;
1621 
1622  change = dlist_container(ReorderBufferChange, node, iter.cur);
1623 
1624  /* Check we're not mixing changes from different transactions. */
1625  Assert(change->txn == txn);
1627 
1628  /* Remove the change from its containing list. */
1629  dlist_delete(&change->node);
1630 
1631  ReorderBufferReturnChange(rb, change, true);
1632  }
1633  }
1634 
1635  /*
1636  * Destroy the (relfilenode, ctid) hashtable, so that we don't leak any
1637  * memory. We could also keep the hash table and update it with new ctid
1638  * values, but this seems simpler and good enough for now.
1639  */
1640  if (txn->tuplecid_hash != NULL)
1641  {
1643  txn->tuplecid_hash = NULL;
1644  }
1645 
1646  /* If this txn is serialized then clean the disk space. */
1647  if (rbtxn_is_serialized(txn))
1648  {
1649  ReorderBufferRestoreCleanup(rb, txn);
1650  txn->txn_flags &= ~RBTXN_IS_SERIALIZED;
1651 
1652  /*
1653  * We set this flag to indicate if the transaction is ever serialized.
1654  * We need this to accurately update the stats as otherwise the same
1655  * transaction can be counted as serialized multiple times.
1656  */
1658  }
1659 
1660  /* also reset the number of entries in the transaction */
1661  txn->nentries_mem = 0;
1662  txn->nentries = 0;
1663 }
1664 
1665 /*
1666  * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by
1667  * HeapTupleSatisfiesHistoricMVCC.
1668  */
1669 static void
1671 {
1672  dlist_iter iter;
1673  HASHCTL hash_ctl;
1674 
1676  return;
1677 
1678  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1679  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1680  hash_ctl.hcxt = rb->context;
1681 
1682  /*
1683  * create the hash with the exact number of to-be-stored tuplecids from
1684  * the start
1685  */
1686  txn->tuplecid_hash =
1687  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1689 
1690  dlist_foreach(iter, &txn->tuplecids)
1691  {
1694  bool found;
1695  ReorderBufferChange *change;
1696 
1697  change = dlist_container(ReorderBufferChange, node, iter.cur);
1698 
1700 
1701  /* be careful about padding */
1702  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1703 
1704  key.relnode = change->data.tuplecid.node;
1705 
1706  ItemPointerCopy(&change->data.tuplecid.tid,
1707  &key.tid);
1708 
1709  ent = (ReorderBufferTupleCidEnt *)
1711  (void *) &key,
1712  HASH_ENTER,
1713  &found);
1714  if (!found)
1715  {
1716  ent->cmin = change->data.tuplecid.cmin;
1717  ent->cmax = change->data.tuplecid.cmax;
1718  ent->combocid = change->data.tuplecid.combocid;
1719  }
1720  else
1721  {
1722  /*
1723  * Maybe we already saw this tuple before in this transaction, but
1724  * if so it must have the same cmin.
1725  */
1726  Assert(ent->cmin == change->data.tuplecid.cmin);
1727 
1728  /*
1729  * cmax may be initially invalid, but once set it can only grow,
1730  * and never become invalid again.
1731  */
1732  Assert((ent->cmax == InvalidCommandId) ||
1733  ((change->data.tuplecid.cmax != InvalidCommandId) &&
1734  (change->data.tuplecid.cmax > ent->cmax)));
1735  ent->cmax = change->data.tuplecid.cmax;
1736  }
1737  }
1738 }
1739 
1740 /*
1741  * Copy a provided snapshot so we can modify it privately. This is needed so
1742  * that catalog modifying transactions can look into intermediate catalog
1743  * states.
1744  */
1745 static Snapshot
1748 {
1749  Snapshot snap;
1750  dlist_iter iter;
1751  int i = 0;
1752  Size size;
1753 
1754  size = sizeof(SnapshotData) +
1755  sizeof(TransactionId) * orig_snap->xcnt +
1756  sizeof(TransactionId) * (txn->nsubtxns + 1);
1757 
1758  snap = MemoryContextAllocZero(rb->context, size);
1759  memcpy(snap, orig_snap, sizeof(SnapshotData));
1760 
1761  snap->copied = true;
1762  snap->active_count = 1; /* mark as active so nobody frees it */
1763  snap->regd_count = 0;
1764  snap->xip = (TransactionId *) (snap + 1);
1765 
1766  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1767 
1768  /*
1769  * snap->subxip contains all txids that belong to our transaction which we
1770  * need to check via cmin/cmax. That's why we store the toplevel
1771  * transaction in there as well.
1772  */
1773  snap->subxip = snap->xip + snap->xcnt;
1774  snap->subxip[i++] = txn->xid;
1775 
1776  /*
1777  * subxcnt isn't decreased when subtransactions abort, so count manually.
1778  * Since it's an upper boundary it is safe to use it for the allocation
1779  * above.
1780  */
1781  snap->subxcnt = 1;
1782 
1783  dlist_foreach(iter, &txn->subtxns)
1784  {
1785  ReorderBufferTXN *sub_txn;
1786 
1787  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1788  snap->subxip[i++] = sub_txn->xid;
1789  snap->subxcnt++;
1790  }
1791 
1792  /* sort so we can bsearch() later */
1793  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1794 
1795  /* store the specified current CommandId */
1796  snap->curcid = cid;
1797 
1798  return snap;
1799 }
1800 
1801 /*
1802  * Free a previously ReorderBufferCopySnap'ed snapshot
1803  */
1804 static void
1806 {
1807  if (snap->copied)
1808  pfree(snap);
1809  else
1811 }
1812 
1813 /*
1814  * If the transaction was (partially) streamed, we need to prepare or commit
1815  * it in a 'streamed' way. That is, we first stream the remaining part of the
1816  * transaction, and then invoke stream_prepare or stream_commit message as per
1817  * the case.
1818  */
1819 static void
1821 {
1822  /* we should only call this for previously streamed transactions */
1823  Assert(rbtxn_is_streamed(txn));
1824 
1825  ReorderBufferStreamTXN(rb, txn);
1826 
1827  if (rbtxn_prepared(txn))
1828  {
1829  /*
1830  * Note, we send stream prepare even if a concurrent abort is
1831  * detected. See DecodePrepare for more information.
1832  */
1833  rb->stream_prepare(rb, txn, txn->final_lsn);
1834 
1835  /*
1836  * This is a PREPARED transaction, part of a two-phase commit. The
1837  * full cleanup will happen as part of the COMMIT PREPAREDs, so now
1838  * just truncate txn by removing changes and tuple_cids.
1839  */
1840  ReorderBufferTruncateTXN(rb, txn, true);
1841  /* Reset the CheckXidAlive */
1843  }
1844  else
1845  {
1846  rb->stream_commit(rb, txn, txn->final_lsn);
1847  ReorderBufferCleanupTXN(rb, txn);
1848  }
1849 }
1850 
1851 /*
1852  * Set xid to detect concurrent aborts.
1853  *
1854  * While streaming an in-progress transaction or decoding a prepared
1855  * transaction there is a possibility that the (sub)transaction might get
1856  * aborted concurrently. In such case if the (sub)transaction has catalog
1857  * update then we might decode the tuple using wrong catalog version. For
1858  * example, suppose there is one catalog tuple with (xmin: 500, xmax: 0). Now,
1859  * the transaction 501 updates the catalog tuple and after that we will have
1860  * two tuples (xmin: 500, xmax: 501) and (xmin: 501, xmax: 0). Now, if 501 is
1861  * aborted and some other transaction say 502 updates the same catalog tuple
1862  * then the first tuple will be changed to (xmin: 500, xmax: 502). So, the
1863  * problem is that when we try to decode the tuple inserted/updated in 501
1864  * after the catalog update, we will see the catalog tuple with (xmin: 500,
1865  * xmax: 502) as visible because it will consider that the tuple is deleted by
1866  * xid 502 which is not visible to our snapshot. And when we will try to
1867  * decode with that catalog tuple, it can lead to a wrong result or a crash.
1868  * So, it is necessary to detect concurrent aborts to allow streaming of
1869  * in-progress transactions or decoding of prepared transactions.
1870  *
1871  * For detecting the concurrent abort we set CheckXidAlive to the current
1872  * (sub)transaction's xid for which this change belongs to. And, during
1873  * catalog scan we can check the status of the xid and if it is aborted we will
1874  * report a specific error so that we can stop streaming current transaction
1875  * and discard the already streamed changes on such an error. We might have
1876  * already streamed some of the changes for the aborted (sub)transaction, but
1877  * that is fine because when we decode the abort we will stream abort message
1878  * to truncate the changes in the subscriber. Similarly, for prepared
1879  * transactions, we stop decoding if concurrent abort is detected and then
1880  * rollback the changes when rollback prepared is encountered. See
1881  * DecodePrepare.
1882  */
1883 static inline void
1885 {
1886  /*
1887  * If the input transaction id is already set as a CheckXidAlive then
1888  * nothing to do.
1889  */
1891  return;
1892 
1893  /*
1894  * setup CheckXidAlive if it's not committed yet. We don't check if the
1895  * xid is aborted. That will happen during catalog access.
1896  */
1897  if (!TransactionIdDidCommit(xid))
1898  CheckXidAlive = xid;
1899  else
1901 }
1902 
1903 /*
1904  * Helper function for ReorderBufferProcessTXN for applying change.
1905  */
1906 static inline void
1908  Relation relation, ReorderBufferChange *change,
1909  bool streaming)
1910 {
1911  if (streaming)
1912  rb->stream_change(rb, txn, relation, change);
1913  else
1914  rb->apply_change(rb, txn, relation, change);
1915 }
1916 
1917 /*
1918  * Helper function for ReorderBufferProcessTXN for applying the truncate.
1919  */
1920 static inline void
1922  int nrelations, Relation *relations,
1923  ReorderBufferChange *change, bool streaming)
1924 {
1925  if (streaming)
1926  rb->stream_truncate(rb, txn, nrelations, relations, change);
1927  else
1928  rb->apply_truncate(rb, txn, nrelations, relations, change);
1929 }
1930 
1931 /*
1932  * Helper function for ReorderBufferProcessTXN for applying the message.
1933  */
1934 static inline void
1936  ReorderBufferChange *change, bool streaming)
1937 {
1938  if (streaming)
1939  rb->stream_message(rb, txn, change->lsn, true,
1940  change->data.msg.prefix,
1941  change->data.msg.message_size,
1942  change->data.msg.message);
1943  else
1944  rb->message(rb, txn, change->lsn, true,
1945  change->data.msg.prefix,
1946  change->data.msg.message_size,
1947  change->data.msg.message);
1948 }
1949 
1950 /*
1951  * Function to store the command id and snapshot at the end of the current
1952  * stream so that we can reuse the same while sending the next stream.
1953  */
1954 static inline void
1956  Snapshot snapshot_now, CommandId command_id)
1957 {
1958  txn->command_id = command_id;
1959 
1960  /* Avoid copying if it's already copied. */
1961  if (snapshot_now->copied)
1962  txn->snapshot_now = snapshot_now;
1963  else
1964  txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1965  txn, command_id);
1966 }
1967 
1968 /*
1969  * Helper function for ReorderBufferProcessTXN to handle the concurrent
1970  * abort of the streaming transaction. This resets the TXN such that it
1971  * can be used to stream the remaining data of transaction being processed.
1972  * This can happen when the subtransaction is aborted and we still want to
1973  * continue processing the main or other subtransactions data.
1974  */
1975 static void
1977  Snapshot snapshot_now,
1978  CommandId command_id,
1979  XLogRecPtr last_lsn,
1980  ReorderBufferChange *specinsert)
1981 {
1982  /* Discard the changes that we just streamed */
1984 
1985  /* Free all resources allocated for toast reconstruction */
1986  ReorderBufferToastReset(rb, txn);
1987 
1988  /* Return the spec insert change if it is not NULL */
1989  if (specinsert != NULL)
1990  {
1991  ReorderBufferReturnChange(rb, specinsert, true);
1992  specinsert = NULL;
1993  }
1994 
1995  /*
1996  * For the streaming case, stop the stream and remember the command ID and
1997  * snapshot for the streaming run.
1998  */
1999  if (rbtxn_is_streamed(txn))
2000  {
2001  rb->stream_stop(rb, txn, last_lsn);
2002  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2003  }
2004 }
2005 
2006 /*
2007  * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN.
2008  *
2009  * Send data of a transaction (and its subtransactions) to the
2010  * output plugin. We iterate over the top and subtransactions (using a k-way
2011  * merge) and replay the changes in lsn order.
2012  *
2013  * If streaming is true then data will be sent using stream API.
2014  *
2015  * Note: "volatile" markers on some parameters are to avoid trouble with
2016  * PG_TRY inside the function.
2017  */
2018 static void
2020  XLogRecPtr commit_lsn,
2021  volatile Snapshot snapshot_now,
2022  volatile CommandId command_id,
2023  bool streaming)
2024 {
2025  bool using_subtxn;
2027  ReorderBufferIterTXNState *volatile iterstate = NULL;
2028  volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
2029  ReorderBufferChange *volatile specinsert = NULL;
2030  volatile bool stream_started = false;
2031  ReorderBufferTXN *volatile curtxn = NULL;
2032 
2033  /* build data to be able to lookup the CommandIds of catalog tuples */
2035 
2036  /* setup the initial snapshot */
2037  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2038 
2039  /*
2040  * Decoding needs access to syscaches et al., which in turn use
2041  * heavyweight locks and such. Thus we need to have enough state around to
2042  * keep track of those. The easiest way is to simply use a transaction
2043  * internally. That also allows us to easily enforce that nothing writes
2044  * to the database by checking for xid assignments.
2045  *
2046  * When we're called via the SQL SRF there's already a transaction
2047  * started, so start an explicit subtransaction there.
2048  */
2049  using_subtxn = IsTransactionOrTransactionBlock();
2050 
2051  PG_TRY();
2052  {
2053  ReorderBufferChange *change;
2054 
2055  if (using_subtxn)
2056  BeginInternalSubTransaction(streaming ? "stream" : "replay");
2057  else
2059 
2060  /*
2061  * We only need to send begin/begin-prepare for non-streamed
2062  * transactions.
2063  */
2064  if (!streaming)
2065  {
2066  if (rbtxn_prepared(txn))
2067  rb->begin_prepare(rb, txn);
2068  else
2069  rb->begin(rb, txn);
2070  }
2071 
2072  ReorderBufferIterTXNInit(rb, txn, &iterstate);
2073  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
2074  {
2075  Relation relation = NULL;
2076  Oid reloid;
2077 
2078  /*
2079  * We can't call start stream callback before processing first
2080  * change.
2081  */
2082  if (prev_lsn == InvalidXLogRecPtr)
2083  {
2084  if (streaming)
2085  {
2086  txn->origin_id = change->origin_id;
2087  rb->stream_start(rb, txn, change->lsn);
2088  stream_started = true;
2089  }
2090  }
2091 
2092  /*
2093  * Enforce correct ordering of changes, merged from multiple
2094  * subtransactions. The changes may have the same LSN due to
2095  * MULTI_INSERT xlog records.
2096  */
2097  Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
2098 
2099  prev_lsn = change->lsn;
2100 
2101  /*
2102  * Set the current xid to detect concurrent aborts. This is
2103  * required for the cases when we decode the changes before the
2104  * COMMIT record is processed.
2105  */
2106  if (streaming || rbtxn_prepared(change->txn))
2107  {
2108  curtxn = change->txn;
2109  SetupCheckXidLive(curtxn->xid);
2110  }
2111 
2112  switch (change->action)
2113  {
2115 
2116  /*
2117  * Confirmation for speculative insertion arrived. Simply
2118  * use as a normal record. It'll be cleaned up at the end
2119  * of INSERT processing.
2120  */
2121  if (specinsert == NULL)
2122  elog(ERROR, "invalid ordering of speculative insertion changes");
2123  Assert(specinsert->data.tp.oldtuple == NULL);
2124  change = specinsert;
2126 
2127  /* intentionally fall through */
2131  Assert(snapshot_now);
2132 
2133  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
2134  change->data.tp.relnode.relNode);
2135 
2136  /*
2137  * Mapped catalog tuple without data, emitted while
2138  * catalog table was in the process of being rewritten. We
2139  * can fail to look up the relfilenode, because the
2140  * relmapper has no "historic" view, in contrast to the
2141  * normal catalog during decoding. Thus repeated rewrites
2142  * can cause a lookup failure. That's OK because we do not
2143  * decode catalog changes anyway. Normally such tuples
2144  * would be skipped over below, but we can't identify
2145  * whether the table should be logically logged without
2146  * mapping the relfilenode to the oid.
2147  */
2148  if (reloid == InvalidOid &&
2149  change->data.tp.newtuple == NULL &&
2150  change->data.tp.oldtuple == NULL)
2151  goto change_done;
2152  else if (reloid == InvalidOid)
2153  elog(ERROR, "could not map filenode \"%s\" to relation OID",
2154  relpathperm(change->data.tp.relnode,
2155  MAIN_FORKNUM));
2156 
2157  relation = RelationIdGetRelation(reloid);
2158 
2159  if (!RelationIsValid(relation))
2160  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
2161  reloid,
2162  relpathperm(change->data.tp.relnode,
2163  MAIN_FORKNUM));
2164 
2165  if (!RelationIsLogicallyLogged(relation))
2166  goto change_done;
2167 
2168  /*
2169  * Ignore temporary heaps created during DDL unless the
2170  * plugin has asked for them.
2171  */
2172  if (relation->rd_rel->relrewrite && !rb->output_rewrites)
2173  goto change_done;
2174 
2175  /*
2176  * For now ignore sequence changes entirely. Most of the
2177  * time they don't log changes using records we
2178  * understand, so it doesn't make sense to handle the few
2179  * cases we do.
2180  */
2181  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
2182  goto change_done;
2183 
2184  /* user-triggered change */
2185  if (!IsToastRelation(relation))
2186  {
2187  ReorderBufferToastReplace(rb, txn, relation, change);
2188  ReorderBufferApplyChange(rb, txn, relation, change,
2189  streaming);
2190 
2191  /*
2192  * Only clear reassembled toast chunks if we're sure
2193  * they're not required anymore. The creator of the
2194  * tuple tells us.
2195  */
2196  if (change->data.tp.clear_toast_afterwards)
2197  ReorderBufferToastReset(rb, txn);
2198  }
2199  /* we're not interested in toast deletions */
2200  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
2201  {
2202  /*
2203  * Need to reassemble the full toasted Datum in
2204  * memory, to ensure the chunks don't get reused till
2205  * we're done remove it from the list of this
2206  * transaction's changes. Otherwise it will get
2207  * freed/reused while restoring spooled data from
2208  * disk.
2209  */
2210  Assert(change->data.tp.newtuple != NULL);
2211 
2212  dlist_delete(&change->node);
2213  ReorderBufferToastAppendChunk(rb, txn, relation,
2214  change);
2215  }
2216 
2217  change_done:
2218 
2219  /*
2220  * If speculative insertion was confirmed, the record
2221  * isn't needed anymore.
2222  */
2223  if (specinsert != NULL)
2224  {
2225  ReorderBufferReturnChange(rb, specinsert, true);
2226  specinsert = NULL;
2227  }
2228 
2229  if (RelationIsValid(relation))
2230  {
2231  RelationClose(relation);
2232  relation = NULL;
2233  }
2234  break;
2235 
2237 
2238  /*
2239  * Speculative insertions are dealt with by delaying the
2240  * processing of the insert until the confirmation record
2241  * arrives. For that we simply unlink the record from the
2242  * chain, so it does not get freed/reused while restoring
2243  * spooled data from disk.
2244  *
2245  * This is safe in the face of concurrent catalog changes
2246  * because the relevant relation can't be changed between
2247  * speculative insertion and confirmation due to
2248  * CheckTableNotInUse() and locking.
2249  */
2250 
2251  /* clear out a pending (and thus failed) speculation */
2252  if (specinsert != NULL)
2253  {
2254  ReorderBufferReturnChange(rb, specinsert, true);
2255  specinsert = NULL;
2256  }
2257 
2258  /* and memorize the pending insertion */
2259  dlist_delete(&change->node);
2260  specinsert = change;
2261  break;
2262 
2264 
2265  /*
2266  * Abort for speculative insertion arrived. So cleanup the
2267  * specinsert tuple and toast hash.
2268  *
2269  * Note that we get the spec abort change for each toast
2270  * entry but we need to perform the cleanup only the first
2271  * time we get it for the main table.
2272  */
2273  if (specinsert != NULL)
2274  {
2275  /*
2276  * We must clean the toast hash before processing a
2277  * completely new tuple to avoid confusion about the
2278  * previous tuple's toast chunks.
2279  */
2280  Assert(change->data.tp.clear_toast_afterwards);
2281  ReorderBufferToastReset(rb, txn);
2282 
2283  /* We don't need this record anymore. */
2284  ReorderBufferReturnChange(rb, specinsert, true);
2285  specinsert = NULL;
2286  }
2287  break;
2288 
2290  {
2291  int i;
2292  int nrelids = change->data.truncate.nrelids;
2293  int nrelations = 0;
2294  Relation *relations;
2295 
2296  relations = palloc0(nrelids * sizeof(Relation));
2297  for (i = 0; i < nrelids; i++)
2298  {
2299  Oid relid = change->data.truncate.relids[i];
2300  Relation relation;
2301 
2302  relation = RelationIdGetRelation(relid);
2303 
2304  if (!RelationIsValid(relation))
2305  elog(ERROR, "could not open relation with OID %u", relid);
2306 
2307  if (!RelationIsLogicallyLogged(relation))
2308  continue;
2309 
2310  relations[nrelations++] = relation;
2311  }
2312 
2313  /* Apply the truncate. */
2314  ReorderBufferApplyTruncate(rb, txn, nrelations,
2315  relations, change,
2316  streaming);
2317 
2318  for (i = 0; i < nrelations; i++)
2319  RelationClose(relations[i]);
2320 
2321  break;
2322  }
2323 
2325  ReorderBufferApplyMessage(rb, txn, change, streaming);
2326  break;
2327 
2329  /* Execute the invalidation messages locally */
2331  change->data.inval.ninvalidations,
2332  change->data.inval.invalidations);
2333  break;
2334 
2336  /* get rid of the old */
2337  TeardownHistoricSnapshot(false);
2338 
2339  if (snapshot_now->copied)
2340  {
2341  ReorderBufferFreeSnap(rb, snapshot_now);
2342  snapshot_now =
2343  ReorderBufferCopySnap(rb, change->data.snapshot,
2344  txn, command_id);
2345  }
2346 
2347  /*
2348  * Restored from disk, need to be careful not to double
2349  * free. We could introduce refcounting for that, but for
2350  * now this seems infrequent enough not to care.
2351  */
2352  else if (change->data.snapshot->copied)
2353  {
2354  snapshot_now =
2355  ReorderBufferCopySnap(rb, change->data.snapshot,
2356  txn, command_id);
2357  }
2358  else
2359  {
2360  snapshot_now = change->data.snapshot;
2361  }
2362 
2363  /* and continue with the new one */
2364  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2365  break;
2366 
2368  Assert(change->data.command_id != InvalidCommandId);
2369 
2370  if (command_id < change->data.command_id)
2371  {
2372  command_id = change->data.command_id;
2373 
2374  if (!snapshot_now->copied)
2375  {
2376  /* we don't use the global one anymore */
2377  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2378  txn, command_id);
2379  }
2380 
2381  snapshot_now->curcid = command_id;
2382 
2383  TeardownHistoricSnapshot(false);
2384  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2385  }
2386 
2387  break;
2388 
2390  elog(ERROR, "tuplecid value in changequeue");
2391  break;
2392  }
2393  }
2394 
2395  /* speculative insertion record must be freed by now */
2396  Assert(!specinsert);
2397 
2398  /* clean up the iterator */
2399  ReorderBufferIterTXNFinish(rb, iterstate);
2400  iterstate = NULL;
2401 
2402  /*
2403  * Update total transaction count and total bytes processed by the
2404  * transaction and its subtransactions. Ensure to not count the
2405  * streamed transaction multiple times.
2406  *
2407  * Note that the statistics computation has to be done after
2408  * ReorderBufferIterTXNFinish as it releases the serialized change
2409  * which we have already accounted in ReorderBufferIterTXNNext.
2410  */
2411  if (!rbtxn_is_streamed(txn))
2412  rb->totalTxns++;
2413 
2414  rb->totalBytes += txn->total_size;
2415 
2416  /*
2417  * Done with current changes, send the last message for this set of
2418  * changes depending upon streaming mode.
2419  */
2420  if (streaming)
2421  {
2422  if (stream_started)
2423  {
2424  rb->stream_stop(rb, txn, prev_lsn);
2425  stream_started = false;
2426  }
2427  }
2428  else
2429  {
2430  /*
2431  * Call either PREPARE (for two-phase transactions) or COMMIT (for
2432  * regular ones).
2433  */
2434  if (rbtxn_prepared(txn))
2435  rb->prepare(rb, txn, commit_lsn);
2436  else
2437  rb->commit(rb, txn, commit_lsn);
2438  }
2439 
2440  /* this is just a sanity check against bad output plugin behaviour */
2442  elog(ERROR, "output plugin used XID %u",
2444 
2445  /*
2446  * Remember the command ID and snapshot for the next set of changes in
2447  * streaming mode.
2448  */
2449  if (streaming)
2450  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2451  else if (snapshot_now->copied)
2452  ReorderBufferFreeSnap(rb, snapshot_now);
2453 
2454  /* cleanup */
2455  TeardownHistoricSnapshot(false);
2456 
2457  /*
2458  * Aborting the current (sub-)transaction as a whole has the right
2459  * semantics. We want all locks acquired in here to be released, not
2460  * reassigned to the parent and we do not want any database access
2461  * have persistent effects.
2462  */
2464 
2465  /* make sure there's no cache pollution */
2467 
2468  if (using_subtxn)
2470 
2471  /*
2472  * We are here due to one of the four reasons: 1. Decoding an
2473  * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
2474  * prepared txn that was (partially) streamed. 4. Decoding a committed
2475  * txn.
2476  *
2477  * For 1, we allow truncation of txn data by removing the changes
2478  * already streamed but still keeping other things like invalidations,
2479  * snapshot, and tuplecids. For 2 and 3, we indicate
2480  * ReorderBufferTruncateTXN to do more elaborate truncation of txn
2481  * data as the entire transaction has been decoded except for commit.
2482  * For 4, as the entire txn has been decoded, we can fully clean up
2483  * the TXN reorder buffer.
2484  */
2485  if (streaming || rbtxn_prepared(txn))
2486  {
2488  /* Reset the CheckXidAlive */
2490  }
2491  else
2492  ReorderBufferCleanupTXN(rb, txn);
2493  }
2494  PG_CATCH();
2495  {
2496  MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
2497  ErrorData *errdata = CopyErrorData();
2498 
2499  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
2500  if (iterstate)
2501  ReorderBufferIterTXNFinish(rb, iterstate);
2502 
2504 
2505  /*
2506  * Force cache invalidation to happen outside of a valid transaction
2507  * to prevent catalog access as we just caught an error.
2508  */
2510 
2511  /* make sure there's no cache pollution */
2513  txn->invalidations);
2514 
2515  if (using_subtxn)
2517 
2518  /*
2519  * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
2520  * abort of the (sub)transaction we are streaming or preparing. We
2521  * need to do the cleanup and return gracefully on this error, see
2522  * SetupCheckXidLive.
2523  *
2524  * This error code can be thrown by one of the callbacks we call
2525  * during decoding so we need to ensure that we return gracefully only
2526  * when we are sending the data in streaming mode and the streaming is
2527  * not finished yet or when we are sending the data out on a PREPARE
2528  * during a two-phase commit.
2529  */
2530  if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
2531  (stream_started || rbtxn_prepared(txn)))
2532  {
2533  /* curtxn must be set for streaming or prepared transactions */
2534  Assert(curtxn);
2535 
2536  /* Cleanup the temporary error state. */
2537  FlushErrorState();
2538  FreeErrorData(errdata);
2539  errdata = NULL;
2540  curtxn->concurrent_abort = true;
2541 
2542  /* Reset the TXN so that it is allowed to stream remaining data. */
2543  ReorderBufferResetTXN(rb, txn, snapshot_now,
2544  command_id, prev_lsn,
2545  specinsert);
2546  }
2547  else
2548  {
2549  ReorderBufferCleanupTXN(rb, txn);
2550  MemoryContextSwitchTo(ecxt);
2551  PG_RE_THROW();
2552  }
2553  }
2554  PG_END_TRY();
2555 }
2556 
2557 /*
2558  * Perform the replay of a transaction and its non-aborted subtransactions.
2559  *
2560  * Subtransactions previously have to be processed by
2561  * ReorderBufferCommitChild(), even if previously assigned to the toplevel
2562  * transaction with ReorderBufferAssignChild.
2563  *
2564  * This interface is called once a prepare or toplevel commit is read for both
2565  * streamed as well as non-streamed transactions.
2566  */
2567 static void
2570  XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2571  TimestampTz commit_time,
2572  RepOriginId origin_id, XLogRecPtr origin_lsn)
2573 {
2574  Snapshot snapshot_now;
2575  CommandId command_id = FirstCommandId;
2576 
2577  txn->final_lsn = commit_lsn;
2578  txn->end_lsn = end_lsn;
2579  txn->xact_time.commit_time = commit_time;
2580  txn->origin_id = origin_id;
2581  txn->origin_lsn = origin_lsn;
2582 
2583  /*
2584  * If the transaction was (partially) streamed, we need to commit it in a
2585  * 'streamed' way. That is, we first stream the remaining part of the
2586  * transaction, and then invoke stream_commit message.
2587  *
2588  * Called after everything (origin ID, LSN, ...) is stored in the
2589  * transaction to avoid passing that information directly.
2590  */
2591  if (rbtxn_is_streamed(txn))
2592  {
2593  ReorderBufferStreamCommit(rb, txn);
2594  return;
2595  }
2596 
2597  /*
2598  * If this transaction has no snapshot, it didn't make any changes to the
2599  * database, so there's nothing to decode. Note that
2600  * ReorderBufferCommitChild will have transferred any snapshots from
2601  * subtransactions if there were any.
2602  */
2603  if (txn->base_snapshot == NULL)
2604  {
2605  Assert(txn->ninvalidations == 0);
2606 
2607  /*
2608  * Removing this txn before a commit might result in the computation
2609  * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
2610  */
2611  if (!rbtxn_prepared(txn))
2612  ReorderBufferCleanupTXN(rb, txn);
2613  return;
2614  }
2615 
2616  snapshot_now = txn->base_snapshot;
2617 
2618  /* Process and send the changes to output plugin. */
2619  ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2620  command_id, false);
2621 }
2622 
2623 /*
2624  * Commit a transaction.
2625  *
2626  * See comments for ReorderBufferReplay().
2627  */
2628 void
2630  XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2631  TimestampTz commit_time,
2632  RepOriginId origin_id, XLogRecPtr origin_lsn)
2633 {
2635 
2636  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2637  false);
2638 
2639  /* unknown transaction, nothing to replay */
2640  if (txn == NULL)
2641  return;
2642 
2643  ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2644  origin_id, origin_lsn);
2645 }
2646 
2647 /*
2648  * Record the prepare information for a transaction.
2649  */
2650 bool
2652  XLogRecPtr prepare_lsn, XLogRecPtr end_lsn,
2653  TimestampTz prepare_time,
2654  RepOriginId origin_id, XLogRecPtr origin_lsn)
2655 {
2657 
2658  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2659 
2660  /* unknown transaction, nothing to do */
2661  if (txn == NULL)
2662  return false;
2663 
2664  /*
2665  * Remember the prepare information to be later used by commit prepared in
2666  * case we skip doing prepare.
2667  */
2668  txn->final_lsn = prepare_lsn;
2669  txn->end_lsn = end_lsn;
2670  txn->xact_time.prepare_time = prepare_time;
2671  txn->origin_id = origin_id;
2672  txn->origin_lsn = origin_lsn;
2673 
2674  return true;
2675 }
2676 
2677 /* Remember that we have skipped prepare */
2678 void
2680 {
2682 
2683  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2684 
2685  /* unknown transaction, nothing to do */
2686  if (txn == NULL)
2687  return;
2688 
2690 }
2691 
2692 /*
2693  * Prepare a two-phase transaction.
2694  *
2695  * See comments for ReorderBufferReplay().
2696  */
2697 void
2699  char *gid)
2700 {
2702 
2703  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2704  false);
2705 
2706  /* unknown transaction, nothing to replay */
2707  if (txn == NULL)
2708  return;
2709 
2710  txn->txn_flags |= RBTXN_PREPARE;
2711  txn->gid = pstrdup(gid);
2712 
2713  /* The prepare info must have been updated in txn by now. */
2715 
2716  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2717  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2718 
2719  /*
2720  * We send the prepare for the concurrently aborted xacts so that later
2721  * when rollback prepared is decoded and sent, the downstream should be
2722  * able to rollback such a xact. See comments atop DecodePrepare.
2723  *
2724  * Note, for the concurrent_abort + streaming case a stream_prepare was
2725  * already sent within the ReorderBufferReplay call above.
2726  */
2727  if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
2728  rb->prepare(rb, txn, txn->final_lsn);
2729 }
2730 
2731 /*
2732  * This is used to handle COMMIT/ROLLBACK PREPARED.
2733  */
2734 void
2736  XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2737  XLogRecPtr two_phase_at,
2738  TimestampTz commit_time, RepOriginId origin_id,
2739  XLogRecPtr origin_lsn, char *gid, bool is_commit)
2740 {
2742  XLogRecPtr prepare_end_lsn;
2743  TimestampTz prepare_time;
2744 
2745  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
2746 
2747  /* unknown transaction, nothing to do */
2748  if (txn == NULL)
2749  return;
2750 
2751  /*
2752  * By this time the txn has the prepare record information, remember it to
2753  * be later used for rollback.
2754  */
2755  prepare_end_lsn = txn->end_lsn;
2756  prepare_time = txn->xact_time.prepare_time;
2757 
2758  /* add the gid in the txn */
2759  txn->gid = pstrdup(gid);
2760 
2761  /*
2762  * It is possible that this transaction is not decoded at prepare time
2763  * either because by that time we didn't have a consistent snapshot, or
2764  * two_phase was not enabled, or it was decoded earlier but we have
2765  * restarted. We only need to send the prepare if it was not decoded
2766  * earlier. We don't need to decode the xact for aborts if it is not done
2767  * already.
2768  */
2769  if ((txn->final_lsn < two_phase_at) && is_commit)
2770  {
2771  txn->txn_flags |= RBTXN_PREPARE;
2772 
2773  /*
2774  * The prepare info must have been updated in txn even if we skip
2775  * prepare.
2776  */
2778 
2779  /*
2780  * By this time the txn has the prepare record information and it is
2781  * important to use that so that downstream gets the accurate
2782  * information. If instead, we have passed commit information here
2783  * then downstream can behave as it has already replayed commit
2784  * prepared after the restart.
2785  */
2786  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2787  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2788  }
2789 
2790  txn->final_lsn = commit_lsn;
2791  txn->end_lsn = end_lsn;
2792  txn->xact_time.commit_time = commit_time;
2793  txn->origin_id = origin_id;
2794  txn->origin_lsn = origin_lsn;
2795 
2796  if (is_commit)
2797  rb->commit_prepared(rb, txn, commit_lsn);
2798  else
2799  rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
2800 
2801  /* cleanup: make sure there's no cache pollution */
2803  txn->invalidations);
2804  ReorderBufferCleanupTXN(rb, txn);
2805 }
2806 
2807 /*
2808  * Abort a transaction that possibly has previous changes. Needs to be first
2809  * called for subtransactions and then for the toplevel xid.
2810  *
2811  * NB: Transactions handled here have to have actively aborted (i.e. have
2812  * produced an abort record). Implicitly aborted transactions are handled via
2813  * ReorderBufferAbortOld(); transactions we're just not interested in, but
2814  * which have committed are handled in ReorderBufferForget().
2815  *
2816  * This function purges this transaction and its contents from memory and
2817  * disk.
2818  */
2819 void
2821 {
2823 
2824  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2825  false);
2826 
2827  /* unknown, nothing to remove */
2828  if (txn == NULL)
2829  return;
2830 
2831  /* For streamed transactions notify the remote node about the abort. */
2832  if (rbtxn_is_streamed(txn))
2833  {
2834  rb->stream_abort(rb, txn, lsn);
2835 
2836  /*
2837  * We might have decoded changes for this transaction that could load
2838  * the cache as per the current transaction's view (consider DDL's
2839  * happened in this transaction). We don't want the decoding of future
2840  * transactions to use those cache entries so execute invalidations.
2841  */
2842  if (txn->ninvalidations > 0)
2844  txn->invalidations);
2845  }
2846 
2847  /* cosmetic... */
2848  txn->final_lsn = lsn;
2849 
2850  /* remove potential on-disk data, and deallocate */
2851  ReorderBufferCleanupTXN(rb, txn);
2852 }
2853 
2854 /*
2855  * Abort all transactions that aren't actually running anymore because the
2856  * server restarted.
2857  *
2858  * NB: These really have to be transactions that have aborted due to a server
2859  * crash/immediate restart, as we don't deal with invalidations here.
2860  */
2861 void
2863 {
2864  dlist_mutable_iter it;
2865 
2866  /*
2867  * Iterate through all (potential) toplevel TXNs and abort all that are
2868  * older than what possibly can be running. Once we've found the first
2869  * that is alive we stop, there might be some that acquired an xid earlier
2870  * but started writing later, but it's unlikely and they will be cleaned
2871  * up in a later call to this function.
2872  */
2874  {
2876 
2877  txn = dlist_container(ReorderBufferTXN, node, it.cur);
2878 
2879  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
2880  {
2881  elog(DEBUG2, "aborting old transaction %u", txn->xid);
2882 
2883  /* remove potential on-disk data, and deallocate this tx */
2884  ReorderBufferCleanupTXN(rb, txn);
2885  }
2886  else
2887  return;
2888  }
2889 }
2890 
2891 /*
2892  * Forget the contents of a transaction if we aren't interested in its
2893  * contents. Needs to be first called for subtransactions and then for the
2894  * toplevel xid.
2895  *
2896  * This is significantly different to ReorderBufferAbort() because
2897  * transactions that have committed need to be treated differently from aborted
2898  * ones since they may have modified the catalog.
2899  *
2900  * Note that this is only allowed to be called in the moment a transaction
2901  * commit has just been read, not earlier; otherwise later records referring
2902  * to this xid might re-create the transaction incompletely.
2903  */
2904 void
2906 {
2908 
2909  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2910  false);
2911 
2912  /* unknown, nothing to forget */
2913  if (txn == NULL)
2914  return;
2915 
2916  /* For streamed transactions notify the remote node about the abort. */
2917  if (rbtxn_is_streamed(txn))
2918  rb->stream_abort(rb, txn, lsn);
2919 
2920  /* cosmetic... */
2921  txn->final_lsn = lsn;
2922 
2923  /*
2924  * Process cache invalidation messages if there are any. Even if we're not
2925  * interested in the transaction's contents, it could have manipulated the
2926  * catalog and we need to update the caches according to that.
2927  */
2928  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2930  txn->invalidations);
2931  else
2932  Assert(txn->ninvalidations == 0);
2933 
2934  /* remove potential on-disk data, and deallocate */
2935  ReorderBufferCleanupTXN(rb, txn);
2936 }
2937 
2938 /*
2939  * Invalidate cache for those transactions that need to be skipped just in case
2940  * catalogs were manipulated as part of the transaction.
2941  *
2942  * Note that this is a special-purpose function for prepared transactions where
2943  * we don't want to clean up the TXN even when we decide to skip it. See
2944  * DecodePrepare.
2945  */
2946 void
2948 {
2950 
2951  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2952  false);
2953 
2954  /* unknown, nothing to do */
2955  if (txn == NULL)
2956  return;
2957 
2958  /*
2959  * Process cache invalidation messages if there are any. Even if we're not
2960  * interested in the transaction's contents, it could have manipulated the
2961  * catalog and we need to update the caches according to that.
2962  */
2963  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2965  txn->invalidations);
2966  else
2967  Assert(txn->ninvalidations == 0);
2968 }
2969 
2970 
2971 /*
2972  * Execute invalidations happening outside the context of a decoded
2973  * transaction. That currently happens either for xid-less commits
2974  * (cf. RecordTransactionCommit()) or for invalidations in uninteresting
2975  * transactions (via ReorderBufferForget()).
2976  */
2977 void
2979  SharedInvalidationMessage *invalidations)
2980 {
2981  bool use_subtxn = IsTransactionOrTransactionBlock();
2982  int i;
2983 
2984  if (use_subtxn)
2985  BeginInternalSubTransaction("replay");
2986 
2987  /*
2988  * Force invalidations to happen outside of a valid transaction - that way
2989  * entries will just be marked as invalid without accessing the catalog.
2990  * That's advantageous because we don't need to setup the full state
2991  * necessary for catalog access.
2992  */
2993  if (use_subtxn)
2995 
2996  for (i = 0; i < ninvalidations; i++)
2997  LocalExecuteInvalidationMessage(&invalidations[i]);
2998 
2999  if (use_subtxn)
3001 }
3002 
3003 /*
3004  * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
3005  * least once for every xid in XLogRecord->xl_xid (other places in records
3006  * may, but do not have to be passed through here).
3007  *
3008  * Reorderbuffer keeps some datastructures about transactions in LSN order,
3009  * for efficiency. To do that it has to know about when transactions are seen
3010  * first in the WAL. As many types of records are not actually interesting for
3011  * logical decoding, they do not necessarily pass though here.
3012  */
3013 void
3015 {
3016  /* many records won't have an xid assigned, centralize check here */
3017  if (xid != InvalidTransactionId)
3018  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3019 }
3020 
3021 /*
3022  * Add a new snapshot to this transaction that may only used after lsn 'lsn'
3023  * because the previous snapshot doesn't describe the catalog correctly for
3024  * following rows.
3025  */
3026 void
3028  XLogRecPtr lsn, Snapshot snap)
3029 {
3031 
3032  change->data.snapshot = snap;
3034 
3035  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3036 }
3037 
3038 /*
3039  * Set up the transaction's base snapshot.
3040  *
3041  * If we know that xid is a subtransaction, set the base snapshot on the
3042  * top-level transaction instead.
3043  */
3044 void
3046  XLogRecPtr lsn, Snapshot snap)
3047 {
3049  bool is_new;
3050 
3051  AssertArg(snap != NULL);
3052 
3053  /*
3054  * Fetch the transaction to operate on. If we know it's a subtransaction,
3055  * operate on its top-level transaction instead.
3056  */
3057  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
3058  if (rbtxn_is_known_subxact(txn))
3059  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3060  NULL, InvalidXLogRecPtr, false);
3061  Assert(txn->base_snapshot == NULL);
3062 
3063  txn->base_snapshot = snap;
3064  txn->base_snapshot_lsn = lsn;
3066 
3067  AssertTXNLsnOrder(rb);
3068 }
3069 
3070 /*
3071  * Access the catalog with this CommandId at this point in the changestream.
3072  *
3073  * May only be called for command ids > 1
3074  */
3075 void
3077  XLogRecPtr lsn, CommandId cid)
3078 {
3080 
3081  change->data.command_id = cid;
3083 
3084  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3085 }
3086 
3087 /*
3088  * Update memory counters to account for the new or removed change.
3089  *
3090  * We update two counters - in the reorder buffer, and in the transaction
3091  * containing the change. The reorder buffer counter allows us to quickly
3092  * decide if we reached the memory limit, the transaction counter allows
3093  * us to quickly pick the largest transaction for eviction.
3094  *
3095  * When streaming is enabled, we need to update the toplevel transaction
3096  * counters instead - we don't really care about subtransactions as we
3097  * can't stream them individually anyway, and we only pick toplevel
3098  * transactions for eviction. So only toplevel transactions matter.
3099  */
3100 static void
3102  ReorderBufferChange *change,
3103  bool addition)
3104 {
3105  Size sz;
3107  ReorderBufferTXN *toptxn;
3108 
3109  Assert(change->txn);
3110 
3111  /*
3112  * Ignore tuple CID changes, because those are not evicted when reaching
3113  * memory limit. So we just don't count them, because it might easily
3114  * trigger a pointless attempt to spill.
3115  */
3117  return;
3118 
3119  txn = change->txn;
3120 
3121  /*
3122  * Update the total size in top level as well. This is later used to
3123  * compute the decoding stats.
3124  */
3125  if (txn->toptxn != NULL)
3126  toptxn = txn->toptxn;
3127  else
3128  toptxn = txn;
3129 
3130  sz = ReorderBufferChangeSize(change);
3131 
3132  if (addition)
3133  {
3134  txn->size += sz;
3135  rb->size += sz;
3136 
3137  /* Update the total size in the top transaction. */
3138  toptxn->total_size += sz;
3139  }
3140  else
3141  {
3142  Assert((rb->size >= sz) && (txn->size >= sz));
3143  txn->size -= sz;
3144  rb->size -= sz;
3145 
3146  /* Update the total size in the top transaction. */
3147  toptxn->total_size -= sz;
3148  }
3149 
3150  Assert(txn->size <= rb->size);
3151 }
3152 
3153 /*
3154  * Add new (relfilenode, tid) -> (cmin, cmax) mappings.
3155  *
3156  * We do not include this change type in memory accounting, because we
3157  * keep CIDs in a separate list and do not evict them when reaching
3158  * the memory limit.
3159  */
3160 void
3162  XLogRecPtr lsn, RelFileNode node,
3163  ItemPointerData tid, CommandId cmin,
3164  CommandId cmax, CommandId combocid)
3165 {
3168 
3169  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3170 
3171  change->data.tuplecid.node = node;
3172  change->data.tuplecid.tid = tid;
3173  change->data.tuplecid.cmin = cmin;
3174  change->data.tuplecid.cmax = cmax;
3175  change->data.tuplecid.combocid = combocid;
3176  change->lsn = lsn;
3177  change->txn = txn;
3179 
3180  dlist_push_tail(&txn->tuplecids, &change->node);
3181  txn->ntuplecids++;
3182 }
3183 
3184 /*
3185  * Setup the invalidation of the toplevel transaction.
3186  *
3187  * This needs to be called for each XLOG_XACT_INVALIDATIONS message and
3188  * accumulates all the invalidation messages in the toplevel transaction as
3189  * well as in the form of change in reorder buffer. We require to record it in
3190  * form of the change so that we can execute only the required invalidations
3191  * instead of executing all the invalidations on each CommandId increment. We
3192  * also need to accumulate these in the toplevel transaction because in some
3193  * cases we skip processing the transaction (see ReorderBufferForget), we need
3194  * to execute all the invalidations together.
3195  */
3196 void
3198  XLogRecPtr lsn, Size nmsgs,
3200 {
3202  MemoryContext oldcontext;
3203  ReorderBufferChange *change;
3204 
3205  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3206 
3207  oldcontext = MemoryContextSwitchTo(rb->context);
3208 
3209  /*
3210  * Collect all the invalidations under the top transaction so that we can
3211  * execute them all together. See comment atop this function
3212  */
3213  if (txn->toptxn)
3214  txn = txn->toptxn;
3215 
3216  Assert(nmsgs > 0);
3217 
3218  /* Accumulate invalidations. */
3219  if (txn->ninvalidations == 0)
3220  {
3221  txn->ninvalidations = nmsgs;
3223  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3224  memcpy(txn->invalidations, msgs,
3225  sizeof(SharedInvalidationMessage) * nmsgs);
3226  }
3227  else
3228  {
3231  (txn->ninvalidations + nmsgs));
3232 
3233  memcpy(txn->invalidations + txn->ninvalidations, msgs,
3234  nmsgs * sizeof(SharedInvalidationMessage));
3235  txn->ninvalidations += nmsgs;
3236  }
3237 
3238  change = ReorderBufferGetChange(rb);
3240  change->data.inval.ninvalidations = nmsgs;
3241  change->data.inval.invalidations = (SharedInvalidationMessage *)
3242  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3243  memcpy(change->data.inval.invalidations, msgs,
3244  sizeof(SharedInvalidationMessage) * nmsgs);
3245 
3246  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3247 
3248  MemoryContextSwitchTo(oldcontext);
3249 }
3250 
3251 /*
3252  * Apply all invalidations we know. Possibly we only need parts at this point
3253  * in the changestream but we don't know which those are.
3254  */
3255 static void
3257 {
3258  int i;
3259 
3260  for (i = 0; i < nmsgs; i++)
3262 }
3263 
3264 /*
3265  * Mark a transaction as containing catalog changes
3266  */
3267 void
3269  XLogRecPtr lsn)
3270 {
3272 
3273  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3274 
3276 
3277  /*
3278  * Mark top-level transaction as having catalog changes too if one of its
3279  * children has so that the ReorderBufferBuildTupleCidHash can
3280  * conveniently check just top-level transaction and decide whether to
3281  * build the hash table or not.
3282  */
3283  if (txn->toptxn != NULL)
3285 }
3286 
3287 /*
3288  * Query whether a transaction is already *known* to contain catalog
3289  * changes. This can be wrong until directly before the commit!
3290  */
3291 bool
3293 {
3295 
3296  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3297  false);
3298  if (txn == NULL)
3299  return false;
3300 
3301  return rbtxn_has_catalog_changes(txn);
3302 }
3303 
3304 /*
3305  * ReorderBufferXidHasBaseSnapshot
3306  * Have we already set the base snapshot for the given txn/subtxn?
3307  */
3308 bool
3310 {
3312 
3313  txn = ReorderBufferTXNByXid(rb, xid, false,
3314  NULL, InvalidXLogRecPtr, false);
3315 
3316  /* transaction isn't known yet, ergo no snapshot */
3317  if (txn == NULL)
3318  return false;
3319 
3320  /* a known subtxn? operate on top-level txn instead */
3321  if (rbtxn_is_known_subxact(txn))
3322  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3323  NULL, InvalidXLogRecPtr, false);
3324 
3325  return txn->base_snapshot != NULL;
3326 }
3327 
3328 
3329 /*
3330  * ---------------------------------------
3331  * Disk serialization support
3332  * ---------------------------------------
3333  */
3334 
3335 /*
3336  * Ensure the IO buffer is >= sz.
3337  */
3338 static void
3340 {
3341  if (!rb->outbufsize)
3342  {
3343  rb->outbuf = MemoryContextAlloc(rb->context, sz);
3344  rb->outbufsize = sz;
3345  }
3346  else if (rb->outbufsize < sz)
3347  {
3348  rb->outbuf = repalloc(rb->outbuf, sz);
3349  rb->outbufsize = sz;
3350  }
3351 }
3352 
3353 /*
3354  * Find the largest transaction (toplevel or subxact) to evict (spill to disk).
3355  *
3356  * XXX With many subtransactions this might be quite slow, because we'll have
3357  * to walk through all of them. There are some options how we could improve
3358  * that: (a) maintain some secondary structure with transactions sorted by
3359  * amount of changes, (b) not looking for the entirely largest transaction,
3360  * but e.g. for transaction using at least some fraction of the memory limit,
3361  * and (c) evicting multiple transactions at once, e.g. to free a given portion
3362  * of the memory limit (e.g. 50%).
3363  */
3364 static ReorderBufferTXN *
3366 {
3367  HASH_SEQ_STATUS hash_seq;
3369  ReorderBufferTXN *largest = NULL;
3370 
3371  hash_seq_init(&hash_seq, rb->by_txn);
3372  while ((ent = hash_seq_search(&hash_seq)) != NULL)
3373  {
3374  ReorderBufferTXN *txn = ent->txn;
3375 
3376  /* if the current transaction is larger, remember it */
3377  if ((!largest) || (txn->size > largest->size))
3378  largest = txn;
3379  }
3380 
3381  Assert(largest);
3382  Assert(largest->size > 0);
3383  Assert(largest->size <= rb->size);
3384 
3385  return largest;
3386 }
3387 
3388 /*
3389  * Find the largest toplevel transaction to evict (by streaming).
3390  *
3391  * This can be seen as an optimized version of ReorderBufferLargestTXN, which
3392  * should give us the same transaction (because we don't update memory account
3393  * for subtransaction with streaming, so it's always 0). But we can simply
3394  * iterate over the limited number of toplevel transactions that have a base
3395  * snapshot. There is no use of selecting a transaction that doesn't have base
3396  * snapshot because we don't decode such transactions.
3397  *
3398  * Note that, we skip transactions that contains incomplete changes. There
3399  * is a scope of optimization here such that we can select the largest
3400  * transaction which has incomplete changes. But that will make the code and
3401  * design quite complex and that might not be worth the benefit. If we plan to
3402  * stream the transactions that contains incomplete changes then we need to
3403  * find a way to partially stream/truncate the transaction changes in-memory
3404  * and build a mechanism to partially truncate the spilled files.
3405  * Additionally, whenever we partially stream the transaction we need to
3406  * maintain the last streamed lsn and next time we need to restore from that
3407  * segment and the offset in WAL. As we stream the changes from the top
3408  * transaction and restore them subtransaction wise, we need to even remember
3409  * the subxact from where we streamed the last change.
3410  */
3411 static ReorderBufferTXN *
3413 {
3414  dlist_iter iter;
3415  Size largest_size = 0;
3416  ReorderBufferTXN *largest = NULL;
3417 
3418  /* Find the largest top-level transaction having a base snapshot. */
3420  {
3422 
3423  txn = dlist_container(ReorderBufferTXN, base_snapshot_node, iter.cur);
3424 
3425  /* must not be a subtxn */
3427  /* base_snapshot must be set */
3428  Assert(txn->base_snapshot != NULL);
3429 
3430  if ((largest == NULL || txn->total_size > largest_size) &&
3431  (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)))
3432  {
3433  largest = txn;
3434  largest_size = txn->total_size;
3435  }
3436  }
3437 
3438  return largest;
3439 }
3440 
3441 /*
3442  * Check whether the logical_decoding_work_mem limit was reached, and if yes
3443  * pick the largest (sub)transaction at-a-time to evict and spill its changes to
3444  * disk until we reach under the memory limit.
3445  *
3446  * XXX At this point we select the transactions until we reach under the memory
3447  * limit, but we might also adapt a more elaborate eviction strategy - for example
3448  * evicting enough transactions to free certain fraction (e.g. 50%) of the memory
3449  * limit.
3450  */
3451 static void
3453 {
3455 
3456  /* bail out if we haven't exceeded the memory limit */
3457  if (rb->size < logical_decoding_work_mem * 1024L)
3458  return;
3459 
3460  /*
3461  * Loop until we reach under the memory limit. One might think that just
3462  * by evicting the largest (sub)transaction we will come under the memory
3463  * limit based on assumption that the selected transaction is at least as
3464  * large as the most recent change (which caused us to go over the memory
3465  * limit). However, that is not true because a user can reduce the
3466  * logical_decoding_work_mem to a smaller value before the most recent
3467  * change.
3468  */
3469  while (rb->size >= logical_decoding_work_mem * 1024L)
3470  {
3471  /*
3472  * Pick the largest transaction (or subtransaction) and evict it from
3473  * memory by streaming, if possible. Otherwise, spill to disk.
3474  */
3476  (txn = ReorderBufferLargestTopTXN(rb)) != NULL)
3477  {
3478  /* we know there has to be one, because the size is not zero */
3479  Assert(txn && !txn->toptxn);
3480  Assert(txn->total_size > 0);
3481  Assert(rb->size >= txn->total_size);
3482 
3483  ReorderBufferStreamTXN(rb, txn);
3484  }
3485  else
3486  {
3487  /*
3488  * Pick the largest transaction (or subtransaction) and evict it
3489  * from memory by serializing it to disk.
3490  */
3491  txn = ReorderBufferLargestTXN(rb);
3492 
3493  /* we know there has to be one, because the size is not zero */
3494  Assert(txn);
3495  Assert(txn->size > 0);
3496  Assert(rb->size >= txn->size);
3497 
3498  ReorderBufferSerializeTXN(rb, txn);
3499  }
3500 
3501  /*
3502  * After eviction, the transaction should have no entries in memory,
3503  * and should use 0 bytes for changes.
3504  */
3505  Assert(txn->size == 0);
3506  Assert(txn->nentries_mem == 0);
3507  }
3508 
3509  /* We must be under the memory limit now. */
3510  Assert(rb->size < logical_decoding_work_mem * 1024L);
3511 }
3512 
3513 /*
3514  * Spill data of a large transaction (and its subtransactions) to disk.
3515  */
3516 static void
3518 {
3519  dlist_iter subtxn_i;
3520  dlist_mutable_iter change_i;
3521  int fd = -1;
3522  XLogSegNo curOpenSegNo = 0;
3523  Size spilled = 0;
3524  Size size = txn->size;
3525 
3526  elog(DEBUG2, "spill %u changes in XID %u to disk",
3527  (uint32) txn->nentries_mem, txn->xid);
3528 
3529  /* do the same to all child TXs */
3530  dlist_foreach(subtxn_i, &txn->subtxns)
3531  {
3532  ReorderBufferTXN *subtxn;
3533 
3534  subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
3535  ReorderBufferSerializeTXN(rb, subtxn);
3536  }
3537 
3538  /* serialize changestream */
3539  dlist_foreach_modify(change_i, &txn->changes)
3540  {
3541  ReorderBufferChange *change;
3542 
3543  change = dlist_container(ReorderBufferChange, node, change_i.cur);
3544 
3545  /*
3546  * store in segment in which it belongs by start lsn, don't split over
3547  * multiple segments tho
3548  */
3549  if (fd == -1 ||
3550  !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
3551  {
3552  char path[MAXPGPATH];
3553 
3554  if (fd != -1)
3555  CloseTransientFile(fd);
3556 
3557  XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
3558 
3559  /*
3560  * No need to care about TLIs here, only used during a single run,
3561  * so each LSN only maps to a specific WAL record.
3562  */
3564  curOpenSegNo);
3565 
3566  /* open segment, create it if necessary */
3567  fd = OpenTransientFile(path,
3568  O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
3569 
3570  if (fd < 0)
3571  ereport(ERROR,
3573  errmsg("could not open file \"%s\": %m", path)));
3574  }
3575 
3576  ReorderBufferSerializeChange(rb, txn, fd, change);
3577  dlist_delete(&change->node);
3578  ReorderBufferReturnChange(rb, change, true);
3579 
3580  spilled++;
3581  }
3582 
3583  /* update the statistics iff we have spilled anything */
3584  if (spilled)
3585  {
3586  rb->spillCount += 1;
3587  rb->spillBytes += size;
3588 
3589  /* don't consider already serialized transactions */
3590  rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
3591 
3592  /* update the decoding stats */
3594  }
3595 
3596  Assert(spilled == txn->nentries_mem);
3597  Assert(dlist_is_empty(&txn->changes));
3598  txn->nentries_mem = 0;
3600 
3601  if (fd != -1)
3602  CloseTransientFile(fd);
3603 }
3604 
3605 /*
3606  * Serialize individual change to disk.
3607  */
3608 static void
3610  int fd, ReorderBufferChange *change)
3611 {
3612  ReorderBufferDiskChange *ondisk;
3613  Size sz = sizeof(ReorderBufferDiskChange);
3614 
3616 
3617  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3618  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
3619 
3620  switch (change->action)
3621  {
3622  /* fall through these, they're all similar enough */
3627  {
3628  char *data;
3629  ReorderBufferTupleBuf *oldtup,
3630  *newtup;
3631  Size oldlen = 0;
3632  Size newlen = 0;
3633 
3634  oldtup = change->data.tp.oldtuple;
3635  newtup = change->data.tp.newtuple;
3636 
3637  if (oldtup)
3638  {
3639  sz += sizeof(HeapTupleData);
3640  oldlen = oldtup->tuple.t_len;
3641  sz += oldlen;
3642  }
3643 
3644  if (newtup)
3645  {
3646  sz += sizeof(HeapTupleData);
3647  newlen = newtup->tuple.t_len;
3648  sz += newlen;
3649  }
3650 
3651  /* make sure we have enough space */
3653 
3654  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3655  /* might have been reallocated above */
3656  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3657 
3658  if (oldlen)
3659  {
3660  memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
3661  data += sizeof(HeapTupleData);
3662 
3663  memcpy(data, oldtup->tuple.t_data, oldlen);
3664  data += oldlen;
3665  }
3666 
3667  if (newlen)
3668  {
3669  memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
3670  data += sizeof(HeapTupleData);
3671 
3672  memcpy(data, newtup->tuple.t_data, newlen);
3673  data += newlen;
3674  }
3675  break;
3676  }
3678  {
3679  char *data;
3680  Size prefix_size = strlen(change->data.msg.prefix) + 1;
3681 
3682  sz += prefix_size + change->data.msg.message_size +
3683  sizeof(Size) + sizeof(Size);
3685 
3686  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3687 
3688  /* might have been reallocated above */
3689  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3690 
3691  /* write the prefix including the size */
3692  memcpy(data, &prefix_size, sizeof(Size));
3693  data += sizeof(Size);
3694  memcpy(data, change->data.msg.prefix,
3695  prefix_size);
3696  data += prefix_size;
3697 
3698  /* write the message including the size */
3699  memcpy(data, &change->data.msg.message_size, sizeof(Size));
3700  data += sizeof(Size);
3701  memcpy(data, change->data.msg.message,
3702  change->data.msg.message_size);
3703  data += change->data.msg.message_size;
3704 
3705  break;
3706  }
3708  {
3709  char *data;
3710  Size inval_size = sizeof(SharedInvalidationMessage) *
3711  change->data.inval.ninvalidations;
3712 
3713  sz += inval_size;
3714 
3716  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3717 
3718  /* might have been reallocated above */
3719  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3720  memcpy(data, change->data.inval.invalidations, inval_size);
3721  data += inval_size;
3722 
3723  break;
3724  }
3726  {
3727  Snapshot snap;
3728  char *data;
3729 
3730  snap = change->data.snapshot;
3731 
3732  sz += sizeof(SnapshotData) +
3733  sizeof(TransactionId) * snap->xcnt +
3734  sizeof(TransactionId) * snap->subxcnt;
3735 
3736  /* make sure we have enough space */
3738  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3739  /* might have been reallocated above */
3740  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3741 
3742  memcpy(data, snap, sizeof(SnapshotData));
3743  data += sizeof(SnapshotData);
3744 
3745  if (snap->xcnt)
3746  {
3747  memcpy(data, snap->xip,
3748  sizeof(TransactionId) * snap->xcnt);
3749  data += sizeof(TransactionId) * snap->xcnt;
3750  }
3751 
3752  if (snap->subxcnt)
3753  {
3754  memcpy(data, snap->subxip,
3755  sizeof(TransactionId) * snap->subxcnt);
3756  data += sizeof(TransactionId) * snap->subxcnt;
3757  }
3758  break;
3759  }
3761  {
3762  Size size;
3763  char *data;
3764 
3765  /* account for the OIDs of truncated relations */
3766  size = sizeof(Oid) * change->data.truncate.nrelids;
3767  sz += size;
3768 
3769  /* make sure we have enough space */
3771 
3772  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3773  /* might have been reallocated above */
3774  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3775 
3776  memcpy(data, change->data.truncate.relids, size);
3777  data += size;
3778 
3779  break;
3780  }
3785  /* ReorderBufferChange contains everything important */
3786  break;
3787  }
3788 
3789  ondisk->size = sz;
3790 
3791  errno = 0;
3793  if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
3794  {
3795  int save_errno = errno;
3796 
3797  CloseTransientFile(fd);
3798 
3799  /* if write didn't set errno, assume problem is no disk space */
3800  errno = save_errno ? save_errno : ENOSPC;
3801  ereport(ERROR,
3803  errmsg("could not write to data file for XID %u: %m",
3804  txn->xid)));
3805  }
3807 
3808  /*
3809  * Keep the transaction's final_lsn up to date with each change we send to
3810  * disk, so that ReorderBufferRestoreCleanup works correctly. (We used to
3811  * only do this on commit and abort records, but that doesn't work if a
3812  * system crash leaves a transaction without its abort record).
3813  *
3814  * Make sure not to move it backwards.
3815  */
3816  if (txn->final_lsn < change->lsn)
3817  txn->final_lsn = change->lsn;
3818 
3819  Assert(ondisk->change.action == change->action);
3820 }
3821 
3822 /* Returns true, if the output plugin supports streaming, false, otherwise. */
3823 static inline bool
3825 {
3827 
3828  return ctx->streaming;
3829 }
3830 
3831 /* Returns true, if the streaming can be started now, false, otherwise. */
3832 static inline bool
3834 {
3836  SnapBuild *builder = ctx->snapshot_builder;
3837 
3838  /* We can't start streaming unless a consistent state is reached. */
3840  return false;
3841 
3842  /*
3843  * We can't start streaming immediately even if the streaming is enabled
3844  * because we previously decoded this transaction and now just are
3845  * restarting.
3846  */
3847  if (ReorderBufferCanStream(rb) &&
3848  !SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))
3849  return true;
3850 
3851  return false;
3852 }
3853 
3854 /*
3855  * Send data of a large transaction (and its subtransactions) to the
3856  * output plugin, but using the stream API.
3857  */
3858 static void
3860 {
3861  Snapshot snapshot_now;
3862  CommandId command_id;
3863  Size stream_bytes;
3864  bool txn_is_streamed;
3865 
3866  /* We can never reach here for a subtransaction. */
3867  Assert(txn->toptxn == NULL);
3868 
3869  /*
3870  * We can't make any assumptions about base snapshot here, similar to what
3871  * ReorderBufferCommit() does. That relies on base_snapshot getting
3872  * transferred from subxact in ReorderBufferCommitChild(), but that was
3873  * not yet called as the transaction is in-progress.
3874  *
3875  * So just walk the subxacts and use the same logic here. But we only need
3876  * to do that once, when the transaction is streamed for the first time.
3877  * After that we need to reuse the snapshot from the previous run.
3878  *
3879  * Unlike DecodeCommit which adds xids of all the subtransactions in
3880  * snapshot's xip array via SnapBuildCommittedTxn, we can't do that here
3881  * but we do add them to subxip array instead via ReorderBufferCopySnap.
3882  * This allows the catalog changes made in subtransactions decoded till
3883  * now to be visible.
3884  */
3885  if (txn->snapshot_now == NULL)
3886  {
3887  dlist_iter subxact_i;
3888 
3889  /* make sure this transaction is streamed for the first time */
3890  Assert(!rbtxn_is_streamed(txn));
3891 
3892  /* at the beginning we should have invalid command ID */
3894 
3895  dlist_foreach(subxact_i, &txn->subtxns)
3896  {
3897  ReorderBufferTXN *subtxn;
3898 
3899  subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
3900  ReorderBufferTransferSnapToParent(txn, subtxn);
3901  }
3902 
3903  /*
3904  * If this transaction has no snapshot, it didn't make any changes to
3905  * the database till now, so there's nothing to decode.
3906  */
3907  if (txn->base_snapshot == NULL)
3908  {
3909  Assert(txn->ninvalidations == 0);
3910  return;
3911  }
3912 
3913  command_id = FirstCommandId;
3914  snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
3915  txn, command_id);
3916  }
3917  else
3918  {
3919  /* the transaction must have been already streamed */
3920  Assert(rbtxn_is_streamed(txn));
3921 
3922  /*
3923  * Nah, we already have snapshot from the previous streaming run. We
3924  * assume new subxacts can't move the LSN backwards, and so can't beat
3925  * the LSN condition in the previous branch (so no need to walk
3926  * through subxacts again). In fact, we must not do that as we may be
3927  * using snapshot half-way through the subxact.
3928  */
3929  command_id = txn->command_id;
3930 
3931  /*
3932  * We can't use txn->snapshot_now directly because after the last
3933  * streaming run, we might have got some new sub-transactions. So we
3934  * need to add them to the snapshot.
3935  */
3936  snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now,
3937  txn, command_id);
3938 
3939  /* Free the previously copied snapshot. */
3940  Assert(txn->snapshot_now->copied);
3942  txn->snapshot_now = NULL;
3943  }
3944 
3945  /*
3946  * Remember this information to be used later to update stats. We can't
3947  * update the stats here as an error while processing the changes would
3948  * lead to the accumulation of stats even though we haven't streamed all
3949  * the changes.
3950  */
3951  txn_is_streamed = rbtxn_is_streamed(txn);
3952  stream_bytes = txn->total_size;
3953 
3954  /* Process and send the changes to output plugin. */
3955  ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
3956  command_id, true);
3957 
3958  rb->streamCount += 1;
3959  rb->streamBytes += stream_bytes;
3960 
3961  /* Don't consider already streamed transaction. */
3962  rb->streamTxns += (txn_is_streamed) ? 0 : 1;
3963 
3964  /* update the decoding stats */
3966 
3967  Assert(dlist_is_empty(&txn->changes));
3968  Assert(txn->nentries == 0);
3969  Assert(txn->nentries_mem == 0);
3970 }
3971 
3972 /*
3973  * Size of a change in memory.
3974  */
3975 static Size
3977 {
3978  Size sz = sizeof(ReorderBufferChange);
3979 
3980  switch (change->action)
3981  {
3982  /* fall through these, they're all similar enough */
3987  {
3988  ReorderBufferTupleBuf *oldtup,
3989  *newtup;
3990  Size oldlen = 0;
3991  Size newlen = 0;
3992 
3993  oldtup = change->data.tp.oldtuple;
3994  newtup = change->data.tp.newtuple;
3995 
3996  if (oldtup)
3997  {
3998  sz += sizeof(HeapTupleData);
3999  oldlen = oldtup->tuple.t_len;
4000  sz += oldlen;
4001  }
4002 
4003  if (newtup)
4004  {
4005  sz += sizeof(HeapTupleData);
4006  newlen = newtup->tuple.t_len;
4007  sz += newlen;
4008  }
4009 
4010  break;
4011  }
4013  {
4014  Size prefix_size = strlen(change->data.msg.prefix) + 1;
4015 
4016  sz += prefix_size + change->data.msg.message_size +
4017  sizeof(Size) + sizeof(Size);
4018 
4019  break;
4020  }
4022  {
4023  sz += sizeof(SharedInvalidationMessage) *
4024  change->data.inval.ninvalidations;
4025  break;
4026  }
4028  {
4029  Snapshot snap;
4030 
4031  snap = change->data.snapshot;
4032 
4033  sz += sizeof(SnapshotData) +
4034  sizeof(TransactionId) * snap->xcnt +
4035  sizeof(TransactionId) * snap->subxcnt;
4036 
4037  break;
4038  }
4040  {
4041  sz += sizeof(Oid) * change->data.truncate.nrelids;
4042 
4043  break;
4044  }
4049  /* ReorderBufferChange contains everything important */
4050  break;
4051  }
4052 
4053  return sz;
4054 }
4055 
4056 
4057 /*
4058  * Restore a number of changes spilled to disk back into memory.
4059  */
4060 static Size
4062  TXNEntryFile *file, XLogSegNo *segno)
4063 {
4064  Size restored = 0;
4065  XLogSegNo last_segno;
4066  dlist_mutable_iter cleanup_iter;
4067  File *fd = &file->vfd;
4068 
4071 
4072  /* free current entries, so we have memory for more */
4073  dlist_foreach_modify(cleanup_iter, &txn->changes)
4074  {
4076  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
4077 
4078  dlist_delete(&cleanup->node);
4079  ReorderBufferReturnChange(rb, cleanup, true);
4080  }
4081  txn->nentries_mem = 0;
4082  Assert(dlist_is_empty(&txn->changes));
4083 
4084  XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
4085 
4086  while (restored < max_changes_in_memory && *segno <= last_segno)
4087  {
4088  int readBytes;
4089  ReorderBufferDiskChange *ondisk;
4090 
4091  if (*fd == -1)
4092  {
4093  char path[MAXPGPATH];
4094 
4095  /* first time in */
4096  if (*segno == 0)
4097  XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
4098 
4099  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
4100 
4101  /*
4102  * No need to care about TLIs here, only used during a single run,
4103  * so each LSN only maps to a specific WAL record.
4104  */
4106  *segno);
4107 
4108  *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
4109 
4110  /* No harm in resetting the offset even in case of failure */
4111  file->curOffset = 0;
4112 
4113  if (*fd < 0 && errno == ENOENT)
4114  {
4115  *fd = -1;
4116  (*segno)++;
4117  continue;
4118  }
4119  else if (*fd < 0)
4120  ereport(ERROR,
4122  errmsg("could not open file \"%s\": %m",
4123  path)));
4124  }
4125 
4126  /*
4127  * Read the statically sized part of a change which has information
4128  * about the total size. If we couldn't read a record, we're at the
4129  * end of this file.
4130  */
4132  readBytes = FileRead(file->vfd, rb->outbuf,
4133  sizeof(ReorderBufferDiskChange),
4135 
4136  /* eof */
4137  if (readBytes == 0)
4138  {
4139  FileClose(*fd);
4140  *fd = -1;
4141  (*segno)++;
4142  continue;
4143  }
4144  else if (readBytes < 0)
4145  ereport(ERROR,
4147  errmsg("could not read from reorderbuffer spill file: %m")));
4148  else if (readBytes != sizeof(ReorderBufferDiskChange))
4149  ereport(ERROR,
4151  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4152  readBytes,
4153  (uint32) sizeof(ReorderBufferDiskChange))));
4154 
4155  file->curOffset += readBytes;
4156 
4157  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4158 
4160  sizeof(ReorderBufferDiskChange) + ondisk->size);
4161  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4162 
4163  readBytes = FileRead(file->vfd,
4164  rb->outbuf + sizeof(ReorderBufferDiskChange),
4165  ondisk->size - sizeof(ReorderBufferDiskChange),
4166  file->curOffset,
4168 
4169  if (readBytes < 0)
4170  ereport(ERROR,
4172  errmsg("could not read from reorderbuffer spill file: %m")));
4173  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
4174  ereport(ERROR,
4176  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4177  readBytes,
4178  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
4179 
4180  file->curOffset += readBytes;
4181 
4182  /*
4183  * ok, read a full change from disk, now restore it into proper
4184  * in-memory format
4185  */
4186  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
4187  restored++;
4188  }
4189 
4190  return restored;
4191 }
4192 
4193 /*
4194  * Convert change from its on-disk format to in-memory format and queue it onto
4195  * the TXN's ->changes list.
4196  *
4197  * Note: although "data" is declared char*, at entry it points to a
4198  * maxalign'd buffer, making it safe in most of this function to assume
4199  * that the pointed-to data is suitably aligned for direct access.
4200  */
4201 static void
4203  char *data)
4204 {
4205  ReorderBufferDiskChange *ondisk;
4206  ReorderBufferChange *change;
4207 
4208  ondisk = (ReorderBufferDiskChange *) data;
4209 
4210  change = ReorderBufferGetChange(rb);
4211 
4212  /* copy static part */
4213  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
4214 
4215  data += sizeof(ReorderBufferDiskChange);
4216 
4217  /* restore individual stuff */
4218  switch (change->action)
4219  {
4220  /* fall through these, they're all similar enough */
4225  if (change->data.tp.oldtuple)
4226  {
4227  uint32 tuplelen = ((HeapTuple) data)->t_len;
4228 
4229  change->data.tp.oldtuple =
4231 
4232  /* restore ->tuple */
4233  memcpy(&change->data.tp.oldtuple->tuple, data,
4234  sizeof(HeapTupleData));
4235  data += sizeof(HeapTupleData);
4236 
4237  /* reset t_data pointer into the new tuplebuf */
4238  change->data.tp.oldtuple->tuple.t_data =
4239  ReorderBufferTupleBufData(change->data.tp.oldtuple);
4240 
4241  /* restore tuple data itself */
4242  memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
4243  data += tuplelen;
4244  }
4245 
4246  if (change->data.tp.newtuple)
4247  {
4248  /* here, data might not be suitably aligned! */
4249  uint32 tuplelen;
4250 
4251  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
4252  sizeof(uint32));
4253 
4254  change->data.tp.newtuple =
4256 
4257  /* restore ->tuple */
4258  memcpy(&change->data.tp.newtuple->tuple, data,
4259  sizeof(HeapTupleData));
4260  data += sizeof(HeapTupleData);
4261 
4262  /* reset t_data pointer into the new tuplebuf */
4263  change->data.tp.newtuple->tuple.t_data =
4264  ReorderBufferTupleBufData(change->data.tp.newtuple);
4265 
4266  /* restore tuple data itself */
4267  memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
4268  data += tuplelen;
4269  }
4270 
4271  break;
4273  {
4274  Size prefix_size;
4275 
4276  /* read prefix */
4277  memcpy(&prefix_size, data, sizeof(Size));
4278  data += sizeof(Size);
4279  change->data.msg.prefix = MemoryContextAlloc(rb->context,
4280  prefix_size);
4281  memcpy(change->data.msg.prefix, data, prefix_size);
4282  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
4283  data += prefix_size;
4284 
4285  /* read the message */
4286  memcpy(&change->data.msg.message_size, data, sizeof(Size));
4287  data += sizeof(Size);
4288  change->data.msg.message = MemoryContextAlloc(rb->context,
4289  change->data.msg.message_size);
4290  memcpy(change->data.msg.message, data,
4291  change->data.msg.message_size);
4292  data += change->data.msg.message_size;
4293 
4294  break;
4295  }
4297  {
4298  Size inval_size = sizeof(SharedInvalidationMessage) *
4299  change->data.inval.ninvalidations;
4300 
4301  change->data.inval.invalidations =
4302  MemoryContextAlloc(rb->context, inval_size);
4303 
4304  /* read the message */
4305  memcpy(change->data.inval.invalidations, data, inval_size);
4306 
4307  break;
4308  }
4310  {
4311  Snapshot oldsnap;
4312  Snapshot newsnap;
4313  Size size;
4314 
4315  oldsnap = (Snapshot) data;
4316 
4317  size = sizeof(SnapshotData) +
4318  sizeof(TransactionId) * oldsnap->xcnt +
4319  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
4320 
4321  change->data.snapshot = MemoryContextAllocZero(rb->context, size);
4322 
4323  newsnap = change->data.snapshot;
4324 
4325  memcpy(newsnap, data, size);
4326  newsnap->xip = (TransactionId *)
4327  (((char *) newsnap) + sizeof(SnapshotData));
4328  newsnap->subxip = newsnap->xip + newsnap->xcnt;
4329  newsnap->copied = true;
4330  break;
4331  }
4332  /* the base struct contains all the data, easy peasy */
4334  {
4335  Oid *relids;
4336 
4337  relids = ReorderBufferGetRelids(rb,
4338  change->data.truncate.nrelids);
4339  memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
4340  change->data.truncate.relids = relids;
4341 
4342  break;
4343  }
4348  break;
4349  }
4350 
4351  dlist_push_tail(&txn->changes, &change->node);
4352  txn->nentries_mem++;
4353 
4354  /*
4355  * Update memory accounting for the restored change. We need to do this
4356  * although we don't check the memory limit when restoring the changes in
4357  * this branch (we only do that when initially queueing the changes after
4358  * decoding), because we will release the changes later, and that will
4359  * update the accounting too (subtracting the size from the counters). And
4360  * we don't want to underflow there.
4361  */
4362  ReorderBufferChangeMemoryUpdate(rb, change, true);
4363 }
4364 
4365 /*
4366  * Remove all on-disk stored for the passed in transaction.
4367  */
4368 static void
4370 {
4371  XLogSegNo first;
4372  XLogSegNo cur;
4373  XLogSegNo last;
4374 
4377 
4378  XLByteToSeg(txn->first_lsn, first, wal_segment_size);
4379  XLByteToSeg(txn->final_lsn, last, wal_segment_size);
4380 
4381  /* iterate over all possible filenames, and delete them */
4382  for (cur = first; cur <= last; cur++)
4383  {
4384  char path[MAXPGPATH];
4385 
4387  if (unlink(path) != 0 && errno != ENOENT)
4388  ereport(ERROR,
4390  errmsg("could not remove file \"%s\": %m", path)));
4391  }
4392 }
4393 
4394 /*
4395  * Remove any leftover serialized reorder buffers from a slot directory after a
4396  * prior crash or decoding session exit.
4397  */
4398 static void
4400 {
4401  DIR *spill_dir;
4402  struct dirent *spill_de;
4403  struct stat statbuf;
4404  char path[MAXPGPATH * 2 + 12];
4405 
4406  sprintf(path, "pg_replslot/%s", slotname);
4407 
4408  /* we're only handling directories here, skip if it's not ours */
4409  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
4410  return;
4411 
4412  spill_dir = AllocateDir(path);
4413  while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
4414  {
4415  /* only look at names that can be ours */
4416  if (strncmp(spill_de->d_name, "xid", 3) == 0)
4417  {
4418  snprintf(path, sizeof(path),
4419  "pg_replslot/%s/%s", slotname,
4420  spill_de->d_name);
4421 
4422  if (unlink(path) != 0)
4423  ereport(ERROR,
4425  errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
4426  path, slotname)));
4427  }
4428  }
4429  FreeDir(spill_dir);
4430 }
4431 
4432 /*
4433  * Given a replication slot, transaction ID and segment number, fill in the
4434  * corresponding spill file into 'path', which is a caller-owned buffer of size
4435  * at least MAXPGPATH.
4436  */
4437 static void
4439  XLogSegNo segno)
4440 {
4441  XLogRecPtr recptr;
4442 
4443  XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
4444 
4445  snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.spill",
4447  xid, LSN_FORMAT_ARGS(recptr));
4448 }
4449 
4450 /*
4451  * Delete all data spilled to disk after we've restarted/crashed. It will be
4452  * recreated when the respective slots are reused.
4453  */
4454 void
4456 {
4457  DIR *logical_dir;
4458  struct dirent *logical_de;
4459 
4460  logical_dir = AllocateDir("pg_replslot");
4461  while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
4462  {
4463  if (strcmp(logical_de->d_name, ".") == 0 ||
4464  strcmp(logical_de->d_name, "..") == 0)
4465  continue;
4466 
4467  /* if it cannot be a slot, skip the directory */
4468  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
4469  continue;
4470 
4471  /*
4472  * ok, has to be a surviving logical slot, iterate and delete
4473  * everything starting with xid-*
4474  */
4476  }
4477  FreeDir(logical_dir);
4478 }
4479 
4480 /* ---------------------------------------
4481  * toast reassembly support
4482  * ---------------------------------------
4483  */
4484 
4485 /*
4486  * Initialize per tuple toast reconstruction support.
4487  */
4488 static void
4490 {
4491  HASHCTL hash_ctl;
4492 
4493  Assert(txn->toast_hash == NULL);
4494 
4495  hash_ctl.keysize = sizeof(Oid);
4496  hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
4497  hash_ctl.hcxt = rb->context;
4498  txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
4500 }
4501 
4502 /*
4503  * Per toast-chunk handling for toast reconstruction
4504  *
4505  * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
4506  * toasted Datum comes along.
4507  */
4508 static void
4510  Relation relation, ReorderBufferChange *change)
4511 {
4512  ReorderBufferToastEnt *ent;
4513  ReorderBufferTupleBuf *newtup;
4514  bool found;
4515  int32 chunksize;
4516  bool isnull;
4517  Pointer chunk;
4518  TupleDesc desc = RelationGetDescr(relation);
4519  Oid chunk_id;
4520  int32 chunk_seq;
4521 
4522  if (txn->toast_hash == NULL)
4523  ReorderBufferToastInitHash(rb, txn);
4524 
4525  Assert(IsToastRelation(relation));
4526 
4527  newtup = change->data.tp.newtuple;
4528  chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
4529  Assert(!isnull);
4530  chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
4531  Assert(!isnull);
4532 
4533  ent = (ReorderBufferToastEnt *)
4534  hash_search(txn->toast_hash,
4535  (void *) &chunk_id,
4536  HASH_ENTER,
4537  &found);
4538 
4539  if (!found)
4540  {
4541  Assert(ent->chunk_id == chunk_id);
4542  ent->num_chunks = 0;
4543  ent->last_chunk_seq = 0;
4544  ent->size = 0;
4545  ent->reconstructed = NULL;
4546  dlist_init(&ent->chunks);
4547 
4548  if (chunk_seq != 0)
4549  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
4550  chunk_seq, chunk_id);
4551  }
4552  else if (found && chunk_seq != ent->last_chunk_seq + 1)
4553  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
4554  chunk_seq, chunk_id, ent->last_chunk_seq + 1);
4555 
4556  chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
4557  Assert(!isnull);
4558 
4559  /* calculate size so we can allocate the right size at once later */
4560  if (!VARATT_IS_EXTENDED(chunk))
4561  chunksize = VARSIZE(chunk) - VARHDRSZ;
4562  else if (VARATT_IS_SHORT(chunk))
4563  /* could happen due to heap_form_tuple doing its thing */
4564  chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
4565  else
4566  elog(ERROR, "unexpected type of toast chunk");
4567 
4568  ent->size += chunksize;
4569  ent->last_chunk_seq = chunk_seq;
4570  ent->num_chunks++;
4571  dlist_push_tail(&ent->chunks, &change->node);
4572 }
4573 
4574 /*
4575  * Rejigger change->newtuple to point to in-memory toast tuples instead to
4576  * on-disk toast tuples that may not longer exist (think DROP TABLE or VACUUM).
4577  *
4578  * We cannot replace unchanged toast tuples though, so those will still point
4579  * to on-disk toast data.
4580  *
4581  * While updating the existing change with detoasted tuple data, we need to
4582  * update the memory accounting info, because the change size will differ.
4583  * Otherwise the accounting may get out of sync, triggering serialization
4584  * at unexpected times.
4585  *
4586  * We simply subtract size of the change before rejiggering the tuple, and
4587  * then adding the new size. This makes it look like the change was removed
4588  * and then added back, except it only tweaks the accounting info.
4589  *
4590  * In particular it can't trigger serialization, which would be pointless
4591  * anyway as it happens during commit processing right before handing
4592  * the change to the output plugin.
4593  */
4594 static void
4596  Relation relation, ReorderBufferChange *change)
4597 {
4598  TupleDesc desc;
4599  int natt;
4600  Datum *attrs;
4601  bool *isnull;
4602  bool *free;
4603  HeapTuple tmphtup;
4604  Relation toast_rel;
4605  TupleDesc toast_desc;
4606  MemoryContext oldcontext;
4607  ReorderBufferTupleBuf *newtup;
4608 
4609  /* no toast tuples changed */
4610  if (txn->toast_hash == NULL)
4611  return;
4612 
4613  /*
4614  * We're going to modify the size of the change, so to make sure the
4615  * accounting is correct we'll make it look like we're removing the change
4616  * now (with the old size), and then re-add it at the end.
4617  */
4618  ReorderBufferChangeMemoryUpdate(rb, change, false);
4619 
4620  oldcontext = MemoryContextSwitchTo(rb->context);
4621 
4622  /* we should only have toast tuples in an INSERT or UPDATE */
4623  Assert(change->data.tp.newtuple);
4624 
4625  desc = RelationGetDescr(relation);
4626 
4627  toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
4628  if (!RelationIsValid(toast_rel))
4629  elog(ERROR, "could not open relation with OID %u",
4630  relation->rd_rel->reltoastrelid);
4631 
4632  toast_desc = RelationGetDescr(toast_rel);
4633 
4634  /* should we allocate from stack instead? */
4635  attrs = palloc0(sizeof(Datum) * desc->natts);
4636  isnull = palloc0(sizeof(bool) * desc->natts);
4637  free = palloc0(sizeof(bool) * desc->natts);
4638 
4639  newtup = change->data.tp.newtuple;
4640 
4641  heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
4642 
4643  for (natt = 0; natt < desc->natts; natt++)
4644  {
4645  Form_pg_attribute attr = TupleDescAttr(desc, natt);
4646  ReorderBufferToastEnt *ent;
4647  struct varlena *varlena;
4648 
4649  /* va_rawsize is the size of the original datum -- including header */
4650  struct varatt_external toast_pointer;
4651  struct varatt_indirect redirect_pointer;
4652  struct varlena *new_datum = NULL;
4653  struct varlena *reconstructed;
4654  dlist_iter it;
4655  Size data_done = 0;
4656 
4657  /* system columns aren't toasted */
4658  if (attr->attnum < 0)
4659  continue;
4660 
4661  if (attr->attisdropped)
4662  continue;
4663 
4664  /* not a varlena datatype */
4665  if (attr->attlen != -1)
4666  continue;
4667 
4668  /* no data */
4669  if (isnull[natt])
4670  continue;
4671 
4672  /* ok, we know we have a toast datum */
4673  varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
4674 
4675  /* no need to do anything if the tuple isn't external */
4676  if (!VARATT_IS_EXTERNAL(varlena))
4677  continue;
4678 
4679  VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
4680 
4681  /*
4682  * Check whether the toast tuple changed, replace if so.
4683  */
4684  ent = (ReorderBufferToastEnt *)
4685  hash_search(txn->toast_hash,
4686  (void *) &toast_pointer.va_valueid,
4687  HASH_FIND,
4688  NULL);
4689  if (ent == NULL)
4690  continue;
4691 
4692  new_datum =
4693  (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
4694 
4695  free[natt] = true;
4696 
4697  reconstructed = palloc0(toast_pointer.va_rawsize);
4698 
4699  ent->reconstructed = reconstructed;
4700 
4701  /* stitch toast tuple back together from its parts */
4702  dlist_foreach(it, &ent->chunks)
4703  {
4704  bool isnull;
4705  ReorderBufferChange *cchange;
4706  ReorderBufferTupleBuf *ctup;
4707  Pointer chunk;
4708 
4709  cchange = dlist_container(ReorderBufferChange, node, it.cur);
4710  ctup = cchange->data.tp.newtuple;
4711  chunk = DatumGetPointer(fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
4712 
4713  Assert(!isnull);
4714  Assert(!VARATT_IS_EXTERNAL(chunk));
4715  Assert(!VARATT_IS_SHORT(chunk));
4716 
4717  memcpy(VARDATA(reconstructed) + data_done,
4718  VARDATA(chunk),
4719  VARSIZE(chunk) - VARHDRSZ);
4720  data_done += VARSIZE(chunk) - VARHDRSZ;
4721  }
4722  Assert(data_done == VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer));
4723 
4724  /* make sure its marked as compressed or not */
4725  if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
4726  SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
4727  else
4728  SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
4729 
4730  memset(&redirect_pointer, 0, sizeof(redirect_pointer));
4731  redirect_pointer.pointer = reconstructed;
4732 
4734  memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
4735  sizeof(redirect_pointer));
4736 
4737  attrs[natt] = PointerGetDatum(new_datum);
4738  }
4739 
4740  /*
4741  * Build tuple in separate memory & copy tuple back into the tuplebuf
4742  * passed to the output plugin. We can't directly heap_fill_tuple() into
4743  * the tuplebuf because attrs[] will point back into the current content.
4744  */
4745  tmphtup = heap_form_tuple(desc, attrs, isnull);
4746  Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
4747  Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
4748 
4749  memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
4750  newtup->tuple.t_len = tmphtup->t_len;
4751 
4752  /*
4753  * free resources we won't further need, more persistent stuff will be
4754  * free'd in ReorderBufferToastReset().
4755  */
4756  RelationClose(toast_rel);
4757  pfree(tmphtup);
4758  for (natt = 0; natt < desc->natts; natt++)
4759  {
4760  if (free[natt])
4761  pfree(DatumGetPointer(attrs[natt]));
4762  }
4763  pfree(attrs);
4764  pfree(free);
4765  pfree(isnull);
4766 
4767  MemoryContextSwitchTo(oldcontext);
4768 
4769  /* now add the change back, with the correct size */
4770  ReorderBufferChangeMemoryUpdate(rb, change, true);
4771 }
4772 
4773 /*
4774  * Free all resources allocated for toast reconstruction.
4775  */
4776 static void
4778 {
4779  HASH_SEQ_STATUS hstat;
4780  ReorderBufferToastEnt *ent;
4781 
4782  if (txn->toast_hash == NULL)
4783  return;
4784 
4785  /* sequentially walk over the hash and free everything */
4786  hash_seq_init(&hstat, txn->toast_hash);
4787  while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
4788  {
4789  dlist_mutable_iter it;
4790 
4791  if (ent->reconstructed != NULL)
4792  pfree(ent->reconstructed);
4793 
4794  dlist_foreach_modify(it, &ent->chunks)
4795  {
4796  ReorderBufferChange *change =
4798 
4799  dlist_delete(&change->node);
4800  ReorderBufferReturnChange(rb, change, true);
4801  }
4802  }
4803 
4804  hash_destroy(txn->toast_hash);
4805  txn->toast_hash = NULL;
4806 }
4807 
4808 
4809 /* ---------------------------------------
4810  * Visibility support for logical decoding
4811  *
4812  *
4813  * Lookup actual cmin/cmax values when using decoding snapshot. We can't
4814  * always rely on stored cmin/cmax values because of two scenarios:
4815  *
4816  * * A tuple got changed multiple times during a single transaction and thus
4817  * has got a combo CID. Combo CIDs are only valid for the duration of a
4818  * single transaction.
4819  * * A tuple with a cmin but no cmax (and thus no combo CID) got
4820  * deleted/updated in another transaction than the one which created it
4821  * which we are looking at right now. As only one of cmin, cmax or combo CID
4822  * is actually stored in the heap we don't have access to the value we
4823  * need anymore.
4824  *
4825  * To resolve those problems we have a per-transaction hash of (cmin,
4826  * cmax) tuples keyed by (relfilenode, ctid) which contains the actual
4827  * (cmin, cmax) values. That also takes care of combo CIDs by simply
4828  * not caring about them at all. As we have the real cmin/cmax values
4829  * combo CIDs aren't interesting.
4830  *
4831  * As we only care about catalog tuples here the overhead of this
4832  * hashtable should be acceptable.
4833  *
4834  * Heap rewrites complicate this a bit, check rewriteheap.c for
4835  * details.
4836  * -------------------------------------------------------------------------
4837  */
4838 
4839 /* struct for sorting mapping files by LSN efficiently */
4840 typedef struct RewriteMappingFile
4841 {
4843  char fname[MAXPGPATH];
4845 
4846 #ifdef NOT_USED
4847 static void
4848 DisplayMapping(HTAB *tuplecid_data)
4849 {
4850  HASH_SEQ_STATUS hstat;
4852 
4853  hash_seq_init(&hstat, tuplecid_data);
4854  while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
4855  {
4856  elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
4857  ent->key.relnode.dbNode,
4858  ent->key.relnode.spcNode,
4859  ent->key.relnode.relNode,
4862  ent->cmin,
4863  ent->cmax
4864  );
4865  }
4866 }
4867 #endif
4868 
4869 /*
4870  * Apply a single mapping file to tuplecid_data.
4871  *
4872  * The mapping file has to have been verified to be a) committed b) for our
4873  * transaction c) applied in LSN order.
4874  */
4875 static void
4876 ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
4877 {
4878  char path[MAXPGPATH];
4879  int fd;
4880  int readBytes;
4882 
4883  sprintf(path, "pg_logical/mappings/%s", fname);
4884  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
4885  if (fd < 0)
4886  ereport(ERROR,
4888  errmsg("could not open file \"%s\": %m", path)));
4889 
4890  while (true)
4891  {
4894  ReorderBufferTupleCidEnt *new_ent;
4895  bool found;
4896 
4897  /* be careful about padding */
4898  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
4899 
4900  /* read all mappings till the end of the file */
4902  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
4904 
4905  if (readBytes < 0)
4906  ereport(ERROR,
4908  errmsg("could not read file \"%s\": %m",
4909  path)));
4910  else if (readBytes == 0) /* EOF */
4911  break;
4912  else if (readBytes != sizeof(LogicalRewriteMappingData))
4913  ereport(ERROR,
4915  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
4916  path, readBytes,
4917  (int32) sizeof(LogicalRewriteMappingData))));
4918 
4919  key.relnode = map.old_node;
4920  ItemPointerCopy(&map.old_tid,
4921  &key.tid);
4922 
4923 
4924  ent = (ReorderBufferTupleCidEnt *)
4925  hash_search(tuplecid_data,
4926  (void *) &key,
4927  HASH_FIND,
4928  NULL);
4929 
4930  /* no existing mapping, no need to update */
4931  if (!ent)
4932  continue;
4933 
4934  key.relnode = map.new_node;
4935  ItemPointerCopy(&map.new_tid,
4936  &key.tid);
4937 
4938  new_ent = (ReorderBufferTupleCidEnt *)
4939  hash_search(tuplecid_data,
4940  (void *) &key,
4941  HASH_ENTER,
4942  &found);
4943 
4944  if (found)
4945  {
4946  /*
4947  * Make sure the existing mapping makes sense. We sometime update
4948  * old records that did not yet have a cmax (e.g. pg_class' own
4949  * entry while rewriting it) during rewrites, so allow that.
4950  */
4951  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
4952  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
4953  }
4954  else
4955  {
4956  /* update mapping */
4957  new_ent->cmin = ent->cmin;
4958  new_ent->cmax = ent->cmax;
4959  new_ent->combocid = ent->combocid;
4960  }
4961  }
4962 
4963  if (CloseTransientFile(fd) != 0)
4964  ereport(ERROR,
4966  errmsg("could not close file \"%s\": %m", path)));
4967 }
4968 
4969 
4970 /*
4971  * Check whether the TransactionId 'xid' is in the pre-sorted array 'xip'.
4972  */
4973 static bool
4975 {
4976  return bsearch(&xid, xip, num,
4977  sizeof(TransactionId), xidComparator) != NULL;
4978 }
4979 
4980 /*
4981  * list_sort() comparator for sorting RewriteMappingFiles in LSN order.
4982  */
4983 static int
4984 file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
4985 {
4988 
4989  if (a->lsn < b->lsn)
4990  return -1;
4991  else if (a->lsn > b->lsn)
4992  return 1;
4993  return 0;
4994 }
4995 
4996 /*
4997  * Apply any existing logical remapping files if there are any targeted at our
4998  * transaction for relid.
4999  */
5000 static void
5002 {
5003  DIR *mapping_dir;
5004  struct dirent *mapping_de;
5005  List *files = NIL;
5006  ListCell *file;
5007  Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
5008 
5009  mapping_dir = AllocateDir("pg_logical/mappings");
5010  while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
5011  {
5012  Oid f_dboid;
5013  Oid f_relid;
5014  TransactionId f_mapped_xid;
5015  TransactionId f_create_xid;
5016  XLogRecPtr f_lsn;
5017  uint32 f_hi,
5018  f_lo;
5019  RewriteMappingFile *f;
5020 
5021  if (strcmp(mapping_de->d_name, ".") == 0 ||
5022  strcmp(mapping_de->d_name, "..") == 0)
5023  continue;
5024 
5025  /* Ignore files that aren't ours */
5026  if (strncmp(mapping_de->d_name, "map-", 4) != 0)
5027  continue;
5028 
5029  if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
5030  &f_dboid, &f_relid, &f_hi, &f_lo,
5031  &f_mapped_xid, &f_create_xid) != 6)
5032  elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
5033 
5034  f_lsn = ((uint64) f_hi) << 32 | f_lo;
5035 
5036  /* mapping for another database */
5037  if (f_dboid != dboid)
5038  continue;
5039 
5040  /* mapping for another relation */
5041  if (f_relid != relid)
5042  continue;
5043 
5044  /* did the creating transaction abort? */
5045  if (!TransactionIdDidCommit(f_create_xid))
5046  continue;
5047 
5048  /* not for our transaction */
5049  if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
5050  continue;
5051 
5052  /* ok, relevant, queue for apply */
5053  f = palloc(sizeof(RewriteMappingFile));
5054  f->lsn = f_lsn;
5055  strcpy(f->fname, mapping_de->d_name);
5056  files = lappend(files, f);
5057  }
5058  FreeDir(mapping_dir);
5059 
5060  /* sort files so we apply them in LSN order */
5061  list_sort(files, file_sort_by_lsn);
5062 
5063  foreach(file, files)
5064  {
5066 
5067  elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
5068  snapshot->subxip[0]);
5069  ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
5070  pfree(f);
5071  }
5072 }
5073 
5074 /*
5075  * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
5076  * combo CIDs.
5077  */
5078 bool
5080  Snapshot snapshot,
5081  HeapTuple htup, Buffer buffer,
5082  CommandId *cmin, CommandId *cmax)
5083 {
5086  ForkNumber forkno;
5087  BlockNumber blockno;
5088  bool updated_mapping = false;
5089 
5090  /*
5091  * Return unresolved if tuplecid_data is not valid. That's because when
5092  * streaming in-progress transactions we may run into tuples with the CID
5093  * before actually decoding them. Think e.g. about INSERT followed by
5094  * TRUNCATE, where the TRUNCATE may not be decoded yet when applying the
5095  * INSERT. So in such cases, we assume the CID is from the future
5096  * command.
5097  */
5098  if (tuplecid_data == NULL)
5099  return false;
5100 
5101  /* be careful about padding */
5102  memset(&key, 0, sizeof(key));
5103 
5104  Assert(!BufferIsLocal(buffer));
5105 
5106  /*
5107  * get relfilenode from the buffer, no convenient way to access it other
5108  * than that.
5109  */
5110  BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
5111 
5112  /* tuples can only be in the main fork */
5113  Assert(forkno == MAIN_FORKNUM);
5114  Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
5115 
5116  ItemPointerCopy(&htup->t_self,
5117  &key.tid);
5118 
5119 restart:
5120  ent = (ReorderBufferTupleCidEnt *)
5121  hash_search(tuplecid_data,
5122  (void *) &key,
5123  HASH_FIND,
5124  NULL);
5125 
5126  /*
5127  * failed to find a mapping, check whether the table was rewritten and
5128  * apply mapping if so, but only do that once - there can be no new
5129  * mappings while we are in here since we have to hold a lock on the
5130  * relation.
5131  */
5132  if (ent == NULL && !updated_mapping)
5133  {
5134  UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
5135  /* now check but don't update for a mapping again */
5136  updated_mapping = true;
5137  goto restart;
5138  }
5139  else if (ent == NULL)
5140  return false;
5141 
5142  if (cmin)
5143  *cmin = ent->cmin;
5144  if (cmax)
5145  *cmax = ent->cmax;
5146  return true;
5147 }
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
XLogRecPtr first_lsn
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
#define NIL
Definition: pg_list.h:65
uint32 CommandId
Definition: c.h:601
TimestampTz commit_time
struct ReorderBufferToastEnt ReorderBufferToastEnt
struct TXNEntryFile TXNEntryFile
void AbortCurrentTransaction(void)
Definition: xact.c:3210
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
bool IsToastRelation(Relation relation)
Definition: catalog.c:146
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:862
void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
#define relpathperm(rnode, forknum)
Definition: relpath.h:83
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
ReorderBufferApplyChangeCB apply_change
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:218
#define AllocSetContextCreate
Definition: memutils.h:173
HeapTupleData * HeapTuple
Definition: htup.h:71
#define rbtxn_prepared(txn)
void * private_data
dlist_node base_snapshot_node
#define DEBUG1
Definition: elog.h:25
dlist_node * cur
Definition: ilist.h:180
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
void StartupReorderBuffer(void)
#define VARDATA(PTR)
Definition: postgres.h:315
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define fastgetattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:711
static void pgstat_report_wait_end(void)
Definition: wait_event.h:278
static int32 next
Definition: blutils.c:219
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1528
#define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)
Definition: postgres.h:391
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
int wal_segment_size
Definition: xlog.c:119
void ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:543
uint32 TransactionId
Definition: c.h:587
void ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
bool copied
Definition: snapshot.h:185
bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
Definition: snapbuild.c:394
void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
#define RBTXN_IS_STREAMED
MemoryContext hcxt
Definition: hsearch.h:86
#define DatumGetInt32(X)
Definition: postgres.h:516
int sqlerrcode
Definition: elog.h:381
#define RelationGetDescr(relation)
Definition: rel.h:503
static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb)
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
#define DEBUG3
Definition: elog.h:23
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
Definition: snapbuild.c:367
#define write(a, b, c)
Definition: win32.h:14
#define VARHDRSZ_SHORT
Definition: postgres.h:292
TransactionId by_txn_last_xid
#define VARSIZE(PTR)
Definition: postgres.h:316
ErrorData * CopyErrorData(void)
Definition: elog.c:1560
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
Definition: detoast.h:22
int64 TimestampTz
Definition: timestamp.h:39
#define PointerGetDatum(X)
Definition: postgres.h:600
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
ReorderBufferTXN * txn
static bool TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
#define VARHDRSZ
Definition: c.h:627
#define dlist_foreach(iter, lhead)
Definition: ilist.h:526
ReorderBufferStreamAbortCB stream_abort
XLogRecPtr current_restart_decoding_lsn
#define DatumGetObjectId(X)
Definition: postgres.h:544
char * pstrdup(const char *in)
Definition: mcxt.c:1299
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2759
static ReorderBufferTXN * ReorderBufferGetTXN(ReorderBuffer *rb)
static void ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:88
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void ReorderBufferFree(ReorderBuffer *rb)
#define rbtxn_is_serialized_clear(txn)
uint16 RepOriginId
Definition: xlogdefs.h:65
Size entrysize
Definition: hsearch.h:76
CommandId command_id
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:350
Snapshot snapshot_now
struct cursor * cur
Definition: ecpg.c:28
#define rbtxn_has_partial_change(txn)
char fname[MAXPGPATH]
int32 va_rawsize
Definition: postgres.h:71
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4701
#define INFO
Definition: elog.h:33
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:175
void binaryheap_replace_first(binaryheap *heap, Datum d)
Definition: binaryheap.c:204
void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileNode node, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
uint32 BlockNumber
Definition: block.h:31
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2051
ReorderBufferCommitCB commit
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:674
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:125
ReplicationSlotPersistentData data
Definition: slot.h:147
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
struct SnapshotData * Snapshot
Definition: snapshot.h:121
Form_pg_class rd_rel
Definition: rel.h:109
unsigned int Oid
Definition: postgres_ext.h:31
XLogRecPtr base_snapshot_lsn
ReorderBufferStreamCommitCB stream_commit
Definition: dirent.h:9
uint32 regd_count
Definition: snapshot.h:205
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:85
void binaryheap_add_unordered(binaryheap *heap, Datum d)
Definition: binaryheap.c:110
MemoryContext change_context
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define VARDATA_EXTERNAL(PTR)
Definition: postgres.h:323
#define PG_BINARY
Definition: c.h:1271
void FlushErrorState(void)
Definition: elog.c:1654
XLogRecPtr origin_lsn
static void ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
signed int int32
Definition: c.h:429
#define FirstCommandId
Definition: c.h:603
XLogRecPtr EndRecPtr
Definition: xlogreader.h:176
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
#define RBTXN_SKIPPED_PREPARE
HeapTupleHeader t_data
Definition: htup.h:68
#define VARATT_IS_EXTERNAL(PTR)
Definition: postgres.h:326
#define sprintf
Definition: port.h:218
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:172
#define rbtxn_is_streamed(txn)
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:440
Definition: dynahash.c:219
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
ReorderBufferStreamMessageCB stream_message
#define RBTXN_HAS_CATALOG_CHANGES
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:38
#define dlist_container(type, membername, ptr)
Definition: ilist.h:496
void pfree(void *pointer)
Definition: mcxt.c:1169
char * Pointer
Definition: c.h:418
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1616
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
Definition: dirent.c:25
#define ERROR
Definition: elog.h:46
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2467
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:225
#define VARATT_IS_SHORT(PTR)
Definition: postgres.h:339
#define RelationIsValid(relation)
Definition: rel.h:450
dlist_head changes
void ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)
dlist_head txns_by_base_snapshot_lsn
Datum binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:159
#define IsSpecInsert(action)
ReorderBufferStreamPrepareCB stream_prepare
#define MAXPGPATH
ItemPointerData t_self
Definition: htup.h:65
ReorderBufferTupleCidKey key
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:195
#define DEBUG2
Definition: elog.h:24
static bool ReorderBufferCanStream(ReorderBuffer *rb)
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:438
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
uint32 t_len
Definition: htup.h:64
#define MaxHeapTupleSize
Definition: htup_details.h:559
void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid)
struct varlena * reconstructed
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4512
#define SET_VARTAG_EXTERNAL(PTR, tag)
Definition: postgres.h:346
uint64 XLogSegNo
Definition: xlogdefs.h:48
int errcode_for_file_access(void)
Definition: elog.c:721
HeapTupleData tuple
Definition: reorderbuffer.h:29
struct SnapshotData SnapshotData
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:455
static ReorderBufferTXN * ReorderBufferLargestTopTXN(ReorderBuffer *rb)
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
#define InvalidTransactionId
Definition: transam.h:31
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:207
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
unsigned int uint32
Definition: c.h:441
XLogRecPtr final_lsn
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2678
#define RBTXN_IS_SUBXACT
Oid t_tableOid
Definition: htup.h:66
static int file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
ReorderBufferBeginCB begin_prepare
struct ReorderBufferTXN * toptxn
void RelationClose(Relation relation)