PostgreSQL Source Code  git master
reorderbuffer.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * reorderbuffer.c
4  * PostgreSQL logical replay/reorder buffer management
5  *
6  *
7  * Copyright (c) 2012-2024, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/replication/logical/reorderbuffer.c
12  *
13  * NOTES
14  * This module gets handed individual pieces of transactions in the order
15  * they are written to the WAL and is responsible to reassemble them into
16  * toplevel transaction sized pieces. When a transaction is completely
17  * reassembled - signaled by reading the transaction commit record - it
18  * will then call the output plugin (cf. ReorderBufferCommit()) with the
19  * individual changes. The output plugins rely on snapshots built by
20  * snapbuild.c which hands them to us.
21  *
22  * Transactions and subtransactions/savepoints in postgres are not
23  * immediately linked to each other from outside the performing
24  * backend. Only at commit/abort (or special xact_assignment records) they
25  * are linked together. Which means that we will have to splice together a
26  * toplevel transaction from its subtransactions. To do that efficiently we
27  * build a binary heap indexed by the smallest current lsn of the individual
28  * subtransactions' changestreams. As the individual streams are inherently
29  * ordered by LSN - since that is where we build them from - the transaction
30  * can easily be reassembled by always using the subtransaction with the
31  * smallest current LSN from the heap.
32  *
33  * In order to cope with large transactions - which can be several times as
34  * big as the available memory - this module supports spooling the contents
35  * of large transactions to disk. When the transaction is replayed the
36  * contents of individual (sub-)transactions will be read from disk in
37  * chunks.
38  *
39  * This module also has to deal with reassembling toast records from the
40  * individual chunks stored in WAL. When a new (or initial) version of a
41  * tuple is stored in WAL it will always be preceded by the toast chunks
42  * emitted for the columns stored out of line. Within a single toplevel
43  * transaction there will be no other data carrying records between a row's
44  * toast chunks and the row data itself. See ReorderBufferToast* for
45  * details.
46  *
47  * ReorderBuffer uses two special memory context types - SlabContext for
48  * allocations of fixed-length structures (changes and transactions), and
49  * GenerationContext for the variable-length transaction data (allocated
50  * and freed in groups with similar lifespans).
51  *
52  * To limit the amount of memory used by decoded changes, we track memory
53  * used at the reorder buffer level (i.e. total amount of memory), and for
54  * each transaction. When the total amount of used memory exceeds the
55  * limit, the transaction consuming the most memory is then serialized to
56  * disk.
57  *
58  * Only decoded changes are evicted from memory (spilled to disk), not the
59  * transaction records. The number of toplevel transactions is limited,
60  * but a transaction with many subtransactions may still consume significant
61  * amounts of memory. However, the transaction records are fairly small and
62  * are not included in the memory limit.
63  *
64  * The current eviction algorithm is very simple - the transaction is
65  * picked merely by size, while it might be useful to also consider age
66  * (LSN) of the changes for example. With the new Generational memory
67  * allocator, evicting the oldest changes would make it more likely the
68  * memory gets actually freed.
69  *
70  * We use a max-heap with transaction size as the key to efficiently find
71  * the largest transaction. We update the max-heap whenever the memory
72  * counter is updated; however transactions with size 0 are not stored in
73  * the heap, because they have no changes to evict.
74  *
75  * We still rely on max_changes_in_memory when loading serialized changes
76  * back into memory. At that point we can't use the memory limit directly
77  * as we load the subxacts independently. One option to deal with this
78  * would be to count the subxacts, and allow each to allocate 1/N of the
79  * memory limit. That however does not seem very appealing, because with
80  * many subtransactions it may easily cause thrashing (short cycles of
81  * deserializing and applying very few changes). We probably should give
82  * a bit more memory to the oldest subtransactions, because it's likely
83  * they are the source for the next sequence of changes.
84  *
85  * -------------------------------------------------------------------------
86  */
87 #include "postgres.h"
88 
89 #include <unistd.h>
90 #include <sys/stat.h>
91 
92 #include "access/detoast.h"
93 #include "access/heapam.h"
94 #include "access/rewriteheap.h"
95 #include "access/transam.h"
96 #include "access/xact.h"
97 #include "access/xlog_internal.h"
98 #include "catalog/catalog.h"
99 #include "common/int.h"
100 #include "lib/binaryheap.h"
101 #include "miscadmin.h"
102 #include "pgstat.h"
103 #include "replication/logical.h"
105 #include "replication/slot.h"
106 #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
107 #include "storage/bufmgr.h"
108 #include "storage/fd.h"
109 #include "storage/sinval.h"
110 #include "utils/builtins.h"
111 #include "utils/memutils.h"
112 #include "utils/rel.h"
113 #include "utils/relfilenumbermap.h"
114 
115 /* entry for a hash table we use to map from xid to our transaction state */
117 {
121 
122 /* data structures for (relfilelocator, ctid) => (cmin, cmax) mapping */
124 {
128 
130 {
134  CommandId combocid; /* just for debugging */
136 
137 /* Virtual file descriptor with file offset tracking */
138 typedef struct TXNEntryFile
139 {
140  File vfd; /* -1 when the file is closed */
141  off_t curOffset; /* offset for next write or read. Reset to 0
142  * when vfd is opened. */
144 
145 /* k-way in-order change iteration support structures */
147 {
154 
156 {
162 
163 /* toast datastructures */
164 typedef struct ReorderBufferToastEnt
165 {
166  Oid chunk_id; /* toast_table.chunk_id */
167  int32 last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
168  * have seen */
169  Size num_chunks; /* number of chunks we've already seen */
170  Size size; /* combined size of chunks seen */
171  dlist_head chunks; /* linked list of chunks */
172  struct varlena *reconstructed; /* reconstructed varlena now pointed to in
173  * main tup */
175 
176 /* Disk serialization support datastructures */
178 {
181  /* data follows */
183 
184 #define IsSpecInsert(action) \
185 ( \
186  ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \
187 )
188 #define IsSpecConfirmOrAbort(action) \
189 ( \
190  (((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) || \
191  ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT)) \
192 )
193 #define IsInsertOrUpdate(action) \
194 ( \
195  (((action) == REORDER_BUFFER_CHANGE_INSERT) || \
196  ((action) == REORDER_BUFFER_CHANGE_UPDATE) || \
197  ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT)) \
198 )
199 
200 /*
201  * Maximum number of changes kept in memory, per transaction. After that,
202  * changes are spooled to disk.
203  *
204  * The current value should be sufficient to decode the entire transaction
205  * without hitting disk in OLTP workloads, while starting to spool to disk in
206  * other workloads reasonably fast.
207  *
208  * At some point in the future it probably makes sense to have a more elaborate
209  * resource management here, but it's not entirely clear what that would look
210  * like.
211  */
213 static const Size max_changes_in_memory = 4096; /* XXX for restore only */
214 
215 /* GUC variable */
217 
218 /* ---------------------------------------
219  * primary reorderbuffer support routines
220  * ---------------------------------------
221  */
225  TransactionId xid, bool create, bool *is_new,
226  XLogRecPtr lsn, bool create_as_top);
228  ReorderBufferTXN *subtxn);
229 
230 static void AssertTXNLsnOrder(ReorderBuffer *rb);
231 
232 /* ---------------------------------------
233  * support functions for lsn-order iterating over the ->changes of a
234  * transaction and its subtransactions
235  *
236  * used for iteration over the k-way heap merge of a transaction and its
237  * subtransactions
238  * ---------------------------------------
239  */
241  ReorderBufferIterTXNState *volatile *iter_state);
246 
247 /*
248  * ---------------------------------------
249  * Disk serialization support functions
250  * ---------------------------------------
251  */
255  int fd, ReorderBufferChange *change);
257  TXNEntryFile *file, XLogSegNo *segno);
259  char *data);
262  bool txn_prepared);
263 static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
264 static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
265  TransactionId xid, XLogSegNo segno);
266 static int ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg);
267 
268 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
270  ReorderBufferTXN *txn, CommandId cid);
271 
272 /*
273  * ---------------------------------------
274  * Streaming support functions
275  * ---------------------------------------
276  */
277 static inline bool ReorderBufferCanStream(ReorderBuffer *rb);
278 static inline bool ReorderBufferCanStartStreaming(ReorderBuffer *rb);
281 
282 /* ---------------------------------------
283  * toast reassembly support
284  * ---------------------------------------
285  */
289  Relation relation, ReorderBufferChange *change);
291  Relation relation, ReorderBufferChange *change);
292 
293 /*
294  * ---------------------------------------
295  * memory accounting
296  * ---------------------------------------
297  */
300  ReorderBufferChange *change,
301  ReorderBufferTXN *txn,
302  bool addition, Size sz);
303 
304 /*
305  * Allocate a new ReorderBuffer and clean out any old serialized state from
306  * prior ReorderBuffer instances for the same slot.
307  */
310 {
311  ReorderBuffer *buffer;
312  HASHCTL hash_ctl;
313  MemoryContext new_ctx;
314 
315  Assert(MyReplicationSlot != NULL);
316 
317  /* allocate memory in own context, to have better accountability */
319  "ReorderBuffer",
321 
322  buffer =
323  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
324 
325  memset(&hash_ctl, 0, sizeof(hash_ctl));
326 
327  buffer->context = new_ctx;
328 
329  buffer->change_context = SlabContextCreate(new_ctx,
330  "Change",
332  sizeof(ReorderBufferChange));
333 
334  buffer->txn_context = SlabContextCreate(new_ctx,
335  "TXN",
337  sizeof(ReorderBufferTXN));
338 
339  /*
340  * To minimize memory fragmentation caused by long-running transactions
341  * with changes spanning multiple memory blocks, we use a single
342  * fixed-size memory block for decoded tuple storage. The performance
343  * testing showed that the default memory block size maintains logical
344  * decoding performance without causing fragmentation due to concurrent
345  * transactions. One might think that we can use the max size as
346  * SLAB_LARGE_BLOCK_SIZE but the test also showed it doesn't help resolve
347  * the memory fragmentation.
348  */
349  buffer->tup_context = GenerationContextCreate(new_ctx,
350  "Tuples",
354 
355  hash_ctl.keysize = sizeof(TransactionId);
356  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
357  hash_ctl.hcxt = buffer->context;
358 
359  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
361 
363  buffer->by_txn_last_txn = NULL;
364 
365  buffer->outbuf = NULL;
366  buffer->outbufsize = 0;
367  buffer->size = 0;
368 
369  /* txn_heap is ordered by transaction size */
371 
372  buffer->spillTxns = 0;
373  buffer->spillCount = 0;
374  buffer->spillBytes = 0;
375  buffer->streamTxns = 0;
376  buffer->streamCount = 0;
377  buffer->streamBytes = 0;
378  buffer->totalTxns = 0;
379  buffer->totalBytes = 0;
380 
382 
383  dlist_init(&buffer->toplevel_by_lsn);
385  dclist_init(&buffer->catchange_txns);
386 
387  /*
388  * Ensure there's no stale data from prior uses of this slot, in case some
389  * prior exit avoided calling ReorderBufferFree. Failure to do this can
390  * produce duplicated txns, and it's very cheap if there's nothing there.
391  */
393 
394  return buffer;
395 }
396 
397 /*
398  * Free a ReorderBuffer
399  */
400 void
402 {
404 
405  /*
406  * We free separately allocated data by entirely scrapping reorderbuffer's
407  * memory context.
408  */
410 
411  /* Free disk space used by unconsumed reorder buffers */
413 }
414 
415 /*
416  * Get an unused, possibly preallocated, ReorderBufferTXN.
417  */
418 static ReorderBufferTXN *
420 {
421  ReorderBufferTXN *txn;
422 
423  txn = (ReorderBufferTXN *)
425 
426  memset(txn, 0, sizeof(ReorderBufferTXN));
427 
428  dlist_init(&txn->changes);
429  dlist_init(&txn->tuplecids);
430  dlist_init(&txn->subtxns);
431 
432  /* InvalidCommandId is not zero, so set it explicitly */
434  txn->output_plugin_private = NULL;
435 
436  return txn;
437 }
438 
439 /*
440  * Free a ReorderBufferTXN.
441  */
442 static void
444 {
445  /* clean the lookup cache if we were cached (quite likely) */
446  if (rb->by_txn_last_xid == txn->xid)
447  {
449  rb->by_txn_last_txn = NULL;
450  }
451 
452  /* free data that's contained */
453 
454  if (txn->gid != NULL)
455  {
456  pfree(txn->gid);
457  txn->gid = NULL;
458  }
459 
460  if (txn->tuplecid_hash != NULL)
461  {
463  txn->tuplecid_hash = NULL;
464  }
465 
466  if (txn->invalidations)
467  {
468  pfree(txn->invalidations);
469  txn->invalidations = NULL;
470  }
471 
472  /* Reset the toast hash */
473  ReorderBufferToastReset(rb, txn);
474 
475  /* All changes must be deallocated */
476  Assert(txn->size == 0);
477 
478  pfree(txn);
479 }
480 
481 /*
482  * Get a fresh ReorderBufferChange.
483  */
486 {
487  ReorderBufferChange *change;
488 
489  change = (ReorderBufferChange *)
491 
492  memset(change, 0, sizeof(ReorderBufferChange));
493  return change;
494 }
495 
496 /*
497  * Free a ReorderBufferChange and update memory accounting, if requested.
498  */
499 void
501  bool upd_mem)
502 {
503  /* update memory accounting info */
504  if (upd_mem)
505  ReorderBufferChangeMemoryUpdate(rb, change, NULL, false,
506  ReorderBufferChangeSize(change));
507 
508  /* free contained data */
509  switch (change->action)
510  {
515  if (change->data.tp.newtuple)
516  {
517  ReorderBufferReturnTupleBuf(change->data.tp.newtuple);
518  change->data.tp.newtuple = NULL;
519  }
520 
521  if (change->data.tp.oldtuple)
522  {
523  ReorderBufferReturnTupleBuf(change->data.tp.oldtuple);
524  change->data.tp.oldtuple = NULL;
525  }
526  break;
528  if (change->data.msg.prefix != NULL)
529  pfree(change->data.msg.prefix);
530  change->data.msg.prefix = NULL;
531  if (change->data.msg.message != NULL)
532  pfree(change->data.msg.message);
533  change->data.msg.message = NULL;
534  break;
536  if (change->data.inval.invalidations)
537  pfree(change->data.inval.invalidations);
538  change->data.inval.invalidations = NULL;
539  break;
541  if (change->data.snapshot)
542  {
543  ReorderBufferFreeSnap(rb, change->data.snapshot);
544  change->data.snapshot = NULL;
545  }
546  break;
547  /* no data in addition to the struct itself */
549  if (change->data.truncate.relids != NULL)
550  {
551  ReorderBufferReturnRelids(rb, change->data.truncate.relids);
552  change->data.truncate.relids = NULL;
553  }
554  break;
559  break;
560  }
561 
562  pfree(change);
563 }
564 
565 /*
566  * Get a fresh HeapTuple fitting a tuple of size tuple_len (excluding header
567  * overhead).
568  */
569 HeapTuple
571 {
572  HeapTuple tuple;
573  Size alloc_len;
574 
575  alloc_len = tuple_len + SizeofHeapTupleHeader;
576 
578  HEAPTUPLESIZE + alloc_len);
579  tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
580 
581  return tuple;
582 }
583 
584 /*
585  * Free a HeapTuple returned by ReorderBufferGetTupleBuf().
586  */
587 void
589 {
590  pfree(tuple);
591 }
592 
593 /*
594  * Get an array for relids of truncated relations.
595  *
596  * We use the global memory context (for the whole reorder buffer), because
597  * none of the existing ones seems like a good match (some are SLAB, so we
598  * can't use those, and tup_context is meant for tuple data, not relids). We
599  * could add yet another context, but it seems like an overkill - TRUNCATE is
600  * not particularly common operation, so it does not seem worth it.
601  */
602 Oid *
604 {
605  Oid *relids;
606  Size alloc_len;
607 
608  alloc_len = sizeof(Oid) * nrelids;
609 
610  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
611 
612  return relids;
613 }
614 
615 /*
616  * Free an array of relids.
617  */
618 void
620 {
621  pfree(relids);
622 }
623 
624 /*
625  * Return the ReorderBufferTXN from the given buffer, specified by Xid.
626  * If create is true, and a transaction doesn't already exist, create it
627  * (with the given LSN, and as top transaction if that's specified);
628  * when this happens, is_new is set to true.
629  */
630 static ReorderBufferTXN *
632  bool *is_new, XLogRecPtr lsn, bool create_as_top)
633 {
634  ReorderBufferTXN *txn;
636  bool found;
637 
639 
640  /*
641  * Check the one-entry lookup cache first
642  */
644  rb->by_txn_last_xid == xid)
645  {
646  txn = rb->by_txn_last_txn;
647 
648  if (txn != NULL)
649  {
650  /* found it, and it's valid */
651  if (is_new)
652  *is_new = false;
653  return txn;
654  }
655 
656  /*
657  * cached as non-existent, and asked not to create? Then nothing else
658  * to do.
659  */
660  if (!create)
661  return NULL;
662  /* otherwise fall through to create it */
663  }
664 
665  /*
666  * If the cache wasn't hit or it yielded a "does-not-exist" and we want to
667  * create an entry.
668  */
669 
670  /* search the lookup table */
671  ent = (ReorderBufferTXNByIdEnt *)
672  hash_search(rb->by_txn,
673  &xid,
674  create ? HASH_ENTER : HASH_FIND,
675  &found);
676  if (found)
677  txn = ent->txn;
678  else if (create)
679  {
680  /* initialize the new entry, if creation was requested */
681  Assert(ent != NULL);
682  Assert(lsn != InvalidXLogRecPtr);
683 
684  ent->txn = ReorderBufferGetTXN(rb);
685  ent->txn->xid = xid;
686  txn = ent->txn;
687  txn->first_lsn = lsn;
689 
690  if (create_as_top)
691  {
692  dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
693  AssertTXNLsnOrder(rb);
694  }
695  }
696  else
697  txn = NULL; /* not found and not asked to create */
698 
699  /* update cache */
700  rb->by_txn_last_xid = xid;
701  rb->by_txn_last_txn = txn;
702 
703  if (is_new)
704  *is_new = !found;
705 
706  Assert(!create || txn != NULL);
707  return txn;
708 }
709 
710 /*
711  * Record the partial change for the streaming of in-progress transactions. We
712  * can stream only complete changes so if we have a partial change like toast
713  * table insert or speculative insert then we mark such a 'txn' so that it
714  * can't be streamed. We also ensure that if the changes in such a 'txn' can
715  * be streamed and are above logical_decoding_work_mem threshold then we stream
716  * them as soon as we have a complete change.
717  */
718 static void
720  ReorderBufferChange *change,
721  bool toast_insert)
722 {
723  ReorderBufferTXN *toptxn;
724 
725  /*
726  * The partial changes need to be processed only while streaming
727  * in-progress transactions.
728  */
729  if (!ReorderBufferCanStream(rb))
730  return;
731 
732  /* Get the top transaction. */
733  toptxn = rbtxn_get_toptxn(txn);
734 
735  /*
736  * Indicate a partial change for toast inserts. The change will be
737  * considered as complete once we get the insert or update on the main
738  * table and we are sure that the pending toast chunks are not required
739  * anymore.
740  *
741  * If we allow streaming when there are pending toast chunks then such
742  * chunks won't be released till the insert (multi_insert) is complete and
743  * we expect the txn to have streamed all changes after streaming. This
744  * restriction is mainly to ensure the correctness of streamed
745  * transactions and it doesn't seem worth uplifting such a restriction
746  * just to allow this case because anyway we will stream the transaction
747  * once such an insert is complete.
748  */
749  if (toast_insert)
751  else if (rbtxn_has_partial_change(toptxn) &&
752  IsInsertOrUpdate(change->action) &&
753  change->data.tp.clear_toast_afterwards)
755 
756  /*
757  * Indicate a partial change for speculative inserts. The change will be
758  * considered as complete once we get the speculative confirm or abort
759  * token.
760  */
761  if (IsSpecInsert(change->action))
763  else if (rbtxn_has_partial_change(toptxn) &&
764  IsSpecConfirmOrAbort(change->action))
766 
767  /*
768  * Stream the transaction if it is serialized before and the changes are
769  * now complete in the top-level transaction.
770  *
771  * The reason for doing the streaming of such a transaction as soon as we
772  * get the complete change for it is that previously it would have reached
773  * the memory threshold and wouldn't get streamed because of incomplete
774  * changes. Delaying such transactions would increase apply lag for them.
775  */
777  !(rbtxn_has_partial_change(toptxn)) &&
778  rbtxn_is_serialized(txn) &&
780  ReorderBufferStreamTXN(rb, toptxn);
781 }
782 
783 /*
784  * Queue a change into a transaction so it can be replayed upon commit or will be
785  * streamed when we reach logical_decoding_work_mem threshold.
786  */
787 void
789  ReorderBufferChange *change, bool toast_insert)
790 {
791  ReorderBufferTXN *txn;
792 
793  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
794 
795  /*
796  * While streaming the previous changes we have detected that the
797  * transaction is aborted. So there is no point in collecting further
798  * changes for it.
799  */
800  if (txn->concurrent_abort)
801  {
802  /*
803  * We don't need to update memory accounting for this change as we
804  * have not added it to the queue yet.
805  */
806  ReorderBufferReturnChange(rb, change, false);
807  return;
808  }
809 
810  /*
811  * The changes that are sent downstream are considered streamable. We
812  * remember such transactions so that only those will later be considered
813  * for streaming.
814  */
815  if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
821  {
822  ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
823 
825  }
826 
827  change->lsn = lsn;
828  change->txn = txn;
829 
830  Assert(InvalidXLogRecPtr != lsn);
831  dlist_push_tail(&txn->changes, &change->node);
832  txn->nentries++;
833  txn->nentries_mem++;
834 
835  /* update memory accounting information */
836  ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
837  ReorderBufferChangeSize(change));
838 
839  /* process partial change */
840  ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
841 
842  /* check the memory limits and evict something if needed */
844 }
845 
846 /*
847  * A transactional message is queued to be processed upon commit and a
848  * non-transactional message gets processed immediately.
849  */
850 void
852  Snapshot snap, XLogRecPtr lsn,
853  bool transactional, const char *prefix,
854  Size message_size, const char *message)
855 {
856  if (transactional)
857  {
858  MemoryContext oldcontext;
859  ReorderBufferChange *change;
860 
862 
863  /*
864  * We don't expect snapshots for transactional changes - we'll use the
865  * snapshot derived later during apply (unless the change gets
866  * skipped).
867  */
868  Assert(!snap);
869 
870  oldcontext = MemoryContextSwitchTo(rb->context);
871 
872  change = ReorderBufferGetChange(rb);
874  change->data.msg.prefix = pstrdup(prefix);
875  change->data.msg.message_size = message_size;
876  change->data.msg.message = palloc(message_size);
877  memcpy(change->data.msg.message, message, message_size);
878 
879  ReorderBufferQueueChange(rb, xid, lsn, change, false);
880 
881  MemoryContextSwitchTo(oldcontext);
882  }
883  else
884  {
885  ReorderBufferTXN *txn = NULL;
886  volatile Snapshot snapshot_now = snap;
887 
888  /* Non-transactional changes require a valid snapshot. */
889  Assert(snapshot_now);
890 
891  if (xid != InvalidTransactionId)
892  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
893 
894  /* setup snapshot to allow catalog access */
895  SetupHistoricSnapshot(snapshot_now, NULL);
896  PG_TRY();
897  {
898  rb->message(rb, txn, lsn, false, prefix, message_size, message);
899 
901  }
902  PG_CATCH();
903  {
905  PG_RE_THROW();
906  }
907  PG_END_TRY();
908  }
909 }
910 
911 /*
912  * AssertTXNLsnOrder
913  * Verify LSN ordering of transaction lists in the reorderbuffer
914  *
915  * Other LSN-related invariants are checked too.
916  *
917  * No-op if assertions are not in use.
918  */
919 static void
921 {
922 #ifdef USE_ASSERT_CHECKING
924  dlist_iter iter;
925  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
926  XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
927 
928  /*
929  * Skip the verification if we don't reach the LSN at which we start
930  * decoding the contents of transactions yet because until we reach the
931  * LSN, we could have transactions that don't have the association between
932  * the top-level transaction and subtransaction yet and consequently have
933  * the same LSN. We don't guarantee this association until we try to
934  * decode the actual contents of transaction. The ordering of the records
935  * prior to the start_decoding_at LSN should have been checked before the
936  * restart.
937  */
939  return;
940 
941  dlist_foreach(iter, &rb->toplevel_by_lsn)
942  {
944  iter.cur);
945 
946  /* start LSN must be set */
947  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
948 
949  /* If there is an end LSN, it must be higher than start LSN */
950  if (cur_txn->end_lsn != InvalidXLogRecPtr)
951  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
952 
953  /* Current initial LSN must be strictly higher than previous */
954  if (prev_first_lsn != InvalidXLogRecPtr)
955  Assert(prev_first_lsn < cur_txn->first_lsn);
956 
957  /* known-as-subtxn txns must not be listed */
958  Assert(!rbtxn_is_known_subxact(cur_txn));
959 
960  prev_first_lsn = cur_txn->first_lsn;
961  }
962 
964  {
966  base_snapshot_node,
967  iter.cur);
968 
969  /* base snapshot (and its LSN) must be set */
970  Assert(cur_txn->base_snapshot != NULL);
972 
973  /* current LSN must be strictly higher than previous */
974  if (prev_base_snap_lsn != InvalidXLogRecPtr)
975  Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
976 
977  /* known-as-subtxn txns must not be listed */
978  Assert(!rbtxn_is_known_subxact(cur_txn));
979 
980  prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
981  }
982 #endif
983 }
984 
985 /*
986  * AssertChangeLsnOrder
987  *
988  * Check ordering of changes in the (sub)transaction.
989  */
990 static void
992 {
993 #ifdef USE_ASSERT_CHECKING
994  dlist_iter iter;
995  XLogRecPtr prev_lsn = txn->first_lsn;
996 
997  dlist_foreach(iter, &txn->changes)
998  {
999  ReorderBufferChange *cur_change;
1000 
1001  cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
1002 
1004  Assert(cur_change->lsn != InvalidXLogRecPtr);
1005  Assert(txn->first_lsn <= cur_change->lsn);
1006 
1007  if (txn->end_lsn != InvalidXLogRecPtr)
1008  Assert(cur_change->lsn <= txn->end_lsn);
1009 
1010  Assert(prev_lsn <= cur_change->lsn);
1011 
1012  prev_lsn = cur_change->lsn;
1013  }
1014 #endif
1015 }
1016 
1017 /*
1018  * ReorderBufferGetOldestTXN
1019  * Return oldest transaction in reorderbuffer
1020  */
1023 {
1024  ReorderBufferTXN *txn;
1025 
1026  AssertTXNLsnOrder(rb);
1027 
1028  if (dlist_is_empty(&rb->toplevel_by_lsn))
1029  return NULL;
1030 
1032 
1035  return txn;
1036 }
1037 
1038 /*
1039  * ReorderBufferGetOldestXmin
1040  * Return oldest Xmin in reorderbuffer
1041  *
1042  * Returns oldest possibly running Xid from the point of view of snapshots
1043  * used in the transactions kept by reorderbuffer, or InvalidTransactionId if
1044  * there are none.
1045  *
1046  * Since snapshots are assigned monotonically, this equals the Xmin of the
1047  * base snapshot with minimal base_snapshot_lsn.
1048  */
1051 {
1052  ReorderBufferTXN *txn;
1053 
1054  AssertTXNLsnOrder(rb);
1055 
1057  return InvalidTransactionId;
1058 
1059  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
1061  return txn->base_snapshot->xmin;
1062 }
1063 
1064 void
1066 {
1067  rb->current_restart_decoding_lsn = ptr;
1068 }
1069 
1070 /*
1071  * ReorderBufferAssignChild
1072  *
1073  * Make note that we know that subxid is a subtransaction of xid, seen as of
1074  * the given lsn.
1075  */
1076 void
1078  TransactionId subxid, XLogRecPtr lsn)
1079 {
1080  ReorderBufferTXN *txn;
1081  ReorderBufferTXN *subtxn;
1082  bool new_top;
1083  bool new_sub;
1084 
1085  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1086  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1087 
1088  if (!new_sub)
1089  {
1090  if (rbtxn_is_known_subxact(subtxn))
1091  {
1092  /* already associated, nothing to do */
1093  return;
1094  }
1095  else
1096  {
1097  /*
1098  * We already saw this transaction, but initially added it to the
1099  * list of top-level txns. Now that we know it's not top-level,
1100  * remove it from there.
1101  */
1102  dlist_delete(&subtxn->node);
1103  }
1104  }
1105 
1106  subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1107  subtxn->toplevel_xid = xid;
1108  Assert(subtxn->nsubtxns == 0);
1109 
1110  /* set the reference to top-level transaction */
1111  subtxn->toptxn = txn;
1112 
1113  /* add to subtransaction list */
1114  dlist_push_tail(&txn->subtxns, &subtxn->node);
1115  txn->nsubtxns++;
1116 
1117  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1118  ReorderBufferTransferSnapToParent(txn, subtxn);
1119 
1120  /* Verify LSN-ordering invariant */
1121  AssertTXNLsnOrder(rb);
1122 }
1123 
1124 /*
1125  * ReorderBufferTransferSnapToParent
1126  * Transfer base snapshot from subtxn to top-level txn, if needed
1127  *
1128  * This is done if the top-level txn doesn't have a base snapshot, or if the
1129  * subtxn's base snapshot has an earlier LSN than the top-level txn's base
1130  * snapshot's LSN. This can happen if there are no changes in the toplevel
1131  * txn but there are some in the subtxn, or the first change in subtxn has
1132  * earlier LSN than first change in the top-level txn and we learned about
1133  * their kinship only now.
1134  *
1135  * The subtransaction's snapshot is cleared regardless of the transfer
1136  * happening, since it's not needed anymore in either case.
1137  *
1138  * We do this as soon as we become aware of their kinship, to avoid queueing
1139  * extra snapshots to txns known-as-subtxns -- only top-level txns will
1140  * receive further snapshots.
1141  */
1142 static void
1144  ReorderBufferTXN *subtxn)
1145 {
1146  Assert(subtxn->toplevel_xid == txn->xid);
1147 
1148  if (subtxn->base_snapshot != NULL)
1149  {
1150  if (txn->base_snapshot == NULL ||
1151  subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
1152  {
1153  /*
1154  * If the toplevel transaction already has a base snapshot but
1155  * it's newer than the subxact's, purge it.
1156  */
1157  if (txn->base_snapshot != NULL)
1158  {
1161  }
1162 
1163  /*
1164  * The snapshot is now the top transaction's; transfer it, and
1165  * adjust the list position of the top transaction in the list by
1166  * moving it to where the subtransaction is.
1167  */
1168  txn->base_snapshot = subtxn->base_snapshot;
1169  txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
1171  &txn->base_snapshot_node);
1172 
1173  /*
1174  * The subtransaction doesn't have a snapshot anymore (so it
1175  * mustn't be in the list.)
1176  */
1177  subtxn->base_snapshot = NULL;
1179  dlist_delete(&subtxn->base_snapshot_node);
1180  }
1181  else
1182  {
1183  /* Base snap of toplevel is fine, so subxact's is not needed */
1185  dlist_delete(&subtxn->base_snapshot_node);
1186  subtxn->base_snapshot = NULL;
1188  }
1189  }
1190 }
1191 
1192 /*
1193  * Associate a subtransaction with its toplevel transaction at commit
1194  * time. There may be no further changes added after this.
1195  */
1196 void
1198  TransactionId subxid, XLogRecPtr commit_lsn,
1199  XLogRecPtr end_lsn)
1200 {
1201  ReorderBufferTXN *subtxn;
1202 
1203  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1204  InvalidXLogRecPtr, false);
1205 
1206  /*
1207  * No need to do anything if that subtxn didn't contain any changes
1208  */
1209  if (!subtxn)
1210  return;
1211 
1212  subtxn->final_lsn = commit_lsn;
1213  subtxn->end_lsn = end_lsn;
1214 
1215  /*
1216  * Assign this subxact as a child of the toplevel xact (no-op if already
1217  * done.)
1218  */
1219  ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
1220 }
1221 
1222 
1223 /*
1224  * Support for efficiently iterating over a transaction's and its
1225  * subtransactions' changes.
1226  *
1227  * We do by doing a k-way merge between transactions/subtransactions. For that
1228  * we model the current heads of the different transactions as a binary heap
1229  * so we easily know which (sub-)transaction has the change with the smallest
1230  * lsn next.
1231  *
1232  * We assume the changes in individual transactions are already sorted by LSN.
1233  */
1234 
1235 /*
1236  * Binary heap comparison function.
1237  */
1238 static int
1240 {
1242  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
1243  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
1244 
1245  if (pos_a < pos_b)
1246  return 1;
1247  else if (pos_a == pos_b)
1248  return 0;
1249  return -1;
1250 }
1251 
1252 /*
1253  * Allocate & initialize an iterator which iterates in lsn order over a
1254  * transaction and all its subtransactions.
1255  *
1256  * Note: The iterator state is returned through iter_state parameter rather
1257  * than the function's return value. This is because the state gets cleaned up
1258  * in a PG_CATCH block in the caller, so we want to make sure the caller gets
1259  * back the state even if this function throws an exception.
1260  */
1261 static void
1263  ReorderBufferIterTXNState *volatile *iter_state)
1264 {
1265  Size nr_txns = 0;
1267  dlist_iter cur_txn_i;
1268  int32 off;
1269 
1270  *iter_state = NULL;
1271 
1272  /* Check ordering of changes in the toplevel transaction. */
1273  AssertChangeLsnOrder(txn);
1274 
1275  /*
1276  * Calculate the size of our heap: one element for every transaction that
1277  * contains changes. (Besides the transactions already in the reorder
1278  * buffer, we count the one we were directly passed.)
1279  */
1280  if (txn->nentries > 0)
1281  nr_txns++;
1282 
1283  dlist_foreach(cur_txn_i, &txn->subtxns)
1284  {
1285  ReorderBufferTXN *cur_txn;
1286 
1287  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1288 
1289  /* Check ordering of changes in this subtransaction. */
1290  AssertChangeLsnOrder(cur_txn);
1291 
1292  if (cur_txn->nentries > 0)
1293  nr_txns++;
1294  }
1295 
1296  /* allocate iteration state */
1299  sizeof(ReorderBufferIterTXNState) +
1300  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1301 
1302  state->nr_txns = nr_txns;
1303  dlist_init(&state->old_change);
1304 
1305  for (off = 0; off < state->nr_txns; off++)
1306  {
1307  state->entries[off].file.vfd = -1;
1308  state->entries[off].segno = 0;
1309  }
1310 
1311  /* allocate heap */
1312  state->heap = binaryheap_allocate(state->nr_txns,
1314  state);
1315 
1316  /* Now that the state fields are initialized, it is safe to return it. */
1317  *iter_state = state;
1318 
1319  /*
1320  * Now insert items into the binary heap, in an unordered fashion. (We
1321  * will run a heap assembly step at the end; this is more efficient.)
1322  */
1323 
1324  off = 0;
1325 
1326  /* add toplevel transaction if it contains changes */
1327  if (txn->nentries > 0)
1328  {
1329  ReorderBufferChange *cur_change;
1330 
1331  if (rbtxn_is_serialized(txn))
1332  {
1333  /* serialize remaining changes */
1334  ReorderBufferSerializeTXN(rb, txn);
1335  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1336  &state->entries[off].segno);
1337  }
1338 
1339  cur_change = dlist_head_element(ReorderBufferChange, node,
1340  &txn->changes);
1341 
1342  state->entries[off].lsn = cur_change->lsn;
1343  state->entries[off].change = cur_change;
1344  state->entries[off].txn = txn;
1345 
1347  }
1348 
1349  /* add subtransactions if they contain changes */
1350  dlist_foreach(cur_txn_i, &txn->subtxns)
1351  {
1352  ReorderBufferTXN *cur_txn;
1353 
1354  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1355 
1356  if (cur_txn->nentries > 0)
1357  {
1358  ReorderBufferChange *cur_change;
1359 
1360  if (rbtxn_is_serialized(cur_txn))
1361  {
1362  /* serialize remaining changes */
1363  ReorderBufferSerializeTXN(rb, cur_txn);
1364  ReorderBufferRestoreChanges(rb, cur_txn,
1365  &state->entries[off].file,
1366  &state->entries[off].segno);
1367  }
1368  cur_change = dlist_head_element(ReorderBufferChange, node,
1369  &cur_txn->changes);
1370 
1371  state->entries[off].lsn = cur_change->lsn;
1372  state->entries[off].change = cur_change;
1373  state->entries[off].txn = cur_txn;
1374 
1376  }
1377  }
1378 
1379  /* assemble a valid binary heap */
1380  binaryheap_build(state->heap);
1381 }
1382 
1383 /*
1384  * Return the next change when iterating over a transaction and its
1385  * subtransactions.
1386  *
1387  * Returns NULL when no further changes exist.
1388  */
1389 static ReorderBufferChange *
1391 {
1392  ReorderBufferChange *change;
1394  int32 off;
1395 
1396  /* nothing there anymore */
1397  if (state->heap->bh_size == 0)
1398  return NULL;
1399 
1400  off = DatumGetInt32(binaryheap_first(state->heap));
1401  entry = &state->entries[off];
1402 
1403  /* free memory we might have "leaked" in the previous *Next call */
1404  if (!dlist_is_empty(&state->old_change))
1405  {
1406  change = dlist_container(ReorderBufferChange, node,
1407  dlist_pop_head_node(&state->old_change));
1408  ReorderBufferReturnChange(rb, change, true);
1409  Assert(dlist_is_empty(&state->old_change));
1410  }
1411 
1412  change = entry->change;
1413 
1414  /*
1415  * update heap with information about which transaction has the next
1416  * relevant change in LSN order
1417  */
1418 
1419  /* there are in-memory changes */
1420  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1421  {
1422  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1423  ReorderBufferChange *next_change =
1425 
1426  /* txn stays the same */
1427  state->entries[off].lsn = next_change->lsn;
1428  state->entries[off].change = next_change;
1429 
1431  return change;
1432  }
1433 
1434  /* try to load changes from disk */
1435  if (entry->txn->nentries != entry->txn->nentries_mem)
1436  {
1437  /*
1438  * Ugly: restoring changes will reuse *Change records, thus delete the
1439  * current one from the per-tx list and only free in the next call.
1440  */
1441  dlist_delete(&change->node);
1442  dlist_push_tail(&state->old_change, &change->node);
1443 
1444  /*
1445  * Update the total bytes processed by the txn for which we are
1446  * releasing the current set of changes and restoring the new set of
1447  * changes.
1448  */
1449  rb->totalBytes += entry->txn->size;
1450  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1451  &state->entries[off].segno))
1452  {
1453  /* successfully restored changes from disk */
1454  ReorderBufferChange *next_change =
1456  &entry->txn->changes);
1457 
1458  elog(DEBUG2, "restored %u/%u changes from disk",
1459  (uint32) entry->txn->nentries_mem,
1460  (uint32) entry->txn->nentries);
1461 
1462  Assert(entry->txn->nentries_mem);
1463  /* txn stays the same */
1464  state->entries[off].lsn = next_change->lsn;
1465  state->entries[off].change = next_change;
1467 
1468  return change;
1469  }
1470  }
1471 
1472  /* ok, no changes there anymore, remove */
1474 
1475  return change;
1476 }
1477 
1478 /*
1479  * Deallocate the iterator
1480  */
1481 static void
1484 {
1485  int32 off;
1486 
1487  for (off = 0; off < state->nr_txns; off++)
1488  {
1489  if (state->entries[off].file.vfd != -1)
1490  FileClose(state->entries[off].file.vfd);
1491  }
1492 
1493  /* free memory we might have "leaked" in the last *Next call */
1494  if (!dlist_is_empty(&state->old_change))
1495  {
1496  ReorderBufferChange *change;
1497 
1498  change = dlist_container(ReorderBufferChange, node,
1499  dlist_pop_head_node(&state->old_change));
1500  ReorderBufferReturnChange(rb, change, true);
1501  Assert(dlist_is_empty(&state->old_change));
1502  }
1503 
1504  binaryheap_free(state->heap);
1505  pfree(state);
1506 }
1507 
1508 /*
1509  * Cleanup the contents of a transaction, usually after the transaction
1510  * committed or aborted.
1511  */
1512 static void
1514 {
1515  bool found;
1516  dlist_mutable_iter iter;
1517  Size mem_freed = 0;
1518 
1519  /* cleanup subtransactions & their changes */
1520  dlist_foreach_modify(iter, &txn->subtxns)
1521  {
1522  ReorderBufferTXN *subtxn;
1523 
1524  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1525 
1526  /*
1527  * Subtransactions are always associated to the toplevel TXN, even if
1528  * they originally were happening inside another subtxn, so we won't
1529  * ever recurse more than one level deep here.
1530  */
1531  Assert(rbtxn_is_known_subxact(subtxn));
1532  Assert(subtxn->nsubtxns == 0);
1533 
1534  ReorderBufferCleanupTXN(rb, subtxn);
1535  }
1536 
1537  /* cleanup changes in the txn */
1538  dlist_foreach_modify(iter, &txn->changes)
1539  {
1540  ReorderBufferChange *change;
1541 
1542  change = dlist_container(ReorderBufferChange, node, iter.cur);
1543 
1544  /* Check we're not mixing changes from different transactions. */
1545  Assert(change->txn == txn);
1546 
1547  /*
1548  * Instead of updating the memory counter for individual changes, we
1549  * sum up the size of memory to free so we can update the memory
1550  * counter all together below. This saves costs of maintaining the
1551  * max-heap.
1552  */
1553  mem_freed += ReorderBufferChangeSize(change);
1554 
1555  ReorderBufferReturnChange(rb, change, false);
1556  }
1557 
1558  /* Update the memory counter */
1559  ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
1560 
1561  /*
1562  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1563  * They are always stored in the toplevel transaction.
1564  */
1565  dlist_foreach_modify(iter, &txn->tuplecids)
1566  {
1567  ReorderBufferChange *change;
1568 
1569  change = dlist_container(ReorderBufferChange, node, iter.cur);
1570 
1571  /* Check we're not mixing changes from different transactions. */
1572  Assert(change->txn == txn);
1574 
1575  ReorderBufferReturnChange(rb, change, true);
1576  }
1577 
1578  /*
1579  * Cleanup the base snapshot, if set.
1580  */
1581  if (txn->base_snapshot != NULL)
1582  {
1585  }
1586 
1587  /*
1588  * Cleanup the snapshot for the last streamed run.
1589  */
1590  if (txn->snapshot_now != NULL)
1591  {
1592  Assert(rbtxn_is_streamed(txn));
1594  }
1595 
1596  /*
1597  * Remove TXN from its containing lists.
1598  *
1599  * Note: if txn is known as subxact, we are deleting the TXN from its
1600  * parent's list of known subxacts; this leaves the parent's nsubxacts
1601  * count too high, but we don't care. Otherwise, we are deleting the TXN
1602  * from the LSN-ordered list of toplevel TXNs. We remove the TXN from the
1603  * list of catalog modifying transactions as well.
1604  */
1605  dlist_delete(&txn->node);
1606  if (rbtxn_has_catalog_changes(txn))
1608 
1609  /* now remove reference from buffer */
1610  hash_search(rb->by_txn, &txn->xid, HASH_REMOVE, &found);
1611  Assert(found);
1612 
1613  /* remove entries spilled to disk */
1614  if (rbtxn_is_serialized(txn))
1615  ReorderBufferRestoreCleanup(rb, txn);
1616 
1617  /* deallocate */
1618  ReorderBufferReturnTXN(rb, txn);
1619 }
1620 
1621 /*
1622  * Discard changes from a transaction (and subtransactions), either after
1623  * streaming or decoding them at PREPARE. Keep the remaining info -
1624  * transactions, tuplecids, invalidations and snapshots.
1625  *
1626  * We additionally remove tuplecids after decoding the transaction at prepare
1627  * time as we only need to perform invalidation at rollback or commit prepared.
1628  *
1629  * 'txn_prepared' indicates that we have decoded the transaction at prepare
1630  * time.
1631  */
1632 static void
1634 {
1635  dlist_mutable_iter iter;
1636  Size mem_freed = 0;
1637 
1638  /* cleanup subtransactions & their changes */
1639  dlist_foreach_modify(iter, &txn->subtxns)
1640  {
1641  ReorderBufferTXN *subtxn;
1642 
1643  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1644 
1645  /*
1646  * Subtransactions are always associated to the toplevel TXN, even if
1647  * they originally were happening inside another subtxn, so we won't
1648  * ever recurse more than one level deep here.
1649  */
1650  Assert(rbtxn_is_known_subxact(subtxn));
1651  Assert(subtxn->nsubtxns == 0);
1652 
1653  ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
1654  }
1655 
1656  /* cleanup changes in the txn */
1657  dlist_foreach_modify(iter, &txn->changes)
1658  {
1659  ReorderBufferChange *change;
1660 
1661  change = dlist_container(ReorderBufferChange, node, iter.cur);
1662 
1663  /* Check we're not mixing changes from different transactions. */
1664  Assert(change->txn == txn);
1665 
1666  /* remove the change from it's containing list */
1667  dlist_delete(&change->node);
1668 
1669  /*
1670  * Instead of updating the memory counter for individual changes, we
1671  * sum up the size of memory to free so we can update the memory
1672  * counter all together below. This saves costs of maintaining the
1673  * max-heap.
1674  */
1675  mem_freed += ReorderBufferChangeSize(change);
1676 
1677  ReorderBufferReturnChange(rb, change, false);
1678  }
1679 
1680  /* Update the memory counter */
1681  ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
1682 
1683  /*
1684  * Mark the transaction as streamed.
1685  *
1686  * The top-level transaction, is marked as streamed always, even if it
1687  * does not contain any changes (that is, when all the changes are in
1688  * subtransactions).
1689  *
1690  * For subtransactions, we only mark them as streamed when there are
1691  * changes in them.
1692  *
1693  * We do it this way because of aborts - we don't want to send aborts for
1694  * XIDs the downstream is not aware of. And of course, it always knows
1695  * about the toplevel xact (we send the XID in all messages), but we never
1696  * stream XIDs of empty subxacts.
1697  */
1698  if ((!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0)))
1699  txn->txn_flags |= RBTXN_IS_STREAMED;
1700 
1701  if (txn_prepared)
1702  {
1703  /*
1704  * If this is a prepared txn, cleanup the tuplecids we stored for
1705  * decoding catalog snapshot access. They are always stored in the
1706  * toplevel transaction.
1707  */
1708  dlist_foreach_modify(iter, &txn->tuplecids)
1709  {
1710  ReorderBufferChange *change;
1711 
1712  change = dlist_container(ReorderBufferChange, node, iter.cur);
1713 
1714  /* Check we're not mixing changes from different transactions. */
1715  Assert(change->txn == txn);
1717 
1718  /* Remove the change from its containing list. */
1719  dlist_delete(&change->node);
1720 
1721  ReorderBufferReturnChange(rb, change, true);
1722  }
1723  }
1724 
1725  /*
1726  * Destroy the (relfilelocator, ctid) hashtable, so that we don't leak any
1727  * memory. We could also keep the hash table and update it with new ctid
1728  * values, but this seems simpler and good enough for now.
1729  */
1730  if (txn->tuplecid_hash != NULL)
1731  {
1733  txn->tuplecid_hash = NULL;
1734  }
1735 
1736  /* If this txn is serialized then clean the disk space. */
1737  if (rbtxn_is_serialized(txn))
1738  {
1739  ReorderBufferRestoreCleanup(rb, txn);
1740  txn->txn_flags &= ~RBTXN_IS_SERIALIZED;
1741 
1742  /*
1743  * We set this flag to indicate if the transaction is ever serialized.
1744  * We need this to accurately update the stats as otherwise the same
1745  * transaction can be counted as serialized multiple times.
1746  */
1748  }
1749 
1750  /* also reset the number of entries in the transaction */
1751  txn->nentries_mem = 0;
1752  txn->nentries = 0;
1753 }
1754 
1755 /*
1756  * Build a hash with a (relfilelocator, ctid) -> (cmin, cmax) mapping for use by
1757  * HeapTupleSatisfiesHistoricMVCC.
1758  */
1759 static void
1761 {
1762  dlist_iter iter;
1763  HASHCTL hash_ctl;
1764 
1766  return;
1767 
1768  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1769  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1770  hash_ctl.hcxt = rb->context;
1771 
1772  /*
1773  * create the hash with the exact number of to-be-stored tuplecids from
1774  * the start
1775  */
1776  txn->tuplecid_hash =
1777  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1779 
1780  dlist_foreach(iter, &txn->tuplecids)
1781  {
1784  bool found;
1785  ReorderBufferChange *change;
1786 
1787  change = dlist_container(ReorderBufferChange, node, iter.cur);
1788 
1790 
1791  /* be careful about padding */
1792  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1793 
1794  key.rlocator = change->data.tuplecid.locator;
1795 
1796  ItemPointerCopy(&change->data.tuplecid.tid,
1797  &key.tid);
1798 
1799  ent = (ReorderBufferTupleCidEnt *)
1800  hash_search(txn->tuplecid_hash, &key, HASH_ENTER, &found);
1801  if (!found)
1802  {
1803  ent->cmin = change->data.tuplecid.cmin;
1804  ent->cmax = change->data.tuplecid.cmax;
1805  ent->combocid = change->data.tuplecid.combocid;
1806  }
1807  else
1808  {
1809  /*
1810  * Maybe we already saw this tuple before in this transaction, but
1811  * if so it must have the same cmin.
1812  */
1813  Assert(ent->cmin == change->data.tuplecid.cmin);
1814 
1815  /*
1816  * cmax may be initially invalid, but once set it can only grow,
1817  * and never become invalid again.
1818  */
1819  Assert((ent->cmax == InvalidCommandId) ||
1820  ((change->data.tuplecid.cmax != InvalidCommandId) &&
1821  (change->data.tuplecid.cmax > ent->cmax)));
1822  ent->cmax = change->data.tuplecid.cmax;
1823  }
1824  }
1825 }
1826 
1827 /*
1828  * Copy a provided snapshot so we can modify it privately. This is needed so
1829  * that catalog modifying transactions can look into intermediate catalog
1830  * states.
1831  */
1832 static Snapshot
1834  ReorderBufferTXN *txn, CommandId cid)
1835 {
1836  Snapshot snap;
1837  dlist_iter iter;
1838  int i = 0;
1839  Size size;
1840 
1841  size = sizeof(SnapshotData) +
1842  sizeof(TransactionId) * orig_snap->xcnt +
1843  sizeof(TransactionId) * (txn->nsubtxns + 1);
1844 
1845  snap = MemoryContextAllocZero(rb->context, size);
1846  memcpy(snap, orig_snap, sizeof(SnapshotData));
1847 
1848  snap->copied = true;
1849  snap->active_count = 1; /* mark as active so nobody frees it */
1850  snap->regd_count = 0;
1851  snap->xip = (TransactionId *) (snap + 1);
1852 
1853  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1854 
1855  /*
1856  * snap->subxip contains all txids that belong to our transaction which we
1857  * need to check via cmin/cmax. That's why we store the toplevel
1858  * transaction in there as well.
1859  */
1860  snap->subxip = snap->xip + snap->xcnt;
1861  snap->subxip[i++] = txn->xid;
1862 
1863  /*
1864  * subxcnt isn't decreased when subtransactions abort, so count manually.
1865  * Since it's an upper boundary it is safe to use it for the allocation
1866  * above.
1867  */
1868  snap->subxcnt = 1;
1869 
1870  dlist_foreach(iter, &txn->subtxns)
1871  {
1872  ReorderBufferTXN *sub_txn;
1873 
1874  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1875  snap->subxip[i++] = sub_txn->xid;
1876  snap->subxcnt++;
1877  }
1878 
1879  /* sort so we can bsearch() later */
1880  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1881 
1882  /* store the specified current CommandId */
1883  snap->curcid = cid;
1884 
1885  return snap;
1886 }
1887 
1888 /*
1889  * Free a previously ReorderBufferCopySnap'ed snapshot
1890  */
1891 static void
1893 {
1894  if (snap->copied)
1895  pfree(snap);
1896  else
1898 }
1899 
1900 /*
1901  * If the transaction was (partially) streamed, we need to prepare or commit
1902  * it in a 'streamed' way. That is, we first stream the remaining part of the
1903  * transaction, and then invoke stream_prepare or stream_commit message as per
1904  * the case.
1905  */
1906 static void
1908 {
1909  /* we should only call this for previously streamed transactions */
1910  Assert(rbtxn_is_streamed(txn));
1911 
1912  ReorderBufferStreamTXN(rb, txn);
1913 
1914  if (rbtxn_prepared(txn))
1915  {
1916  /*
1917  * Note, we send stream prepare even if a concurrent abort is
1918  * detected. See DecodePrepare for more information.
1919  */
1920  rb->stream_prepare(rb, txn, txn->final_lsn);
1921 
1922  /*
1923  * This is a PREPARED transaction, part of a two-phase commit. The
1924  * full cleanup will happen as part of the COMMIT PREPAREDs, so now
1925  * just truncate txn by removing changes and tuplecids.
1926  */
1927  ReorderBufferTruncateTXN(rb, txn, true);
1928  /* Reset the CheckXidAlive */
1930  }
1931  else
1932  {
1933  rb->stream_commit(rb, txn, txn->final_lsn);
1934  ReorderBufferCleanupTXN(rb, txn);
1935  }
1936 }
1937 
1938 /*
1939  * Set xid to detect concurrent aborts.
1940  *
1941  * While streaming an in-progress transaction or decoding a prepared
1942  * transaction there is a possibility that the (sub)transaction might get
1943  * aborted concurrently. In such case if the (sub)transaction has catalog
1944  * update then we might decode the tuple using wrong catalog version. For
1945  * example, suppose there is one catalog tuple with (xmin: 500, xmax: 0). Now,
1946  * the transaction 501 updates the catalog tuple and after that we will have
1947  * two tuples (xmin: 500, xmax: 501) and (xmin: 501, xmax: 0). Now, if 501 is
1948  * aborted and some other transaction say 502 updates the same catalog tuple
1949  * then the first tuple will be changed to (xmin: 500, xmax: 502). So, the
1950  * problem is that when we try to decode the tuple inserted/updated in 501
1951  * after the catalog update, we will see the catalog tuple with (xmin: 500,
1952  * xmax: 502) as visible because it will consider that the tuple is deleted by
1953  * xid 502 which is not visible to our snapshot. And when we will try to
1954  * decode with that catalog tuple, it can lead to a wrong result or a crash.
1955  * So, it is necessary to detect concurrent aborts to allow streaming of
1956  * in-progress transactions or decoding of prepared transactions.
1957  *
1958  * For detecting the concurrent abort we set CheckXidAlive to the current
1959  * (sub)transaction's xid for which this change belongs to. And, during
1960  * catalog scan we can check the status of the xid and if it is aborted we will
1961  * report a specific error so that we can stop streaming current transaction
1962  * and discard the already streamed changes on such an error. We might have
1963  * already streamed some of the changes for the aborted (sub)transaction, but
1964  * that is fine because when we decode the abort we will stream abort message
1965  * to truncate the changes in the subscriber. Similarly, for prepared
1966  * transactions, we stop decoding if concurrent abort is detected and then
1967  * rollback the changes when rollback prepared is encountered. See
1968  * DecodePrepare.
1969  */
1970 static inline void
1972 {
1973  /*
1974  * If the input transaction id is already set as a CheckXidAlive then
1975  * nothing to do.
1976  */
1978  return;
1979 
1980  /*
1981  * setup CheckXidAlive if it's not committed yet. We don't check if the
1982  * xid is aborted. That will happen during catalog access.
1983  */
1984  if (!TransactionIdDidCommit(xid))
1985  CheckXidAlive = xid;
1986  else
1988 }
1989 
1990 /*
1991  * Helper function for ReorderBufferProcessTXN for applying change.
1992  */
1993 static inline void
1995  Relation relation, ReorderBufferChange *change,
1996  bool streaming)
1997 {
1998  if (streaming)
1999  rb->stream_change(rb, txn, relation, change);
2000  else
2001  rb->apply_change(rb, txn, relation, change);
2002 }
2003 
2004 /*
2005  * Helper function for ReorderBufferProcessTXN for applying the truncate.
2006  */
2007 static inline void
2009  int nrelations, Relation *relations,
2010  ReorderBufferChange *change, bool streaming)
2011 {
2012  if (streaming)
2013  rb->stream_truncate(rb, txn, nrelations, relations, change);
2014  else
2015  rb->apply_truncate(rb, txn, nrelations, relations, change);
2016 }
2017 
2018 /*
2019  * Helper function for ReorderBufferProcessTXN for applying the message.
2020  */
2021 static inline void
2023  ReorderBufferChange *change, bool streaming)
2024 {
2025  if (streaming)
2026  rb->stream_message(rb, txn, change->lsn, true,
2027  change->data.msg.prefix,
2028  change->data.msg.message_size,
2029  change->data.msg.message);
2030  else
2031  rb->message(rb, txn, change->lsn, true,
2032  change->data.msg.prefix,
2033  change->data.msg.message_size,
2034  change->data.msg.message);
2035 }
2036 
2037 /*
2038  * Function to store the command id and snapshot at the end of the current
2039  * stream so that we can reuse the same while sending the next stream.
2040  */
2041 static inline void
2043  Snapshot snapshot_now, CommandId command_id)
2044 {
2045  txn->command_id = command_id;
2046 
2047  /* Avoid copying if it's already copied. */
2048  if (snapshot_now->copied)
2049  txn->snapshot_now = snapshot_now;
2050  else
2051  txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2052  txn, command_id);
2053 }
2054 
2055 /*
2056  * Helper function for ReorderBufferProcessTXN to handle the concurrent
2057  * abort of the streaming transaction. This resets the TXN such that it
2058  * can be used to stream the remaining data of transaction being processed.
2059  * This can happen when the subtransaction is aborted and we still want to
2060  * continue processing the main or other subtransactions data.
2061  */
2062 static void
2064  Snapshot snapshot_now,
2065  CommandId command_id,
2066  XLogRecPtr last_lsn,
2067  ReorderBufferChange *specinsert)
2068 {
2069  /* Discard the changes that we just streamed */
2071 
2072  /* Free all resources allocated for toast reconstruction */
2073  ReorderBufferToastReset(rb, txn);
2074 
2075  /* Return the spec insert change if it is not NULL */
2076  if (specinsert != NULL)
2077  {
2078  ReorderBufferReturnChange(rb, specinsert, true);
2079  specinsert = NULL;
2080  }
2081 
2082  /*
2083  * For the streaming case, stop the stream and remember the command ID and
2084  * snapshot for the streaming run.
2085  */
2086  if (rbtxn_is_streamed(txn))
2087  {
2088  rb->stream_stop(rb, txn, last_lsn);
2089  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2090  }
2091 
2092  /* All changes must be deallocated */
2093  Assert(txn->size == 0);
2094 }
2095 
2096 /*
2097  * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN.
2098  *
2099  * Send data of a transaction (and its subtransactions) to the
2100  * output plugin. We iterate over the top and subtransactions (using a k-way
2101  * merge) and replay the changes in lsn order.
2102  *
2103  * If streaming is true then data will be sent using stream API.
2104  *
2105  * Note: "volatile" markers on some parameters are to avoid trouble with
2106  * PG_TRY inside the function.
2107  */
2108 static void
2110  XLogRecPtr commit_lsn,
2111  volatile Snapshot snapshot_now,
2112  volatile CommandId command_id,
2113  bool streaming)
2114 {
2115  bool using_subtxn;
2117  ReorderBufferIterTXNState *volatile iterstate = NULL;
2118  volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
2119  ReorderBufferChange *volatile specinsert = NULL;
2120  volatile bool stream_started = false;
2121  ReorderBufferTXN *volatile curtxn = NULL;
2122 
2123  /* build data to be able to lookup the CommandIds of catalog tuples */
2125 
2126  /* setup the initial snapshot */
2127  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2128 
2129  /*
2130  * Decoding needs access to syscaches et al., which in turn use
2131  * heavyweight locks and such. Thus we need to have enough state around to
2132  * keep track of those. The easiest way is to simply use a transaction
2133  * internally. That also allows us to easily enforce that nothing writes
2134  * to the database by checking for xid assignments.
2135  *
2136  * When we're called via the SQL SRF there's already a transaction
2137  * started, so start an explicit subtransaction there.
2138  */
2139  using_subtxn = IsTransactionOrTransactionBlock();
2140 
2141  PG_TRY();
2142  {
2143  ReorderBufferChange *change;
2144  int changes_count = 0; /* used to accumulate the number of
2145  * changes */
2146 
2147  if (using_subtxn)
2148  BeginInternalSubTransaction(streaming ? "stream" : "replay");
2149  else
2151 
2152  /*
2153  * We only need to send begin/begin-prepare for non-streamed
2154  * transactions.
2155  */
2156  if (!streaming)
2157  {
2158  if (rbtxn_prepared(txn))
2159  rb->begin_prepare(rb, txn);
2160  else
2161  rb->begin(rb, txn);
2162  }
2163 
2164  ReorderBufferIterTXNInit(rb, txn, &iterstate);
2165  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
2166  {
2167  Relation relation = NULL;
2168  Oid reloid;
2169 
2171 
2172  /*
2173  * We can't call start stream callback before processing first
2174  * change.
2175  */
2176  if (prev_lsn == InvalidXLogRecPtr)
2177  {
2178  if (streaming)
2179  {
2180  txn->origin_id = change->origin_id;
2181  rb->stream_start(rb, txn, change->lsn);
2182  stream_started = true;
2183  }
2184  }
2185 
2186  /*
2187  * Enforce correct ordering of changes, merged from multiple
2188  * subtransactions. The changes may have the same LSN due to
2189  * MULTI_INSERT xlog records.
2190  */
2191  Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
2192 
2193  prev_lsn = change->lsn;
2194 
2195  /*
2196  * Set the current xid to detect concurrent aborts. This is
2197  * required for the cases when we decode the changes before the
2198  * COMMIT record is processed.
2199  */
2200  if (streaming || rbtxn_prepared(change->txn))
2201  {
2202  curtxn = change->txn;
2203  SetupCheckXidLive(curtxn->xid);
2204  }
2205 
2206  switch (change->action)
2207  {
2209 
2210  /*
2211  * Confirmation for speculative insertion arrived. Simply
2212  * use as a normal record. It'll be cleaned up at the end
2213  * of INSERT processing.
2214  */
2215  if (specinsert == NULL)
2216  elog(ERROR, "invalid ordering of speculative insertion changes");
2217  Assert(specinsert->data.tp.oldtuple == NULL);
2218  change = specinsert;
2220 
2221  /* intentionally fall through */
2225  Assert(snapshot_now);
2226 
2227  reloid = RelidByRelfilenumber(change->data.tp.rlocator.spcOid,
2228  change->data.tp.rlocator.relNumber);
2229 
2230  /*
2231  * Mapped catalog tuple without data, emitted while
2232  * catalog table was in the process of being rewritten. We
2233  * can fail to look up the relfilenumber, because the
2234  * relmapper has no "historic" view, in contrast to the
2235  * normal catalog during decoding. Thus repeated rewrites
2236  * can cause a lookup failure. That's OK because we do not
2237  * decode catalog changes anyway. Normally such tuples
2238  * would be skipped over below, but we can't identify
2239  * whether the table should be logically logged without
2240  * mapping the relfilenumber to the oid.
2241  */
2242  if (reloid == InvalidOid &&
2243  change->data.tp.newtuple == NULL &&
2244  change->data.tp.oldtuple == NULL)
2245  goto change_done;
2246  else if (reloid == InvalidOid)
2247  elog(ERROR, "could not map filenumber \"%s\" to relation OID",
2248  relpathperm(change->data.tp.rlocator,
2249  MAIN_FORKNUM));
2250 
2251  relation = RelationIdGetRelation(reloid);
2252 
2253  if (!RelationIsValid(relation))
2254  elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",
2255  reloid,
2256  relpathperm(change->data.tp.rlocator,
2257  MAIN_FORKNUM));
2258 
2259  if (!RelationIsLogicallyLogged(relation))
2260  goto change_done;
2261 
2262  /*
2263  * Ignore temporary heaps created during DDL unless the
2264  * plugin has asked for them.
2265  */
2266  if (relation->rd_rel->relrewrite && !rb->output_rewrites)
2267  goto change_done;
2268 
2269  /*
2270  * For now ignore sequence changes entirely. Most of the
2271  * time they don't log changes using records we
2272  * understand, so it doesn't make sense to handle the few
2273  * cases we do.
2274  */
2275  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
2276  goto change_done;
2277 
2278  /* user-triggered change */
2279  if (!IsToastRelation(relation))
2280  {
2281  ReorderBufferToastReplace(rb, txn, relation, change);
2282  ReorderBufferApplyChange(rb, txn, relation, change,
2283  streaming);
2284 
2285  /*
2286  * Only clear reassembled toast chunks if we're sure
2287  * they're not required anymore. The creator of the
2288  * tuple tells us.
2289  */
2290  if (change->data.tp.clear_toast_afterwards)
2291  ReorderBufferToastReset(rb, txn);
2292  }
2293  /* we're not interested in toast deletions */
2294  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
2295  {
2296  /*
2297  * Need to reassemble the full toasted Datum in
2298  * memory, to ensure the chunks don't get reused till
2299  * we're done remove it from the list of this
2300  * transaction's changes. Otherwise it will get
2301  * freed/reused while restoring spooled data from
2302  * disk.
2303  */
2304  Assert(change->data.tp.newtuple != NULL);
2305 
2306  dlist_delete(&change->node);
2307  ReorderBufferToastAppendChunk(rb, txn, relation,
2308  change);
2309  }
2310 
2311  change_done:
2312 
2313  /*
2314  * If speculative insertion was confirmed, the record
2315  * isn't needed anymore.
2316  */
2317  if (specinsert != NULL)
2318  {
2319  ReorderBufferReturnChange(rb, specinsert, true);
2320  specinsert = NULL;
2321  }
2322 
2323  if (RelationIsValid(relation))
2324  {
2325  RelationClose(relation);
2326  relation = NULL;
2327  }
2328  break;
2329 
2331 
2332  /*
2333  * Speculative insertions are dealt with by delaying the
2334  * processing of the insert until the confirmation record
2335  * arrives. For that we simply unlink the record from the
2336  * chain, so it does not get freed/reused while restoring
2337  * spooled data from disk.
2338  *
2339  * This is safe in the face of concurrent catalog changes
2340  * because the relevant relation can't be changed between
2341  * speculative insertion and confirmation due to
2342  * CheckTableNotInUse() and locking.
2343  */
2344 
2345  /* clear out a pending (and thus failed) speculation */
2346  if (specinsert != NULL)
2347  {
2348  ReorderBufferReturnChange(rb, specinsert, true);
2349  specinsert = NULL;
2350  }
2351 
2352  /* and memorize the pending insertion */
2353  dlist_delete(&change->node);
2354  specinsert = change;
2355  break;
2356 
2358 
2359  /*
2360  * Abort for speculative insertion arrived. So cleanup the
2361  * specinsert tuple and toast hash.
2362  *
2363  * Note that we get the spec abort change for each toast
2364  * entry but we need to perform the cleanup only the first
2365  * time we get it for the main table.
2366  */
2367  if (specinsert != NULL)
2368  {
2369  /*
2370  * We must clean the toast hash before processing a
2371  * completely new tuple to avoid confusion about the
2372  * previous tuple's toast chunks.
2373  */
2374  Assert(change->data.tp.clear_toast_afterwards);
2375  ReorderBufferToastReset(rb, txn);
2376 
2377  /* We don't need this record anymore. */
2378  ReorderBufferReturnChange(rb, specinsert, true);
2379  specinsert = NULL;
2380  }
2381  break;
2382 
2384  {
2385  int i;
2386  int nrelids = change->data.truncate.nrelids;
2387  int nrelations = 0;
2388  Relation *relations;
2389 
2390  relations = palloc0(nrelids * sizeof(Relation));
2391  for (i = 0; i < nrelids; i++)
2392  {
2393  Oid relid = change->data.truncate.relids[i];
2394  Relation rel;
2395 
2396  rel = RelationIdGetRelation(relid);
2397 
2398  if (!RelationIsValid(rel))
2399  elog(ERROR, "could not open relation with OID %u", relid);
2400 
2401  if (!RelationIsLogicallyLogged(rel))
2402  continue;
2403 
2404  relations[nrelations++] = rel;
2405  }
2406 
2407  /* Apply the truncate. */
2408  ReorderBufferApplyTruncate(rb, txn, nrelations,
2409  relations, change,
2410  streaming);
2411 
2412  for (i = 0; i < nrelations; i++)
2413  RelationClose(relations[i]);
2414 
2415  break;
2416  }
2417 
2419  ReorderBufferApplyMessage(rb, txn, change, streaming);
2420  break;
2421 
2423  /* Execute the invalidation messages locally */
2424  ReorderBufferExecuteInvalidations(change->data.inval.ninvalidations,
2425  change->data.inval.invalidations);
2426  break;
2427 
2429  /* get rid of the old */
2430  TeardownHistoricSnapshot(false);
2431 
2432  if (snapshot_now->copied)
2433  {
2434  ReorderBufferFreeSnap(rb, snapshot_now);
2435  snapshot_now =
2436  ReorderBufferCopySnap(rb, change->data.snapshot,
2437  txn, command_id);
2438  }
2439 
2440  /*
2441  * Restored from disk, need to be careful not to double
2442  * free. We could introduce refcounting for that, but for
2443  * now this seems infrequent enough not to care.
2444  */
2445  else if (change->data.snapshot->copied)
2446  {
2447  snapshot_now =
2448  ReorderBufferCopySnap(rb, change->data.snapshot,
2449  txn, command_id);
2450  }
2451  else
2452  {
2453  snapshot_now = change->data.snapshot;
2454  }
2455 
2456  /* and continue with the new one */
2457  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2458  break;
2459 
2461  Assert(change->data.command_id != InvalidCommandId);
2462 
2463  if (command_id < change->data.command_id)
2464  {
2465  command_id = change->data.command_id;
2466 
2467  if (!snapshot_now->copied)
2468  {
2469  /* we don't use the global one anymore */
2470  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2471  txn, command_id);
2472  }
2473 
2474  snapshot_now->curcid = command_id;
2475 
2476  TeardownHistoricSnapshot(false);
2477  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2478  }
2479 
2480  break;
2481 
2483  elog(ERROR, "tuplecid value in changequeue");
2484  break;
2485  }
2486 
2487  /*
2488  * It is possible that the data is not sent to downstream for a
2489  * long time either because the output plugin filtered it or there
2490  * is a DDL that generates a lot of data that is not processed by
2491  * the plugin. So, in such cases, the downstream can timeout. To
2492  * avoid that we try to send a keepalive message if required.
2493  * Trying to send a keepalive message after every change has some
2494  * overhead, but testing showed there is no noticeable overhead if
2495  * we do it after every ~100 changes.
2496  */
2497 #define CHANGES_THRESHOLD 100
2498 
2499  if (++changes_count >= CHANGES_THRESHOLD)
2500  {
2501  rb->update_progress_txn(rb, txn, change->lsn);
2502  changes_count = 0;
2503  }
2504  }
2505 
2506  /* speculative insertion record must be freed by now */
2507  Assert(!specinsert);
2508 
2509  /* clean up the iterator */
2510  ReorderBufferIterTXNFinish(rb, iterstate);
2511  iterstate = NULL;
2512 
2513  /*
2514  * Update total transaction count and total bytes processed by the
2515  * transaction and its subtransactions. Ensure to not count the
2516  * streamed transaction multiple times.
2517  *
2518  * Note that the statistics computation has to be done after
2519  * ReorderBufferIterTXNFinish as it releases the serialized change
2520  * which we have already accounted in ReorderBufferIterTXNNext.
2521  */
2522  if (!rbtxn_is_streamed(txn))
2523  rb->totalTxns++;
2524 
2525  rb->totalBytes += txn->total_size;
2526 
2527  /*
2528  * Done with current changes, send the last message for this set of
2529  * changes depending upon streaming mode.
2530  */
2531  if (streaming)
2532  {
2533  if (stream_started)
2534  {
2535  rb->stream_stop(rb, txn, prev_lsn);
2536  stream_started = false;
2537  }
2538  }
2539  else
2540  {
2541  /*
2542  * Call either PREPARE (for two-phase transactions) or COMMIT (for
2543  * regular ones).
2544  */
2545  if (rbtxn_prepared(txn))
2546  rb->prepare(rb, txn, commit_lsn);
2547  else
2548  rb->commit(rb, txn, commit_lsn);
2549  }
2550 
2551  /* this is just a sanity check against bad output plugin behaviour */
2553  elog(ERROR, "output plugin used XID %u",
2555 
2556  /*
2557  * Remember the command ID and snapshot for the next set of changes in
2558  * streaming mode.
2559  */
2560  if (streaming)
2561  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2562  else if (snapshot_now->copied)
2563  ReorderBufferFreeSnap(rb, snapshot_now);
2564 
2565  /* cleanup */
2566  TeardownHistoricSnapshot(false);
2567 
2568  /*
2569  * Aborting the current (sub-)transaction as a whole has the right
2570  * semantics. We want all locks acquired in here to be released, not
2571  * reassigned to the parent and we do not want any database access
2572  * have persistent effects.
2573  */
2575 
2576  /* make sure there's no cache pollution */
2578 
2579  if (using_subtxn)
2581 
2582  /*
2583  * We are here due to one of the four reasons: 1. Decoding an
2584  * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
2585  * prepared txn that was (partially) streamed. 4. Decoding a committed
2586  * txn.
2587  *
2588  * For 1, we allow truncation of txn data by removing the changes
2589  * already streamed but still keeping other things like invalidations,
2590  * snapshot, and tuplecids. For 2 and 3, we indicate
2591  * ReorderBufferTruncateTXN to do more elaborate truncation of txn
2592  * data as the entire transaction has been decoded except for commit.
2593  * For 4, as the entire txn has been decoded, we can fully clean up
2594  * the TXN reorder buffer.
2595  */
2596  if (streaming || rbtxn_prepared(txn))
2597  {
2599  /* Reset the CheckXidAlive */
2601  }
2602  else
2603  ReorderBufferCleanupTXN(rb, txn);
2604  }
2605  PG_CATCH();
2606  {
2607  MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
2608  ErrorData *errdata = CopyErrorData();
2609 
2610  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
2611  if (iterstate)
2612  ReorderBufferIterTXNFinish(rb, iterstate);
2613 
2615 
2616  /*
2617  * Force cache invalidation to happen outside of a valid transaction
2618  * to prevent catalog access as we just caught an error.
2619  */
2621 
2622  /* make sure there's no cache pollution */
2624  txn->invalidations);
2625 
2626  if (using_subtxn)
2628 
2629  /*
2630  * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
2631  * abort of the (sub)transaction we are streaming or preparing. We
2632  * need to do the cleanup and return gracefully on this error, see
2633  * SetupCheckXidLive.
2634  *
2635  * This error code can be thrown by one of the callbacks we call
2636  * during decoding so we need to ensure that we return gracefully only
2637  * when we are sending the data in streaming mode and the streaming is
2638  * not finished yet or when we are sending the data out on a PREPARE
2639  * during a two-phase commit.
2640  */
2641  if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
2642  (stream_started || rbtxn_prepared(txn)))
2643  {
2644  /* curtxn must be set for streaming or prepared transactions */
2645  Assert(curtxn);
2646 
2647  /* Cleanup the temporary error state. */
2648  FlushErrorState();
2649  FreeErrorData(errdata);
2650  errdata = NULL;
2651  curtxn->concurrent_abort = true;
2652 
2653  /* Reset the TXN so that it is allowed to stream remaining data. */
2654  ReorderBufferResetTXN(rb, txn, snapshot_now,
2655  command_id, prev_lsn,
2656  specinsert);
2657  }
2658  else
2659  {
2660  ReorderBufferCleanupTXN(rb, txn);
2661  MemoryContextSwitchTo(ecxt);
2662  PG_RE_THROW();
2663  }
2664  }
2665  PG_END_TRY();
2666 }
2667 
2668 /*
2669  * Perform the replay of a transaction and its non-aborted subtransactions.
2670  *
2671  * Subtransactions previously have to be processed by
2672  * ReorderBufferCommitChild(), even if previously assigned to the toplevel
2673  * transaction with ReorderBufferAssignChild.
2674  *
2675  * This interface is called once a prepare or toplevel commit is read for both
2676  * streamed as well as non-streamed transactions.
2677  */
2678 static void
2680  ReorderBuffer *rb, TransactionId xid,
2681  XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2682  TimestampTz commit_time,
2683  RepOriginId origin_id, XLogRecPtr origin_lsn)
2684 {
2685  Snapshot snapshot_now;
2686  CommandId command_id = FirstCommandId;
2687 
2688  txn->final_lsn = commit_lsn;
2689  txn->end_lsn = end_lsn;
2690  txn->xact_time.commit_time = commit_time;
2691  txn->origin_id = origin_id;
2692  txn->origin_lsn = origin_lsn;
2693 
2694  /*
2695  * If the transaction was (partially) streamed, we need to commit it in a
2696  * 'streamed' way. That is, we first stream the remaining part of the
2697  * transaction, and then invoke stream_commit message.
2698  *
2699  * Called after everything (origin ID, LSN, ...) is stored in the
2700  * transaction to avoid passing that information directly.
2701  */
2702  if (rbtxn_is_streamed(txn))
2703  {
2704  ReorderBufferStreamCommit(rb, txn);
2705  return;
2706  }
2707 
2708  /*
2709  * If this transaction has no snapshot, it didn't make any changes to the
2710  * database, so there's nothing to decode. Note that
2711  * ReorderBufferCommitChild will have transferred any snapshots from
2712  * subtransactions if there were any.
2713  */
2714  if (txn->base_snapshot == NULL)
2715  {
2716  Assert(txn->ninvalidations == 0);
2717 
2718  /*
2719  * Removing this txn before a commit might result in the computation
2720  * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
2721  */
2722  if (!rbtxn_prepared(txn))
2723  ReorderBufferCleanupTXN(rb, txn);
2724  return;
2725  }
2726 
2727  snapshot_now = txn->base_snapshot;
2728 
2729  /* Process and send the changes to output plugin. */
2730  ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2731  command_id, false);
2732 }
2733 
2734 /*
2735  * Commit a transaction.
2736  *
2737  * See comments for ReorderBufferReplay().
2738  */
2739 void
2741  XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2742  TimestampTz commit_time,
2743  RepOriginId origin_id, XLogRecPtr origin_lsn)
2744 {
2745  ReorderBufferTXN *txn;
2746 
2747  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2748  false);
2749 
2750  /* unknown transaction, nothing to replay */
2751  if (txn == NULL)
2752  return;
2753 
2754  ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2755  origin_id, origin_lsn);
2756 }
2757 
2758 /*
2759  * Record the prepare information for a transaction.
2760  */
2761 bool
2763  XLogRecPtr prepare_lsn, XLogRecPtr end_lsn,
2764  TimestampTz prepare_time,
2765  RepOriginId origin_id, XLogRecPtr origin_lsn)
2766 {
2767  ReorderBufferTXN *txn;
2768 
2769  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2770 
2771  /* unknown transaction, nothing to do */
2772  if (txn == NULL)
2773  return false;
2774 
2775  /*
2776  * Remember the prepare information to be later used by commit prepared in
2777  * case we skip doing prepare.
2778  */
2779  txn->final_lsn = prepare_lsn;
2780  txn->end_lsn = end_lsn;
2781  txn->xact_time.prepare_time = prepare_time;
2782  txn->origin_id = origin_id;
2783  txn->origin_lsn = origin_lsn;
2784 
2785  return true;
2786 }
2787 
2788 /* Remember that we have skipped prepare */
2789 void
2791 {
2792  ReorderBufferTXN *txn;
2793 
2794  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2795 
2796  /* unknown transaction, nothing to do */
2797  if (txn == NULL)
2798  return;
2799 
2801 }
2802 
2803 /*
2804  * Prepare a two-phase transaction.
2805  *
2806  * See comments for ReorderBufferReplay().
2807  */
2808 void
2810  char *gid)
2811 {
2812  ReorderBufferTXN *txn;
2813 
2814  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2815  false);
2816 
2817  /* unknown transaction, nothing to replay */
2818  if (txn == NULL)
2819  return;
2820 
2821  txn->txn_flags |= RBTXN_PREPARE;
2822  txn->gid = pstrdup(gid);
2823 
2824  /* The prepare info must have been updated in txn by now. */
2826 
2827  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2828  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2829 
2830  /*
2831  * We send the prepare for the concurrently aborted xacts so that later
2832  * when rollback prepared is decoded and sent, the downstream should be
2833  * able to rollback such a xact. See comments atop DecodePrepare.
2834  *
2835  * Note, for the concurrent_abort + streaming case a stream_prepare was
2836  * already sent within the ReorderBufferReplay call above.
2837  */
2838  if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
2839  rb->prepare(rb, txn, txn->final_lsn);
2840 }
2841 
2842 /*
2843  * This is used to handle COMMIT/ROLLBACK PREPARED.
2844  */
2845 void
2847  XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2848  XLogRecPtr two_phase_at,
2849  TimestampTz commit_time, RepOriginId origin_id,
2850  XLogRecPtr origin_lsn, char *gid, bool is_commit)
2851 {
2852  ReorderBufferTXN *txn;
2853  XLogRecPtr prepare_end_lsn;
2854  TimestampTz prepare_time;
2855 
2856  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
2857 
2858  /* unknown transaction, nothing to do */
2859  if (txn == NULL)
2860  return;
2861 
2862  /*
2863  * By this time the txn has the prepare record information, remember it to
2864  * be later used for rollback.
2865  */
2866  prepare_end_lsn = txn->end_lsn;
2867  prepare_time = txn->xact_time.prepare_time;
2868 
2869  /* add the gid in the txn */
2870  txn->gid = pstrdup(gid);
2871 
2872  /*
2873  * It is possible that this transaction is not decoded at prepare time
2874  * either because by that time we didn't have a consistent snapshot, or
2875  * two_phase was not enabled, or it was decoded earlier but we have
2876  * restarted. We only need to send the prepare if it was not decoded
2877  * earlier. We don't need to decode the xact for aborts if it is not done
2878  * already.
2879  */
2880  if ((txn->final_lsn < two_phase_at) && is_commit)
2881  {
2882  txn->txn_flags |= RBTXN_PREPARE;
2883 
2884  /*
2885  * The prepare info must have been updated in txn even if we skip
2886  * prepare.
2887  */
2889 
2890  /*
2891  * By this time the txn has the prepare record information and it is
2892  * important to use that so that downstream gets the accurate
2893  * information. If instead, we have passed commit information here
2894  * then downstream can behave as it has already replayed commit
2895  * prepared after the restart.
2896  */
2897  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2898  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2899  }
2900 
2901  txn->final_lsn = commit_lsn;
2902  txn->end_lsn = end_lsn;
2903  txn->xact_time.commit_time = commit_time;
2904  txn->origin_id = origin_id;
2905  txn->origin_lsn = origin_lsn;
2906 
2907  if (is_commit)
2908  rb->commit_prepared(rb, txn, commit_lsn);
2909  else
2910  rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
2911 
2912  /* cleanup: make sure there's no cache pollution */
2914  txn->invalidations);
2915  ReorderBufferCleanupTXN(rb, txn);
2916 }
2917 
2918 /*
2919  * Abort a transaction that possibly has previous changes. Needs to be first
2920  * called for subtransactions and then for the toplevel xid.
2921  *
2922  * NB: Transactions handled here have to have actively aborted (i.e. have
2923  * produced an abort record). Implicitly aborted transactions are handled via
2924  * ReorderBufferAbortOld(); transactions we're just not interested in, but
2925  * which have committed are handled in ReorderBufferForget().
2926  *
2927  * This function purges this transaction and its contents from memory and
2928  * disk.
2929  */
2930 void
2932  TimestampTz abort_time)
2933 {
2934  ReorderBufferTXN *txn;
2935 
2936  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2937  false);
2938 
2939  /* unknown, nothing to remove */
2940  if (txn == NULL)
2941  return;
2942 
2943  txn->xact_time.abort_time = abort_time;
2944 
2945  /* For streamed transactions notify the remote node about the abort. */
2946  if (rbtxn_is_streamed(txn))
2947  {
2948  rb->stream_abort(rb, txn, lsn);
2949 
2950  /*
2951  * We might have decoded changes for this transaction that could load
2952  * the cache as per the current transaction's view (consider DDL's
2953  * happened in this transaction). We don't want the decoding of future
2954  * transactions to use those cache entries so execute invalidations.
2955  */
2956  if (txn->ninvalidations > 0)
2958  txn->invalidations);
2959  }
2960 
2961  /* cosmetic... */
2962  txn->final_lsn = lsn;
2963 
2964  /* remove potential on-disk data, and deallocate */
2965  ReorderBufferCleanupTXN(rb, txn);
2966 }
2967 
2968 /*
2969  * Abort all transactions that aren't actually running anymore because the
2970  * server restarted.
2971  *
2972  * NB: These really have to be transactions that have aborted due to a server
2973  * crash/immediate restart, as we don't deal with invalidations here.
2974  */
2975 void
2977 {
2978  dlist_mutable_iter it;
2979 
2980  /*
2981  * Iterate through all (potential) toplevel TXNs and abort all that are
2982  * older than what possibly can be running. Once we've found the first
2983  * that is alive we stop, there might be some that acquired an xid earlier
2984  * but started writing later, but it's unlikely and they will be cleaned
2985  * up in a later call to this function.
2986  */
2988  {
2989  ReorderBufferTXN *txn;
2990 
2991  txn = dlist_container(ReorderBufferTXN, node, it.cur);
2992 
2993  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
2994  {
2995  elog(DEBUG2, "aborting old transaction %u", txn->xid);
2996 
2997  /* Notify the remote node about the crash/immediate restart. */
2998  if (rbtxn_is_streamed(txn))
2999  rb->stream_abort(rb, txn, InvalidXLogRecPtr);
3000 
3001  /* remove potential on-disk data, and deallocate this tx */
3002  ReorderBufferCleanupTXN(rb, txn);
3003  }
3004  else
3005  return;
3006  }
3007 }
3008 
3009 /*
3010  * Forget the contents of a transaction if we aren't interested in its
3011  * contents. Needs to be first called for subtransactions and then for the
3012  * toplevel xid.
3013  *
3014  * This is significantly different to ReorderBufferAbort() because
3015  * transactions that have committed need to be treated differently from aborted
3016  * ones since they may have modified the catalog.
3017  *
3018  * Note that this is only allowed to be called in the moment a transaction
3019  * commit has just been read, not earlier; otherwise later records referring
3020  * to this xid might re-create the transaction incompletely.
3021  */
3022 void
3024 {
3025  ReorderBufferTXN *txn;
3026 
3027  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3028  false);
3029 
3030  /* unknown, nothing to forget */
3031  if (txn == NULL)
3032  return;
3033 
3034  /* this transaction mustn't be streamed */
3035  Assert(!rbtxn_is_streamed(txn));
3036 
3037  /* cosmetic... */
3038  txn->final_lsn = lsn;
3039 
3040  /*
3041  * Process cache invalidation messages if there are any. Even if we're not
3042  * interested in the transaction's contents, it could have manipulated the
3043  * catalog and we need to update the caches according to that.
3044  */
3045  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3047  txn->invalidations);
3048  else
3049  Assert(txn->ninvalidations == 0);
3050 
3051  /* remove potential on-disk data, and deallocate */
3052  ReorderBufferCleanupTXN(rb, txn);
3053 }
3054 
3055 /*
3056  * Invalidate cache for those transactions that need to be skipped just in case
3057  * catalogs were manipulated as part of the transaction.
3058  *
3059  * Note that this is a special-purpose function for prepared transactions where
3060  * we don't want to clean up the TXN even when we decide to skip it. See
3061  * DecodePrepare.
3062  */
3063 void
3065 {
3066  ReorderBufferTXN *txn;
3067 
3068  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3069  false);
3070 
3071  /* unknown, nothing to do */
3072  if (txn == NULL)
3073  return;
3074 
3075  /*
3076  * Process cache invalidation messages if there are any. Even if we're not
3077  * interested in the transaction's contents, it could have manipulated the
3078  * catalog and we need to update the caches according to that.
3079  */
3080  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3082  txn->invalidations);
3083  else
3084  Assert(txn->ninvalidations == 0);
3085 }
3086 
3087 
3088 /*
3089  * Execute invalidations happening outside the context of a decoded
3090  * transaction. That currently happens either for xid-less commits
3091  * (cf. RecordTransactionCommit()) or for invalidations in uninteresting
3092  * transactions (via ReorderBufferForget()).
3093  */
3094 void
3096  SharedInvalidationMessage *invalidations)
3097 {
3098  bool use_subtxn = IsTransactionOrTransactionBlock();
3099  int i;
3100 
3101  if (use_subtxn)
3102  BeginInternalSubTransaction("replay");
3103 
3104  /*
3105  * Force invalidations to happen outside of a valid transaction - that way
3106  * entries will just be marked as invalid without accessing the catalog.
3107  * That's advantageous because we don't need to setup the full state
3108  * necessary for catalog access.
3109  */
3110  if (use_subtxn)
3112 
3113  for (i = 0; i < ninvalidations; i++)
3114  LocalExecuteInvalidationMessage(&invalidations[i]);
3115 
3116  if (use_subtxn)
3118 }
3119 
3120 /*
3121  * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
3122  * least once for every xid in XLogRecord->xl_xid (other places in records
3123  * may, but do not have to be passed through here).
3124  *
3125  * Reorderbuffer keeps some data structures about transactions in LSN order,
3126  * for efficiency. To do that it has to know about when transactions are seen
3127  * first in the WAL. As many types of records are not actually interesting for
3128  * logical decoding, they do not necessarily pass through here.
3129  */
3130 void
3132 {
3133  /* many records won't have an xid assigned, centralize check here */
3134  if (xid != InvalidTransactionId)
3135  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3136 }
3137 
3138 /*
3139  * Add a new snapshot to this transaction that may only used after lsn 'lsn'
3140  * because the previous snapshot doesn't describe the catalog correctly for
3141  * following rows.
3142  */
3143 void
3145  XLogRecPtr lsn, Snapshot snap)
3146 {
3148 
3149  change->data.snapshot = snap;
3151 
3152  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3153 }
3154 
3155 /*
3156  * Set up the transaction's base snapshot.
3157  *
3158  * If we know that xid is a subtransaction, set the base snapshot on the
3159  * top-level transaction instead.
3160  */
3161 void
3163  XLogRecPtr lsn, Snapshot snap)
3164 {
3165  ReorderBufferTXN *txn;
3166  bool is_new;
3167 
3168  Assert(snap != NULL);
3169 
3170  /*
3171  * Fetch the transaction to operate on. If we know it's a subtransaction,
3172  * operate on its top-level transaction instead.
3173  */
3174  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
3175  if (rbtxn_is_known_subxact(txn))
3176  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3177  NULL, InvalidXLogRecPtr, false);
3178  Assert(txn->base_snapshot == NULL);
3179 
3180  txn->base_snapshot = snap;
3181  txn->base_snapshot_lsn = lsn;
3183 
3184  AssertTXNLsnOrder(rb);
3185 }
3186 
3187 /*
3188  * Access the catalog with this CommandId at this point in the changestream.
3189  *
3190  * May only be called for command ids > 1
3191  */
3192 void
3194  XLogRecPtr lsn, CommandId cid)
3195 {
3197 
3198  change->data.command_id = cid;
3200 
3201  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3202 }
3203 
3204 /*
3205  * Update memory counters to account for the new or removed change.
3206  *
3207  * We update two counters - in the reorder buffer, and in the transaction
3208  * containing the change. The reorder buffer counter allows us to quickly
3209  * decide if we reached the memory limit, the transaction counter allows
3210  * us to quickly pick the largest transaction for eviction.
3211  *
3212  * Either txn or change must be non-NULL at least. We update the memory
3213  * counter of txn if it's non-NULL, otherwise change->txn.
3214  *
3215  * When streaming is enabled, we need to update the toplevel transaction
3216  * counters instead - we don't really care about subtransactions as we
3217  * can't stream them individually anyway, and we only pick toplevel
3218  * transactions for eviction. So only toplevel transactions matter.
3219  */
3220 static void
3222  ReorderBufferChange *change,
3223  ReorderBufferTXN *txn,
3224  bool addition, Size sz)
3225 {
3226  ReorderBufferTXN *toptxn;
3227 
3228  Assert(txn || change);
3229 
3230  /*
3231  * Ignore tuple CID changes, because those are not evicted when reaching
3232  * memory limit. So we just don't count them, because it might easily
3233  * trigger a pointless attempt to spill.
3234  */
3235  if (change && change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
3236  return;
3237 
3238  if (sz == 0)
3239  return;
3240 
3241  if (txn == NULL)
3242  txn = change->txn;
3243  Assert(txn != NULL);
3244 
3245  /*
3246  * Update the total size in top level as well. This is later used to
3247  * compute the decoding stats.
3248  */
3249  toptxn = rbtxn_get_toptxn(txn);
3250 
3251  if (addition)
3252  {
3253  Size oldsize = txn->size;
3254 
3255  txn->size += sz;
3256  rb->size += sz;
3257 
3258  /* Update the total size in the top transaction. */
3259  toptxn->total_size += sz;
3260 
3261  /* Update the max-heap */
3262  if (oldsize != 0)
3263  pairingheap_remove(rb->txn_heap, &txn->txn_node);
3264  pairingheap_add(rb->txn_heap, &txn->txn_node);
3265  }
3266  else
3267  {
3268  Assert((rb->size >= sz) && (txn->size >= sz));
3269  txn->size -= sz;
3270  rb->size -= sz;
3271 
3272  /* Update the total size in the top transaction. */
3273  toptxn->total_size -= sz;
3274 
3275  /* Update the max-heap */
3276  pairingheap_remove(rb->txn_heap, &txn->txn_node);
3277  if (txn->size != 0)
3278  pairingheap_add(rb->txn_heap, &txn->txn_node);
3279  }
3280 
3281  Assert(txn->size <= rb->size);
3282 }
3283 
3284 /*
3285  * Add new (relfilelocator, tid) -> (cmin, cmax) mappings.
3286  *
3287  * We do not include this change type in memory accounting, because we
3288  * keep CIDs in a separate list and do not evict them when reaching
3289  * the memory limit.
3290  */
3291 void
3293  XLogRecPtr lsn, RelFileLocator locator,
3294  ItemPointerData tid, CommandId cmin,
3295  CommandId cmax, CommandId combocid)
3296 {
3298  ReorderBufferTXN *txn;
3299 
3300  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3301 
3302  change->data.tuplecid.locator = locator;
3303  change->data.tuplecid.tid = tid;
3304  change->data.tuplecid.cmin = cmin;
3305  change->data.tuplecid.cmax = cmax;
3306  change->data.tuplecid.combocid = combocid;
3307  change->lsn = lsn;
3308  change->txn = txn;
3310 
3311  dlist_push_tail(&txn->tuplecids, &change->node);
3312  txn->ntuplecids++;
3313 }
3314 
3315 /*
3316  * Accumulate the invalidations for executing them later.
3317  *
3318  * This needs to be called for each XLOG_XACT_INVALIDATIONS message and
3319  * accumulates all the invalidation messages in the toplevel transaction, if
3320  * available, otherwise in the current transaction, as well as in the form of
3321  * change in reorder buffer. We require to record it in form of the change
3322  * so that we can execute only the required invalidations instead of executing
3323  * all the invalidations on each CommandId increment. We also need to
3324  * accumulate these in the txn buffer because in some cases where we skip
3325  * processing the transaction (see ReorderBufferForget), we need to execute
3326  * all the invalidations together.
3327  */
3328 void
3330  XLogRecPtr lsn, Size nmsgs,
3332 {
3333  ReorderBufferTXN *txn;
3334  MemoryContext oldcontext;
3335  ReorderBufferChange *change;
3336 
3337  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3338 
3339  oldcontext = MemoryContextSwitchTo(rb->context);
3340 
3341  /*
3342  * Collect all the invalidations under the top transaction, if available,
3343  * so that we can execute them all together. See comments atop this
3344  * function.
3345  */
3346  txn = rbtxn_get_toptxn(txn);
3347 
3348  Assert(nmsgs > 0);
3349 
3350  /* Accumulate invalidations. */
3351  if (txn->ninvalidations == 0)
3352  {
3353  txn->ninvalidations = nmsgs;
3355  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3356  memcpy(txn->invalidations, msgs,
3357  sizeof(SharedInvalidationMessage) * nmsgs);
3358  }
3359  else
3360  {
3363  (txn->ninvalidations + nmsgs));
3364 
3365  memcpy(txn->invalidations + txn->ninvalidations, msgs,
3366  nmsgs * sizeof(SharedInvalidationMessage));
3367  txn->ninvalidations += nmsgs;
3368  }
3369 
3370  change = ReorderBufferGetChange(rb);
3372  change->data.inval.ninvalidations = nmsgs;
3373  change->data.inval.invalidations = (SharedInvalidationMessage *)
3374  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3375  memcpy(change->data.inval.invalidations, msgs,
3376  sizeof(SharedInvalidationMessage) * nmsgs);
3377 
3378  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3379 
3380  MemoryContextSwitchTo(oldcontext);
3381 }
3382 
3383 /*
3384  * Apply all invalidations we know. Possibly we only need parts at this point
3385  * in the changestream but we don't know which those are.
3386  */
3387 static void
3389 {
3390  int i;
3391 
3392  for (i = 0; i < nmsgs; i++)
3394 }
3395 
3396 /*
3397  * Mark a transaction as containing catalog changes
3398  */
3399 void
3401  XLogRecPtr lsn)
3402 {
3403  ReorderBufferTXN *txn;
3404 
3405  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3406 
3407  if (!rbtxn_has_catalog_changes(txn))
3408  {
3411  }
3412 
3413  /*
3414  * Mark top-level transaction as having catalog changes too if one of its
3415  * children has so that the ReorderBufferBuildTupleCidHash can
3416  * conveniently check just top-level transaction and decide whether to
3417  * build the hash table or not.
3418  */
3419  if (rbtxn_is_subtxn(txn))
3420  {
3421  ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
3422 
3423  if (!rbtxn_has_catalog_changes(toptxn))
3424  {
3427  }
3428  }
3429 }
3430 
3431 /*
3432  * Return palloc'ed array of the transactions that have changed catalogs.
3433  * The returned array is sorted in xidComparator order.
3434  *
3435  * The caller must free the returned array when done with it.
3436  */
3437 TransactionId *
3439 {
3440  dlist_iter iter;
3441  TransactionId *xids = NULL;
3442  size_t xcnt = 0;
3443 
3444  /* Quick return if the list is empty */
3445  if (dclist_count(&rb->catchange_txns) == 0)
3446  return NULL;
3447 
3448  /* Initialize XID array */
3449  xids = (TransactionId *) palloc(sizeof(TransactionId) *
3451  dclist_foreach(iter, &rb->catchange_txns)
3452  {
3454  catchange_node,
3455  iter.cur);
3456 
3458 
3459  xids[xcnt++] = txn->xid;
3460  }
3461 
3462  qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
3463 
3464  Assert(xcnt == dclist_count(&rb->catchange_txns));
3465  return xids;
3466 }
3467 
3468 /*
3469  * Query whether a transaction is already *known* to contain catalog
3470  * changes. This can be wrong until directly before the commit!
3471  */
3472 bool
3474 {
3475  ReorderBufferTXN *txn;
3476 
3477  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3478  false);
3479  if (txn == NULL)
3480  return false;
3481 
3482  return rbtxn_has_catalog_changes(txn);
3483 }
3484 
3485 /*
3486  * ReorderBufferXidHasBaseSnapshot
3487  * Have we already set the base snapshot for the given txn/subtxn?
3488  */
3489 bool
3491 {
3492  ReorderBufferTXN *txn;
3493 
3494  txn = ReorderBufferTXNByXid(rb, xid, false,
3495  NULL, InvalidXLogRecPtr, false);
3496 
3497  /* transaction isn't known yet, ergo no snapshot */
3498  if (txn == NULL)
3499  return false;
3500 
3501  /* a known subtxn? operate on top-level txn instead */
3502  if (rbtxn_is_known_subxact(txn))
3503  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3504  NULL, InvalidXLogRecPtr, false);
3505 
3506  return txn->base_snapshot != NULL;
3507 }
3508 
3509 
3510 /*
3511  * ---------------------------------------
3512  * Disk serialization support
3513  * ---------------------------------------
3514  */
3515 
3516 /*
3517  * Ensure the IO buffer is >= sz.
3518  */
3519 static void
3521 {
3522  if (!rb->outbufsize)
3523  {
3524  rb->outbuf = MemoryContextAlloc(rb->context, sz);
3525  rb->outbufsize = sz;
3526  }
3527  else if (rb->outbufsize < sz)
3528  {
3529  rb->outbuf = repalloc(rb->outbuf, sz);
3530  rb->outbufsize = sz;
3531  }
3532 }
3533 
3534 
3535 /* Compare two transactions by size */
3536 static int
3538 {
3541 
3542  if (ta->size < tb->size)
3543  return -1;
3544  if (ta->size > tb->size)
3545  return 1;
3546  return 0;
3547 }
3548 
3549 /*
3550  * Find the largest transaction (toplevel or subxact) to evict (spill to disk).
3551  */
3552 static ReorderBufferTXN *
3554 {
3555  ReorderBufferTXN *largest;
3556 
3557  /* Get the largest transaction from the max-heap */
3558  largest = pairingheap_container(ReorderBufferTXN, txn_node,
3560 
3561  Assert(largest);
3562  Assert(largest->size > 0);
3563  Assert(largest->size <= rb->size);
3564 
3565  return largest;
3566 }
3567 
3568 /*
3569  * Find the largest streamable toplevel transaction to evict (by streaming).
3570  *
3571  * This can be seen as an optimized version of ReorderBufferLargestTXN, which
3572  * should give us the same transaction (because we don't update memory account
3573  * for subtransaction with streaming, so it's always 0). But we can simply
3574  * iterate over the limited number of toplevel transactions that have a base
3575  * snapshot. There is no use of selecting a transaction that doesn't have base
3576  * snapshot because we don't decode such transactions. Also, we do not select
3577  * the transaction which doesn't have any streamable change.
3578  *
3579  * Note that, we skip transactions that contain incomplete changes. There
3580  * is a scope of optimization here such that we can select the largest
3581  * transaction which has incomplete changes. But that will make the code and
3582  * design quite complex and that might not be worth the benefit. If we plan to
3583  * stream the transactions that contain incomplete changes then we need to
3584  * find a way to partially stream/truncate the transaction changes in-memory
3585  * and build a mechanism to partially truncate the spilled files.
3586  * Additionally, whenever we partially stream the transaction we need to
3587  * maintain the last streamed lsn and next time we need to restore from that
3588  * segment and the offset in WAL. As we stream the changes from the top
3589  * transaction and restore them subtransaction wise, we need to even remember
3590  * the subxact from where we streamed the last change.
3591  */
3592 static ReorderBufferTXN *
3594 {
3595  dlist_iter iter;
3596  Size largest_size = 0;
3597  ReorderBufferTXN *largest = NULL;
3598 
3599  /* Find the largest top-level transaction having a base snapshot. */
3601  {
3602  ReorderBufferTXN *txn;
3603 
3604  txn = dlist_container(ReorderBufferTXN, base_snapshot_node, iter.cur);
3605 
3606  /* must not be a subtxn */
3608  /* base_snapshot must be set */
3609  Assert(txn->base_snapshot != NULL);
3610 
3611  if ((largest == NULL || txn->total_size > largest_size) &&
3612  (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) &&
3614  {
3615  largest = txn;
3616  largest_size = txn->total_size;
3617  }
3618  }
3619 
3620  return largest;
3621 }
3622 
3623 /*
3624  * Check whether the logical_decoding_work_mem limit was reached, and if yes
3625  * pick the largest (sub)transaction at-a-time to evict and spill its changes to
3626  * disk or send to the output plugin until we reach under the memory limit.
3627  *
3628  * If debug_logical_replication_streaming is set to "immediate", stream or
3629  * serialize the changes immediately.
3630  *
3631  * XXX At this point we select the transactions until we reach under the memory
3632  * limit, but we might also adapt a more elaborate eviction strategy - for example
3633  * evicting enough transactions to free certain fraction (e.g. 50%) of the memory
3634  * limit.
3635  */
3636 static void
3638 {
3639  ReorderBufferTXN *txn;
3640 
3641  /*
3642  * Bail out if debug_logical_replication_streaming is buffered and we
3643  * haven't exceeded the memory limit.
3644  */
3646  rb->size < logical_decoding_work_mem * 1024L)
3647  return;
3648 
3649  /*
3650  * If debug_logical_replication_streaming is immediate, loop until there's
3651  * no change. Otherwise, loop until we reach under the memory limit. One
3652  * might think that just by evicting the largest (sub)transaction we will
3653  * come under the memory limit based on assumption that the selected
3654  * transaction is at least as large as the most recent change (which
3655  * caused us to go over the memory limit). However, that is not true
3656  * because a user can reduce the logical_decoding_work_mem to a smaller
3657  * value before the most recent change.
3658  */
3659  while (rb->size >= logical_decoding_work_mem * 1024L ||
3661  rb->size > 0))
3662  {
3663  /*
3664  * Pick the largest transaction and evict it from memory by streaming,
3665  * if possible. Otherwise, spill to disk.
3666  */
3668  (txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
3669  {
3670  /* we know there has to be one, because the size is not zero */
3671  Assert(txn && rbtxn_is_toptxn(txn));
3672  Assert(txn->total_size > 0);
3673  Assert(rb->size >= txn->total_size);
3674 
3675  ReorderBufferStreamTXN(rb, txn);
3676  }
3677  else
3678  {
3679  /*
3680  * Pick the largest transaction (or subtransaction) and evict it
3681  * from memory by serializing it to disk.
3682  */
3683  txn = ReorderBufferLargestTXN(rb);
3684 
3685  /* we know there has to be one, because the size is not zero */
3686  Assert(txn);
3687  Assert(txn->size > 0);
3688  Assert(rb->size >= txn->size);
3689 
3690  ReorderBufferSerializeTXN(rb, txn);
3691  }
3692 
3693  /*
3694  * After eviction, the transaction should have no entries in memory,
3695  * and should use 0 bytes for changes.
3696  */
3697  Assert(txn->size == 0);
3698  Assert(txn->nentries_mem == 0);
3699  }
3700 
3701  /* We must be under the memory limit now. */
3702  Assert(rb->size < logical_decoding_work_mem * 1024L);
3703 
3704 }
3705 
3706 /*
3707  * Spill data of a large transaction (and its subtransactions) to disk.
3708  */
3709 static void
3711 {
3712  dlist_iter subtxn_i;
3713  dlist_mutable_iter change_i;
3714  int fd = -1;
3715  XLogSegNo curOpenSegNo = 0;
3716  Size spilled = 0;
3717  Size size = txn->size;
3718 
3719  elog(DEBUG2, "spill %u changes in XID %u to disk",
3720  (uint32) txn->nentries_mem, txn->xid);
3721 
3722  /* do the same to all child TXs */
3723  dlist_foreach(subtxn_i, &txn->subtxns)
3724  {
3725  ReorderBufferTXN *subtxn;
3726 
3727  subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
3728  ReorderBufferSerializeTXN(rb, subtxn);
3729  }
3730 
3731  /* serialize changestream */
3732  dlist_foreach_modify(change_i, &txn->changes)
3733  {
3734  ReorderBufferChange *change;
3735 
3736  change = dlist_container(ReorderBufferChange, node, change_i.cur);
3737 
3738  /*
3739  * store in segment in which it belongs by start lsn, don't split over
3740  * multiple segments tho
3741  */
3742  if (fd == -1 ||
3743  !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
3744  {
3745  char path[MAXPGPATH];
3746 
3747  if (fd != -1)
3749 
3750  XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
3751 
3752  /*
3753  * No need to care about TLIs here, only used during a single run,
3754  * so each LSN only maps to a specific WAL record.
3755  */
3757  curOpenSegNo);
3758 
3759  /* open segment, create it if necessary */
3760  fd = OpenTransientFile(path,
3761  O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
3762 
3763  if (fd < 0)
3764  ereport(ERROR,
3766  errmsg("could not open file \"%s\": %m", path)));
3767  }
3768 
3769  ReorderBufferSerializeChange(rb, txn, fd, change);
3770  dlist_delete(&change->node);
3771  ReorderBufferReturnChange(rb, change, false);
3772 
3773  spilled++;
3774  }
3775 
3776  /* Update the memory counter */
3777  ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, size);
3778 
3779  /* update the statistics iff we have spilled anything */
3780  if (spilled)
3781  {
3782  rb->spillCount += 1;
3783  rb->spillBytes += size;
3784 
3785  /* don't consider already serialized transactions */
3786  rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
3787 
3788  /* update the decoding stats */
3790  }
3791 
3792  Assert(spilled == txn->nentries_mem);
3793  Assert(dlist_is_empty(&txn->changes));
3794  txn->nentries_mem = 0;
3796 
3797  if (fd != -1)
3799 }
3800 
3801 /*
3802  * Serialize individual change to disk.
3803  */
3804 static void
3806  int fd, ReorderBufferChange *change)
3807 {
3808  ReorderBufferDiskChange *ondisk;
3809  Size sz = sizeof(ReorderBufferDiskChange);
3810 
3812 
3813  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3814  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
3815 
3816  switch (change->action)
3817  {
3818  /* fall through these, they're all similar enough */
3823  {
3824  char *data;
3825  HeapTuple oldtup,
3826  newtup;
3827  Size oldlen = 0;
3828  Size newlen = 0;
3829 
3830  oldtup = change->data.tp.oldtuple;
3831  newtup = change->data.tp.newtuple;
3832 
3833  if (oldtup)
3834  {
3835  sz += sizeof(HeapTupleData);
3836  oldlen = oldtup->t_len;
3837  sz += oldlen;
3838  }
3839 
3840  if (newtup)
3841  {
3842  sz += sizeof(HeapTupleData);
3843  newlen = newtup->t_len;
3844  sz += newlen;
3845  }
3846 
3847  /* make sure we have enough space */
3849 
3850  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3851  /* might have been reallocated above */
3852  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3853 
3854  if (oldlen)
3855  {
3856  memcpy(data, oldtup, sizeof(HeapTupleData));
3857  data += sizeof(HeapTupleData);
3858 
3859  memcpy(data, oldtup->t_data, oldlen);
3860  data += oldlen;
3861  }
3862 
3863  if (newlen)
3864  {
3865  memcpy(data, newtup, sizeof(HeapTupleData));
3866  data += sizeof(HeapTupleData);
3867 
3868  memcpy(data, newtup->t_data, newlen);
3869  data += newlen;
3870  }
3871  break;
3872  }
3874  {
3875  char *data;
3876  Size prefix_size = strlen(change->data.msg.prefix) + 1;
3877 
3878  sz += prefix_size + change->data.msg.message_size +
3879  sizeof(Size) + sizeof(Size);
3881 
3882  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3883 
3884  /* might have been reallocated above */
3885  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3886 
3887  /* write the prefix including the size */
3888  memcpy(data, &prefix_size, sizeof(Size));
3889  data += sizeof(Size);
3890  memcpy(data, change->data.msg.prefix,
3891  prefix_size);
3892  data += prefix_size;
3893 
3894  /* write the message including the size */
3895  memcpy(data, &change->data.msg.message_size, sizeof(Size));
3896  data += sizeof(Size);
3897  memcpy(data, change->data.msg.message,
3898  change->data.msg.message_size);
3899  data += change->data.msg.message_size;
3900 
3901  break;
3902  }
3904  {
3905  char *data;
3906  Size inval_size = sizeof(SharedInvalidationMessage) *
3907  change->data.inval.ninvalidations;
3908 
3909  sz += inval_size;
3910 
3912  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3913 
3914  /* might have been reallocated above */
3915  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3916  memcpy(data, change->data.inval.invalidations, inval_size);
3917  data += inval_size;
3918 
3919  break;
3920  }
3922  {
3923  Snapshot snap;
3924  char *data;
3925 
3926  snap = change->data.snapshot;
3927 
3928  sz += sizeof(SnapshotData) +
3929  sizeof(TransactionId) * snap->xcnt +
3930  sizeof(TransactionId) * snap->subxcnt;
3931 
3932  /* make sure we have enough space */
3934  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3935  /* might have been reallocated above */
3936  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3937 
3938  memcpy(data, snap, sizeof(SnapshotData));
3939  data += sizeof(SnapshotData);
3940 
3941  if (snap->xcnt)
3942  {
3943  memcpy(data, snap->xip,
3944  sizeof(TransactionId) * snap->xcnt);
3945  data += sizeof(TransactionId) * snap->xcnt;
3946  }
3947 
3948  if (snap->subxcnt)
3949  {
3950  memcpy(data, snap->subxip,
3951  sizeof(TransactionId) * snap->subxcnt);
3952  data += sizeof(TransactionId) * snap->subxcnt;
3953  }
3954  break;
3955  }
3957  {
3958  Size size;
3959  char *data;
3960 
3961  /* account for the OIDs of truncated relations */
3962  size = sizeof(Oid) * change->data.truncate.nrelids;
3963  sz += size;
3964 
3965  /* make sure we have enough space */
3967 
3968  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3969  /* might have been reallocated above */
3970  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3971 
3972  memcpy(data, change->data.truncate.relids, size);
3973  data += size;
3974 
3975  break;
3976  }
3981  /* ReorderBufferChange contains everything important */
3982  break;
3983  }
3984 
3985  ondisk->size = sz;
3986 
3987  errno = 0;
3988  pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
3989  if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
3990  {
3991  int save_errno = errno;
3992 
3994 
3995  /* if write didn't set errno, assume problem is no disk space */
3996  errno = save_errno ? save_errno : ENOSPC;
3997  ereport(ERROR,
3999  errmsg("could not write to data file for XID %u: %m",
4000  txn->xid)));
4001  }
4003 
4004  /*
4005  * Keep the transaction's final_lsn up to date with each change we send to
4006  * disk, so that ReorderBufferRestoreCleanup works correctly. (We used to
4007  * only do this on commit and abort records, but that doesn't work if a
4008  * system crash leaves a transaction without its abort record).
4009  *
4010  * Make sure not to move it backwards.
4011  */
4012  if (txn->final_lsn < change->lsn)
4013  txn->final_lsn = change->lsn;
4014 
4015  Assert(ondisk->change.action == change->action);
4016 }
4017 
4018 /* Returns true, if the output plugin supports streaming, false, otherwise. */
4019 static inline bool
4021 {
4023 
4024  return ctx->streaming;
4025 }
4026 
4027 /* Returns true, if the streaming can be started now, false, otherwise. */
4028 static inline bool
4030 {
4032  SnapBuild *builder = ctx->snapshot_builder;
4033 
4034  /* We can't start streaming unless a consistent state is reached. */
4036  return false;
4037 
4038  /*
4039  * We can't start streaming immediately even if the streaming is enabled
4040  * because we previously decoded this transaction and now just are
4041  * restarting.
4042  */
4043  if (ReorderBufferCanStream(rb) &&
4044  !SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr))
4045  return true;
4046 
4047  return false;
4048 }
4049 
4050 /*
4051  * Send data of a large transaction (and its subtransactions) to the
4052  * output plugin, but using the stream API.
4053  */
4054 static void
4056 {
4057  Snapshot snapshot_now;
4058  CommandId command_id;
4059  Size stream_bytes;
4060  bool txn_is_streamed;
4061 
4062  /* We can never reach here for a subtransaction. */
4063  Assert(rbtxn_is_toptxn(txn));
4064 
4065  /*
4066  * We can't make any assumptions about base snapshot here, similar to what
4067  * ReorderBufferCommit() does. That relies on base_snapshot getting
4068  * transferred from subxact in ReorderBufferCommitChild(), but that was
4069  * not yet called as the transaction is in-progress.
4070  *
4071  * So just walk the subxacts and use the same logic here. But we only need
4072  * to do that once, when the transaction is streamed for the first time.
4073  * After that we need to reuse the snapshot from the previous run.
4074  *
4075  * Unlike DecodeCommit which adds xids of all the subtransactions in
4076  * snapshot's xip array via SnapBuildCommitTxn, we can't do that here but
4077  * we do add them to subxip array instead via ReorderBufferCopySnap. This
4078  * allows the catalog changes made in subtransactions decoded till now to
4079  * be visible.
4080  */
4081  if (txn->snapshot_now == NULL)
4082  {
4083  dlist_iter subxact_i;
4084 
4085  /* make sure this transaction is streamed for the first time */
4086  Assert(!rbtxn_is_streamed(txn));
4087 
4088  /* at the beginning we should have invalid command ID */
4090 
4091  dlist_foreach(subxact_i, &txn->subtxns)
4092  {
4093  ReorderBufferTXN *subtxn;
4094 
4095  subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
4096  ReorderBufferTransferSnapToParent(txn, subtxn);
4097  }
4098 
4099  /*
4100  * If this transaction has no snapshot, it didn't make any changes to
4101  * the database till now, so there's nothing to decode.
4102  */
4103  if (txn->base_snapshot == NULL)
4104  {
4105  Assert(txn->ninvalidations == 0);
4106  return;
4107  }
4108 
4109  command_id = FirstCommandId;
4110  snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
4111  txn, command_id);
4112  }
4113  else
4114  {
4115  /* the transaction must have been already streamed */
4116  Assert(rbtxn_is_streamed(txn));
4117 
4118  /*
4119  * Nah, we already have snapshot from the previous streaming run. We
4120  * assume new subxacts can't move the LSN backwards, and so can't beat
4121  * the LSN condition in the previous branch (so no need to walk
4122  * through subxacts again). In fact, we must not do that as we may be
4123  * using snapshot half-way through the subxact.
4124  */
4125  command_id = txn->command_id;
4126 
4127  /*
4128  * We can't use txn->snapshot_now directly because after the last
4129  * streaming run, we might have got some new sub-transactions. So we
4130  * need to add them to the snapshot.
4131  */
4132  snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now,
4133  txn, command_id);
4134 
4135  /* Free the previously copied snapshot. */
4136  Assert(txn->snapshot_now->copied);
4138  txn->snapshot_now = NULL;
4139  }
4140 
4141  /*
4142  * Remember this information to be used later to update stats. We can't
4143  * update the stats here as an error while processing the changes would
4144  * lead to the accumulation of stats even though we haven't streamed all
4145  * the changes.
4146  */
4147  txn_is_streamed = rbtxn_is_streamed(txn);
4148  stream_bytes = txn->total_size;
4149 
4150  /* Process and send the changes to output plugin. */
4151  ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
4152  command_id, true);
4153 
4154  rb->streamCount += 1;
4155  rb->streamBytes += stream_bytes;
4156 
4157  /* Don't consider already streamed transaction. */
4158  rb->streamTxns += (txn_is_streamed) ? 0 : 1;
4159 
4160  /* update the decoding stats */
4162 
4163  Assert(dlist_is_empty(&txn->changes));
4164  Assert(txn->nentries == 0);
4165  Assert(txn->nentries_mem == 0);
4166 }
4167 
4168 /*
4169  * Size of a change in memory.
4170  */
4171 static Size
4173 {
4174  Size sz = sizeof(ReorderBufferChange);
4175 
4176  switch (change->action)
4177  {
4178  /* fall through these, they're all similar enough */
4183  {
4184  HeapTuple oldtup,
4185  newtup;
4186  Size oldlen = 0;
4187  Size newlen = 0;
4188 
4189  oldtup = change->data.tp.oldtuple;
4190  newtup = change->data.tp.newtuple;
4191 
4192  if (oldtup)
4193  {
4194  sz += sizeof(HeapTupleData);
4195  oldlen = oldtup->t_len;
4196  sz += oldlen;
4197  }
4198 
4199  if (newtup)
4200  {
4201  sz += sizeof(HeapTupleData);
4202  newlen = newtup->t_len;
4203  sz += newlen;
4204  }
4205 
4206  break;
4207  }
4209  {
4210  Size prefix_size = strlen(change->data.msg.prefix) + 1;
4211 
4212  sz += prefix_size + change->data.msg.message_size +
4213  sizeof(Size) + sizeof(Size);
4214 
4215  break;
4216  }
4218  {
4219  sz += sizeof(SharedInvalidationMessage) *
4220  change->data.inval.ninvalidations;
4221  break;
4222  }
4224  {
4225  Snapshot snap;
4226 
4227  snap = change->data.snapshot;
4228 
4229  sz += sizeof(SnapshotData) +
4230  sizeof(TransactionId) * snap->xcnt +
4231  sizeof(TransactionId) * snap->subxcnt;
4232 
4233  break;
4234  }
4236  {
4237  sz += sizeof(Oid) * change->data.truncate.nrelids;
4238 
4239  break;
4240  }
4245  /* ReorderBufferChange contains everything important */
4246  break;
4247  }
4248 
4249  return sz;
4250 }
4251 
4252 
4253 /*
4254  * Restore a number of changes spilled to disk back into memory.
4255  */
4256 static Size
4258  TXNEntryFile *file, XLogSegNo *segno)
4259 {
4260  Size restored = 0;
4261  XLogSegNo last_segno;
4262  dlist_mutable_iter cleanup_iter;
4263  File *fd = &file->vfd;
4264 
4267 
4268  /* free current entries, so we have memory for more */
4269  dlist_foreach_modify(cleanup_iter, &txn->changes)
4270  {
4272  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
4273 
4274  dlist_delete(&cleanup->node);
4276  }
4277  txn->nentries_mem = 0;
4278  Assert(dlist_is_empty(&txn->changes));
4279 
4280  XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
4281 
4282  while (restored < max_changes_in_memory && *segno <= last_segno)
4283  {
4284  int readBytes;
4285  ReorderBufferDiskChange *ondisk;
4286 
4288 
4289  if (*fd == -1)
4290  {
4291  char path[MAXPGPATH];
4292 
4293  /* first time in */
4294  if (*segno == 0)
4295  XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
4296 
4297  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
4298 
4299  /*
4300  * No need to care about TLIs here, only used during a single run,
4301  * so each LSN only maps to a specific WAL record.
4302  */
4304  *segno);
4305 
4306  *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
4307 
4308  /* No harm in resetting the offset even in case of failure */
4309  file->curOffset = 0;
4310 
4311  if (*fd < 0 && errno == ENOENT)
4312  {
4313  *fd = -1;
4314  (*segno)++;
4315  continue;
4316  }
4317  else if (*fd < 0)
4318  ereport(ERROR,
4320  errmsg("could not open file \"%s\": %m",
4321  path)));
4322  }
4323 
4324  /*
4325  * Read the statically sized part of a change which has information
4326  * about the total size. If we couldn't read a record, we're at the
4327  * end of this file.
4328  */
4330  readBytes = FileRead(file->vfd, rb->outbuf,
4331  sizeof(ReorderBufferDiskChange),
4332  file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
4333 
4334  /* eof */
4335  if (readBytes == 0)
4336  {
4337  FileClose(*fd);
4338  *fd = -1;
4339  (*segno)++;
4340  continue;
4341  }
4342  else if (readBytes < 0)
4343  ereport(ERROR,
4345  errmsg("could not read from reorderbuffer spill file: %m")));
4346  else if (readBytes != sizeof(ReorderBufferDiskChange))
4347  ereport(ERROR,
4349  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4350  readBytes,
4351  (uint32) sizeof(ReorderBufferDiskChange))));
4352 
4353  file->curOffset += readBytes;
4354 
4355  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4356 
4358  sizeof(ReorderBufferDiskChange) + ondisk->size);
4359  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4360 
4361  readBytes = FileRead(file->vfd,
4362  rb->outbuf + sizeof(ReorderBufferDiskChange),
4363  ondisk->size - sizeof(ReorderBufferDiskChange),
4364  file->curOffset,
4365  WAIT_EVENT_REORDER_BUFFER_READ);
4366 
4367  if (readBytes < 0)
4368  ereport(ERROR,
4370  errmsg("could not read from reorderbuffer spill file: %m")));
4371  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
4372  ereport(ERROR,
4374  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4375  readBytes,
4376  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
4377 
4378  file->curOffset += readBytes;
4379 
4380  /*
4381  * ok, read a full change from disk, now restore it into proper
4382  * in-memory format
4383  */
4384  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
4385  restored++;
4386  }
4387 
4388  return restored;
4389 }
4390 
4391 /*
4392  * Convert change from its on-disk format to in-memory format and queue it onto
4393  * the TXN's ->changes list.
4394  *
4395  * Note: although "data" is declared char*, at entry it points to a
4396  * maxalign'd buffer, making it safe in most of this function to assume
4397  * that the pointed-to data is suitably aligned for direct access.
4398  */
4399 static void
4401  char *data)
4402 {
4403  ReorderBufferDiskChange *ondisk;
4404  ReorderBufferChange *change;
4405 
4406  ondisk = (ReorderBufferDiskChange *) data;
4407 
4408  change = ReorderBufferGetChange(rb);
4409 
4410  /* copy static part */
4411  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
4412 
4413  data += sizeof(ReorderBufferDiskChange);
4414 
4415  /* restore individual stuff */
4416  switch (change->action)
4417  {
4418  /* fall through these, they're all similar enough */
4423  if (change->data.tp.oldtuple)
4424  {
4425  uint32 tuplelen = ((HeapTuple) data)->t_len;
4426 
4427  change->data.tp.oldtuple =
4429 
4430  /* restore ->tuple */
4431  memcpy(change->data.tp.oldtuple, data,
4432  sizeof(HeapTupleData));
4433  data += sizeof(HeapTupleData);
4434 
4435  /* reset t_data pointer into the new tuplebuf */
4436  change->data.tp.oldtuple->t_data =
4437  (HeapTupleHeader) ((char *) change->data.tp.oldtuple + HEAPTUPLESIZE);
4438 
4439  /* restore tuple data itself */
4440  memcpy(change->data.tp.oldtuple->t_data, data, tuplelen);
4441  data += tuplelen;
4442  }
4443 
4444  if (change->data.tp.newtuple)
4445  {
4446  /* here, data might not be suitably aligned! */
4447  uint32 tuplelen;
4448 
4449  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
4450  sizeof(uint32));
4451 
4452  change->data.tp.newtuple =
4454 
4455  /* restore ->tuple */
4456  memcpy(change->data.tp.newtuple, data,
4457  sizeof(HeapTupleData));
4458  data += sizeof(HeapTupleData);
4459 
4460  /* reset t_data pointer into the new tuplebuf */
4461  change->data.tp.newtuple->t_data =
4462  (HeapTupleHeader) ((char *) change->data.tp.newtuple + HEAPTUPLESIZE);
4463 
4464  /* restore tuple data itself */
4465  memcpy(change->data.tp.newtuple->t_data, data, tuplelen);
4466  data += tuplelen;
4467  }
4468 
4469  break;
4471  {
4472  Size prefix_size;
4473 
4474  /* read prefix */
4475  memcpy(&prefix_size, data, sizeof(Size));
4476  data += sizeof(Size);
4477  change->data.msg.prefix = MemoryContextAlloc(rb->context,
4478  prefix_size);
4479  memcpy(change->data.msg.prefix, data, prefix_size);
4480  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
4481  data += prefix_size;
4482 
4483  /* read the message */
4484  memcpy(&change->data.msg.message_size, data, sizeof(Size));
4485  data += sizeof(Size);
4486  change->data.msg.message = MemoryContextAlloc(rb->context,
4487  change->data.msg.message_size);
4488  memcpy(change->data.msg.message, data,
4489  change->data.msg.message_size);
4490  data += change->data.msg.message_size;
4491 
4492  break;
4493  }
4495  {
4496  Size inval_size = sizeof(SharedInvalidationMessage) *
4497  change->data.inval.ninvalidations;
4498 
4499  change->data.inval.invalidations =
4500  MemoryContextAlloc(rb->context, inval_size);
4501 
4502  /* read the message */
4503  memcpy(change->data.inval.invalidations, data, inval_size);
4504 
4505  break;
4506  }
4508  {
4509  Snapshot oldsnap;
4510  Snapshot newsnap;
4511  Size size;
4512 
4513  oldsnap = (Snapshot) data;
4514 
4515  size = sizeof(SnapshotData) +
4516  sizeof(TransactionId) * oldsnap->xcnt +
4517  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
4518 
4520 
4521  newsnap = change->data.snapshot;
4522 
4523  memcpy(newsnap, data, size);
4524  newsnap->xip = (TransactionId *)
4525  (((char *) newsnap) + sizeof(SnapshotData));
4526  newsnap->subxip = newsnap->xip + newsnap->xcnt;
4527  newsnap->copied = true;
4528  break;
4529  }
4530  /* the base struct contains all the data, easy peasy */
4532  {
4533  Oid *relids;
4534 
4535  relids = ReorderBufferGetRelids(rb,
4536  change->data.truncate.nrelids);
4537  memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
4538  change->data.truncate.relids = relids;
4539 
4540  break;
4541  }
4546  break;
4547  }
4548 
4549  dlist_push_tail(&txn->changes, &change->node);
4550  txn->nentries_mem++;
4551 
4552  /*
4553  * Update memory accounting for the restored change. We need to do this
4554  * although we don't check the memory limit when restoring the changes in
4555  * this branch (we only do that when initially queueing the changes after
4556  * decoding), because we will release the changes later, and that will
4557  * update the accounting too (subtracting the size from the counters). And
4558  * we don't want to underflow there.
4559  */
4560  ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
4561  ReorderBufferChangeSize(change));
4562 }
4563 
4564 /*
4565  * Remove all on-disk stored for the passed in transaction.
4566  */
4567 static void
4569 {
4570  XLogSegNo first;
4571  XLogSegNo cur;
4572  XLogSegNo last;
4573 
4576 
4577  XLByteToSeg(txn->first_lsn, first, wal_segment_size);
4578  XLByteToSeg(txn->final_lsn, last, wal_segment_size);
4579 
4580  /* iterate over all possible filenames, and delete them */
4581  for (cur = first; cur <= last; cur++)
4582  {
4583  char path[MAXPGPATH];
4584 
4586  if (unlink(path) != 0 && errno != ENOENT)
4587  ereport(ERROR,
4589  errmsg("could not remove file \"%s\": %m", path)));
4590  }
4591 }
4592 
4593 /*
4594  * Remove any leftover serialized reorder buffers from a slot directory after a
4595  * prior crash or decoding session exit.
4596  */
4597 static void
4599 {
4600  DIR *spill_dir;
4601  struct dirent *spill_de;
4602  struct stat statbuf;
4603  char path[MAXPGPATH * 2 + sizeof(PG_REPLSLOT_DIR)];
4604 
4605  sprintf(path, "%s/%s", PG_REPLSLOT_DIR, slotname);
4606 
4607  /* we're only handling directories here, skip if it's not ours */
4608  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
4609  return;
4610 
4611  spill_dir = AllocateDir(path);
4612  while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
4613  {
4614  /* only look at names that can be ours */
4615  if (strncmp(spill_de->d_name, "xid", 3) == 0)
4616  {
4617  snprintf(path, sizeof(path),
4618  "%s/%s/%s", PG_REPLSLOT_DIR, slotname,
4619  spill_de->d_name);
4620 
4621  if (unlink(path) != 0)
4622  ereport(ERROR,
4624  errmsg("could not remove file \"%s\" during removal of %s/%s/xid*: %m",
4625  path, PG_REPLSLOT_DIR, slotname)));
4626  }
4627  }
4628  FreeDir(spill_dir);
4629 }
4630 
4631 /*
4632  * Given a replication slot, transaction ID and segment number, fill in the
4633  * corresponding spill file into 'path', which is a caller-owned buffer of size
4634  * at least MAXPGPATH.
4635  */
4636 static void
4638  XLogSegNo segno)
4639 {
4640  XLogRecPtr recptr;
4641 
4642  XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
4643 
4644  snprintf(path, MAXPGPATH, "%s/%s/xid-%u-lsn-%X-%X.spill",
4647  xid, LSN_FORMAT_ARGS(recptr));
4648 }
4649 
4650 /*
4651  * Delete all data spilled to disk after we've restarted/crashed. It will be
4652  * recreated when the respective slots are reused.
4653  */
4654 void
4656 {
4657  DIR *logical_dir;
4658  struct dirent *logical_de;
4659 
4660  logical_dir = AllocateDir(PG_REPLSLOT_DIR);
4661  while ((logical_de = ReadDir(logical_dir, PG_REPLSLOT_DIR)) != NULL)
4662  {
4663  if (strcmp(logical_de->d_name, ".") == 0 ||
4664  strcmp(logical_de->d_name, "..") == 0)
4665  continue;
4666 
4667  /* if it cannot be a slot, skip the directory */
4668  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
4669  continue;
4670 
4671  /*
4672  * ok, has to be a surviving logical slot, iterate and delete
4673  * everything starting with xid-*
4674  */
4676  }
4677  FreeDir(logical_dir);
4678 }
4679 
4680 /* ---------------------------------------
4681  * toast reassembly support
4682  * ---------------------------------------
4683  */
4684 
4685 /*
4686  * Initialize per tuple toast reconstruction support.
4687  */
4688 static void
4690 {
4691  HASHCTL hash_ctl;
4692 
4693  Assert(txn->toast_hash == NULL);
4694 
4695  hash_ctl.keysize = sizeof(Oid);
4696  hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
4697  hash_ctl.hcxt = rb->context;
4698  txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
4700 }
4701 
4702 /*
4703  * Per toast-chunk handling for toast reconstruction
4704  *
4705  * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
4706  * toasted Datum comes along.
4707  */
4708 static void
4710  Relation relation, ReorderBufferChange *change)
4711 {
4712  ReorderBufferToastEnt *ent;
4713  HeapTuple newtup;
4714  bool found;
4715  int32 chunksize;
4716  bool isnull;
4717  Pointer chunk;
4718  TupleDesc desc = RelationGetDescr(relation);
4719  Oid chunk_id;
4720  int32 chunk_seq;
4721 
4722  if (txn->toast_hash == NULL)
4723  ReorderBufferToastInitHash(rb, txn);
4724 
4725  Assert(IsToastRelation(relation));
4726 
4727  newtup = change->data.tp.newtuple;
4728  chunk_id = DatumGetObjectId(fastgetattr(newtup, 1, desc, &isnull));
4729  Assert(!isnull);
4730  chunk_seq = DatumGetInt32(fastgetattr(newtup, 2, desc, &isnull));
4731  Assert(!isnull);
4732 
4733  ent = (ReorderBufferToastEnt *)
4734  hash_search(txn->toast_hash, &chunk_id, HASH_ENTER, &found);
4735 
4736  if (!found)
4737  {
4738  Assert(ent->chunk_id == chunk_id);
4739  ent->num_chunks = 0;
4740  ent->last_chunk_seq = 0;
4741  ent->size = 0;
4742  ent->reconstructed = NULL;
4743  dlist_init(&ent->chunks);
4744 
4745  if (chunk_seq != 0)
4746  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
4747  chunk_seq, chunk_id);
4748  }
4749  else if (found && chunk_seq != ent->last_chunk_seq + 1)
4750  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
4751  chunk_seq, chunk_id, ent->last_chunk_seq + 1);
4752 
4753  chunk = DatumGetPointer(fastgetattr(newtup, 3, desc, &isnull));
4754  Assert(!isnull);
4755 
4756  /* calculate size so we can allocate the right size at once later */
4757  if (!VARATT_IS_EXTENDED(chunk))
4758  chunksize = VARSIZE(chunk) - VARHDRSZ;
4759  else if (VARATT_IS_SHORT(chunk))
4760  /* could happen due to heap_form_tuple doing its thing */
4761  chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
4762  else
4763  elog(ERROR, "unexpected type of toast chunk");
4764 
4765  ent->size += chunksize;
4766  ent->last_chunk_seq = chunk_seq;
4767  ent->num_chunks++;
4768  dlist_push_tail(&ent->chunks, &change->node);
4769 }
4770 
4771 /*
4772  * Rejigger change->newtuple to point to in-memory toast tuples instead of
4773  * on-disk toast tuples that may no longer exist (think DROP TABLE or VACUUM).
4774  *
4775  * We cannot replace unchanged toast tuples though, so those will still point
4776  * to on-disk toast data.
4777  *
4778  * While updating the existing change with detoasted tuple data, we need to
4779  * update the memory accounting info, because the change size will differ.
4780  * Otherwise the accounting may get out of sync, triggering serialization
4781  * at unexpected times.
4782  *
4783  * We simply subtract size of the change before rejiggering the tuple, and
4784  * then add the new size. This makes it look like the change was removed
4785  * and then added back, except it only tweaks the accounting info.
4786  *
4787  * In particular it can't trigger serialization, which would be pointless
4788  * anyway as it happens during commit processing right before handing
4789  * the change to the output plugin.
4790  */
4791 static void
4793  Relation relation, ReorderBufferChange *change)
4794 {
4795  TupleDesc desc;
4796  int natt;
4797  Datum *attrs;
4798  bool *isnull;
4799  bool *free;
4800  HeapTuple tmphtup;
4801  Relation toast_rel;
4802  TupleDesc toast_desc;
4803  MemoryContext oldcontext;
4804  HeapTuple newtup;
4805  Size old_size;
4806 
4807  /* no toast tuples changed */
4808  if (txn->toast_hash == NULL)
4809  return;
4810 
4811  /*
4812  * We're going to modify the size of the change. So, to make sure the
4813  * accounting is correct we record the current change size and then after
4814  * re-computing the change we'll subtract the recorded size and then
4815  * re-add the new change size at the end. We don't immediately subtract
4816  * the old size because if there is any error before we add the new size,
4817  * we will release the changes and that will update the accounting info
4818  * (subtracting the size from the counters). And we don't want to
4819  * underflow there.
4820  */
4821  old_size = ReorderBufferChangeSize(change);
4822 
4823  oldcontext = MemoryContextSwitchTo(rb->context);
4824 
4825  /* we should only have toast tuples in an INSERT or UPDATE */
4826  Assert(change->data.tp.newtuple);
4827 
4828  desc = RelationGetDescr(relation);
4829 
4830  toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
4831  if (!RelationIsValid(toast_rel))
4832  elog(ERROR, "could not open toast relation with OID %u (base relation \"%s\")",
4833  relation->rd_rel->reltoastrelid, RelationGetRelationName(relation));
4834 
4835  toast_desc = RelationGetDescr(toast_rel);
4836 
4837  /* should we allocate from stack instead? */
4838  attrs = palloc0(sizeof(Datum) * desc->natts);
4839  isnull = palloc0(sizeof(bool) * desc->natts);
4840  free = palloc0(sizeof(bool) * desc->natts);
4841 
4842  newtup = change->data.tp.newtuple;
4843 
4844  heap_deform_tuple(newtup, desc, attrs, isnull);
4845 
4846  for (natt = 0; natt < desc->natts; natt++)
4847  {
4848  Form_pg_attribute attr = TupleDescAttr(desc, natt);
4849  ReorderBufferToastEnt *ent;
4850  struct varlena *varlena;
4851 
4852  /* va_rawsize is the size of the original datum -- including header */
4853  struct varatt_external toast_pointer;
4854  struct varatt_indirect redirect_pointer;
4855  struct varlena *new_datum = NULL;
4856  struct varlena *reconstructed;
4857  dlist_iter it;
4858  Size data_done = 0;
4859 
4860  /* system columns aren't toasted */
4861  if (attr->attnum < 0)
4862  continue;
4863 
4864  if (attr->attisdropped)
4865  continue;
4866 
4867  /* not a varlena datatype */
4868  if (attr->attlen != -1)
4869  continue;
4870 
4871  /* no data */
4872  if (isnull[natt])
4873  continue;
4874 
4875  /* ok, we know we have a toast datum */
4876  varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
4877 
4878  /* no need to do anything if the tuple isn't external */
4880  continue;
4881 
4882  VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
4883 
4884  /*
4885  * Check whether the toast tuple changed, replace if so.
4886  */
4887  ent = (ReorderBufferToastEnt *)
4888  hash_search(txn->toast_hash,
4889  &toast_pointer.va_valueid,
4890  HASH_FIND,
4891  NULL);
4892  if (ent == NULL)
4893  continue;
4894 
4895  new_datum =
4897 
4898  free[natt] = true;
4899 
4900  reconstructed = palloc0(toast_pointer.va_rawsize);
4901 
4902  ent->reconstructed = reconstructed;
4903 
4904  /* stitch toast tuple back together from its parts */
4905  dlist_foreach(it, &ent->chunks)
4906  {
4907  bool cisnull;
4908  ReorderBufferChange *cchange;
4909  HeapTuple ctup;
4910  Pointer chunk;
4911 
4912  cchange = dlist_container(ReorderBufferChange, node, it.cur);
4913  ctup = cchange->data.tp.newtuple;
4914  chunk = DatumGetPointer(fastgetattr(ctup, 3, toast_desc, &cisnull));
4915 
4916  Assert(!cisnull);
4919 
4920  memcpy(VARDATA(reconstructed) + data_done,
4921  VARDATA(chunk),
4922  VARSIZE(chunk) - VARHDRSZ);
4923  data_done += VARSIZE(chunk) - VARHDRSZ;
4924  }
4925  Assert(data_done == VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer));
4926 
4927  /* make sure its marked as compressed or not */
4928  if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
4929  SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
4930  else
4931  SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
4932 
4933  memset(&redirect_pointer, 0, sizeof(redirect_pointer));
4934  redirect_pointer.pointer = reconstructed;
4935 
4937  memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
4938  sizeof(redirect_pointer));
4939 
4940  attrs[natt] = PointerGetDatum(new_datum);
4941  }
4942 
4943  /*
4944  * Build tuple in separate memory & copy tuple back into the tuplebuf
4945  * passed to the output plugin. We can't directly heap_fill_tuple() into
4946  * the tuplebuf because attrs[] will point back into the current content.
4947  */
4948  tmphtup = heap_form_tuple(desc, attrs, isnull);
4949  Assert(newtup->t_len <= MaxHeapTupleSize);
4950  Assert(newtup->t_data == (HeapTupleHeader) ((char *) newtup + HEAPTUPLESIZE));
4951 
4952  memcpy(newtup->t_data, tmphtup->t_data, tmphtup->t_len);
4953  newtup->t_len = tmphtup->t_len;
4954 
4955  /*
4956  * free resources we won't further need, more persistent stuff will be
4957  * free'd in ReorderBufferToastReset().
4958  */
4959  RelationClose(toast_rel);
4960  pfree(tmphtup);
4961  for (natt = 0; natt < desc->natts; natt++)
4962  {
4963  if (free[natt])
4964  pfree(DatumGetPointer(attrs[natt]));
4965  }
4966  pfree(attrs);
4967  pfree(free);
4968  pfree(isnull);
4969 
4970  MemoryContextSwitchTo(oldcontext);
4971 
4972  /* subtract the old change size */
4973  ReorderBufferChangeMemoryUpdate(rb, change, NULL, false, old_size);
4974  /* now add the change back, with the correct size */
4975  ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
4976  ReorderBufferChangeSize(change));
4977 }
4978 
4979 /*
4980  * Free all resources allocated for toast reconstruction.
4981  */
4982 static void
4984 {
4985  HASH_SEQ_STATUS hstat;
4986  ReorderBufferToastEnt *ent;
4987 
4988  if (txn->toast_hash == NULL)
4989  return;
4990 
4991  /* sequentially walk over the hash and free everything */
4992  hash_seq_init(&hstat, txn->toast_hash);
4993  while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
4994  {
4995  dlist_mutable_iter it;
4996 
4997  if (ent->reconstructed != NULL)
4998  pfree(ent->reconstructed);
4999 
5000  dlist_foreach_modify(it, &ent->chunks)
5001  {
5002  ReorderBufferChange *change =
5004 
5005  dlist_delete(&change->node);
5006  ReorderBufferReturnChange(rb, change, true);
5007  }
5008  }
5009 
5010  hash_destroy(txn->toast_hash);
5011  txn->toast_hash = NULL;
5012 }
5013 
5014 
5015 /* ---------------------------------------
5016  * Visibility support for logical decoding
5017  *
5018  *
5019  * Lookup actual cmin/cmax values when using decoding snapshot. We can't
5020  * always rely on stored cmin/cmax values because of two scenarios:
5021  *
5022  * * A tuple got changed multiple times during a single transaction and thus
5023  * has got a combo CID. Combo CIDs are only valid for the duration of a
5024  * single transaction.
5025  * * A tuple with a cmin but no cmax (and thus no combo CID) got
5026  * deleted/updated in another transaction than the one which created it
5027  * which we are looking at right now. As only one of cmin, cmax or combo CID
5028  * is actually stored in the heap we don't have access to the value we
5029  * need anymore.
5030  *
5031  * To resolve those problems we have a per-transaction hash of (cmin,
5032  * cmax) tuples keyed by (relfilelocator, ctid) which contains the actual
5033  * (cmin, cmax) values. That also takes care of combo CIDs by simply
5034  * not caring about them at all. As we have the real cmin/cmax values
5035  * combo CIDs aren't interesting.
5036  *
5037  * As we only care about catalog tuples here the overhead of this
5038  * hashtable should be acceptable.
5039  *
5040  * Heap rewrites complicate this a bit, check rewriteheap.c for
5041  * details.
5042  * -------------------------------------------------------------------------
5043  */
5044 
5045 /* struct for sorting mapping files by LSN efficiently */
5046 typedef struct RewriteMappingFile
5047 {
5051 
5052 #ifdef NOT_USED
5053 static void
5054 DisplayMapping(HTAB *tuplecid_data)
5055 {
5056  HASH_SEQ_STATUS hstat;
5058 
5059  hash_seq_init(&hstat, tuplecid_data);
5060  while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
5061  {
5062  elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
5063  ent->key.rlocator.dbOid,
5064  ent->key.rlocator.spcOid,
5065  ent->key.rlocator.relNumber,
5068  ent->cmin,
5069  ent->cmax
5070  );
5071  }
5072 }
5073 #endif
5074 
5075 /*
5076  * Apply a single mapping file to tuplecid_data.
5077  *
5078  * The mapping file has to have been verified to be a) committed b) for our
5079  * transaction c) applied in LSN order.
5080  */
5081 static void
5082 ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
5083 {
5084  char path[MAXPGPATH];
5085  int fd;
5086  int readBytes;
5088 
5089  sprintf(path, "%s/%s", PG_LOGICAL_MAPPINGS_DIR, fname);
5090  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
5091  if (fd < 0)
5092  ereport(ERROR,
5094  errmsg("could not open file \"%s\": %m", path)));
5095 
5096  while (true)
5097  {
5100  ReorderBufferTupleCidEnt *new_ent;
5101  bool found;
5102 
5103  /* be careful about padding */
5104  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
5105 
5106  /* read all mappings till the end of the file */
5107  pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ);
5108  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
5110 
5111  if (readBytes < 0)
5112  ereport(ERROR,
5114  errmsg("could not read file \"%s\": %m",
5115  path)));
5116  else if (readBytes == 0) /* EOF */
5117  break;
5118  else if (readBytes != sizeof(LogicalRewriteMappingData))
5119  ereport(ERROR,
5121  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
5122  path, readBytes,
5123  (int32) sizeof(LogicalRewriteMappingData))));
5124 
5125  key.rlocator = map.old_locator;
5126  ItemPointerCopy(&map.old_tid,
5127  &key.tid);
5128 
5129 
5130  ent = (ReorderBufferTupleCidEnt *)
5132 
5133  /* no existing mapping, no need to update */
5134  if (!ent)
5135  continue;
5136 
5137  key.rlocator = map.new_locator;
5138  ItemPointerCopy(&map.new_tid,
5139  &key.tid);
5140 
5141  new_ent = (ReorderBufferTupleCidEnt *)
5143 
5144  if (found)
5145  {
5146  /*
5147  * Make sure the existing mapping makes sense. We sometime update
5148  * old records that did not yet have a cmax (e.g. pg_class' own
5149  * entry while rewriting it) during rewrites, so allow that.
5150  */
5151  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
5152  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
5153  }
5154  else
5155  {
5156  /* update mapping */
5157  new_ent->cmin = ent->cmin;
5158  new_ent->cmax = ent->cmax;
5159  new_ent->combocid = ent->combocid;
5160  }
5161  }
5162 
5163  if (CloseTransientFile(fd) != 0)
5164  ereport(ERROR,
5166  errmsg("could not close file \"%s\": %m", path)));
5167 }
5168 
5169 
5170 /*
5171  * Check whether the TransactionId 'xid' is in the pre-sorted array 'xip'.
5172  */
5173 static bool
5175 {
5176  return bsearch(&xid, xip, num,
5177  sizeof(TransactionId), xidComparator) != NULL;
5178 }
5179 
5180 /*
5181  * list_sort() comparator for sorting RewriteMappingFiles in LSN order.
5182  */
5183 static int
5184 file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
5185 {
5188 
5189  return pg_cmp_u64(a->lsn, b->lsn);
5190 }
5191 
5192 /*
5193  * Apply any existing logical remapping files if there are any targeted at our
5194  * transaction for relid.
5195  */
5196 static void
5198 {
5199  DIR *mapping_dir;
5200  struct dirent *mapping_de;
5201  List *files = NIL;
5202  ListCell *file;
5203  Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
5204 
5205  mapping_dir = AllocateDir(PG_LOGICAL_MAPPINGS_DIR);
5206  while ((mapping_de = ReadDir(mapping_dir, PG_LOGICAL_MAPPINGS_DIR)) != NULL)
5207  {
5208  Oid f_dboid;
5209  Oid f_relid;
5210  TransactionId f_mapped_xid;
5211  TransactionId f_create_xid;
5212  XLogRecPtr f_lsn;
5213  uint32 f_hi,
5214  f_lo;
5215  RewriteMappingFile *f;
5216 
5217  if (strcmp(mapping_de->d_name, ".") == 0 ||
5218  strcmp(mapping_de->d_name, "..") == 0)
5219  continue;
5220 
5221  /* Ignore files that aren't ours */
5222  if (strncmp(mapping_de->d_name, "map-", 4) != 0)
5223  continue;
5224 
5225  if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
5226  &f_dboid, &f_relid, &f_hi, &f_lo,
5227  &f_mapped_xid, &f_create_xid) != 6)
5228  elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
5229 
5230  f_lsn = ((uint64) f_hi) << 32 | f_lo;
5231 
5232  /* mapping for another database */
5233  if (f_dboid != dboid)
5234  continue;
5235 
5236  /* mapping for another relation */
5237  if (f_relid != relid)
5238  continue;
5239 
5240  /* did the creating transaction abort? */
5241  if (!TransactionIdDidCommit(f_create_xid))
5242  continue;
5243 
5244  /* not for our transaction */
5245  if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
5246  continue;
5247 
5248  /* ok, relevant, queue for apply */
5249  f = palloc(sizeof(RewriteMappingFile));
5250  f->lsn = f_lsn;
5251  strcpy(f->fname, mapping_de->d_name);
5252  files = lappend(files, f);
5253  }
5254  FreeDir(mapping_dir);
5255 
5256  /* sort files so we apply them in LSN order */
5257  list_sort(files, file_sort_by_lsn);
5258 
5259  foreach(file, files)
5260  {
5262 
5263  elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
5264  snapshot->subxip[0]);
5266  pfree(f);
5267  }
5268 }
5269 
5270 /*
5271  * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
5272  * combo CIDs.
5273  */
5274 bool
5276  Snapshot snapshot,
5277  HeapTuple htup, Buffer buffer,
5278  CommandId *cmin, CommandId *cmax)
5279 {
5282  ForkNumber forkno;
5283  BlockNumber blockno;
5284  bool updated_mapping = false;
5285 
5286  /*
5287  * Return unresolved if tuplecid_data is not valid. That's because when
5288  * streaming in-progress transactions we may run into tuples with the CID
5289  * before actually decoding them. Think e.g. about INSERT followed by
5290  * TRUNCATE, where the TRUNCATE may not be decoded yet when applying the
5291  * INSERT. So in such cases, we assume the CID is from the future
5292  * command.
5293  */
5294  if (tuplecid_data == NULL)
5295  return false;
5296 
5297  /* be careful about padding */
5298  memset(&key, 0, sizeof(key));
5299 
5300  Assert(!BufferIsLocal(buffer));
5301 
5302  /*
5303  * get relfilelocator from the buffer, no convenient way to access it
5304  * other than that.
5305  */
5306  BufferGetTag(buffer, &key.rlocator, &forkno, &blockno);
5307 
5308  /* tuples can only be in the main fork */
5309  Assert(forkno == MAIN_FORKNUM);
5310  Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
5311 
5312  ItemPointerCopy(&htup->t_self,
5313  &key.tid);
5314 
5315 restart:
5316  ent = (ReorderBufferTupleCidEnt *)
5318 
5319  /*
5320  * failed to find a mapping, check whether the table was rewritten and
5321  * apply mapping if so, but only do that once - there can be no new
5322  * mappings while we are in here since we have to hold a lock on the
5323  * relation.
5324  */
5325  if (ent == NULL && !updated_mapping)
5326  {
5328  /* now check but don't update for a mapping again */
5329  updated_mapping = true;
5330  goto restart;
5331  }
5332  else if (ent == NULL)
5333  return false;
5334 
5335  if (cmin)
5336  *cmin = ent->cmin;
5337  if (cmax)
5338  *cmax = ent->cmax;
5339  return true;
5340 }
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:138
void binaryheap_replace_first(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:255
bh_node_type binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:177
bh_node_type binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:192
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:39
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:75
void binaryheap_add_unordered(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:116
uint32 BlockNumber
Definition: block.h:31
static int32 next
Definition: blutils.c:219
static void cleanup(void)
Definition: bootstrap.c:704
int Buffer
Definition: buf.h:23
#define BufferIsLocal(buffer)
Definition: buf.h:37
void BufferGetTag(Buffer buffer, RelFileLocator *rlocator, ForkNumber *forknum, BlockNumber *blknum)
Definition: bufmgr.c:3745
#define NameStr(name)
Definition: c.h:700
#define InvalidCommandId
Definition: c.h:623
char * Pointer
Definition: c.h:476
#define VARHDRSZ
Definition: c.h:646
#define Assert(condition)
Definition: c.h:812
#define PG_BINARY
Definition: c.h:1227
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:417
#define FirstCommandId
Definition: c.h:622
int32_t int32
Definition: c.h:481
uint64_t uint64
Definition: c.h:486
uint32_t uint32
Definition: c.h:485
uint32 CommandId
Definition: c.h:620
uint32 TransactionId
Definition: c.h:606
size_t Size
Definition: c.h:559
bool IsToastRelation(Relation relation)
Definition: catalog.c:175
bool IsSharedRelation(Oid relationId)
Definition: catalog.c:273
int64 TimestampTz
Definition: timestamp.h:39
#define INDIRECT_POINTER_SIZE
Definition: detoast.h:34
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
Definition: detoast.h:22
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:865
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1420
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1385
struct cursor * cur
Definition: ecpg.c:29
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1818
int errcode_for_file_access(void)
Definition: elog.c:876
void FlushErrorState(void)
Definition: elog.c:1867
int errmsg(const char *fmt,...)
Definition: elog.c:1070
ErrorData * CopyErrorData(void)
Definition: elog.c:1746
#define PG_RE_THROW()
Definition: elog.h:412
#define DEBUG3
Definition: elog.h:28
#define PG_TRY(...)
Definition: elog.h:371
#define DEBUG2
Definition: elog.h:29
#define PG_END_TRY(...)
Definition: elog.h:396
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define PG_CATCH(...)
Definition: elog.h:381
#define elog(elevel,...)
Definition: elog.h:225
#define INFO
Definition: elog.h:34
#define ereport(elevel,...)
Definition: elog.h:149
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2931
int FreeDir(DIR *dir)
Definition: fd.c:2983
int CloseTransientFile(int fd)
Definition: fd.c:2831
void FileClose(File file)
Definition: fd.c:1977
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1574
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2946
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2655
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2865
static ssize_t FileRead(File file, void *buffer, size_t amount, off_t offset, uint32 wait_event_info)
Definition: fd.h:196
int File
Definition: fd.h:51
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: generation.c:160
Oid MyDatabaseId
Definition: globals.c:93
uint64 chunk
#define free(a)
Definition: header.h:65
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1116
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:1345
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_REMOVE
Definition: hsearch.h:115
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
#define HEAPTUPLESIZE
Definition: htup.h:73
HeapTupleData * HeapTuple
Definition: htup.h:71
struct HeapTupleData HeapTupleData
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
#define SizeofHeapTupleHeader
Definition: htup_details.h:185
#define MaxHeapTupleSize
Definition: htup_details.h:558
static Datum fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull)
Definition: htup_details.h:749
#define dlist_foreach(iter, lhead)
Definition: ilist.h:623
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
#define dclist_container(type, membername, ptr)
Definition: ilist.h:947
static bool dlist_has_next(const dlist_head *head, const dlist_node *node)
Definition: ilist.h:503
static void dclist_push_tail(dclist_head *head, dlist_node *node)
Definition: ilist.h:709
static void dlist_insert_before(dlist_node *before, dlist_node *node)
Definition: ilist.h:393
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:603
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
static uint32 dclist_count(const dclist_head *head)
Definition: ilist.h:932
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:537
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:450
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:640
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:364
static void dclist_delete_from(dclist_head *head, dlist_node *node)
Definition: ilist.h:763
static void dclist_init(dclist_head *head)
Definition: ilist.h:671
#define dlist_container(type, membername, ptr)
Definition: ilist.h:593
#define dclist_foreach(iter, lhead)
Definition: ilist.h:970
static int pg_cmp_u64(uint64 a, uint64 b)
Definition: int.h:616
#define write(a, b, c)
Definition: win32.h:14
#define read(a, b, c)
Definition: win32.h:13
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:762
int b
Definition: isn.c:69
int a
Definition: isn.c:68
int i
Definition: isn.c:72
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:76
static OffsetNumber ItemPointerGetOffsetNumber(const ItemPointerData *pointer)
Definition: itemptr.h:124
static BlockNumber ItemPointerGetBlockNumber(const ItemPointerData *pointer)
Definition: itemptr.h:103
static void ItemPointerCopy(const ItemPointerData *fromPointer, ItemPointerData *toPointer)
Definition: itemptr.h:172
void list_sort(List *list, list_sort_comparator cmp)
Definition: list.c:1674
List * lappend(List *list, void *datum)
Definition: list.c:339
void UpdateDecodingStats(LogicalDecodingContext *ctx)
Definition: logical.c:1934
char * pstrdup(const char *in)
Definition: mcxt.c:1696
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc0(Size size)
Definition: mcxt.c:1347
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1215
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1541
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1181
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
void * palloc(Size size)
Definition: mcxt.c:1317
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:189
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:170
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:112
pairingheap_node * pairingheap_first(pairingheap *heap)
Definition: pairingheap.c:130
pairingheap * pairingheap_allocate(pairingheap_comparator compare, void *arg)
Definition: pairingheap.c:42
#define pairingheap_container(type, membername, ptr)
Definition: pairingheap.h:43
#define pairingheap_const_container(type, membername, ptr)
Definition: pairingheap.h:51
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:209
void * arg
#define MAXPGPATH
const void * data
#define lfirst(lc)
Definition: pg_list.h:172
#define NIL
Definition: pg_list.h:68