PostgreSQL Source Code  git master
reorderbuffer.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * reorderbuffer.c
4  * PostgreSQL logical replay/reorder buffer management
5  *
6  *
7  * Copyright (c) 2012-2020, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/replication/reorderbuffer.c
12  *
13  * NOTES
14  * This module gets handed individual pieces of transactions in the order
15  * they are written to the WAL and is responsible to reassemble them into
16  * toplevel transaction sized pieces. When a transaction is completely
17  * reassembled - signaled by reading the transaction commit record - it
18  * will then call the output plugin (cf. ReorderBufferCommit()) with the
19  * individual changes. The output plugins rely on snapshots built by
20  * snapbuild.c which hands them to us.
21  *
22  * Transactions and subtransactions/savepoints in postgres are not
23  * immediately linked to each other from outside the performing
24  * backend. Only at commit/abort (or special xact_assignment records) they
25  * are linked together. Which means that we will have to splice together a
26  * toplevel transaction from its subtransactions. To do that efficiently we
27  * build a binary heap indexed by the smallest current lsn of the individual
28  * subtransactions' changestreams. As the individual streams are inherently
29  * ordered by LSN - since that is where we build them from - the transaction
30  * can easily be reassembled by always using the subtransaction with the
31  * smallest current LSN from the heap.
32  *
33  * In order to cope with large transactions - which can be several times as
34  * big as the available memory - this module supports spooling the contents
35  * of a large transactions to disk. When the transaction is replayed the
36  * contents of individual (sub-)transactions will be read from disk in
37  * chunks.
38  *
39  * This module also has to deal with reassembling toast records from the
40  * individual chunks stored in WAL. When a new (or initial) version of a
41  * tuple is stored in WAL it will always be preceded by the toast chunks
42  * emitted for the columns stored out of line. Within a single toplevel
43  * transaction there will be no other data carrying records between a row's
44  * toast chunks and the row data itself. See ReorderBufferToast* for
45  * details.
46  *
47  * ReorderBuffer uses two special memory context types - SlabContext for
48  * allocations of fixed-length structures (changes and transactions), and
49  * GenerationContext for the variable-length transaction data (allocated
50  * and freed in groups with similar lifespans).
51  *
52  * To limit the amount of memory used by decoded changes, we track memory
53  * used at the reorder buffer level (i.e. total amount of memory), and for
54  * each transaction. When the total amount of used memory exceeds the
55  * limit, the transaction consuming the most memory is then serialized to
56  * disk.
57  *
58  * Only decoded changes are evicted from memory (spilled to disk), not the
59  * transaction records. The number of toplevel transactions is limited,
60  * but a transaction with many subtransactions may still consume significant
61  * amounts of memory. However, the transaction records are fairly small and
62  * are not included in the memory limit.
63  *
64  * The current eviction algorithm is very simple - the transaction is
65  * picked merely by size, while it might be useful to also consider age
66  * (LSN) of the changes for example. With the new Generational memory
67  * allocator, evicting the oldest changes would make it more likely the
68  * memory gets actually freed.
69  *
70  * We still rely on max_changes_in_memory when loading serialized changes
71  * back into memory. At that point we can't use the memory limit directly
72  * as we load the subxacts independently. One option to deal with this
73  * would be to count the subxacts, and allow each to allocate 1/N of the
74  * memory limit. That however does not seem very appealing, because with
75  * many subtransactions it may easily cause thrashing (short cycles of
76  * deserializing and applying very few changes). We probably should give
77  * a bit more memory to the oldest subtransactions, because it's likely
78  * they are the source for the next sequence of changes.
79  *
80  * -------------------------------------------------------------------------
81  */
82 #include "postgres.h"
83 
84 #include <unistd.h>
85 #include <sys/stat.h>
86 
87 #include "access/detoast.h"
88 #include "access/heapam.h"
89 #include "access/rewriteheap.h"
90 #include "access/transam.h"
91 #include "access/xact.h"
92 #include "access/xlog_internal.h"
93 #include "catalog/catalog.h"
94 #include "lib/binaryheap.h"
95 #include "miscadmin.h"
96 #include "pgstat.h"
97 #include "replication/logical.h"
99 #include "replication/slot.h"
100 #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
101 #include "storage/bufmgr.h"
102 #include "storage/fd.h"
103 #include "storage/sinval.h"
104 #include "utils/builtins.h"
105 #include "utils/combocid.h"
106 #include "utils/memdebug.h"
107 #include "utils/memutils.h"
108 #include "utils/rel.h"
109 #include "utils/relfilenodemap.h"
110 
111 
112 /* entry for a hash table we use to map from xid to our transaction state */
114 {
118 
119 /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
121 {
125 
127 {
131  CommandId combocid; /* just for debugging */
133 
134 /* Virtual file descriptor with file offset tracking */
135 typedef struct TXNEntryFile
136 {
137  File vfd; /* -1 when the file is closed */
138  off_t curOffset; /* offset for next write or read. Reset to 0
139  * when vfd is opened. */
140 } TXNEntryFile;
141 
142 /* k-way in-order change iteration support structures */
144 {
151 
153 {
159 
160 /* toast datastructures */
161 typedef struct ReorderBufferToastEnt
162 {
163  Oid chunk_id; /* toast_table.chunk_id */
164  int32 last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
165  * have seen */
166  Size num_chunks; /* number of chunks we've already seen */
167  Size size; /* combined size of chunks seen */
168  dlist_head chunks; /* linked list of chunks */
169  struct varlena *reconstructed; /* reconstructed varlena now pointed to in
170  * main tup */
172 
173 /* Disk serialization support datastructures */
175 {
178  /* data follows */
180 
181 #define IsSpecInsert(action) \
182 ( \
183  ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \
184 )
185 #define IsSpecConfirm(action) \
186 ( \
187  ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) \
188 )
189 #define IsInsertOrUpdate(action) \
190 ( \
191  (((action) == REORDER_BUFFER_CHANGE_INSERT) || \
192  ((action) == REORDER_BUFFER_CHANGE_UPDATE) || \
193  ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT)) \
194 )
195 
196 /*
197  * Maximum number of changes kept in memory, per transaction. After that,
198  * changes are spooled to disk.
199  *
200  * The current value should be sufficient to decode the entire transaction
201  * without hitting disk in OLTP workloads, while starting to spool to disk in
202  * other workloads reasonably fast.
203  *
204  * At some point in the future it probably makes sense to have a more elaborate
205  * resource management here, but it's not entirely clear what that would look
206  * like.
207  */
209 static const Size max_changes_in_memory = 4096; /* XXX for restore only */
210 
211 /* ---------------------------------------
212  * primary reorderbuffer support routines
213  * ---------------------------------------
214  */
218  TransactionId xid, bool create, bool *is_new,
219  XLogRecPtr lsn, bool create_as_top);
221  ReorderBufferTXN *subtxn);
222 
223 static void AssertTXNLsnOrder(ReorderBuffer *rb);
224 
225 /* ---------------------------------------
226  * support functions for lsn-order iterating over the ->changes of a
227  * transaction and its subtransactions
228  *
229  * used for iteration over the k-way heap merge of a transaction and its
230  * subtransactions
231  * ---------------------------------------
232  */
234  ReorderBufferIterTXNState *volatile *iter_state);
239 
240 /*
241  * ---------------------------------------
242  * Disk serialization support functions
243  * ---------------------------------------
244  */
248  int fd, ReorderBufferChange *change);
250  TXNEntryFile *file, XLogSegNo *segno);
252  char *change);
255 static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
256 static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
257  TransactionId xid, XLogSegNo segno);
258 
259 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
262 
263 /*
264  * ---------------------------------------
265  * Streaming support functions
266  * ---------------------------------------
267  */
268 static inline bool ReorderBufferCanStream(ReorderBuffer *rb);
269 static inline bool ReorderBufferCanStartStreaming(ReorderBuffer *rb);
272 
273 /* ---------------------------------------
274  * toast reassembly support
275  * ---------------------------------------
276  */
280  Relation relation, ReorderBufferChange *change);
282  Relation relation, ReorderBufferChange *change);
283 
284 /*
285  * ---------------------------------------
286  * memory accounting
287  * ---------------------------------------
288  */
291  ReorderBufferChange *change, bool addition);
292 
293 /*
294  * Allocate a new ReorderBuffer and clean out any old serialized state from
295  * prior ReorderBuffer instances for the same slot.
296  */
299 {
300  ReorderBuffer *buffer;
301  HASHCTL hash_ctl;
302  MemoryContext new_ctx;
303 
304  Assert(MyReplicationSlot != NULL);
305 
306  /* allocate memory in own context, to have better accountability */
308  "ReorderBuffer",
310 
311  buffer =
312  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
313 
314  memset(&hash_ctl, 0, sizeof(hash_ctl));
315 
316  buffer->context = new_ctx;
317 
318  buffer->change_context = SlabContextCreate(new_ctx,
319  "Change",
321  sizeof(ReorderBufferChange));
322 
323  buffer->txn_context = SlabContextCreate(new_ctx,
324  "TXN",
326  sizeof(ReorderBufferTXN));
327 
328  buffer->tup_context = GenerationContextCreate(new_ctx,
329  "Tuples",
331 
332  hash_ctl.keysize = sizeof(TransactionId);
333  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
334  hash_ctl.hcxt = buffer->context;
335 
336  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
338 
340  buffer->by_txn_last_txn = NULL;
341 
342  buffer->outbuf = NULL;
343  buffer->outbufsize = 0;
344  buffer->size = 0;
345 
347 
348  dlist_init(&buffer->toplevel_by_lsn);
350 
351  /*
352  * Ensure there's no stale data from prior uses of this slot, in case some
353  * prior exit avoided calling ReorderBufferFree. Failure to do this can
354  * produce duplicated txns, and it's very cheap if there's nothing there.
355  */
357 
358  return buffer;
359 }
360 
361 /*
362  * Free a ReorderBuffer
363  */
364 void
366 {
367  MemoryContext context = rb->context;
368 
369  /*
370  * We free separately allocated data by entirely scrapping reorderbuffer's
371  * memory context.
372  */
373  MemoryContextDelete(context);
374 
375  /* Free disk space used by unconsumed reorder buffers */
377 }
378 
379 /*
380  * Get an unused, possibly preallocated, ReorderBufferTXN.
381  */
382 static ReorderBufferTXN *
384 {
386 
387  txn = (ReorderBufferTXN *)
389 
390  memset(txn, 0, sizeof(ReorderBufferTXN));
391 
392  dlist_init(&txn->changes);
393  dlist_init(&txn->tuplecids);
394  dlist_init(&txn->subtxns);
395 
396  /* InvalidCommandId is not zero, so set it explicitly */
398 
399  return txn;
400 }
401 
402 /*
403  * Free a ReorderBufferTXN.
404  */
405 static void
407 {
408  /* clean the lookup cache if we were cached (quite likely) */
409  if (rb->by_txn_last_xid == txn->xid)
410  {
412  rb->by_txn_last_txn = NULL;
413  }
414 
415  /* free data that's contained */
416 
417  if (txn->tuplecid_hash != NULL)
418  {
420  txn->tuplecid_hash = NULL;
421  }
422 
423  if (txn->invalidations)
424  {
425  pfree(txn->invalidations);
426  txn->invalidations = NULL;
427  }
428 
429  pfree(txn);
430 }
431 
432 /*
433  * Get an fresh ReorderBufferChange.
434  */
437 {
438  ReorderBufferChange *change;
439 
440  change = (ReorderBufferChange *)
442 
443  memset(change, 0, sizeof(ReorderBufferChange));
444  return change;
445 }
446 
447 /*
448  * Free a ReorderBufferChange and update memory accounting, if requested.
449  */
450 void
452  bool upd_mem)
453 {
454  /* update memory accounting info */
455  if (upd_mem)
456  ReorderBufferChangeMemoryUpdate(rb, change, false);
457 
458  /* free contained data */
459  switch (change->action)
460  {
465  if (change->data.tp.newtuple)
466  {
467  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
468  change->data.tp.newtuple = NULL;
469  }
470 
471  if (change->data.tp.oldtuple)
472  {
473  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
474  change->data.tp.oldtuple = NULL;
475  }
476  break;
478  if (change->data.msg.prefix != NULL)
479  pfree(change->data.msg.prefix);
480  change->data.msg.prefix = NULL;
481  if (change->data.msg.message != NULL)
482  pfree(change->data.msg.message);
483  change->data.msg.message = NULL;
484  break;
486  if (change->data.snapshot)
487  {
488  ReorderBufferFreeSnap(rb, change->data.snapshot);
489  change->data.snapshot = NULL;
490  }
491  break;
492  /* no data in addition to the struct itself */
494  if (change->data.truncate.relids != NULL)
495  {
496  ReorderBufferReturnRelids(rb, change->data.truncate.relids);
497  change->data.truncate.relids = NULL;
498  }
499  break;
503  break;
504  }
505 
506  pfree(change);
507 }
508 
509 /*
510  * Get a fresh ReorderBufferTupleBuf fitting at least a tuple of size
511  * tuple_len (excluding header overhead).
512  */
515 {
516  ReorderBufferTupleBuf *tuple;
517  Size alloc_len;
518 
519  alloc_len = tuple_len + SizeofHeapTupleHeader;
520 
521  tuple = (ReorderBufferTupleBuf *)
523  sizeof(ReorderBufferTupleBuf) +
524  MAXIMUM_ALIGNOF + alloc_len);
525  tuple->alloc_tuple_size = alloc_len;
526  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
527 
528  return tuple;
529 }
530 
531 /*
532  * Free an ReorderBufferTupleBuf.
533  */
534 void
536 {
537  pfree(tuple);
538 }
539 
540 /*
541  * Get an array for relids of truncated relations.
542  *
543  * We use the global memory context (for the whole reorder buffer), because
544  * none of the existing ones seems like a good match (some are SLAB, so we
545  * can't use those, and tup_context is meant for tuple data, not relids). We
546  * could add yet another context, but it seems like an overkill - TRUNCATE is
547  * not particularly common operation, so it does not seem worth it.
548  */
549 Oid *
551 {
552  Oid *relids;
553  Size alloc_len;
554 
555  alloc_len = sizeof(Oid) * nrelids;
556 
557  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
558 
559  return relids;
560 }
561 
562 /*
563  * Free an array of relids.
564  */
565 void
567 {
568  pfree(relids);
569 }
570 
571 /*
572  * Return the ReorderBufferTXN from the given buffer, specified by Xid.
573  * If create is true, and a transaction doesn't already exist, create it
574  * (with the given LSN, and as top transaction if that's specified);
575  * when this happens, is_new is set to true.
576  */
577 static ReorderBufferTXN *
579  bool *is_new, XLogRecPtr lsn, bool create_as_top)
580 {
583  bool found;
584 
586 
587  /*
588  * Check the one-entry lookup cache first
589  */
591  rb->by_txn_last_xid == xid)
592  {
593  txn = rb->by_txn_last_txn;
594 
595  if (txn != NULL)
596  {
597  /* found it, and it's valid */
598  if (is_new)
599  *is_new = false;
600  return txn;
601  }
602 
603  /*
604  * cached as non-existent, and asked not to create? Then nothing else
605  * to do.
606  */
607  if (!create)
608  return NULL;
609  /* otherwise fall through to create it */
610  }
611 
612  /*
613  * If the cache wasn't hit or it yielded an "does-not-exist" and we want
614  * to create an entry.
615  */
616 
617  /* search the lookup table */
618  ent = (ReorderBufferTXNByIdEnt *)
619  hash_search(rb->by_txn,
620  (void *) &xid,
621  create ? HASH_ENTER : HASH_FIND,
622  &found);
623  if (found)
624  txn = ent->txn;
625  else if (create)
626  {
627  /* initialize the new entry, if creation was requested */
628  Assert(ent != NULL);
629  Assert(lsn != InvalidXLogRecPtr);
630 
631  ent->txn = ReorderBufferGetTXN(rb);
632  ent->txn->xid = xid;
633  txn = ent->txn;
634  txn->first_lsn = lsn;
636 
637  if (create_as_top)
638  {
639  dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
640  AssertTXNLsnOrder(rb);
641  }
642  }
643  else
644  txn = NULL; /* not found and not asked to create */
645 
646  /* update cache */
647  rb->by_txn_last_xid = xid;
648  rb->by_txn_last_txn = txn;
649 
650  if (is_new)
651  *is_new = !found;
652 
653  Assert(!create || txn != NULL);
654  return txn;
655 }
656 
657 /*
658  * Record the partial change for the streaming of in-progress transactions. We
659  * can stream only complete changes so if we have a partial change like toast
660  * table insert or speculative insert then we mark such a 'txn' so that it
661  * can't be streamed. We also ensure that if the changes in such a 'txn' are
662  * above logical_decoding_work_mem threshold then we stream them as soon as we
663  * have a complete change.
664  */
665 static void
667  ReorderBufferChange *change,
668  bool toast_insert)
669 {
670  ReorderBufferTXN *toptxn;
671 
672  /*
673  * The partial changes need to be processed only while streaming
674  * in-progress transactions.
675  */
676  if (!ReorderBufferCanStream(rb))
677  return;
678 
679  /* Get the top transaction. */
680  if (txn->toptxn != NULL)
681  toptxn = txn->toptxn;
682  else
683  toptxn = txn;
684 
685  /*
686  * Set the toast insert bit whenever we get toast insert to indicate a
687  * partial change and clear it when we get the insert or update on main
688  * table (Both update and insert will do the insert in the toast table).
689  */
690  if (toast_insert)
692  else if (rbtxn_has_toast_insert(toptxn) &&
693  IsInsertOrUpdate(change->action))
694  toptxn->txn_flags &= ~RBTXN_HAS_TOAST_INSERT;
695 
696  /*
697  * Set the spec insert bit whenever we get the speculative insert to
698  * indicate the partial change and clear the same on speculative confirm.
699  */
700  if (IsSpecInsert(change->action))
701  toptxn->txn_flags |= RBTXN_HAS_SPEC_INSERT;
702  else if (IsSpecConfirm(change->action))
703  {
704  /*
705  * Speculative confirm change must be preceded by speculative
706  * insertion.
707  */
708  Assert(rbtxn_has_spec_insert(toptxn));
709  toptxn->txn_flags &= ~RBTXN_HAS_SPEC_INSERT;
710  }
711 
712  /*
713  * Stream the transaction if it is serialized before and the changes are
714  * now complete in the top-level transaction.
715  *
716  * The reason for doing the streaming of such a transaction as soon as we
717  * get the complete change for it is that previously it would have reached
718  * the memory threshold and wouldn't get streamed because of incomplete
719  * changes. Delaying such transactions would increase apply lag for them.
720  */
722  !(rbtxn_has_incomplete_tuple(toptxn)) &&
723  rbtxn_is_serialized(txn))
724  ReorderBufferStreamTXN(rb, toptxn);
725 }
726 
727 /*
728  * Queue a change into a transaction so it can be replayed upon commit or will be
729  * streamed when we reach logical_decoding_work_mem threshold.
730  */
731 void
733  ReorderBufferChange *change, bool toast_insert)
734 {
736 
737  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
738 
739  /*
740  * While streaming the previous changes we have detected that the
741  * transaction is aborted. So there is no point in collecting further
742  * changes for it.
743  */
744  if (txn->concurrent_abort)
745  {
746  /*
747  * We don't need to update memory accounting for this change as we
748  * have not added it to the queue yet.
749  */
750  ReorderBufferReturnChange(rb, change, false);
751  return;
752  }
753 
754  change->lsn = lsn;
755  change->txn = txn;
756 
757  Assert(InvalidXLogRecPtr != lsn);
758  dlist_push_tail(&txn->changes, &change->node);
759  txn->nentries++;
760  txn->nentries_mem++;
761 
762  /* update memory accounting information */
763  ReorderBufferChangeMemoryUpdate(rb, change, true);
764 
765  /* process partial change */
766  ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
767 
768  /* check the memory limits and evict something if needed */
770 }
771 
772 /*
773  * Queue message into a transaction so it can be processed upon commit.
774  */
775 void
777  Snapshot snapshot, XLogRecPtr lsn,
778  bool transactional, const char *prefix,
779  Size message_size, const char *message)
780 {
781  if (transactional)
782  {
783  MemoryContext oldcontext;
784  ReorderBufferChange *change;
785 
787 
788  oldcontext = MemoryContextSwitchTo(rb->context);
789 
790  change = ReorderBufferGetChange(rb);
792  change->data.msg.prefix = pstrdup(prefix);
793  change->data.msg.message_size = message_size;
794  change->data.msg.message = palloc(message_size);
795  memcpy(change->data.msg.message, message, message_size);
796 
797  ReorderBufferQueueChange(rb, xid, lsn, change, false);
798 
799  MemoryContextSwitchTo(oldcontext);
800  }
801  else
802  {
803  ReorderBufferTXN *txn = NULL;
804  volatile Snapshot snapshot_now = snapshot;
805 
806  if (xid != InvalidTransactionId)
807  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
808 
809  /* setup snapshot to allow catalog access */
810  SetupHistoricSnapshot(snapshot_now, NULL);
811  PG_TRY();
812  {
813  rb->message(rb, txn, lsn, false, prefix, message_size, message);
814 
816  }
817  PG_CATCH();
818  {
820  PG_RE_THROW();
821  }
822  PG_END_TRY();
823  }
824 }
825 
826 /*
827  * AssertTXNLsnOrder
828  * Verify LSN ordering of transaction lists in the reorderbuffer
829  *
830  * Other LSN-related invariants are checked too.
831  *
832  * No-op if assertions are not in use.
833  */
834 static void
836 {
837 #ifdef USE_ASSERT_CHECKING
838  dlist_iter iter;
839  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
840  XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
841 
842  dlist_foreach(iter, &rb->toplevel_by_lsn)
843  {
845  iter.cur);
846 
847  /* start LSN must be set */
848  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
849 
850  /* If there is an end LSN, it must be higher than start LSN */
851  if (cur_txn->end_lsn != InvalidXLogRecPtr)
852  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
853 
854  /* Current initial LSN must be strictly higher than previous */
855  if (prev_first_lsn != InvalidXLogRecPtr)
856  Assert(prev_first_lsn < cur_txn->first_lsn);
857 
858  /* known-as-subtxn txns must not be listed */
859  Assert(!rbtxn_is_known_subxact(cur_txn));
860 
861  prev_first_lsn = cur_txn->first_lsn;
862  }
863 
865  {
867  base_snapshot_node,
868  iter.cur);
869 
870  /* base snapshot (and its LSN) must be set */
871  Assert(cur_txn->base_snapshot != NULL);
873 
874  /* current LSN must be strictly higher than previous */
875  if (prev_base_snap_lsn != InvalidXLogRecPtr)
876  Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
877 
878  /* known-as-subtxn txns must not be listed */
879  Assert(!rbtxn_is_known_subxact(cur_txn));
880 
881  prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
882  }
883 #endif
884 }
885 
886 /*
887  * AssertChangeLsnOrder
888  *
889  * Check ordering of changes in the (sub)transaction.
890  */
891 static void
893 {
894 #ifdef USE_ASSERT_CHECKING
895  dlist_iter iter;
896  XLogRecPtr prev_lsn = txn->first_lsn;
897 
898  dlist_foreach(iter, &txn->changes)
899  {
900  ReorderBufferChange *cur_change;
901 
902  cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
903 
905  Assert(cur_change->lsn != InvalidXLogRecPtr);
906  Assert(txn->first_lsn <= cur_change->lsn);
907 
908  if (txn->end_lsn != InvalidXLogRecPtr)
909  Assert(cur_change->lsn <= txn->end_lsn);
910 
911  Assert(prev_lsn <= cur_change->lsn);
912 
913  prev_lsn = cur_change->lsn;
914  }
915 #endif
916 }
917 
918 /*
919  * ReorderBufferGetOldestTXN
920  * Return oldest transaction in reorderbuffer
921  */
924 {
926 
927  AssertTXNLsnOrder(rb);
928 
930  return NULL;
931 
933 
936  return txn;
937 }
938 
939 /*
940  * ReorderBufferGetOldestXmin
941  * Return oldest Xmin in reorderbuffer
942  *
943  * Returns oldest possibly running Xid from the point of view of snapshots
944  * used in the transactions kept by reorderbuffer, or InvalidTransactionId if
945  * there are none.
946  *
947  * Since snapshots are assigned monotonically, this equals the Xmin of the
948  * base snapshot with minimal base_snapshot_lsn.
949  */
952 {
954 
955  AssertTXNLsnOrder(rb);
956 
958  return InvalidTransactionId;
959 
960  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
962  return txn->base_snapshot->xmin;
963 }
964 
965 void
967 {
969 }
970 
971 /*
972  * ReorderBufferAssignChild
973  *
974  * Make note that we know that subxid is a subtransaction of xid, seen as of
975  * the given lsn.
976  */
977 void
979  TransactionId subxid, XLogRecPtr lsn)
980 {
982  ReorderBufferTXN *subtxn;
983  bool new_top;
984  bool new_sub;
985 
986  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
987  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
988 
989  if (!new_sub)
990  {
991  if (rbtxn_is_known_subxact(subtxn))
992  {
993  /* already associated, nothing to do */
994  return;
995  }
996  else
997  {
998  /*
999  * We already saw this transaction, but initially added it to the
1000  * list of top-level txns. Now that we know it's not top-level,
1001  * remove it from there.
1002  */
1003  dlist_delete(&subtxn->node);
1004  }
1005  }
1006 
1007  subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1008  subtxn->toplevel_xid = xid;
1009  Assert(subtxn->nsubtxns == 0);
1010 
1011  /* set the reference to top-level transaction */
1012  subtxn->toptxn = txn;
1013 
1014  /* add to subtransaction list */
1015  dlist_push_tail(&txn->subtxns, &subtxn->node);
1016  txn->nsubtxns++;
1017 
1018  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1019  ReorderBufferTransferSnapToParent(txn, subtxn);
1020 
1021  /* Verify LSN-ordering invariant */
1022  AssertTXNLsnOrder(rb);
1023 }
1024 
1025 /*
1026  * ReorderBufferTransferSnapToParent
1027  * Transfer base snapshot from subtxn to top-level txn, if needed
1028  *
1029  * This is done if the top-level txn doesn't have a base snapshot, or if the
1030  * subtxn's base snapshot has an earlier LSN than the top-level txn's base
1031  * snapshot's LSN. This can happen if there are no changes in the toplevel
1032  * txn but there are some in the subtxn, or the first change in subtxn has
1033  * earlier LSN than first change in the top-level txn and we learned about
1034  * their kinship only now.
1035  *
1036  * The subtransaction's snapshot is cleared regardless of the transfer
1037  * happening, since it's not needed anymore in either case.
1038  *
1039  * We do this as soon as we become aware of their kinship, to avoid queueing
1040  * extra snapshots to txns known-as-subtxns -- only top-level txns will
1041  * receive further snapshots.
1042  */
1043 static void
1045  ReorderBufferTXN *subtxn)
1046 {
1047  Assert(subtxn->toplevel_xid == txn->xid);
1048 
1049  if (subtxn->base_snapshot != NULL)
1050  {
1051  if (txn->base_snapshot == NULL ||
1052  subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
1053  {
1054  /*
1055  * If the toplevel transaction already has a base snapshot but
1056  * it's newer than the subxact's, purge it.
1057  */
1058  if (txn->base_snapshot != NULL)
1059  {
1062  }
1063 
1064  /*
1065  * The snapshot is now the top transaction's; transfer it, and
1066  * adjust the list position of the top transaction in the list by
1067  * moving it to where the subtransaction is.
1068  */
1069  txn->base_snapshot = subtxn->base_snapshot;
1070  txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
1072  &txn->base_snapshot_node);
1073 
1074  /*
1075  * The subtransaction doesn't have a snapshot anymore (so it
1076  * mustn't be in the list.)
1077  */
1078  subtxn->base_snapshot = NULL;
1080  dlist_delete(&subtxn->base_snapshot_node);
1081  }
1082  else
1083  {
1084  /* Base snap of toplevel is fine, so subxact's is not needed */
1086  dlist_delete(&subtxn->base_snapshot_node);
1087  subtxn->base_snapshot = NULL;
1089  }
1090  }
1091 }
1092 
1093 /*
1094  * Associate a subtransaction with its toplevel transaction at commit
1095  * time. There may be no further changes added after this.
1096  */
1097 void
1099  TransactionId subxid, XLogRecPtr commit_lsn,
1100  XLogRecPtr end_lsn)
1101 {
1102  ReorderBufferTXN *subtxn;
1103 
1104  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1105  InvalidXLogRecPtr, false);
1106 
1107  /*
1108  * No need to do anything if that subtxn didn't contain any changes
1109  */
1110  if (!subtxn)
1111  return;
1112 
1113  subtxn->final_lsn = commit_lsn;
1114  subtxn->end_lsn = end_lsn;
1115 
1116  /*
1117  * Assign this subxact as a child of the toplevel xact (no-op if already
1118  * done.)
1119  */
1120  ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
1121 }
1122 
1123 
1124 /*
1125  * Support for efficiently iterating over a transaction's and its
1126  * subtransactions' changes.
1127  *
1128  * We do by doing a k-way merge between transactions/subtransactions. For that
1129  * we model the current heads of the different transactions as a binary heap
1130  * so we easily know which (sub-)transaction has the change with the smallest
1131  * lsn next.
1132  *
1133  * We assume the changes in individual transactions are already sorted by LSN.
1134  */
1135 
1136 /*
1137  * Binary heap comparison function.
1138  */
1139 static int
1141 {
1143  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
1144  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
1145 
1146  if (pos_a < pos_b)
1147  return 1;
1148  else if (pos_a == pos_b)
1149  return 0;
1150  return -1;
1151 }
1152 
1153 /*
1154  * Allocate & initialize an iterator which iterates in lsn order over a
1155  * transaction and all its subtransactions.
1156  *
1157  * Note: The iterator state is returned through iter_state parameter rather
1158  * than the function's return value. This is because the state gets cleaned up
1159  * in a PG_CATCH block in the caller, so we want to make sure the caller gets
1160  * back the state even if this function throws an exception.
1161  */
1162 static void
1164  ReorderBufferIterTXNState *volatile *iter_state)
1165 {
1166  Size nr_txns = 0;
1168  dlist_iter cur_txn_i;
1169  int32 off;
1170 
1171  *iter_state = NULL;
1172 
1173  /* Check ordering of changes in the toplevel transaction. */
1174  AssertChangeLsnOrder(txn);
1175 
1176  /*
1177  * Calculate the size of our heap: one element for every transaction that
1178  * contains changes. (Besides the transactions already in the reorder
1179  * buffer, we count the one we were directly passed.)
1180  */
1181  if (txn->nentries > 0)
1182  nr_txns++;
1183 
1184  dlist_foreach(cur_txn_i, &txn->subtxns)
1185  {
1186  ReorderBufferTXN *cur_txn;
1187 
1188  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1189 
1190  /* Check ordering of changes in this subtransaction. */
1191  AssertChangeLsnOrder(cur_txn);
1192 
1193  if (cur_txn->nentries > 0)
1194  nr_txns++;
1195  }
1196 
1197  /* allocate iteration state */
1198  state = (ReorderBufferIterTXNState *)
1200  sizeof(ReorderBufferIterTXNState) +
1201  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1202 
1203  state->nr_txns = nr_txns;
1204  dlist_init(&state->old_change);
1205 
1206  for (off = 0; off < state->nr_txns; off++)
1207  {
1208  state->entries[off].file.vfd = -1;
1209  state->entries[off].segno = 0;
1210  }
1211 
1212  /* allocate heap */
1213  state->heap = binaryheap_allocate(state->nr_txns,
1215  state);
1216 
1217  /* Now that the state fields are initialized, it is safe to return it. */
1218  *iter_state = state;
1219 
1220  /*
1221  * Now insert items into the binary heap, in an unordered fashion. (We
1222  * will run a heap assembly step at the end; this is more efficient.)
1223  */
1224 
1225  off = 0;
1226 
1227  /* add toplevel transaction if it contains changes */
1228  if (txn->nentries > 0)
1229  {
1230  ReorderBufferChange *cur_change;
1231 
1232  if (rbtxn_is_serialized(txn))
1233  {
1234  /* serialize remaining changes */
1235  ReorderBufferSerializeTXN(rb, txn);
1236  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1237  &state->entries[off].segno);
1238  }
1239 
1240  cur_change = dlist_head_element(ReorderBufferChange, node,
1241  &txn->changes);
1242 
1243  state->entries[off].lsn = cur_change->lsn;
1244  state->entries[off].change = cur_change;
1245  state->entries[off].txn = txn;
1246 
1248  }
1249 
1250  /* add subtransactions if they contain changes */
1251  dlist_foreach(cur_txn_i, &txn->subtxns)
1252  {
1253  ReorderBufferTXN *cur_txn;
1254 
1255  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1256 
1257  if (cur_txn->nentries > 0)
1258  {
1259  ReorderBufferChange *cur_change;
1260 
1261  if (rbtxn_is_serialized(cur_txn))
1262  {
1263  /* serialize remaining changes */
1264  ReorderBufferSerializeTXN(rb, cur_txn);
1265  ReorderBufferRestoreChanges(rb, cur_txn,
1266  &state->entries[off].file,
1267  &state->entries[off].segno);
1268  }
1269  cur_change = dlist_head_element(ReorderBufferChange, node,
1270  &cur_txn->changes);
1271 
1272  state->entries[off].lsn = cur_change->lsn;
1273  state->entries[off].change = cur_change;
1274  state->entries[off].txn = cur_txn;
1275 
1277  }
1278  }
1279 
1280  /* assemble a valid binary heap */
1281  binaryheap_build(state->heap);
1282 }
1283 
1284 /*
1285  * Return the next change when iterating over a transaction and its
1286  * subtransactions.
1287  *
1288  * Returns NULL when no further changes exist.
1289  */
1290 static ReorderBufferChange *
1292 {
1293  ReorderBufferChange *change;
1295  int32 off;
1296 
1297  /* nothing there anymore */
1298  if (state->heap->bh_size == 0)
1299  return NULL;
1300 
1301  off = DatumGetInt32(binaryheap_first(state->heap));
1302  entry = &state->entries[off];
1303 
1304  /* free memory we might have "leaked" in the previous *Next call */
1305  if (!dlist_is_empty(&state->old_change))
1306  {
1307  change = dlist_container(ReorderBufferChange, node,
1308  dlist_pop_head_node(&state->old_change));
1309  ReorderBufferReturnChange(rb, change, true);
1310  Assert(dlist_is_empty(&state->old_change));
1311  }
1312 
1313  change = entry->change;
1314 
1315  /*
1316  * update heap with information about which transaction has the next
1317  * relevant change in LSN order
1318  */
1319 
1320  /* there are in-memory changes */
1321  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1322  {
1323  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1324  ReorderBufferChange *next_change =
1325  dlist_container(ReorderBufferChange, node, next);
1326 
1327  /* txn stays the same */
1328  state->entries[off].lsn = next_change->lsn;
1329  state->entries[off].change = next_change;
1330 
1332  return change;
1333  }
1334 
1335  /* try to load changes from disk */
1336  if (entry->txn->nentries != entry->txn->nentries_mem)
1337  {
1338  /*
1339  * Ugly: restoring changes will reuse *Change records, thus delete the
1340  * current one from the per-tx list and only free in the next call.
1341  */
1342  dlist_delete(&change->node);
1343  dlist_push_tail(&state->old_change, &change->node);
1344 
1345  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1346  &state->entries[off].segno))
1347  {
1348  /* successfully restored changes from disk */
1349  ReorderBufferChange *next_change =
1351  &entry->txn->changes);
1352 
1353  elog(DEBUG2, "restored %u/%u changes from disk",
1354  (uint32) entry->txn->nentries_mem,
1355  (uint32) entry->txn->nentries);
1356 
1357  Assert(entry->txn->nentries_mem);
1358  /* txn stays the same */
1359  state->entries[off].lsn = next_change->lsn;
1360  state->entries[off].change = next_change;
1362 
1363  return change;
1364  }
1365  }
1366 
1367  /* ok, no changes there anymore, remove */
1368  binaryheap_remove_first(state->heap);
1369 
1370  return change;
1371 }
1372 
1373 /*
1374  * Deallocate the iterator
1375  */
1376 static void
1379 {
1380  int32 off;
1381 
1382  for (off = 0; off < state->nr_txns; off++)
1383  {
1384  if (state->entries[off].file.vfd != -1)
1385  FileClose(state->entries[off].file.vfd);
1386  }
1387 
1388  /* free memory we might have "leaked" in the last *Next call */
1389  if (!dlist_is_empty(&state->old_change))
1390  {
1391  ReorderBufferChange *change;
1392 
1393  change = dlist_container(ReorderBufferChange, node,
1394  dlist_pop_head_node(&state->old_change));
1395  ReorderBufferReturnChange(rb, change, true);
1396  Assert(dlist_is_empty(&state->old_change));
1397  }
1398 
1399  binaryheap_free(state->heap);
1400  pfree(state);
1401 }
1402 
1403 /*
1404  * Cleanup the contents of a transaction, usually after the transaction
1405  * committed or aborted.
1406  */
1407 static void
1409 {
1410  bool found;
1411  dlist_mutable_iter iter;
1412 
1413  /* cleanup subtransactions & their changes */
1414  dlist_foreach_modify(iter, &txn->subtxns)
1415  {
1416  ReorderBufferTXN *subtxn;
1417 
1418  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1419 
1420  /*
1421  * Subtransactions are always associated to the toplevel TXN, even if
1422  * they originally were happening inside another subtxn, so we won't
1423  * ever recurse more than one level deep here.
1424  */
1425  Assert(rbtxn_is_known_subxact(subtxn));
1426  Assert(subtxn->nsubtxns == 0);
1427 
1428  ReorderBufferCleanupTXN(rb, subtxn);
1429  }
1430 
1431  /* cleanup changes in the toplevel txn */
1432  dlist_foreach_modify(iter, &txn->changes)
1433  {
1434  ReorderBufferChange *change;
1435 
1436  change = dlist_container(ReorderBufferChange, node, iter.cur);
1437 
1438  /* Check we're not mixing changes from different transactions. */
1439  Assert(change->txn == txn);
1440 
1441  ReorderBufferReturnChange(rb, change, true);
1442  }
1443 
1444  /*
1445  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1446  * They are always stored in the toplevel transaction.
1447  */
1448  dlist_foreach_modify(iter, &txn->tuplecids)
1449  {
1450  ReorderBufferChange *change;
1451 
1452  change = dlist_container(ReorderBufferChange, node, iter.cur);
1453 
1454  /* Check we're not mixing changes from different transactions. */
1455  Assert(change->txn == txn);
1457 
1458  ReorderBufferReturnChange(rb, change, true);
1459  }
1460 
1461  /*
1462  * Cleanup the base snapshot, if set.
1463  */
1464  if (txn->base_snapshot != NULL)
1465  {
1468  }
1469 
1470  /*
1471  * Cleanup the snapshot for the last streamed run.
1472  */
1473  if (txn->snapshot_now != NULL)
1474  {
1475  Assert(rbtxn_is_streamed(txn));
1477  }
1478 
1479  /*
1480  * Remove TXN from its containing list.
1481  *
1482  * Note: if txn is known as subxact, we are deleting the TXN from its
1483  * parent's list of known subxacts; this leaves the parent's nsubxacts
1484  * count too high, but we don't care. Otherwise, we are deleting the TXN
1485  * from the LSN-ordered list of toplevel TXNs.
1486  */
1487  dlist_delete(&txn->node);
1488 
1489  /* now remove reference from buffer */
1490  hash_search(rb->by_txn,
1491  (void *) &txn->xid,
1492  HASH_REMOVE,
1493  &found);
1494  Assert(found);
1495 
1496  /* remove entries spilled to disk */
1497  if (rbtxn_is_serialized(txn))
1498  ReorderBufferRestoreCleanup(rb, txn);
1499 
1500  /* deallocate */
1501  ReorderBufferReturnTXN(rb, txn);
1502 }
1503 
1504 /*
1505  * Discard changes from a transaction (and subtransactions), after streaming
1506  * them. Keep the remaining info - transactions, tuplecids, invalidations and
1507  * snapshots.
1508  */
1509 static void
1511 {
1512  dlist_mutable_iter iter;
1513 
1514  /* cleanup subtransactions & their changes */
1515  dlist_foreach_modify(iter, &txn->subtxns)
1516  {
1517  ReorderBufferTXN *subtxn;
1518 
1519  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1520 
1521  /*
1522  * Subtransactions are always associated to the toplevel TXN, even if
1523  * they originally were happening inside another subtxn, so we won't
1524  * ever recurse more than one level deep here.
1525  */
1526  Assert(rbtxn_is_known_subxact(subtxn));
1527  Assert(subtxn->nsubtxns == 0);
1528 
1529  ReorderBufferTruncateTXN(rb, subtxn);
1530  }
1531 
1532  /* cleanup changes in the toplevel txn */
1533  dlist_foreach_modify(iter, &txn->changes)
1534  {
1535  ReorderBufferChange *change;
1536 
1537  change = dlist_container(ReorderBufferChange, node, iter.cur);
1538 
1539  /* Check we're not mixing changes from different transactions. */
1540  Assert(change->txn == txn);
1541 
1542  /* remove the change from it's containing list */
1543  dlist_delete(&change->node);
1544 
1545  ReorderBufferReturnChange(rb, change, true);
1546  }
1547 
1548  /*
1549  * Mark the transaction as streamed.
1550  *
1551  * The toplevel transaction, identified by (toptxn==NULL), is marked as
1552  * streamed always, even if it does not contain any changes (that is, when
1553  * all the changes are in subtransactions).
1554  *
1555  * For subtransactions, we only mark them as streamed when there are
1556  * changes in them.
1557  *
1558  * We do it this way because of aborts - we don't want to send aborts for
1559  * XIDs the downstream is not aware of. And of course, it always knows
1560  * about the toplevel xact (we send the XID in all messages), but we never
1561  * stream XIDs of empty subxacts.
1562  */
1563  if ((!txn->toptxn) || (txn->nentries_mem != 0))
1564  txn->txn_flags |= RBTXN_IS_STREAMED;
1565 
1566  /*
1567  * Destroy the (relfilenode, ctid) hashtable, so that we don't leak any
1568  * memory. We could also keep the hash table and update it with new ctid
1569  * values, but this seems simpler and good enough for now.
1570  */
1571  if (txn->tuplecid_hash != NULL)
1572  {
1574  txn->tuplecid_hash = NULL;
1575  }
1576 
1577  /* If this txn is serialized then clean the disk space. */
1578  if (rbtxn_is_serialized(txn))
1579  {
1580  ReorderBufferRestoreCleanup(rb, txn);
1581  txn->txn_flags &= ~RBTXN_IS_SERIALIZED;
1582  }
1583 
1584  /* also reset the number of entries in the transaction */
1585  txn->nentries_mem = 0;
1586  txn->nentries = 0;
1587 }
1588 
1589 /*
1590  * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by
1591  * HeapTupleSatisfiesHistoricMVCC.
1592  */
1593 static void
1595 {
1596  dlist_iter iter;
1597  HASHCTL hash_ctl;
1598 
1600  return;
1601 
1602  memset(&hash_ctl, 0, sizeof(hash_ctl));
1603 
1604  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1605  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1606  hash_ctl.hcxt = rb->context;
1607 
1608  /*
1609  * create the hash with the exact number of to-be-stored tuplecids from
1610  * the start
1611  */
1612  txn->tuplecid_hash =
1613  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1615 
1616  dlist_foreach(iter, &txn->tuplecids)
1617  {
1620  bool found;
1621  ReorderBufferChange *change;
1622 
1623  change = dlist_container(ReorderBufferChange, node, iter.cur);
1624 
1626 
1627  /* be careful about padding */
1628  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1629 
1630  key.relnode = change->data.tuplecid.node;
1631 
1632  ItemPointerCopy(&change->data.tuplecid.tid,
1633  &key.tid);
1634 
1635  ent = (ReorderBufferTupleCidEnt *)
1637  (void *) &key,
1639  &found);
1640  if (!found)
1641  {
1642  ent->cmin = change->data.tuplecid.cmin;
1643  ent->cmax = change->data.tuplecid.cmax;
1644  ent->combocid = change->data.tuplecid.combocid;
1645  }
1646  else
1647  {
1648  /*
1649  * Maybe we already saw this tuple before in this transaction, but
1650  * if so it must have the same cmin.
1651  */
1652  Assert(ent->cmin == change->data.tuplecid.cmin);
1653 
1654  /*
1655  * cmax may be initially invalid, but once set it can only grow,
1656  * and never become invalid again.
1657  */
1658  Assert((ent->cmax == InvalidCommandId) ||
1659  ((change->data.tuplecid.cmax != InvalidCommandId) &&
1660  (change->data.tuplecid.cmax > ent->cmax)));
1661  ent->cmax = change->data.tuplecid.cmax;
1662  }
1663  }
1664 }
1665 
1666 /*
1667  * Copy a provided snapshot so we can modify it privately. This is needed so
1668  * that catalog modifying transactions can look into intermediate catalog
1669  * states.
1670  */
1671 static Snapshot
1674 {
1675  Snapshot snap;
1676  dlist_iter iter;
1677  int i = 0;
1678  Size size;
1679 
1680  size = sizeof(SnapshotData) +
1681  sizeof(TransactionId) * orig_snap->xcnt +
1682  sizeof(TransactionId) * (txn->nsubtxns + 1);
1683 
1684  snap = MemoryContextAllocZero(rb->context, size);
1685  memcpy(snap, orig_snap, sizeof(SnapshotData));
1686 
1687  snap->copied = true;
1688  snap->active_count = 1; /* mark as active so nobody frees it */
1689  snap->regd_count = 0;
1690  snap->xip = (TransactionId *) (snap + 1);
1691 
1692  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1693 
1694  /*
1695  * snap->subxip contains all txids that belong to our transaction which we
1696  * need to check via cmin/cmax. That's why we store the toplevel
1697  * transaction in there as well.
1698  */
1699  snap->subxip = snap->xip + snap->xcnt;
1700  snap->subxip[i++] = txn->xid;
1701 
1702  /*
1703  * subxcnt isn't decreased when subtransactions abort, so count manually.
1704  * Since it's an upper boundary it is safe to use it for the allocation
1705  * above.
1706  */
1707  snap->subxcnt = 1;
1708 
1709  dlist_foreach(iter, &txn->subtxns)
1710  {
1711  ReorderBufferTXN *sub_txn;
1712 
1713  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1714  snap->subxip[i++] = sub_txn->xid;
1715  snap->subxcnt++;
1716  }
1717 
1718  /* sort so we can bsearch() later */
1719  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1720 
1721  /* store the specified current CommandId */
1722  snap->curcid = cid;
1723 
1724  return snap;
1725 }
1726 
1727 /*
1728  * Free a previously ReorderBufferCopySnap'ed snapshot
1729  */
1730 static void
1732 {
1733  if (snap->copied)
1734  pfree(snap);
1735  else
1737 }
1738 
1739 /*
1740  * If the transaction was (partially) streamed, we need to commit it in a
1741  * 'streamed' way. That is, we first stream the remaining part of the
1742  * transaction, and then invoke stream_commit message.
1743  */
1744 static void
1746 {
1747  /* we should only call this for previously streamed transactions */
1748  Assert(rbtxn_is_streamed(txn));
1749 
1750  ReorderBufferStreamTXN(rb, txn);
1751 
1752  rb->stream_commit(rb, txn, txn->final_lsn);
1753 
1754  ReorderBufferCleanupTXN(rb, txn);
1755 }
1756 
1757 /*
1758  * Set xid to detect concurrent aborts.
1759  *
1760  * While streaming an in-progress transaction there is a possibility that the
1761  * (sub)transaction might get aborted concurrently. In such case if the
1762  * (sub)transaction has catalog update then we might decode the tuple using
1763  * wrong catalog version. For example, suppose there is one catalog tuple with
1764  * (xmin: 500, xmax: 0). Now, the transaction 501 updates the catalog tuple
1765  * and after that we will have two tuples (xmin: 500, xmax: 501) and
1766  * (xmin: 501, xmax: 0). Now, if 501 is aborted and some other transaction
1767  * say 502 updates the same catalog tuple then the first tuple will be changed
1768  * to (xmin: 500, xmax: 502). So, the problem is that when we try to decode
1769  * the tuple inserted/updated in 501 after the catalog update, we will see the
1770  * catalog tuple with (xmin: 500, xmax: 502) as visible because it will
1771  * consider that the tuple is deleted by xid 502 which is not visible to our
1772  * snapshot. And when we will try to decode with that catalog tuple, it can
1773  * lead to a wrong result or a crash. So, it is necessary to detect
1774  * concurrent aborts to allow streaming of in-progress transactions.
1775  *
1776  * For detecting the concurrent abort we set CheckXidAlive to the current
1777  * (sub)transaction's xid for which this change belongs to. And, during
1778  * catalog scan we can check the status of the xid and if it is aborted we will
1779  * report a specific error so that we can stop streaming current transaction
1780  * and discard the already streamed changes on such an error. We might have
1781  * already streamed some of the changes for the aborted (sub)transaction, but
1782  * that is fine because when we decode the abort we will stream abort message
1783  * to truncate the changes in the subscriber.
1784  */
1785 static inline void
1787 {
1788  /*
1789  * If the input transaction id is already set as a CheckXidAlive then
1790  * nothing to do.
1791  */
1793  return;
1794 
1795  /*
1796  * setup CheckXidAlive if it's not committed yet. We don't check if the
1797  * xid is aborted. That will happen during catalog access.
1798  */
1799  if (!TransactionIdDidCommit(xid))
1800  CheckXidAlive = xid;
1801  else
1803 }
1804 
1805 /*
1806  * Helper function for ReorderBufferProcessTXN for applying change.
1807  */
1808 static inline void
1810  Relation relation, ReorderBufferChange *change,
1811  bool streaming)
1812 {
1813  if (streaming)
1814  rb->stream_change(rb, txn, relation, change);
1815  else
1816  rb->apply_change(rb, txn, relation, change);
1817 }
1818 
1819 /*
1820  * Helper function for ReorderBufferProcessTXN for applying the truncate.
1821  */
1822 static inline void
1824  int nrelations, Relation *relations,
1825  ReorderBufferChange *change, bool streaming)
1826 {
1827  if (streaming)
1828  rb->stream_truncate(rb, txn, nrelations, relations, change);
1829  else
1830  rb->apply_truncate(rb, txn, nrelations, relations, change);
1831 }
1832 
1833 /*
1834  * Helper function for ReorderBufferProcessTXN for applying the message.
1835  */
1836 static inline void
1838  ReorderBufferChange *change, bool streaming)
1839 {
1840  if (streaming)
1841  rb->stream_message(rb, txn, change->lsn, true,
1842  change->data.msg.prefix,
1843  change->data.msg.message_size,
1844  change->data.msg.message);
1845  else
1846  rb->message(rb, txn, change->lsn, true,
1847  change->data.msg.prefix,
1848  change->data.msg.message_size,
1849  change->data.msg.message);
1850 }
1851 
1852 /*
1853  * Function to store the command id and snapshot at the end of the current
1854  * stream so that we can reuse the same while sending the next stream.
1855  */
1856 static inline void
1858  Snapshot snapshot_now, CommandId command_id)
1859 {
1860  txn->command_id = command_id;
1861 
1862  /* Avoid copying if it's already copied. */
1863  if (snapshot_now->copied)
1864  txn->snapshot_now = snapshot_now;
1865  else
1866  txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1867  txn, command_id);
1868 }
1869 
1870 /*
1871  * Helper function for ReorderBufferProcessTXN to handle the concurrent
1872  * abort of the streaming transaction. This resets the TXN such that it
1873  * can be used to stream the remaining data of transaction being processed.
1874  */
1875 static void
1877  Snapshot snapshot_now,
1878  CommandId command_id,
1879  XLogRecPtr last_lsn,
1880  ReorderBufferChange *specinsert)
1881 {
1882  /* Discard the changes that we just streamed */
1883  ReorderBufferTruncateTXN(rb, txn);
1884 
1885  /* Free all resources allocated for toast reconstruction */
1886  ReorderBufferToastReset(rb, txn);
1887 
1888  /* Return the spec insert change if it is not NULL */
1889  if (specinsert != NULL)
1890  {
1891  ReorderBufferReturnChange(rb, specinsert, true);
1892  specinsert = NULL;
1893  }
1894 
1895  /* Stop the stream. */
1896  rb->stream_stop(rb, txn, last_lsn);
1897 
1898  /* Remember the command ID and snapshot for the streaming run */
1899  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
1900 }
1901 
1902 /*
1903  * Helper function for ReorderBufferCommit and ReorderBufferStreamTXN.
1904  *
1905  * Send data of a transaction (and its subtransactions) to the
1906  * output plugin. We iterate over the top and subtransactions (using a k-way
1907  * merge) and replay the changes in lsn order.
1908  *
1909  * If streaming is true then data will be sent using stream API.
1910  *
1911  * Note: "volatile" markers on some parameters are to avoid trouble with
1912  * PG_TRY inside the function.
1913  */
1914 static void
1916  XLogRecPtr commit_lsn,
1917  volatile Snapshot snapshot_now,
1918  volatile CommandId command_id,
1919  bool streaming)
1920 {
1921  bool using_subtxn;
1923  ReorderBufferIterTXNState *volatile iterstate = NULL;
1924  volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
1925  ReorderBufferChange *volatile specinsert = NULL;
1926  volatile bool stream_started = false;
1927  ReorderBufferTXN *volatile curtxn = NULL;
1928 
1929  /* build data to be able to lookup the CommandIds of catalog tuples */
1931 
1932  /* setup the initial snapshot */
1933  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1934 
1935  /*
1936  * Decoding needs access to syscaches et al., which in turn use
1937  * heavyweight locks and such. Thus we need to have enough state around to
1938  * keep track of those. The easiest way is to simply use a transaction
1939  * internally. That also allows us to easily enforce that nothing writes
1940  * to the database by checking for xid assignments.
1941  *
1942  * When we're called via the SQL SRF there's already a transaction
1943  * started, so start an explicit subtransaction there.
1944  */
1945  using_subtxn = IsTransactionOrTransactionBlock();
1946 
1947  PG_TRY();
1948  {
1949  ReorderBufferChange *change;
1950 
1951  if (using_subtxn)
1952  BeginInternalSubTransaction(streaming ? "stream" : "replay");
1953  else
1955 
1956  /* We only need to send begin/commit for non-streamed transactions. */
1957  if (!streaming)
1958  rb->begin(rb, txn);
1959 
1960  ReorderBufferIterTXNInit(rb, txn, &iterstate);
1961  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1962  {
1963  Relation relation = NULL;
1964  Oid reloid;
1965 
1966  /*
1967  * We can't call start stream callback before processing first
1968  * change.
1969  */
1970  if (prev_lsn == InvalidXLogRecPtr)
1971  {
1972  if (streaming)
1973  {
1974  txn->origin_id = change->origin_id;
1975  rb->stream_start(rb, txn, change->lsn);
1976  stream_started = true;
1977  }
1978  }
1979 
1980  /*
1981  * Enforce correct ordering of changes, merged from multiple
1982  * subtransactions. The changes may have the same LSN due to
1983  * MULTI_INSERT xlog records.
1984  */
1985  Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
1986 
1987  prev_lsn = change->lsn;
1988 
1989  /* Set the current xid to detect concurrent aborts. */
1990  if (streaming)
1991  {
1992  curtxn = change->txn;
1993  SetupCheckXidLive(curtxn->xid);
1994  }
1995 
1996  switch (change->action)
1997  {
1999 
2000  /*
2001  * Confirmation for speculative insertion arrived. Simply
2002  * use as a normal record. It'll be cleaned up at the end
2003  * of INSERT processing.
2004  */
2005  if (specinsert == NULL)
2006  elog(ERROR, "invalid ordering of speculative insertion changes");
2007  Assert(specinsert->data.tp.oldtuple == NULL);
2008  change = specinsert;
2010 
2011  /* intentionally fall through */
2015  Assert(snapshot_now);
2016 
2017  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
2018  change->data.tp.relnode.relNode);
2019 
2020  /*
2021  * Mapped catalog tuple without data, emitted while
2022  * catalog table was in the process of being rewritten. We
2023  * can fail to look up the relfilenode, because the
2024  * relmapper has no "historic" view, in contrast to normal
2025  * the normal catalog during decoding. Thus repeated
2026  * rewrites can cause a lookup failure. That's OK because
2027  * we do not decode catalog changes anyway. Normally such
2028  * tuples would be skipped over below, but we can't
2029  * identify whether the table should be logically logged
2030  * without mapping the relfilenode to the oid.
2031  */
2032  if (reloid == InvalidOid &&
2033  change->data.tp.newtuple == NULL &&
2034  change->data.tp.oldtuple == NULL)
2035  goto change_done;
2036  else if (reloid == InvalidOid)
2037  elog(ERROR, "could not map filenode \"%s\" to relation OID",
2038  relpathperm(change->data.tp.relnode,
2039  MAIN_FORKNUM));
2040 
2041  relation = RelationIdGetRelation(reloid);
2042 
2043  if (!RelationIsValid(relation))
2044  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
2045  reloid,
2046  relpathperm(change->data.tp.relnode,
2047  MAIN_FORKNUM));
2048 
2049  if (!RelationIsLogicallyLogged(relation))
2050  goto change_done;
2051 
2052  /*
2053  * Ignore temporary heaps created during DDL unless the
2054  * plugin has asked for them.
2055  */
2056  if (relation->rd_rel->relrewrite && !rb->output_rewrites)
2057  goto change_done;
2058 
2059  /*
2060  * For now ignore sequence changes entirely. Most of the
2061  * time they don't log changes using records we
2062  * understand, so it doesn't make sense to handle the few
2063  * cases we do.
2064  */
2065  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
2066  goto change_done;
2067 
2068  /* user-triggered change */
2069  if (!IsToastRelation(relation))
2070  {
2071  ReorderBufferToastReplace(rb, txn, relation, change);
2072  ReorderBufferApplyChange(rb, txn, relation, change,
2073  streaming);
2074 
2075  /*
2076  * Only clear reassembled toast chunks if we're sure
2077  * they're not required anymore. The creator of the
2078  * tuple tells us.
2079  */
2080  if (change->data.tp.clear_toast_afterwards)
2081  ReorderBufferToastReset(rb, txn);
2082  }
2083  /* we're not interested in toast deletions */
2084  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
2085  {
2086  /*
2087  * Need to reassemble the full toasted Datum in
2088  * memory, to ensure the chunks don't get reused till
2089  * we're done remove it from the list of this
2090  * transaction's changes. Otherwise it will get
2091  * freed/reused while restoring spooled data from
2092  * disk.
2093  */
2094  Assert(change->data.tp.newtuple != NULL);
2095 
2096  dlist_delete(&change->node);
2097  ReorderBufferToastAppendChunk(rb, txn, relation,
2098  change);
2099  }
2100 
2101  change_done:
2102 
2103  /*
2104  * Either speculative insertion was confirmed, or it was
2105  * unsuccessful and the record isn't needed anymore.
2106  */
2107  if (specinsert != NULL)
2108  {
2109  ReorderBufferReturnChange(rb, specinsert, true);
2110  specinsert = NULL;
2111  }
2112 
2113  if (RelationIsValid(relation))
2114  {
2115  RelationClose(relation);
2116  relation = NULL;
2117  }
2118  break;
2119 
2121 
2122  /*
2123  * Speculative insertions are dealt with by delaying the
2124  * processing of the insert until the confirmation record
2125  * arrives. For that we simply unlink the record from the
2126  * chain, so it does not get freed/reused while restoring
2127  * spooled data from disk.
2128  *
2129  * This is safe in the face of concurrent catalog changes
2130  * because the relevant relation can't be changed between
2131  * speculative insertion and confirmation due to
2132  * CheckTableNotInUse() and locking.
2133  */
2134 
2135  /* clear out a pending (and thus failed) speculation */
2136  if (specinsert != NULL)
2137  {
2138  ReorderBufferReturnChange(rb, specinsert, true);
2139  specinsert = NULL;
2140  }
2141 
2142  /* and memorize the pending insertion */
2143  dlist_delete(&change->node);
2144  specinsert = change;
2145  break;
2146 
2148  {
2149  int i;
2150  int nrelids = change->data.truncate.nrelids;
2151  int nrelations = 0;
2152  Relation *relations;
2153 
2154  relations = palloc0(nrelids * sizeof(Relation));
2155  for (i = 0; i < nrelids; i++)
2156  {
2157  Oid relid = change->data.truncate.relids[i];
2158  Relation relation;
2159 
2160  relation = RelationIdGetRelation(relid);
2161 
2162  if (!RelationIsValid(relation))
2163  elog(ERROR, "could not open relation with OID %u", relid);
2164 
2165  if (!RelationIsLogicallyLogged(relation))
2166  continue;
2167 
2168  relations[nrelations++] = relation;
2169  }
2170 
2171  /* Apply the truncate. */
2172  ReorderBufferApplyTruncate(rb, txn, nrelations,
2173  relations, change,
2174  streaming);
2175 
2176  for (i = 0; i < nrelations; i++)
2177  RelationClose(relations[i]);
2178 
2179  break;
2180  }
2181 
2183  ReorderBufferApplyMessage(rb, txn, change, streaming);
2184  break;
2185 
2187  /* get rid of the old */
2188  TeardownHistoricSnapshot(false);
2189 
2190  if (snapshot_now->copied)
2191  {
2192  ReorderBufferFreeSnap(rb, snapshot_now);
2193  snapshot_now =
2194  ReorderBufferCopySnap(rb, change->data.snapshot,
2195  txn, command_id);
2196  }
2197 
2198  /*
2199  * Restored from disk, need to be careful not to double
2200  * free. We could introduce refcounting for that, but for
2201  * now this seems infrequent enough not to care.
2202  */
2203  else if (change->data.snapshot->copied)
2204  {
2205  snapshot_now =
2206  ReorderBufferCopySnap(rb, change->data.snapshot,
2207  txn, command_id);
2208  }
2209  else
2210  {
2211  snapshot_now = change->data.snapshot;
2212  }
2213 
2214  /* and continue with the new one */
2215  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2216  break;
2217 
2219  Assert(change->data.command_id != InvalidCommandId);
2220 
2221  if (command_id < change->data.command_id)
2222  {
2223  command_id = change->data.command_id;
2224 
2225  if (!snapshot_now->copied)
2226  {
2227  /* we don't use the global one anymore */
2228  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2229  txn, command_id);
2230  }
2231 
2232  snapshot_now->curcid = command_id;
2233 
2234  TeardownHistoricSnapshot(false);
2235  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2236 
2237  /*
2238  * Every time the CommandId is incremented, we could
2239  * see new catalog contents, so execute all
2240  * invalidations.
2241  */
2243  }
2244 
2245  break;
2246 
2248  elog(ERROR, "tuplecid value in changequeue");
2249  break;
2250  }
2251  }
2252 
2253  /*
2254  * There's a speculative insertion remaining, just clean in up, it
2255  * can't have been successful, otherwise we'd gotten a confirmation
2256  * record.
2257  */
2258  if (specinsert)
2259  {
2260  ReorderBufferReturnChange(rb, specinsert, true);
2261  specinsert = NULL;
2262  }
2263 
2264  /* clean up the iterator */
2265  ReorderBufferIterTXNFinish(rb, iterstate);
2266  iterstate = NULL;
2267 
2268  /*
2269  * Done with current changes, send the last message for this set of
2270  * changes depending upon streaming mode.
2271  */
2272  if (streaming)
2273  {
2274  if (stream_started)
2275  {
2276  rb->stream_stop(rb, txn, prev_lsn);
2277  stream_started = false;
2278  }
2279  }
2280  else
2281  rb->commit(rb, txn, commit_lsn);
2282 
2283  /* this is just a sanity check against bad output plugin behaviour */
2285  elog(ERROR, "output plugin used XID %u",
2287 
2288  /*
2289  * Remember the command ID and snapshot for the next set of changes in
2290  * streaming mode.
2291  */
2292  if (streaming)
2293  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2294  else if (snapshot_now->copied)
2295  ReorderBufferFreeSnap(rb, snapshot_now);
2296 
2297  /* cleanup */
2298  TeardownHistoricSnapshot(false);
2299 
2300  /*
2301  * Aborting the current (sub-)transaction as a whole has the right
2302  * semantics. We want all locks acquired in here to be released, not
2303  * reassigned to the parent and we do not want any database access
2304  * have persistent effects.
2305  */
2307 
2308  /* make sure there's no cache pollution */
2310 
2311  if (using_subtxn)
2313 
2314  /*
2315  * If we are streaming the in-progress transaction then discard the
2316  * changes that we just streamed, and mark the transactions as
2317  * streamed (if they contained changes). Otherwise, remove all the
2318  * changes and deallocate the ReorderBufferTXN.
2319  */
2320  if (streaming)
2321  {
2322  ReorderBufferTruncateTXN(rb, txn);
2323 
2324  /* Reset the CheckXidAlive */
2326  }
2327  else
2328  ReorderBufferCleanupTXN(rb, txn);
2329  }
2330  PG_CATCH();
2331  {
2332  MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
2333  ErrorData *errdata = CopyErrorData();
2334 
2335  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
2336  if (iterstate)
2337  ReorderBufferIterTXNFinish(rb, iterstate);
2338 
2340 
2341  /*
2342  * Force cache invalidation to happen outside of a valid transaction
2343  * to prevent catalog access as we just caught an error.
2344  */
2346 
2347  /* make sure there's no cache pollution */
2349 
2350  if (using_subtxn)
2352 
2353  /*
2354  * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
2355  * abort of the (sub)transaction we are streaming. We need to do the
2356  * cleanup and return gracefully on this error, see SetupCheckXidLive.
2357  */
2358  if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK)
2359  {
2360  /*
2361  * This error can only occur when we are sending the data in
2362  * streaming mode and the streaming is not finished yet.
2363  */
2364  Assert(streaming);
2365  Assert(stream_started);
2366 
2367  /* Cleanup the temporary error state. */
2368  FlushErrorState();
2369  FreeErrorData(errdata);
2370  errdata = NULL;
2371  curtxn->concurrent_abort = true;
2372 
2373  /* Reset the TXN so that it is allowed to stream remaining data. */
2374  ReorderBufferResetTXN(rb, txn, snapshot_now,
2375  command_id, prev_lsn,
2376  specinsert);
2377  }
2378  else
2379  {
2380  ReorderBufferCleanupTXN(rb, txn);
2381  MemoryContextSwitchTo(ecxt);
2382  PG_RE_THROW();
2383  }
2384  }
2385  PG_END_TRY();
2386 }
2387 
2388 /*
2389  * Perform the replay of a transaction and its non-aborted subtransactions.
2390  *
2391  * Subtransactions previously have to be processed by
2392  * ReorderBufferCommitChild(), even if previously assigned to the toplevel
2393  * transaction with ReorderBufferAssignChild.
2394  *
2395  * This interface is called once a toplevel commit is read for both streamed
2396  * as well as non-streamed transactions.
2397  */
2398 void
2400  XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2401  TimestampTz commit_time,
2402  RepOriginId origin_id, XLogRecPtr origin_lsn)
2403 {
2405  Snapshot snapshot_now;
2406  CommandId command_id = FirstCommandId;
2407 
2408  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2409  false);
2410 
2411  /* unknown transaction, nothing to replay */
2412  if (txn == NULL)
2413  return;
2414 
2415  txn->final_lsn = commit_lsn;
2416  txn->end_lsn = end_lsn;
2417  txn->commit_time = commit_time;
2418  txn->origin_id = origin_id;
2419  txn->origin_lsn = origin_lsn;
2420 
2421  /*
2422  * If the transaction was (partially) streamed, we need to commit it in a
2423  * 'streamed' way. That is, we first stream the remaining part of the
2424  * transaction, and then invoke stream_commit message.
2425  *
2426  * Called after everything (origin ID, LSN, ...) is stored in the
2427  * transaction to avoid passing that information directly.
2428  */
2429  if (rbtxn_is_streamed(txn))
2430  {
2431  ReorderBufferStreamCommit(rb, txn);
2432  return;
2433  }
2434 
2435  /*
2436  * If this transaction has no snapshot, it didn't make any changes to the
2437  * database, so there's nothing to decode. Note that
2438  * ReorderBufferCommitChild will have transferred any snapshots from
2439  * subtransactions if there were any.
2440  */
2441  if (txn->base_snapshot == NULL)
2442  {
2443  Assert(txn->ninvalidations == 0);
2444  ReorderBufferCleanupTXN(rb, txn);
2445  return;
2446  }
2447 
2448  snapshot_now = txn->base_snapshot;
2449 
2450  /* Process and send the changes to output plugin. */
2451  ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2452  command_id, false);
2453 }
2454 
2455 /*
2456  * Abort a transaction that possibly has previous changes. Needs to be first
2457  * called for subtransactions and then for the toplevel xid.
2458  *
2459  * NB: Transactions handled here have to have actively aborted (i.e. have
2460  * produced an abort record). Implicitly aborted transactions are handled via
2461  * ReorderBufferAbortOld(); transactions we're just not interested in, but
2462  * which have committed are handled in ReorderBufferForget().
2463  *
2464  * This function purges this transaction and its contents from memory and
2465  * disk.
2466  */
2467 void
2469 {
2471 
2472  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2473  false);
2474 
2475  /* unknown, nothing to remove */
2476  if (txn == NULL)
2477  return;
2478 
2479  /* For streamed transactions notify the remote node about the abort. */
2480  if (rbtxn_is_streamed(txn))
2481  {
2482  rb->stream_abort(rb, txn, lsn);
2483 
2484  /*
2485  * We might have decoded changes for this transaction that could load
2486  * the cache as per the current transaction's view (consider DDL's
2487  * happened in this transaction). We don't want the decoding of future
2488  * transactions to use those cache entries so execute invalidations.
2489  */
2490  if (txn->ninvalidations > 0)
2492  txn->invalidations);
2493  }
2494 
2495  /* cosmetic... */
2496  txn->final_lsn = lsn;
2497 
2498  /* remove potential on-disk data, and deallocate */
2499  ReorderBufferCleanupTXN(rb, txn);
2500 }
2501 
2502 /*
2503  * Abort all transactions that aren't actually running anymore because the
2504  * server restarted.
2505  *
2506  * NB: These really have to be transactions that have aborted due to a server
2507  * crash/immediate restart, as we don't deal with invalidations here.
2508  */
2509 void
2511 {
2512  dlist_mutable_iter it;
2513 
2514  /*
2515  * Iterate through all (potential) toplevel TXNs and abort all that are
2516  * older than what possibly can be running. Once we've found the first
2517  * that is alive we stop, there might be some that acquired an xid earlier
2518  * but started writing later, but it's unlikely and they will be cleaned
2519  * up in a later call to this function.
2520  */
2522  {
2524 
2525  txn = dlist_container(ReorderBufferTXN, node, it.cur);
2526 
2527  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
2528  {
2529  elog(DEBUG2, "aborting old transaction %u", txn->xid);
2530 
2531  /* remove potential on-disk data, and deallocate this tx */
2532  ReorderBufferCleanupTXN(rb, txn);
2533  }
2534  else
2535  return;
2536  }
2537 }
2538 
2539 /*
2540  * Forget the contents of a transaction if we aren't interested in its
2541  * contents. Needs to be first called for subtransactions and then for the
2542  * toplevel xid.
2543  *
2544  * This is significantly different to ReorderBufferAbort() because
2545  * transactions that have committed need to be treated differently from aborted
2546  * ones since they may have modified the catalog.
2547  *
2548  * Note that this is only allowed to be called in the moment a transaction
2549  * commit has just been read, not earlier; otherwise later records referring
2550  * to this xid might re-create the transaction incompletely.
2551  */
2552 void
2554 {
2556 
2557  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2558  false);
2559 
2560  /* unknown, nothing to forget */
2561  if (txn == NULL)
2562  return;
2563 
2564  /* For streamed transactions notify the remote node about the abort. */
2565  if (rbtxn_is_streamed(txn))
2566  rb->stream_abort(rb, txn, lsn);
2567 
2568  /* cosmetic... */
2569  txn->final_lsn = lsn;
2570 
2571  /*
2572  * Process cache invalidation messages if there are any. Even if we're not
2573  * interested in the transaction's contents, it could have manipulated the
2574  * catalog and we need to update the caches according to that.
2575  */
2576  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2578  txn->invalidations);
2579  else
2580  Assert(txn->ninvalidations == 0);
2581 
2582  /* remove potential on-disk data, and deallocate */
2583  ReorderBufferCleanupTXN(rb, txn);
2584 }
2585 
2586 /*
2587  * Execute invalidations happening outside the context of a decoded
2588  * transaction. That currently happens either for xid-less commits
2589  * (cf. RecordTransactionCommit()) or for invalidations in uninteresting
2590  * transactions (via ReorderBufferForget()).
2591  */
2592 void
2594  SharedInvalidationMessage *invalidations)
2595 {
2596  bool use_subtxn = IsTransactionOrTransactionBlock();
2597  int i;
2598 
2599  if (use_subtxn)
2600  BeginInternalSubTransaction("replay");
2601 
2602  /*
2603  * Force invalidations to happen outside of a valid transaction - that way
2604  * entries will just be marked as invalid without accessing the catalog.
2605  * That's advantageous because we don't need to setup the full state
2606  * necessary for catalog access.
2607  */
2608  if (use_subtxn)
2610 
2611  for (i = 0; i < ninvalidations; i++)
2612  LocalExecuteInvalidationMessage(&invalidations[i]);
2613 
2614  if (use_subtxn)
2616 }
2617 
2618 /*
2619  * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
2620  * least once for every xid in XLogRecord->xl_xid (other places in records
2621  * may, but do not have to be passed through here).
2622  *
2623  * Reorderbuffer keeps some datastructures about transactions in LSN order,
2624  * for efficiency. To do that it has to know about when transactions are seen
2625  * first in the WAL. As many types of records are not actually interesting for
2626  * logical decoding, they do not necessarily pass though here.
2627  */
2628 void
2630 {
2631  /* many records won't have an xid assigned, centralize check here */
2632  if (xid != InvalidTransactionId)
2633  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2634 }
2635 
2636 /*
2637  * Add a new snapshot to this transaction that may only used after lsn 'lsn'
2638  * because the previous snapshot doesn't describe the catalog correctly for
2639  * following rows.
2640  */
2641 void
2643  XLogRecPtr lsn, Snapshot snap)
2644 {
2646 
2647  change->data.snapshot = snap;
2649 
2650  ReorderBufferQueueChange(rb, xid, lsn, change, false);
2651 }
2652 
2653 /*
2654  * Set up the transaction's base snapshot.
2655  *
2656  * If we know that xid is a subtransaction, set the base snapshot on the
2657  * top-level transaction instead.
2658  */
2659 void
2661  XLogRecPtr lsn, Snapshot snap)
2662 {
2664  bool is_new;
2665 
2666  AssertArg(snap != NULL);
2667 
2668  /*
2669  * Fetch the transaction to operate on. If we know it's a subtransaction,
2670  * operate on its top-level transaction instead.
2671  */
2672  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
2673  if (rbtxn_is_known_subxact(txn))
2674  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
2675  NULL, InvalidXLogRecPtr, false);
2676  Assert(txn->base_snapshot == NULL);
2677 
2678  txn->base_snapshot = snap;
2679  txn->base_snapshot_lsn = lsn;
2681 
2682  AssertTXNLsnOrder(rb);
2683 }
2684 
2685 /*
2686  * Access the catalog with this CommandId at this point in the changestream.
2687  *
2688  * May only be called for command ids > 1
2689  */
2690 void
2692  XLogRecPtr lsn, CommandId cid)
2693 {
2695 
2696  change->data.command_id = cid;
2698 
2699  ReorderBufferQueueChange(rb, xid, lsn, change, false);
2700 }
2701 
2702 /*
2703  * Update memory counters to account for the new or removed change.
2704  *
2705  * We update two counters - in the reorder buffer, and in the transaction
2706  * containing the change. The reorder buffer counter allows us to quickly
2707  * decide if we reached the memory limit, the transaction counter allows
2708  * us to quickly pick the largest transaction for eviction.
2709  *
2710  * When streaming is enabled, we need to update the toplevel transaction
2711  * counters instead - we don't really care about subtransactions as we
2712  * can't stream them individually anyway, and we only pick toplevel
2713  * transactions for eviction. So only toplevel transactions matter.
2714  */
2715 static void
2717  ReorderBufferChange *change,
2718  bool addition)
2719 {
2720  Size sz;
2722  ReorderBufferTXN *toptxn = NULL;
2723 
2724  Assert(change->txn);
2725 
2726  /*
2727  * Ignore tuple CID changes, because those are not evicted when reaching
2728  * memory limit. So we just don't count them, because it might easily
2729  * trigger a pointless attempt to spill.
2730  */
2732  return;
2733 
2734  txn = change->txn;
2735 
2736  /* If streaming supported, update the total size in top level as well. */
2737  if (ReorderBufferCanStream(rb))
2738  {
2739  if (txn->toptxn != NULL)
2740  toptxn = txn->toptxn;
2741  else
2742  toptxn = txn;
2743  }
2744 
2745  sz = ReorderBufferChangeSize(change);
2746 
2747  if (addition)
2748  {
2749  txn->size += sz;
2750  rb->size += sz;
2751 
2752  /* Update the total size in the top transaction. */
2753  if (toptxn)
2754  toptxn->total_size += sz;
2755  }
2756  else
2757  {
2758  Assert((rb->size >= sz) && (txn->size >= sz));
2759  txn->size -= sz;
2760  rb->size -= sz;
2761 
2762  /* Update the total size in the top transaction. */
2763  if (toptxn)
2764  toptxn->total_size -= sz;
2765  }
2766 
2767  Assert(txn->size <= rb->size);
2768 }
2769 
2770 /*
2771  * Add new (relfilenode, tid) -> (cmin, cmax) mappings.
2772  *
2773  * We do not include this change type in memory accounting, because we
2774  * keep CIDs in a separate list and do not evict them when reaching
2775  * the memory limit.
2776  */
2777 void
2779  XLogRecPtr lsn, RelFileNode node,
2780  ItemPointerData tid, CommandId cmin,
2781  CommandId cmax, CommandId combocid)
2782 {
2785 
2786  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2787 
2788  change->data.tuplecid.node = node;
2789  change->data.tuplecid.tid = tid;
2790  change->data.tuplecid.cmin = cmin;
2791  change->data.tuplecid.cmax = cmax;
2792  change->data.tuplecid.combocid = combocid;
2793  change->lsn = lsn;
2794  change->txn = txn;
2796 
2797  dlist_push_tail(&txn->tuplecids, &change->node);
2798  txn->ntuplecids++;
2799 }
2800 
2801 /*
2802  * Setup the invalidation of the toplevel transaction.
2803  *
2804  * This needs to be called for each XLOG_XACT_INVALIDATIONS message and
2805  * accumulates all the invalidation messages in the toplevel transaction.
2806  * This is required because in some cases where we skip processing the
2807  * transaction (see ReorderBufferForget), we need to execute all the
2808  * invalidations together.
2809  */
2810 void
2812  XLogRecPtr lsn, Size nmsgs,
2814 {
2816 
2817  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2818 
2819  /*
2820  * We collect all the invalidations under the top transaction so that we
2821  * can execute them all together.
2822  */
2823  if (txn->toptxn)
2824  txn = txn->toptxn;
2825 
2826  Assert(nmsgs > 0);
2827 
2828  /* Accumulate invalidations. */
2829  if (txn->ninvalidations == 0)
2830  {
2831  txn->ninvalidations = nmsgs;
2834  sizeof(SharedInvalidationMessage) * nmsgs);
2835  memcpy(txn->invalidations, msgs,
2836  sizeof(SharedInvalidationMessage) * nmsgs);
2837  }
2838  else
2839  {
2842  (txn->ninvalidations + nmsgs));
2843 
2844  memcpy(txn->invalidations + txn->ninvalidations, msgs,
2845  nmsgs * sizeof(SharedInvalidationMessage));
2846  txn->ninvalidations += nmsgs;
2847  }
2848 }
2849 
2850 /*
2851  * Apply all invalidations we know. Possibly we only need parts at this point
2852  * in the changestream but we don't know which those are.
2853  */
2854 static void
2856 {
2857  int i;
2858 
2859  for (i = 0; i < txn->ninvalidations; i++)
2861 }
2862 
2863 /*
2864  * Mark a transaction as containing catalog changes
2865  */
2866 void
2868  XLogRecPtr lsn)
2869 {
2871 
2872  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2873 
2875 
2876  /*
2877  * Mark top-level transaction as having catalog changes too if one of its
2878  * children has so that the ReorderBufferBuildTupleCidHash can
2879  * conveniently check just top-level transaction and decide whether to
2880  * build the hash table or not.
2881  */
2882  if (txn->toptxn != NULL)
2884 }
2885 
2886 /*
2887  * Query whether a transaction is already *known* to contain catalog
2888  * changes. This can be wrong until directly before the commit!
2889  */
2890 bool
2892 {
2894 
2895  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2896  false);
2897  if (txn == NULL)
2898  return false;
2899 
2900  return rbtxn_has_catalog_changes(txn);
2901 }
2902 
2903 /*
2904  * ReorderBufferXidHasBaseSnapshot
2905  * Have we already set the base snapshot for the given txn/subtxn?
2906  */
2907 bool
2909 {
2911 
2912  txn = ReorderBufferTXNByXid(rb, xid, false,
2913  NULL, InvalidXLogRecPtr, false);
2914 
2915  /* transaction isn't known yet, ergo no snapshot */
2916  if (txn == NULL)
2917  return false;
2918 
2919  /* a known subtxn? operate on top-level txn instead */
2920  if (rbtxn_is_known_subxact(txn))
2921  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
2922  NULL, InvalidXLogRecPtr, false);
2923 
2924  return txn->base_snapshot != NULL;
2925 }
2926 
2927 
2928 /*
2929  * ---------------------------------------
2930  * Disk serialization support
2931  * ---------------------------------------
2932  */
2933 
2934 /*
2935  * Ensure the IO buffer is >= sz.
2936  */
2937 static void
2939 {
2940  if (!rb->outbufsize)
2941  {
2942  rb->outbuf = MemoryContextAlloc(rb->context, sz);
2943  rb->outbufsize = sz;
2944  }
2945  else if (rb->outbufsize < sz)
2946  {
2947  rb->outbuf = repalloc(rb->outbuf, sz);
2948  rb->outbufsize = sz;
2949  }
2950 }
2951 
2952 /*
2953  * Find the largest transaction (toplevel or subxact) to evict (spill to disk).
2954  *
2955  * XXX With many subtransactions this might be quite slow, because we'll have
2956  * to walk through all of them. There are some options how we could improve
2957  * that: (a) maintain some secondary structure with transactions sorted by
2958  * amount of changes, (b) not looking for the entirely largest transaction,
2959  * but e.g. for transaction using at least some fraction of the memory limit,
2960  * and (c) evicting multiple transactions at once, e.g. to free a given portion
2961  * of the memory limit (e.g. 50%).
2962  */
2963 static ReorderBufferTXN *
2965 {
2966  HASH_SEQ_STATUS hash_seq;
2968  ReorderBufferTXN *largest = NULL;
2969 
2970  hash_seq_init(&hash_seq, rb->by_txn);
2971  while ((ent = hash_seq_search(&hash_seq)) != NULL)
2972  {
2973  ReorderBufferTXN *txn = ent->txn;
2974 
2975  /* if the current transaction is larger, remember it */
2976  if ((!largest) || (txn->size > largest->size))
2977  largest = txn;
2978  }
2979 
2980  Assert(largest);
2981  Assert(largest->size > 0);
2982  Assert(largest->size <= rb->size);
2983 
2984  return largest;
2985 }
2986 
2987 /*
2988  * Find the largest toplevel transaction to evict (by streaming).
2989  *
2990  * This can be seen as an optimized version of ReorderBufferLargestTXN, which
2991  * should give us the same transaction (because we don't update memory account
2992  * for subtransaction with streaming, so it's always 0). But we can simply
2993  * iterate over the limited number of toplevel transactions.
2994  *
2995  * Note that, we skip transactions that contains incomplete changes. There
2996  * is a scope of optimization here such that we can select the largest transaction
2997  * which has complete changes. But that will make the code and design quite complex
2998  * and that might not be worth the benefit. If we plan to stream the transactions
2999  * that contains incomplete changes then we need to find a way to partially
3000  * stream/truncate the transaction changes in-memory and build a mechanism to
3001  * partially truncate the spilled files. Additionally, whenever we partially
3002  * stream the transaction we need to maintain the last streamed lsn and next time
3003  * we need to restore from that segment and the offset in WAL. As we stream the
3004  * changes from the top transaction and restore them subtransaction wise, we need
3005  * to even remember the subxact from where we streamed the last change.
3006  */
3007 static ReorderBufferTXN *
3009 {
3010  dlist_iter iter;
3011  Size largest_size = 0;
3012  ReorderBufferTXN *largest = NULL;
3013 
3014  /* Find the largest top-level transaction. */
3015  dlist_foreach(iter, &rb->toplevel_by_lsn)
3016  {
3018 
3019  txn = dlist_container(ReorderBufferTXN, node, iter.cur);
3020 
3021  if ((largest != NULL || txn->total_size > largest_size) &&
3022  (txn->total_size > 0) && !(rbtxn_has_incomplete_tuple(txn)))
3023  {
3024  largest = txn;
3025  largest_size = txn->total_size;
3026  }
3027  }
3028 
3029  return largest;
3030 }
3031 
3032 /*
3033  * Check whether the logical_decoding_work_mem limit was reached, and if yes
3034  * pick the largest (sub)transaction at-a-time to evict and spill its changes to
3035  * disk until we reach under the memory limit.
3036  *
3037  * XXX At this point we select the transactions until we reach under the memory
3038  * limit, but we might also adapt a more elaborate eviction strategy - for example
3039  * evicting enough transactions to free certain fraction (e.g. 50%) of the memory
3040  * limit.
3041  */
3042 static void
3044 {
3046 
3047  /* bail out if we haven't exceeded the memory limit */
3048  if (rb->size < logical_decoding_work_mem * 1024L)
3049  return;
3050 
3051  /*
3052  * Loop until we reach under the memory limit. One might think that just
3053  * by evicting the largest (sub)transaction we will come under the memory
3054  * limit based on assumption that the selected transaction is at least as
3055  * large as the most recent change (which caused us to go over the memory
3056  * limit). However, that is not true because a user can reduce the
3057  * logical_decoding_work_mem to a smaller value before the most recent
3058  * change.
3059  */
3060  while (rb->size >= logical_decoding_work_mem * 1024L)
3061  {
3062  /*
3063  * Pick the largest transaction (or subtransaction) and evict it from
3064  * memory by streaming, if possible. Otherwise, spill to disk.
3065  */
3067  (txn = ReorderBufferLargestTopTXN(rb)) != NULL)
3068  {
3069  /* we know there has to be one, because the size is not zero */
3070  Assert(txn && !txn->toptxn);
3071  Assert(txn->total_size > 0);
3072  Assert(rb->size >= txn->total_size);
3073 
3074  ReorderBufferStreamTXN(rb, txn);
3075  }
3076  else
3077  {
3078  /*
3079  * Pick the largest transaction (or subtransaction) and evict it
3080  * from memory by serializing it to disk.
3081  */
3082  txn = ReorderBufferLargestTXN(rb);
3083 
3084  /* we know there has to be one, because the size is not zero */
3085  Assert(txn);
3086  Assert(txn->size > 0);
3087  Assert(rb->size >= txn->size);
3088 
3089  ReorderBufferSerializeTXN(rb, txn);
3090  }
3091 
3092  /*
3093  * After eviction, the transaction should have no entries in memory,
3094  * and should use 0 bytes for changes.
3095  */
3096  Assert(txn->size == 0);
3097  Assert(txn->nentries_mem == 0);
3098  }
3099 
3100  /* We must be under the memory limit now. */
3101  Assert(rb->size < logical_decoding_work_mem * 1024L);
3102 }
3103 
3104 /*
3105  * Spill data of a large transaction (and its subtransactions) to disk.
3106  */
3107 static void
3109 {
3110  dlist_iter subtxn_i;
3111  dlist_mutable_iter change_i;
3112  int fd = -1;
3113  XLogSegNo curOpenSegNo = 0;
3114  Size spilled = 0;
3115 
3116  elog(DEBUG2, "spill %u changes in XID %u to disk",
3117  (uint32) txn->nentries_mem, txn->xid);
3118 
3119  /* do the same to all child TXs */
3120  dlist_foreach(subtxn_i, &txn->subtxns)
3121  {
3122  ReorderBufferTXN *subtxn;
3123 
3124  subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
3125  ReorderBufferSerializeTXN(rb, subtxn);
3126  }
3127 
3128  /* serialize changestream */
3129  dlist_foreach_modify(change_i, &txn->changes)
3130  {
3131  ReorderBufferChange *change;
3132 
3133  change = dlist_container(ReorderBufferChange, node, change_i.cur);
3134 
3135  /*
3136  * store in segment in which it belongs by start lsn, don't split over
3137  * multiple segments tho
3138  */
3139  if (fd == -1 ||
3140  !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
3141  {
3142  char path[MAXPGPATH];
3143 
3144  if (fd != -1)
3145  CloseTransientFile(fd);
3146 
3147  XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
3148 
3149  /*
3150  * No need to care about TLIs here, only used during a single run,
3151  * so each LSN only maps to a specific WAL record.
3152  */
3154  curOpenSegNo);
3155 
3156  /* open segment, create it if necessary */
3157  fd = OpenTransientFile(path,
3158  O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
3159 
3160  if (fd < 0)
3161  ereport(ERROR,
3163  errmsg("could not open file \"%s\": %m", path)));
3164  }
3165 
3166  ReorderBufferSerializeChange(rb, txn, fd, change);
3167  dlist_delete(&change->node);
3168  ReorderBufferReturnChange(rb, change, true);
3169 
3170  spilled++;
3171  }
3172 
3173  Assert(spilled == txn->nentries_mem);
3174  Assert(dlist_is_empty(&txn->changes));
3175  txn->nentries_mem = 0;
3177 
3178  if (fd != -1)
3179  CloseTransientFile(fd);
3180 }
3181 
3182 /*
3183  * Serialize individual change to disk.
3184  */
3185 static void
3187  int fd, ReorderBufferChange *change)
3188 {
3189  ReorderBufferDiskChange *ondisk;
3190  Size sz = sizeof(ReorderBufferDiskChange);
3191 
3193 
3194  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3195  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
3196 
3197  switch (change->action)
3198  {
3199  /* fall through these, they're all similar enough */
3204  {
3205  char *data;
3206  ReorderBufferTupleBuf *oldtup,
3207  *newtup;
3208  Size oldlen = 0;
3209  Size newlen = 0;
3210 
3211  oldtup = change->data.tp.oldtuple;
3212  newtup = change->data.tp.newtuple;
3213 
3214  if (oldtup)
3215  {
3216  sz += sizeof(HeapTupleData);
3217  oldlen = oldtup->tuple.t_len;
3218  sz += oldlen;
3219  }
3220 
3221  if (newtup)
3222  {
3223  sz += sizeof(HeapTupleData);
3224  newlen = newtup->tuple.t_len;
3225  sz += newlen;
3226  }
3227 
3228  /* make sure we have enough space */
3230 
3231  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3232  /* might have been reallocated above */
3233  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3234 
3235  if (oldlen)
3236  {
3237  memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
3238  data += sizeof(HeapTupleData);
3239 
3240  memcpy(data, oldtup->tuple.t_data, oldlen);
3241  data += oldlen;
3242  }
3243 
3244  if (newlen)
3245  {
3246  memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
3247  data += sizeof(HeapTupleData);
3248 
3249  memcpy(data, newtup->tuple.t_data, newlen);
3250  data += newlen;
3251  }
3252  break;
3253  }
3255  {
3256  char *data;
3257  Size prefix_size = strlen(change->data.msg.prefix) + 1;
3258 
3259  sz += prefix_size + change->data.msg.message_size +
3260  sizeof(Size) + sizeof(Size);
3262 
3263  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3264 
3265  /* might have been reallocated above */
3266  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3267 
3268  /* write the prefix including the size */
3269  memcpy(data, &prefix_size, sizeof(Size));
3270  data += sizeof(Size);
3271  memcpy(data, change->data.msg.prefix,
3272  prefix_size);
3273  data += prefix_size;
3274 
3275  /* write the message including the size */
3276  memcpy(data, &change->data.msg.message_size, sizeof(Size));
3277  data += sizeof(Size);
3278  memcpy(data, change->data.msg.message,
3279  change->data.msg.message_size);
3280  data += change->data.msg.message_size;
3281 
3282  break;
3283  }
3285  {
3286  Snapshot snap;
3287  char *data;
3288 
3289  snap = change->data.snapshot;
3290 
3291  sz += sizeof(SnapshotData) +
3292  sizeof(TransactionId) * snap->xcnt +
3293  sizeof(TransactionId) * snap->subxcnt;
3294 
3295  /* make sure we have enough space */
3297  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3298  /* might have been reallocated above */
3299  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3300 
3301  memcpy(data, snap, sizeof(SnapshotData));
3302  data += sizeof(SnapshotData);
3303 
3304  if (snap->xcnt)
3305  {
3306  memcpy(data, snap->xip,
3307  sizeof(TransactionId) * snap->xcnt);
3308  data += sizeof(TransactionId) * snap->xcnt;
3309  }
3310 
3311  if (snap->subxcnt)
3312  {
3313  memcpy(data, snap->subxip,
3314  sizeof(TransactionId) * snap->subxcnt);
3315  data += sizeof(TransactionId) * snap->subxcnt;
3316  }
3317  break;
3318  }
3320  {
3321  Size size;
3322  char *data;
3323 
3324  /* account for the OIDs of truncated relations */
3325  size = sizeof(Oid) * change->data.truncate.nrelids;
3326  sz += size;
3327 
3328  /* make sure we have enough space */
3330 
3331  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3332  /* might have been reallocated above */
3333  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3334 
3335  memcpy(data, change->data.truncate.relids, size);
3336  data += size;
3337 
3338  break;
3339  }
3343  /* ReorderBufferChange contains everything important */
3344  break;
3345  }
3346 
3347  ondisk->size = sz;
3348 
3349  errno = 0;
3351  if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
3352  {
3353  int save_errno = errno;
3354 
3355  CloseTransientFile(fd);
3356 
3357  /* if write didn't set errno, assume problem is no disk space */
3358  errno = save_errno ? save_errno : ENOSPC;
3359  ereport(ERROR,
3361  errmsg("could not write to data file for XID %u: %m",
3362  txn->xid)));
3363  }
3365 
3366  /*
3367  * Keep the transaction's final_lsn up to date with each change we send to
3368  * disk, so that ReorderBufferRestoreCleanup works correctly. (We used to
3369  * only do this on commit and abort records, but that doesn't work if a
3370  * system crash leaves a transaction without its abort record).
3371  *
3372  * Make sure not to move it backwards.
3373  */
3374  if (txn->final_lsn < change->lsn)
3375  txn->final_lsn = change->lsn;
3376 
3377  Assert(ondisk->change.action == change->action);
3378 }
3379 
3380 /* Returns true, if the output plugin supports streaming, false, otherwise. */
3381 static inline bool
3383 {
3385 
3386  return ctx->streaming;
3387 }
3388 
3389 /* Returns true, if the streaming can be started now, false, otherwise. */
3390 static inline bool
3392 {
3394  SnapBuild *builder = ctx->snapshot_builder;
3395 
3396  /*
3397  * We can't start streaming immediately even if the streaming is enabled
3398  * because we previously decoded this transaction and now just are
3399  * restarting.
3400  */
3401  if (ReorderBufferCanStream(rb) &&
3402  !SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))
3403  {
3404  /* We must have a consistent snapshot by this time */
3406  return true;
3407  }
3408 
3409  return false;
3410 }
3411 
3412 /*
3413  * Send data of a large transaction (and its subtransactions) to the
3414  * output plugin, but using the stream API.
3415  */
3416 static void
3418 {
3419  Snapshot snapshot_now;
3420  CommandId command_id;
3421 
3422  /* We can never reach here for a subtransaction. */
3423  Assert(txn->toptxn == NULL);
3424 
3425  /*
3426  * We can't make any assumptions about base snapshot here, similar to what
3427  * ReorderBufferCommit() does. That relies on base_snapshot getting
3428  * transferred from subxact in ReorderBufferCommitChild(), but that was
3429  * not yet called as the transaction is in-progress.
3430  *
3431  * So just walk the subxacts and use the same logic here. But we only need
3432  * to do that once, when the transaction is streamed for the first time.
3433  * After that we need to reuse the snapshot from the previous run.
3434  *
3435  * Unlike DecodeCommit which adds xids of all the subtransactions in
3436  * snapshot's xip array via SnapBuildCommittedTxn, we can't do that here
3437  * but we do add them to subxip array instead via ReorderBufferCopySnap.
3438  * This allows the catalog changes made in subtransactions decoded till
3439  * now to be visible.
3440  */
3441  if (txn->snapshot_now == NULL)
3442  {
3443  dlist_iter subxact_i;
3444 
3445  /* make sure this transaction is streamed for the first time */
3446  Assert(!rbtxn_is_streamed(txn));
3447 
3448  /* at the beginning we should have invalid command ID */
3450 
3451  dlist_foreach(subxact_i, &txn->subtxns)
3452  {
3453  ReorderBufferTXN *subtxn;
3454 
3455  subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
3456  ReorderBufferTransferSnapToParent(txn, subtxn);
3457  }
3458 
3459  /*
3460  * If this transaction has no snapshot, it didn't make any changes to
3461  * the database till now, so there's nothing to decode.
3462  */
3463  if (txn->base_snapshot == NULL)
3464  {
3465  Assert(txn->ninvalidations == 0);
3466  return;
3467  }
3468 
3469  command_id = FirstCommandId;
3470  snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
3471  txn, command_id);
3472  }
3473  else
3474  {
3475  /* the transaction must have been already streamed */
3476  Assert(rbtxn_is_streamed(txn));
3477 
3478  /*
3479  * Nah, we already have snapshot from the previous streaming run. We
3480  * assume new subxacts can't move the LSN backwards, and so can't beat
3481  * the LSN condition in the previous branch (so no need to walk
3482  * through subxacts again). In fact, we must not do that as we may be
3483  * using snapshot half-way through the subxact.
3484  */
3485  command_id = txn->command_id;
3486 
3487  /*
3488  * We can't use txn->snapshot_now directly because after the last
3489  * streaming run, we might have got some new sub-transactions. So we
3490  * need to add them to the snapshot.
3491  */
3492  snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now,
3493  txn, command_id);
3494 
3495  /* Free the previously copied snapshot. */
3496  Assert(txn->snapshot_now->copied);
3498  txn->snapshot_now = NULL;
3499  }
3500 
3501  /* Process and send the changes to output plugin. */
3502  ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
3503  command_id, true);
3504 
3505  Assert(dlist_is_empty(&txn->changes));
3506  Assert(txn->nentries == 0);
3507  Assert(txn->nentries_mem == 0);
3508 }
3509 
3510 /*
3511  * Size of a change in memory.
3512  */
3513 static Size
3515 {
3516  Size sz = sizeof(ReorderBufferChange);
3517 
3518  switch (change->action)
3519  {
3520  /* fall through these, they're all similar enough */
3525  {
3526  ReorderBufferTupleBuf *oldtup,
3527  *newtup;
3528  Size oldlen = 0;
3529  Size newlen = 0;
3530 
3531  oldtup = change->data.tp.oldtuple;
3532  newtup = change->data.tp.newtuple;
3533 
3534  if (oldtup)
3535  {
3536  sz += sizeof(HeapTupleData);
3537  oldlen = oldtup->tuple.t_len;
3538  sz += oldlen;
3539  }
3540 
3541  if (newtup)
3542  {
3543  sz += sizeof(HeapTupleData);
3544  newlen = newtup->tuple.t_len;
3545  sz += newlen;
3546  }
3547 
3548  break;
3549  }
3551  {
3552  Size prefix_size = strlen(change->data.msg.prefix) + 1;
3553 
3554  sz += prefix_size + change->data.msg.message_size +
3555  sizeof(Size) + sizeof(Size);
3556 
3557  break;
3558  }
3560  {
3561  Snapshot snap;
3562 
3563  snap = change->data.snapshot;
3564 
3565  sz += sizeof(SnapshotData) +
3566  sizeof(TransactionId) * snap->xcnt +
3567  sizeof(TransactionId) * snap->subxcnt;
3568 
3569  break;
3570  }
3572  {
3573  sz += sizeof(Oid) * change->data.truncate.nrelids;
3574 
3575  break;
3576  }
3580  /* ReorderBufferChange contains everything important */
3581  break;
3582  }
3583 
3584  return sz;
3585 }
3586 
3587 
3588 /*
3589  * Restore a number of changes spilled to disk back into memory.
3590  */
3591 static Size
3593  TXNEntryFile *file, XLogSegNo *segno)
3594 {
3595  Size restored = 0;
3596  XLogSegNo last_segno;
3597  dlist_mutable_iter cleanup_iter;
3598  File *fd = &file->vfd;
3599 
3602 
3603  /* free current entries, so we have memory for more */
3604  dlist_foreach_modify(cleanup_iter, &txn->changes)
3605  {
3607  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
3608 
3609  dlist_delete(&cleanup->node);
3610  ReorderBufferReturnChange(rb, cleanup, true);
3611  }
3612  txn->nentries_mem = 0;
3613  Assert(dlist_is_empty(&txn->changes));
3614 
3615  XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
3616 
3617  while (restored < max_changes_in_memory && *segno <= last_segno)
3618  {
3619  int readBytes;
3620  ReorderBufferDiskChange *ondisk;
3621 
3622  if (*fd == -1)
3623  {
3624  char path[MAXPGPATH];
3625 
3626  /* first time in */
3627  if (*segno == 0)
3628  XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
3629 
3630  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
3631 
3632  /*
3633  * No need to care about TLIs here, only used during a single run,
3634  * so each LSN only maps to a specific WAL record.
3635  */
3637  *segno);
3638 
3639  *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
3640 
3641  /* No harm in resetting the offset even in case of failure */
3642  file->curOffset = 0;
3643 
3644  if (*fd < 0 && errno == ENOENT)
3645  {
3646  *fd = -1;
3647  (*segno)++;
3648  continue;
3649  }
3650  else if (*fd < 0)
3651  ereport(ERROR,
3653  errmsg("could not open file \"%s\": %m",
3654  path)));
3655  }
3656 
3657  /*
3658  * Read the statically sized part of a change which has information
3659  * about the total size. If we couldn't read a record, we're at the
3660  * end of this file.
3661  */
3663  readBytes = FileRead(file->vfd, rb->outbuf,
3664  sizeof(ReorderBufferDiskChange),
3666 
3667  /* eof */
3668  if (readBytes == 0)
3669  {
3670  FileClose(*fd);
3671  *fd = -1;
3672  (*segno)++;
3673  continue;
3674  }
3675  else if (readBytes < 0)
3676  ereport(ERROR,
3678  errmsg("could not read from reorderbuffer spill file: %m")));
3679  else if (readBytes != sizeof(ReorderBufferDiskChange))
3680  ereport(ERROR,
3682  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
3683  readBytes,
3684  (uint32) sizeof(ReorderBufferDiskChange))));
3685 
3686  file->curOffset += readBytes;
3687 
3688  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3689 
3691  sizeof(ReorderBufferDiskChange) + ondisk->size);
3692  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3693 
3694  readBytes = FileRead(file->vfd,
3695  rb->outbuf + sizeof(ReorderBufferDiskChange),
3696  ondisk->size - sizeof(ReorderBufferDiskChange),
3697  file->curOffset,
3699 
3700  if (readBytes < 0)
3701  ereport(ERROR,
3703  errmsg("could not read from reorderbuffer spill file: %m")));
3704  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
3705  ereport(ERROR,
3707  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
3708  readBytes,
3709  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
3710 
3711  file->curOffset += readBytes;
3712 
3713  /*
3714  * ok, read a full change from disk, now restore it into proper
3715  * in-memory format
3716  */
3717  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
3718  restored++;
3719  }
3720 
3721  return restored;
3722 }
3723 
3724 /*
3725  * Convert change from its on-disk format to in-memory format and queue it onto
3726  * the TXN's ->changes list.
3727  *
3728  * Note: although "data" is declared char*, at entry it points to a
3729  * maxalign'd buffer, making it safe in most of this function to assume
3730  * that the pointed-to data is suitably aligned for direct access.
3731  */
3732 static void
3734  char *data)
3735 {
3736  ReorderBufferDiskChange *ondisk;
3737  ReorderBufferChange *change;
3738 
3739  ondisk = (ReorderBufferDiskChange *) data;
3740 
3741  change = ReorderBufferGetChange(rb);
3742 
3743  /* copy static part */
3744  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
3745 
3746  data += sizeof(ReorderBufferDiskChange);
3747 
3748  /* restore individual stuff */
3749  switch (change->action)
3750  {
3751  /* fall through these, they're all similar enough */
3756  if (change->data.tp.oldtuple)
3757  {
3758  uint32 tuplelen = ((HeapTuple) data)->t_len;
3759 
3760  change->data.tp.oldtuple =
3762 
3763  /* restore ->tuple */
3764  memcpy(&change->data.tp.oldtuple->tuple, data,
3765  sizeof(HeapTupleData));
3766  data += sizeof(HeapTupleData);
3767 
3768  /* reset t_data pointer into the new tuplebuf */
3769  change->data.tp.oldtuple->tuple.t_data =
3770  ReorderBufferTupleBufData(change->data.tp.oldtuple);
3771 
3772  /* restore tuple data itself */
3773  memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
3774  data += tuplelen;
3775  }
3776 
3777  if (change->data.tp.newtuple)
3778  {
3779  /* here, data might not be suitably aligned! */
3780  uint32 tuplelen;
3781 
3782  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
3783  sizeof(uint32));
3784 
3785  change->data.tp.newtuple =
3787 
3788  /* restore ->tuple */
3789  memcpy(&change->data.tp.newtuple->tuple, data,
3790  sizeof(HeapTupleData));
3791  data += sizeof(HeapTupleData);
3792 
3793  /* reset t_data pointer into the new tuplebuf */
3794  change->data.tp.newtuple->tuple.t_data =
3795  ReorderBufferTupleBufData(change->data.tp.newtuple);
3796 
3797  /* restore tuple data itself */
3798  memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
3799  data += tuplelen;
3800  }
3801 
3802  break;
3804  {
3805  Size prefix_size;
3806 
3807  /* read prefix */
3808  memcpy(&prefix_size, data, sizeof(Size));
3809  data += sizeof(Size);
3810  change->data.msg.prefix = MemoryContextAlloc(rb->context,
3811  prefix_size);
3812  memcpy(change->data.msg.prefix, data, prefix_size);
3813  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
3814  data += prefix_size;
3815 
3816  /* read the message */
3817  memcpy(&change->data.msg.message_size, data, sizeof(Size));
3818  data += sizeof(Size);
3819  change->data.msg.message = MemoryContextAlloc(rb->context,
3820  change->data.msg.message_size);
3821  memcpy(change->data.msg.message, data,
3822  change->data.msg.message_size);
3823  data += change->data.msg.message_size;
3824 
3825  break;
3826  }
3828  {
3829  Snapshot oldsnap;
3830  Snapshot newsnap;
3831  Size size;
3832 
3833  oldsnap = (Snapshot) data;
3834 
3835  size = sizeof(SnapshotData) +
3836  sizeof(TransactionId) * oldsnap->xcnt +
3837  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
3838 
3839  change->data.snapshot = MemoryContextAllocZero(rb->context, size);
3840 
3841  newsnap = change->data.snapshot;
3842 
3843  memcpy(newsnap, data, size);
3844  newsnap->xip = (TransactionId *)
3845  (((char *) newsnap) + sizeof(SnapshotData));
3846  newsnap->subxip = newsnap->xip + newsnap->xcnt;
3847  newsnap->copied = true;
3848  break;
3849  }
3850  /* the base struct contains all the data, easy peasy */
3852  {
3853  Oid *relids;
3854 
3855  relids = ReorderBufferGetRelids(rb,
3856  change->data.truncate.nrelids);
3857  memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
3858  change->data.truncate.relids = relids;
3859 
3860  break;
3861  }
3865  break;
3866  }
3867 
3868  dlist_push_tail(&txn->changes, &change->node);
3869  txn->nentries_mem++;
3870 
3871  /*
3872  * Update memory accounting for the restored change. We need to do this
3873  * although we don't check the memory limit when restoring the changes in
3874  * this branch (we only do that when initially queueing the changes after
3875  * decoding), because we will release the changes later, and that will
3876  * update the accounting too (subtracting the size from the counters). And
3877  * we don't want to underflow there.
3878  */
3879  ReorderBufferChangeMemoryUpdate(rb, change, true);
3880 }
3881 
3882 /*
3883  * Remove all on-disk stored for the passed in transaction.
3884  */
3885 static void
3887 {
3888  XLogSegNo first;
3889  XLogSegNo cur;
3890  XLogSegNo last;
3891 
3894 
3895  XLByteToSeg(txn->first_lsn, first, wal_segment_size);
3896  XLByteToSeg(txn->final_lsn, last, wal_segment_size);
3897 
3898  /* iterate over all possible filenames, and delete them */
3899  for (cur = first; cur <= last; cur++)
3900  {
3901  char path[MAXPGPATH];
3902 
3904  if (unlink(path) != 0 && errno != ENOENT)
3905  ereport(ERROR,
3907  errmsg("could not remove file \"%s\": %m", path)));
3908  }
3909 }
3910 
3911 /*
3912  * Remove any leftover serialized reorder buffers from a slot directory after a
3913  * prior crash or decoding session exit.
3914  */
3915 static void
3917 {
3918  DIR *spill_dir;
3919  struct dirent *spill_de;
3920  struct stat statbuf;
3921  char path[MAXPGPATH * 2 + 12];
3922 
3923  sprintf(path, "pg_replslot/%s", slotname);
3924 
3925  /* we're only handling directories here, skip if it's not ours */
3926  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
3927  return;
3928 
3929  spill_dir = AllocateDir(path);
3930  while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
3931  {
3932  /* only look at names that can be ours */
3933  if (strncmp(spill_de->d_name, "xid", 3) == 0)
3934  {
3935  snprintf(path, sizeof(path),
3936  "pg_replslot/%s/%s", slotname,
3937  spill_de->d_name);
3938 
3939  if (unlink(path) != 0)
3940  ereport(ERROR,
3942  errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
3943  path, slotname)));
3944  }
3945  }
3946  FreeDir(spill_dir);
3947 }
3948 
3949 /*
3950  * Given a replication slot, transaction ID and segment number, fill in the
3951  * corresponding spill file into 'path', which is a caller-owned buffer of size
3952  * at least MAXPGPATH.
3953  */
3954 static void
3956  XLogSegNo segno)
3957 {
3958  XLogRecPtr recptr;
3959 
3960  XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
3961 
3962  snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.spill",
3964  xid,
3965  (uint32) (recptr >> 32), (uint32) recptr);
3966 }
3967 
3968 /*
3969  * Delete all data spilled to disk after we've restarted/crashed. It will be
3970  * recreated when the respective slots are reused.
3971  */
3972 void
3974 {
3975  DIR *logical_dir;
3976  struct dirent *logical_de;
3977 
3978  logical_dir = AllocateDir("pg_replslot");
3979  while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
3980  {
3981  if (strcmp(logical_de->d_name, ".") == 0 ||
3982  strcmp(logical_de->d_name, "..") == 0)
3983  continue;
3984 
3985  /* if it cannot be a slot, skip the directory */
3986  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
3987  continue;
3988 
3989  /*
3990  * ok, has to be a surviving logical slot, iterate and delete
3991  * everything starting with xid-*
3992  */
3994  }
3995  FreeDir(logical_dir);
3996 }
3997 
3998 /* ---------------------------------------
3999  * toast reassembly support
4000  * ---------------------------------------
4001  */
4002 
4003 /*
4004  * Initialize per tuple toast reconstruction support.
4005  */
4006 static void
4008 {
4009  HASHCTL hash_ctl;
4010 
4011  Assert(txn->toast_hash == NULL);
4012 
4013  memset(&hash_ctl, 0, sizeof(hash_ctl));
4014  hash_ctl.keysize = sizeof(Oid);
4015  hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
4016  hash_ctl.hcxt = rb->context;
4017  txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
4019 }
4020 
4021 /*
4022  * Per toast-chunk handling for toast reconstruction
4023  *
4024  * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
4025  * toasted Datum comes along.
4026  */
4027 static void
4029  Relation relation, ReorderBufferChange *change)
4030 {
4031  ReorderBufferToastEnt *ent;
4032  ReorderBufferTupleBuf *newtup;
4033  bool found;
4034  int32 chunksize;
4035  bool isnull;
4036  Pointer chunk;
4037  TupleDesc desc = RelationGetDescr(relation);
4038  Oid chunk_id;
4039  int32 chunk_seq;
4040 
4041  if (txn->toast_hash == NULL)
4042  ReorderBufferToastInitHash(rb, txn);
4043 
4044  Assert(IsToastRelation(relation));
4045 
4046  newtup = change->data.tp.newtuple;
4047  chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
4048  Assert(!isnull);
4049  chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
4050  Assert(!isnull);
4051 
4052  ent = (ReorderBufferToastEnt *)
4053  hash_search(txn->toast_hash,
4054  (void *) &chunk_id,
4055  HASH_ENTER,
4056  &found);
4057 
4058  if (!found)
4059  {
4060  Assert(ent->chunk_id == chunk_id);
4061  ent->num_chunks = 0;
4062  ent->last_chunk_seq = 0;
4063  ent->size = 0;
4064  ent->reconstructed = NULL;
4065  dlist_init(&ent->chunks);
4066 
4067  if (chunk_seq != 0)
4068  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
4069  chunk_seq, chunk_id);
4070  }
4071  else if (found && chunk_seq != ent->last_chunk_seq + 1)
4072  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
4073  chunk_seq, chunk_id, ent->last_chunk_seq + 1);
4074 
4075  chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
4076  Assert(!isnull);
4077 
4078  /* calculate size so we can allocate the right size at once later */
4079  if (!VARATT_IS_EXTENDED(chunk))
4080  chunksize = VARSIZE(chunk) - VARHDRSZ;
4081  else if (VARATT_IS_SHORT(chunk))
4082  /* could happen due to heap_form_tuple doing its thing */
4083  chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
4084  else
4085  elog(ERROR, "unexpected type of toast chunk");
4086 
4087  ent->size += chunksize;
4088  ent->last_chunk_seq = chunk_seq;
4089  ent->num_chunks++;
4090  dlist_push_tail(&ent->chunks, &change->node);
4091 }
4092 
4093 /*
4094  * Rejigger change->newtuple to point to in-memory toast tuples instead to
4095  * on-disk toast tuples that may not longer exist (think DROP TABLE or VACUUM).
4096  *
4097  * We cannot replace unchanged toast tuples though, so those will still point
4098  * to on-disk toast data.
4099  *
4100  * While updating the existing change with detoasted tuple data, we need to
4101  * update the memory accounting info, because the change size will differ.
4102  * Otherwise the accounting may get out of sync, triggering serialization
4103  * at unexpected times.
4104  *
4105  * We simply subtract size of the change before rejiggering the tuple, and
4106  * then adding the new size. This makes it look like the change was removed
4107  * and then added back, except it only tweaks the accounting info.
4108  *
4109  * In particular it can't trigger serialization, which would be pointless
4110  * anyway as it happens during commit processing right before handing
4111  * the change to the output plugin.
4112  */
4113 static void
4115  Relation relation, ReorderBufferChange *change)
4116 {
4117  TupleDesc desc;
4118  int natt;
4119  Datum *attrs;
4120  bool *isnull;
4121  bool *free;
4122  HeapTuple tmphtup;
4123  Relation toast_rel;
4124  TupleDesc toast_desc;
4125  MemoryContext oldcontext;
4126  ReorderBufferTupleBuf *newtup;
4127 
4128  /* no toast tuples changed */
4129  if (txn->toast_hash == NULL)
4130  return;
4131 
4132  /*
4133  * We're going to modify the size of the change, so to make sure the
4134  * accounting is correct we'll make it look like we're removing the change
4135  * now (with the old size), and then re-add it at the end.
4136  */
4137  ReorderBufferChangeMemoryUpdate(rb, change, false);
4138 
4139  oldcontext = MemoryContextSwitchTo(rb->context);
4140 
4141  /* we should only have toast tuples in an INSERT or UPDATE */
4142  Assert(change->data.tp.newtuple);
4143 
4144  desc = RelationGetDescr(relation);
4145 
4146  toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
4147  if (!RelationIsValid(toast_rel))
4148  elog(ERROR, "could not open relation with OID %u",
4149  relation->rd_rel->reltoastrelid);
4150 
4151  toast_desc = RelationGetDescr(toast_rel);
4152 
4153  /* should we allocate from stack instead? */
4154  attrs = palloc0(sizeof(Datum) * desc->natts);
4155  isnull = palloc0(sizeof(bool) * desc->natts);
4156  free = palloc0(sizeof(bool) * desc->natts);
4157 
4158  newtup = change->data.tp.newtuple;
4159 
4160  heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
4161 
4162  for (natt = 0; natt < desc->natts; natt++)
4163  {
4164  Form_pg_attribute attr = TupleDescAttr(desc, natt);
4165  ReorderBufferToastEnt *ent;
4166  struct varlena *varlena;
4167 
4168  /* va_rawsize is the size of the original datum -- including header */
4169  struct varatt_external toast_pointer;
4170  struct varatt_indirect redirect_pointer;
4171  struct varlena *new_datum = NULL;
4172  struct varlena *reconstructed;
4173  dlist_iter it;
4174  Size data_done = 0;
4175 
4176  /* system columns aren't toasted */
4177  if (attr->attnum < 0)
4178  continue;
4179 
4180  if (attr->attisdropped)
4181  continue;
4182 
4183  /* not a varlena datatype */
4184  if (attr->attlen != -1)
4185  continue;
4186 
4187  /* no data */
4188  if (isnull[natt])
4189  continue;
4190 
4191  /* ok, we know we have a toast datum */
4192  varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
4193 
4194  /* no need to do anything if the tuple isn't external */
4195  if (!VARATT_IS_EXTERNAL(varlena))
4196  continue;
4197 
4198  VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
4199 
4200  /*
4201  * Check whether the toast tuple changed, replace if so.
4202  */
4203  ent = (ReorderBufferToastEnt *)
4204  hash_search(txn->toast_hash,
4205  (void *) &toast_pointer.va_valueid,
4206  HASH_FIND,
4207  NULL);
4208  if (ent == NULL)
4209  continue;
4210 
4211  new_datum =
4212  (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
4213 
4214  free[natt] = true;
4215 
4216  reconstructed = palloc0(toast_pointer.va_rawsize);
4217 
4218  ent->reconstructed = reconstructed;
4219 
4220  /* stitch toast tuple back together from its parts */
4221  dlist_foreach(it, &ent->chunks)
4222  {
4223  bool isnull;
4224  ReorderBufferChange *cchange;
4225  ReorderBufferTupleBuf *ctup;
4226  Pointer chunk;
4227 
4228  cchange = dlist_container(ReorderBufferChange, node, it.cur);
4229  ctup = cchange->data.tp.newtuple;
4230  chunk = DatumGetPointer(fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
4231 
4232  Assert(!isnull);
4233  Assert(!VARATT_IS_EXTERNAL(chunk));
4234  Assert(!VARATT_IS_SHORT(chunk));
4235 
4236  memcpy(VARDATA(reconstructed) + data_done,
4237  VARDATA(chunk),
4238  VARSIZE(chunk) - VARHDRSZ);
4239  data_done += VARSIZE(chunk) - VARHDRSZ;
4240  }
4241  Assert(data_done == toast_pointer.va_extsize);
4242 
4243  /* make sure its marked as compressed or not */
4244  if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
4245  SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
4246  else
4247  SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
4248 
4249  memset(&redirect_pointer, 0, sizeof(redirect_pointer));
4250  redirect_pointer.pointer = reconstructed;
4251 
4253  memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
4254  sizeof(redirect_pointer));
4255 
4256  attrs[natt] = PointerGetDatum(new_datum);
4257  }
4258 
4259  /*
4260  * Build tuple in separate memory & copy tuple back into the tuplebuf
4261  * passed to the output plugin. We can't directly heap_fill_tuple() into
4262  * the tuplebuf because attrs[] will point back into the current content.
4263  */
4264  tmphtup = heap_form_tuple(desc, attrs, isnull);
4265  Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
4266  Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
4267 
4268  memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
4269  newtup->tuple.t_len = tmphtup->t_len;
4270 
4271  /*
4272  * free resources we won't further need, more persistent stuff will be
4273  * free'd in ReorderBufferToastReset().
4274  */
4275  RelationClose(toast_rel);
4276  pfree(tmphtup);
4277  for (natt = 0; natt < desc->natts; natt++)
4278  {
4279  if (free[natt])
4280  pfree(DatumGetPointer(attrs[natt]));
4281  }
4282  pfree(attrs);
4283  pfree(free);
4284  pfree(isnull);
4285 
4286  MemoryContextSwitchTo(oldcontext);
4287 
4288  /* now add the change back, with the correct size */
4289  ReorderBufferChangeMemoryUpdate(rb, change, true);
4290 }
4291 
4292 /*
4293  * Free all resources allocated for toast reconstruction.
4294  */
4295 static void
4297 {
4298  HASH_SEQ_STATUS hstat;
4299  ReorderBufferToastEnt *ent;
4300 
4301  if (txn->toast_hash == NULL)
4302  return;
4303 
4304  /* sequentially walk over the hash and free everything */
4305  hash_seq_init(&hstat, txn->toast_hash);
4306  while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
4307  {
4308  dlist_mutable_iter it;
4309 
4310  if (ent->reconstructed != NULL)
4311  pfree(ent->reconstructed);
4312 
4313  dlist_foreach_modify(it, &ent->chunks)
4314  {
4315  ReorderBufferChange *change =
4317 
4318  dlist_delete(&change->node);
4319  ReorderBufferReturnChange(rb, change, true);
4320  }
4321  }
4322 
4323  hash_destroy(txn->toast_hash);
4324  txn->toast_hash = NULL;
4325 }
4326 
4327 
4328 /* ---------------------------------------
4329  * Visibility support for logical decoding
4330  *
4331  *
4332  * Lookup actual cmin/cmax values when using decoding snapshot. We can't
4333  * always rely on stored cmin/cmax values because of two scenarios:
4334  *
4335  * * A tuple got changed multiple times during a single transaction and thus
4336  * has got a combocid. Combocid's are only valid for the duration of a
4337  * single transaction.
4338  * * A tuple with a cmin but no cmax (and thus no combocid) got
4339  * deleted/updated in another transaction than the one which created it
4340  * which we are looking at right now. As only one of cmin, cmax or combocid
4341  * is actually stored in the heap we don't have access to the value we
4342  * need anymore.
4343  *
4344  * To resolve those problems we have a per-transaction hash of (cmin,
4345  * cmax) tuples keyed by (relfilenode, ctid) which contains the actual
4346  * (cmin, cmax) values. That also takes care of combocids by simply
4347  * not caring about them at all. As we have the real cmin/cmax values
4348  * combocids aren't interesting.
4349  *
4350  * As we only care about catalog tuples here the overhead of this
4351  * hashtable should be acceptable.
4352  *
4353  * Heap rewrites complicate this a bit, check rewriteheap.c for
4354  * details.
4355  * -------------------------------------------------------------------------
4356  */
4357 
4358 /* struct for sorting mapping files by LSN efficiently */
4359 typedef struct RewriteMappingFile
4360 {
4362  char fname[MAXPGPATH];
4364 
4365 #ifdef NOT_USED
4366 static void
4367 DisplayMapping(HTAB *tuplecid_data)
4368 {
4369  HASH_SEQ_STATUS hstat;
4371 
4372  hash_seq_init(&hstat, tuplecid_data);
4373  while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
4374  {
4375  elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
4376  ent->key.relnode.dbNode,
4377  ent->key.relnode.spcNode,
4378  ent->key.relnode.relNode,
4381  ent->cmin,
4382  ent->cmax
4383  );
4384  }
4385 }
4386 #endif
4387 
4388 /*
4389  * Apply a single mapping file to tuplecid_data.
4390  *
4391  * The mapping file has to have been verified to be a) committed b) for our
4392  * transaction c) applied in LSN order.
4393  */
4394 static void
4395 ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
4396 {
4397  char path[MAXPGPATH];
4398  int fd;
4399  int readBytes;
4401 
4402  sprintf(path, "pg_logical/mappings/%s", fname);
4403  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
4404  if (fd < 0)
4405  ereport(ERROR,
4407  errmsg("could not open file \"%s\": %m", path)));
4408 
4409  while (true)
4410  {
4413  ReorderBufferTupleCidEnt *new_ent;
4414  bool found;
4415 
4416  /* be careful about padding */
4417  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
4418 
4419  /* read all mappings till the end of the file */
4421  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
4423 
4424  if (readBytes < 0)
4425  ereport(ERROR,
4427  errmsg("could not read file \"%s\": %m",
4428  path)));
4429  else if (readBytes == 0) /* EOF */
4430  break;
4431  else if (readBytes != sizeof(LogicalRewriteMappingData))
4432  ereport(ERROR,
4434  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
4435  path, readBytes,
4436  (int32) sizeof(LogicalRewriteMappingData))));
4437 
4438  key.relnode = map.old_node;
4439  ItemPointerCopy(&map.old_tid,
4440  &key.tid);
4441 
4442 
4443  ent = (ReorderBufferTupleCidEnt *)
4444  hash_search(tuplecid_data,
4445  (void *) &key,
4446  HASH_FIND,
4447  NULL);
4448 
4449  /* no existing mapping, no need to update */
4450  if (!ent)
4451  continue;
4452 
4453  key.relnode = map.new_node;
4454  ItemPointerCopy(&map.new_tid,
4455  &key.tid);
4456 
4457  new_ent = (ReorderBufferTupleCidEnt *)
4458  hash_search(tuplecid_data,
4459  (void *) &key,
4460  HASH_ENTER,
4461  &found);
4462 
4463  if (found)
4464  {
4465  /*
4466  * Make sure the existing mapping makes sense. We sometime update
4467  * old records that did not yet have a cmax (e.g. pg_class' own
4468  * entry while rewriting it) during rewrites, so allow that.
4469  */
4470  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
4471  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
4472  }
4473  else
4474  {
4475  /* update mapping */
4476  new_ent->cmin = ent->cmin;
4477  new_ent->cmax = ent->cmax;
4478  new_ent->combocid = ent->combocid;
4479  }
4480  }
4481 
4482  if (CloseTransientFile(fd) != 0)
4483  ereport(ERROR,
4485  errmsg("could not close file \"%s\": %m", path)));
4486 }
4487 
4488 
4489 /*
4490  * Check whether the TransactionId 'xid' is in the pre-sorted array 'xip'.
4491  */
4492 static bool
4494 {
4495  return bsearch(&xid, xip, num,
4496  sizeof(TransactionId), xidComparator) != NULL;
4497 }
4498 
4499 /*
4500  * list_sort() comparator for sorting RewriteMappingFiles in LSN order.
4501  */
4502 static int
4503 file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
4504 {
4507 
4508  if (a->lsn < b->lsn)
4509  return -1;
4510  else if (a->lsn > b->lsn)
4511  return 1;
4512  return 0;
4513 }
4514 
4515 /*
4516  * Apply any existing logical remapping files if there are any targeted at our
4517  * transaction for relid.
4518  */
4519 static void
4521 {
4522  DIR *mapping_dir;
4523  struct dirent *mapping_de;
4524  List *files = NIL;
4525  ListCell *file;
4526  Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
4527 
4528  mapping_dir = AllocateDir("pg_logical/mappings");
4529  while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
4530  {
4531  Oid f_dboid;
4532  Oid f_relid;
4533  TransactionId f_mapped_xid;
4534  TransactionId f_create_xid;
4535  XLogRecPtr f_lsn;
4536  uint32 f_hi,
4537  f_lo;
4538  RewriteMappingFile *f;
4539 
4540  if (strcmp(mapping_de->d_name, ".") == 0 ||
4541  strcmp(mapping_de->d_name, "..") == 0)
4542  continue;
4543 
4544  /* Ignore files that aren't ours */
4545  if (strncmp(mapping_de->d_name, "map-", 4) != 0)
4546  continue;
4547 
4548  if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
4549  &f_dboid, &f_relid, &f_hi, &f_lo,
4550  &f_mapped_xid, &f_create_xid) != 6)
4551  elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
4552 
4553  f_lsn = ((uint64) f_hi) << 32 | f_lo;
4554 
4555  /* mapping for another database */
4556  if (f_dboid != dboid)
4557  continue;
4558 
4559  /* mapping for another relation */
4560  if (f_relid != relid)
4561  continue;
4562 
4563  /* did the creating transaction abort? */
4564  if (!TransactionIdDidCommit(f_create_xid))
4565  continue;
4566 
4567  /* not for our transaction */
4568  if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
4569  continue;
4570 
4571  /* ok, relevant, queue for apply */
4572  f = palloc(sizeof(RewriteMappingFile));
4573  f->lsn = f_lsn;
4574  strcpy(f->fname, mapping_de->d_name);
4575  files = lappend(files, f);
4576  }
4577  FreeDir(mapping_dir);
4578 
4579  /* sort files so we apply them in LSN order */
4580  list_sort(files, file_sort_by_lsn);
4581 
4582  foreach(file, files)
4583  {
4585 
4586  elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
4587  snapshot->subxip[0]);
4588  ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
4589  pfree(f);
4590  }
4591 }
4592 
4593 /*
4594  * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
4595  * combocids.
4596  */
4597 bool
4599  Snapshot snapshot,
4600  HeapTuple htup, Buffer buffer,
4601  CommandId *cmin, CommandId *cmax)
4602 {
4605  ForkNumber forkno;
4606  BlockNumber blockno;
4607  bool updated_mapping = false;
4608 
4609  /*
4610  * Return unresolved if tuplecid_data is not valid. That's because when
4611  * streaming in-progress transactions we may run into tuples with the CID
4612  * before actually decoding them. Think e.g. about INSERT followed by
4613  * TRUNCATE, where the TRUNCATE may not be decoded yet when applying the
4614  * INSERT. So in such cases, we assume the CID is from the future
4615  * command.
4616  */
4617  if (tuplecid_data == NULL)
4618  return false;
4619 
4620  /* be careful about padding */
4621  memset(&key, 0, sizeof(key));
4622 
4623  Assert(!BufferIsLocal(buffer));
4624 
4625  /*
4626  * get relfilenode from the buffer, no convenient way to access it other
4627  * than that.
4628  */
4629  BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
4630 
4631  /* tuples can only be in the main fork */
4632  Assert(forkno == MAIN_FORKNUM);
4633  Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
4634 
4635  ItemPointerCopy(&htup->t_self,
4636  &key.tid);
4637 
4638 restart:
4639  ent = (ReorderBufferTupleCidEnt *)
4640  hash_search(tuplecid_data,
4641  (void *) &key,
4642  HASH_FIND,
4643  NULL);
4644 
4645  /*
4646  * failed to find a mapping, check whether the table was rewritten and
4647  * apply mapping if so, but only do that once - there can be no new
4648  * mappings while we are in here since we have to hold a lock on the
4649  * relation.
4650  */
4651  if (ent == NULL && !updated_mapping)
4652  {
4653  UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
4654  /* now check but don't update for a mapping again */
4655  updated_mapping = true;
4656  goto restart;
4657  }
4658  else if (ent == NULL)
4659  return false;
4660 
4661  if (cmin)
4662  *cmin = ent->cmin;
4663  if (cmax)
4664  *cmax = ent->cmax;
4665  return true;
4666 }
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
XLogRecPtr first_lsn
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
#define NIL
Definition: pg_list.h:65
uint32 CommandId
Definition: c.h:534
TimestampTz commit_time
struct ReorderBufferToastEnt ReorderBufferToastEnt
struct TXNEntryFile TXNEntryFile
void AbortCurrentTransaction(void)
Definition: xact.c:3211
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
bool IsToastRelation(Relation relation)
Definition: catalog.c:140
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:835
void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
#define relpathperm(rnode, forknum)
Definition: relpath.h:83
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
ReorderBufferApplyChangeCB apply_change
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
#define AllocSetContextCreate
Definition: memutils.h:170
HeapTupleData * HeapTuple
Definition: htup.h:71
void * private_data
dlist_node base_snapshot_node
#define DEBUG1
Definition: elog.h:25
dlist_node * cur
Definition: ilist.h:180
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
void StartupReorderBuffer(void)
#define VARDATA(PTR)
Definition: postgres.h:302
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define fastgetattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:712
static int32 next
Definition: blutils.c:219
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1436
#define RBTXN_HAS_TOAST_INSERT
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
#define HASH_CONTEXT
Definition: hsearch.h:93
struct ReorderBufferChange::@99::@101 truncate
#define HASH_ELEM
Definition: hsearch.h:87
int wal_segment_size
Definition: xlog.c:116
void ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
uint32 TransactionId
Definition: c.h:520
void ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
bool copied
Definition: snapshot.h:185
bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
Definition: snapbuild.c:404
void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
#define RBTXN_IS_STREAMED
MemoryContext hcxt
Definition: hsearch.h:78
#define DatumGetInt32(X)
Definition: postgres.h:472
int sqlerrcode
Definition: elog.h:364
#define RelationGetDescr(relation)
Definition: rel.h:482
static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb)
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
#define DEBUG3
Definition: elog.h:23
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
Definition: snapbuild.c:395
#define write(a, b, c)
Definition: win32.h:14
#define VARHDRSZ_SHORT
Definition: postgres.h:268
TransactionId by_txn_last_xid
#define VARSIZE(PTR)
Definition: postgres.h:303
ErrorData * CopyErrorData(void)
Definition: elog.c:1474
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
Definition: detoast.h:32
int64 TimestampTz
Definition: timestamp.h:39
struct ReorderBufferChange::@99::@102 msg
#define PointerGetDatum(X)
Definition: postgres.h:556
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
ReorderBufferTXN * txn
static bool TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
#define VARHDRSZ
Definition: c.h:568
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
ReorderBufferStreamAbortCB stream_abort
XLogRecPtr current_restart_decoding_lsn
#define DatumGetObjectId(X)
Definition: postgres.h:500
char * pstrdup(const char *in)
Definition: mcxt.c:1186
#define rbtxn_has_incomplete_tuple(txn)
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2664
static ReorderBufferTXN * ReorderBufferGetTXN(ReorderBuffer *rb)
static void ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:86
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
struct ReorderBufferChange::@99::@103 tuplecid
void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void ReorderBufferFree(ReorderBuffer *rb)
uint16 RepOriginId
Definition: xlogdefs.h:58
Size entrysize
Definition: hsearch.h:73
CommandId command_id
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:283
Snapshot snapshot_now
struct cursor * cur
Definition: ecpg.c:28
char fname[MAXPGPATH]
int32 va_rawsize
Definition: postgres.h:69
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4702
#define INFO
Definition: elog.h:33
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:174
void binaryheap_replace_first(binaryheap *heap, Datum d)
Definition: binaryheap.c:204
void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileNode node, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
uint32 BlockNumber
Definition: block.h:31
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2051
ReorderBufferCommitCB commit
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:635
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:927
static int64 files
Definition: pg_checksums.c:34
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:125
ReplicationSlotPersistentData data
Definition: slot.h:143
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
struct SnapshotData * Snapshot
Definition: snapshot.h:121
Form_pg_class rd_rel
Definition: rel.h:109
unsigned int Oid
Definition: postgres_ext.h:31
XLogRecPtr base_snapshot_lsn
ReorderBufferStreamCommitCB stream_commit
Definition: dirent.h:9
uint32 regd_count
Definition: snapshot.h:199
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
void binaryheap_add_unordered(binaryheap *heap, Datum d)
Definition: binaryheap.c:110
MemoryContext change_context
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define VARDATA_EXTERNAL(PTR)
Definition: postgres.h:310
#define PG_BINARY
Definition: c.h:1211
void FlushErrorState(void)
Definition: elog.c:1568
XLogRecPtr origin_lsn
static void ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
signed int int32
Definition: c.h:362
#define FirstCommandId
Definition: c.h:536
XLogRecPtr EndRecPtr
Definition: xlogreader.h:176
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
HeapTupleHeader t_data
Definition: htup.h:68
#define VARATT_IS_EXTERNAL(PTR)
Definition: postgres.h:313
#define sprintf
Definition: port.h:195
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:175
#define rbtxn_is_streamed(txn)
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:421
Definition: dynahash.c:220
ReorderBufferStreamMessageCB stream_message
#define RBTXN_HAS_CATALOG_CHANGES
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:38
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:1056
char * Pointer
Definition: c.h:351
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1530
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
Definition: dirent.c:25
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2372
#define rbtxn_has_toast_insert(txn)
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:222
#define VARATT_IS_SHORT(PTR)
Definition: postgres.h:326
#define RelationIsValid(relation)
Definition: rel.h:429
dlist_head changes
void ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)
dlist_head txns_by_base_snapshot_lsn
Datum binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:159
#define IsSpecInsert(action)
#define MAXPGPATH
ItemPointerData t_self
Definition: htup.h:65
ReorderBufferTupleCidKey key
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
#define DEBUG2
Definition: elog.h:24
static bool ReorderBufferCanStream(ReorderBuffer *rb)
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:438
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
uint32 t_len
Definition: htup.h:64
#define MaxHeapTupleSize
Definition: htup_details.h:560
struct varlena * reconstructed
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4513
#define SET_VARTAG_EXTERNAL(PTR, tag)
Definition: postgres.h:333
uint64 XLogSegNo
Definition: xlogdefs.h:41
int errcode_for_file_access(void)
Definition: elog.c:633
HeapTupleData tuple
Definition: reorderbuffer.h:29
struct SnapshotData SnapshotData
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:455
static ReorderBufferTXN * ReorderBufferLargestTopTXN(ReorderBuffer *rb)
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
#define InvalidTransactionId
Definition: transam.h:31
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:193
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
unsigned int uint32
Definition: c.h:374
XLogRecPtr final_lsn
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2583
#define RBTXN_IS_SUBXACT
Oid t_tableOid
Definition: htup.h:66
static int file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
struct ReorderBufferTXN * toptxn
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1381
void RelationClose(Relation relation)
Definition: relcache.c:2103
TransactionId xmin
Definition: snapshot.h:157
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define INDIRECT_POINTER_SIZE
Definition: detoast.h:44
ReorderBufferMessageCB message
ReorderBufferStreamChangeCB stream_change
#define AssertArg(condition)
Definition: c.h:747
int bh_size
Definition: binaryheap.h:32
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
TransactionId * xip
Definition: snapshot.h:168
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
#define VARSIZE_SHORT(PTR)
Definition: postgres.h:305
ForkNumber
Definition: relpath.h:40
RepOriginId origin_id
Definition: reorderbuffer.h:88
ReorderBufferStreamTruncateCB stream_truncate
List * lappend(List *list, void *datum)
Definition: list.c:321
#define rbtxn_has_spec_insert(txn)
static HTAB * tuplecid_data
Definition: snapmgr.c:172
int CloseTransientFile(int fd)
Definition: fd.c:2549
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId CheckXidAlive
Definition: xact.c:95
#define stat(a, b)
Definition: win32_port.h:255
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
#define HASH_BLOBS
Definition: hsearch.h:88
TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb)
ReorderBufferChange * change
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size blockSize)
Definition: generation.c:196
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
MemoryContext context
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
void ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
static void ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
void * palloc0(Size size)
Definition: mcxt.c:980
static void ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
static void dlist_insert_before(dlist_node *before, dlist_node *node)
Definition: ilist.h:346
static bool dlist_has_next(dlist_head *head, dlist_node *node)
Definition: ilist.h:402
#define InvalidCommandId
Definition: c.h:537
uintptr_t Datum
Definition: postgres.h:367
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:328
ReorderBufferTXN * by_txn_last_txn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
union ReorderBufferChange::@99 data
static void cleanup(void)
Definition: bootstrap.c:886
dlist_head toplevel_by_lsn
struct RewriteMappingFile RewriteMappingFile
Oid MyDatabaseId
Definition: globals.c:85
TransactionId xid
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
bool IsSharedRelation(Oid relationId)
Definition: catalog.c:238
Size keysize
Definition: hsearch.h:72
dlist_node * cur
Definition: ilist.h:161
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
static void AssertChangeLsnOrder(ReorderBufferTXN *txn)
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:839
static void AssertTXNLsnOrder(ReorderBuffer *rb)
void ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
#define InvalidOid
Definition: postgres_ext.h:36
ReorderBufferStreamStartCB stream_start
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
CommandId curcid
Definition: snapshot.h:187
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
struct ReorderBufferIterTXNState ReorderBufferIterTXNState
#define ereport(elevel,...)
Definition: elog.h:144
static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
struct ReorderBufferDiskChange ReorderBufferDiskChange
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:126
#define free(a)
Definition: header.h:65
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
struct SnapBuild * snapshot_builder
Definition: logical.h:43
#define PG_CATCH()
Definition: elog.h:305
void FileClose(File file)
Definition: fd.c:1826
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
ItemPointerData new_tid
Definition: rewriteheap.h:40
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
struct ReorderBufferChange::@99::@100 tp
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:745
#define lfirst(lc)
Definition: pg_list.h:190
ReorderBufferTXN * ReorderBufferGetOldestTXN(ReorderBuffer *rb)
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
Definition: regguts.h:298
#define IsSpecConfirm(action)
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2649
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
int32 va_extsize
Definition: postgres.h:70
XLogRecPtr end_lsn
void StartTransactionCommand(void)
Definition: xact.c:2846
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4408
void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
int logical_decoding_work_mem
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
size_t Size
Definition: c.h:473
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1357
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:69
#define IsInsertOrUpdate(action)
SharedInvalidationMessage * invalidations
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)
#define BufferIsLocal(buffer)
Definition: buf.h:37
#define RBTXN_HAS_SPEC_INSERT
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:221
#define ItemPointerGetOffsetNumber(pointer)
Definition: itemptr.h:117
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
#define PG_RE_THROW()
Definition: elog.h:336
ReorderBuffer * ReorderBufferAllocate(void)
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1410
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1069
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1400
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
struct varlena * pointer
Definition: postgres.h:86
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:428
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
dlist_head subtxns
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:33
#define VARATT_IS_EXTENDED(PTR)
Definition: postgres.h:327
#define S_ISDIR(m)
Definition: win32_port.h:296
#define DatumGetPointer(X)
Definition: postgres.h:549
#define lstat(path, sb)
Definition: win32_port.h:244
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:1249
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:559
#define Int32GetDatum(X)
Definition: postgres.h:479
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define rbtxn_has_catalog_changes(txn)
static void ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
uint32 xcnt
Definition: snapshot.h:169
void * palloc(Size size)
Definition: mcxt.c:949
void list_sort(List *list, list_sort_comparator cmp)
Definition: list.c:1481
int errmsg(const char *fmt,...)
Definition: elog.c:824
XLogReaderState * reader
Definition: logical.h:41
static void UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
ReorderBufferApplyTruncateCB apply_truncate
#define RBTXN_IS_SERIALIZED
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
ReorderBufferTXN * txn
void ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2035
#define elog(elevel,...)
Definition: elog.h:214
Oid * ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
int i
XLogRecPtr restart_decoding_lsn
#define NameStr(name)
Definition: c.h:622
#define SET_VARSIZE_COMPRESSED(PTR, len)
Definition: postgres.h:331
#define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)
Definition: detoast.h:22
void * arg
Datum binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:174
static const Size max_changes_in_memory
Definition: c.h:562
static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
MemoryContext tup_context
#define SET_VARSIZE(PTR, len)
Definition: postgres.h:329
char d_name[MAX_PATH]
Definition: dirent.h:14
#define ItemPointerGetBlockNumber(pointer)
Definition: itemptr.h:98
#define rbtxn_is_serialized(txn)
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define qsort(a, b, c, d)
Definition: port.h:479
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static void SetupCheckXidLive(TransactionId xid)
ReorderBufferChange change
ReorderBufferBeginCB begin
static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
void BufferGetTag(Buffer buffer, RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
Definition: bufmgr.c:2689
#define PG_TRY()
Definition: elog.h:295
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
#define rbtxn_is_known_subxact(txn)
int FileRead(File file, char *buffer, int amount, off_t offset, uint32 wait_event_info)
Definition: fd.c:1973
Definition: pg_list.h:50
#define snprintf
Definition: port.h:193
int Buffer
Definition: buf.h:23
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:1997
#define PG_END_TRY()
Definition: elog.h:320
int File
Definition: fd.h:49
#define read(a, b, c)
Definition: win32.h:13
int FreeDir(DIR *dir)
Definition: fd.c:2701
struct HeapTupleData HeapTupleData
TransactionId toplevel_xid
#define offsetof(type, field)
Definition: c.h:668
MemoryContext txn_context
dlist_head tuplecids
static void ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:161
TransactionId * subxip
Definition: snapshot.h:180
uint32 active_count
Definition: snapshot.h:198
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:139
struct ReorderBufferChange ReorderBufferChange
int32 subxcnt
Definition: snapshot.h:181
void ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
ReorderBufferStreamStopCB stream_stop
ReorderBufferTupleBuf * ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
bool ResolveCminCmaxDuringDecoding(HT