PostgreSQL Source Code  git master
reorderbuffer.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * reorderbuffer.c
4  * PostgreSQL logical replay/reorder buffer management
5  *
6  *
7  * Copyright (c) 2012-2020, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/replication/reorderbuffer.c
12  *
13  * NOTES
14  * This module gets handed individual pieces of transactions in the order
15  * they are written to the WAL and is responsible to reassemble them into
16  * toplevel transaction sized pieces. When a transaction is completely
17  * reassembled - signaled by reading the transaction commit record - it
18  * will then call the output plugin (cf. ReorderBufferCommit()) with the
19  * individual changes. The output plugins rely on snapshots built by
20  * snapbuild.c which hands them to us.
21  *
22  * Transactions and subtransactions/savepoints in postgres are not
23  * immediately linked to each other from outside the performing
24  * backend. Only at commit/abort (or special xact_assignment records) they
25  * are linked together. Which means that we will have to splice together a
26  * toplevel transaction from its subtransactions. To do that efficiently we
27  * build a binary heap indexed by the smallest current lsn of the individual
28  * subtransactions' changestreams. As the individual streams are inherently
29  * ordered by LSN - since that is where we build them from - the transaction
30  * can easily be reassembled by always using the subtransaction with the
31  * smallest current LSN from the heap.
32  *
33  * In order to cope with large transactions - which can be several times as
34  * big as the available memory - this module supports spooling the contents
35  * of a large transactions to disk. When the transaction is replayed the
36  * contents of individual (sub-)transactions will be read from disk in
37  * chunks.
38  *
39  * This module also has to deal with reassembling toast records from the
40  * individual chunks stored in WAL. When a new (or initial) version of a
41  * tuple is stored in WAL it will always be preceded by the toast chunks
42  * emitted for the columns stored out of line. Within a single toplevel
43  * transaction there will be no other data carrying records between a row's
44  * toast chunks and the row data itself. See ReorderBufferToast* for
45  * details.
46  *
47  * ReorderBuffer uses two special memory context types - SlabContext for
48  * allocations of fixed-length structures (changes and transactions), and
49  * GenerationContext for the variable-length transaction data (allocated
50  * and freed in groups with similar lifespans).
51  *
52  * To limit the amount of memory used by decoded changes, we track memory
53  * used at the reorder buffer level (i.e. total amount of memory), and for
54  * each transaction. When the total amount of used memory exceeds the
55  * limit, the transaction consuming the most memory is then serialized to
56  * disk.
57  *
58  * Only decoded changes are evicted from memory (spilled to disk), not the
59  * transaction records. The number of toplevel transactions is limited,
60  * but a transaction with many subtransactions may still consume significant
61  * amounts of memory. However, the transaction records are fairly small and
62  * are not included in the memory limit.
63  *
64  * The current eviction algorithm is very simple - the transaction is
65  * picked merely by size, while it might be useful to also consider age
66  * (LSN) of the changes for example. With the new Generational memory
67  * allocator, evicting the oldest changes would make it more likely the
68  * memory gets actually freed.
69  *
70  * We still rely on max_changes_in_memory when loading serialized changes
71  * back into memory. At that point we can't use the memory limit directly
72  * as we load the subxacts independently. One option to deal with this
73  * would be to count the subxacts, and allow each to allocate 1/N of the
74  * memory limit. That however does not seem very appealing, because with
75  * many subtransactions it may easily cause thrashing (short cycles of
76  * deserializing and applying very few changes). We probably should give
77  * a bit more memory to the oldest subtransactions, because it's likely
78  * they are the source for the next sequence of changes.
79  *
80  * -------------------------------------------------------------------------
81  */
82 #include "postgres.h"
83 
84 #include <unistd.h>
85 #include <sys/stat.h>
86 
87 #include "access/detoast.h"
88 #include "access/heapam.h"
89 #include "access/rewriteheap.h"
90 #include "access/transam.h"
91 #include "access/xact.h"
92 #include "access/xlog_internal.h"
93 #include "catalog/catalog.h"
94 #include "lib/binaryheap.h"
95 #include "miscadmin.h"
96 #include "pgstat.h"
97 #include "replication/logical.h"
99 #include "replication/slot.h"
100 #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
101 #include "storage/bufmgr.h"
102 #include "storage/fd.h"
103 #include "storage/sinval.h"
104 #include "utils/builtins.h"
105 #include "utils/combocid.h"
106 #include "utils/memdebug.h"
107 #include "utils/memutils.h"
108 #include "utils/rel.h"
109 #include "utils/relfilenodemap.h"
110 
111 
112 /* entry for a hash table we use to map from xid to our transaction state */
114 {
118 
119 /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
121 {
125 
127 {
131  CommandId combocid; /* just for debugging */
133 
134 /* Virtual file descriptor with file offset tracking */
135 typedef struct TXNEntryFile
136 {
137  File vfd; /* -1 when the file is closed */
138  off_t curOffset; /* offset for next write or read. Reset to 0
139  * when vfd is opened. */
140 } TXNEntryFile;
141 
142 /* k-way in-order change iteration support structures */
144 {
151 
153 {
159 
160 /* toast datastructures */
161 typedef struct ReorderBufferToastEnt
162 {
163  Oid chunk_id; /* toast_table.chunk_id */
164  int32 last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
165  * have seen */
166  Size num_chunks; /* number of chunks we've already seen */
167  Size size; /* combined size of chunks seen */
168  dlist_head chunks; /* linked list of chunks */
169  struct varlena *reconstructed; /* reconstructed varlena now pointed to in
170  * main tup */
172 
173 /* Disk serialization support datastructures */
175 {
178  /* data follows */
180 
181 #define IsSpecInsert(action) \
182 ( \
183  ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \
184 )
185 #define IsSpecConfirm(action) \
186 ( \
187  ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) \
188 )
189 #define IsInsertOrUpdate(action) \
190 ( \
191  (((action) == REORDER_BUFFER_CHANGE_INSERT) || \
192  ((action) == REORDER_BUFFER_CHANGE_UPDATE) || \
193  ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT)) \
194 )
195 
196 /*
197  * Maximum number of changes kept in memory, per transaction. After that,
198  * changes are spooled to disk.
199  *
200  * The current value should be sufficient to decode the entire transaction
201  * without hitting disk in OLTP workloads, while starting to spool to disk in
202  * other workloads reasonably fast.
203  *
204  * At some point in the future it probably makes sense to have a more elaborate
205  * resource management here, but it's not entirely clear what that would look
206  * like.
207  */
209 static const Size max_changes_in_memory = 4096; /* XXX for restore only */
210 
211 /* ---------------------------------------
212  * primary reorderbuffer support routines
213  * ---------------------------------------
214  */
218  TransactionId xid, bool create, bool *is_new,
219  XLogRecPtr lsn, bool create_as_top);
221  ReorderBufferTXN *subtxn);
222 
223 static void AssertTXNLsnOrder(ReorderBuffer *rb);
224 
225 /* ---------------------------------------
226  * support functions for lsn-order iterating over the ->changes of a
227  * transaction and its subtransactions
228  *
229  * used for iteration over the k-way heap merge of a transaction and its
230  * subtransactions
231  * ---------------------------------------
232  */
234  ReorderBufferIterTXNState *volatile *iter_state);
239 
240 /*
241  * ---------------------------------------
242  * Disk serialization support functions
243  * ---------------------------------------
244  */
248  int fd, ReorderBufferChange *change);
250  TXNEntryFile *file, XLogSegNo *segno);
252  char *change);
255 static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
256 static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
257  TransactionId xid, XLogSegNo segno);
258 
259 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
262 
263 /*
264  * ---------------------------------------
265  * Streaming support functions
266  * ---------------------------------------
267  */
268 static inline bool ReorderBufferCanStream(ReorderBuffer *rb);
269 static inline bool ReorderBufferCanStartStreaming(ReorderBuffer *rb);
272 
273 /* ---------------------------------------
274  * toast reassembly support
275  * ---------------------------------------
276  */
280  Relation relation, ReorderBufferChange *change);
282  Relation relation, ReorderBufferChange *change);
283 
284 /*
285  * ---------------------------------------
286  * memory accounting
287  * ---------------------------------------
288  */
291  ReorderBufferChange *change, bool addition);
292 
293 /*
294  * Allocate a new ReorderBuffer and clean out any old serialized state from
295  * prior ReorderBuffer instances for the same slot.
296  */
299 {
300  ReorderBuffer *buffer;
301  HASHCTL hash_ctl;
302  MemoryContext new_ctx;
303 
304  Assert(MyReplicationSlot != NULL);
305 
306  /* allocate memory in own context, to have better accountability */
308  "ReorderBuffer",
310 
311  buffer =
312  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
313 
314  memset(&hash_ctl, 0, sizeof(hash_ctl));
315 
316  buffer->context = new_ctx;
317 
318  buffer->change_context = SlabContextCreate(new_ctx,
319  "Change",
321  sizeof(ReorderBufferChange));
322 
323  buffer->txn_context = SlabContextCreate(new_ctx,
324  "TXN",
326  sizeof(ReorderBufferTXN));
327 
328  buffer->tup_context = GenerationContextCreate(new_ctx,
329  "Tuples",
331 
332  hash_ctl.keysize = sizeof(TransactionId);
333  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
334  hash_ctl.hcxt = buffer->context;
335 
336  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
338 
340  buffer->by_txn_last_txn = NULL;
341 
342  buffer->outbuf = NULL;
343  buffer->outbufsize = 0;
344  buffer->size = 0;
345 
346  buffer->spillTxns = 0;
347  buffer->spillCount = 0;
348  buffer->spillBytes = 0;
349 
351 
352  dlist_init(&buffer->toplevel_by_lsn);
354 
355  /*
356  * Ensure there's no stale data from prior uses of this slot, in case some
357  * prior exit avoided calling ReorderBufferFree. Failure to do this can
358  * produce duplicated txns, and it's very cheap if there's nothing there.
359  */
361 
362  return buffer;
363 }
364 
365 /*
366  * Free a ReorderBuffer
367  */
368 void
370 {
371  MemoryContext context = rb->context;
372 
373  /*
374  * We free separately allocated data by entirely scrapping reorderbuffer's
375  * memory context.
376  */
377  MemoryContextDelete(context);
378 
379  /* Free disk space used by unconsumed reorder buffers */
381 }
382 
383 /*
384  * Get an unused, possibly preallocated, ReorderBufferTXN.
385  */
386 static ReorderBufferTXN *
388 {
390 
391  txn = (ReorderBufferTXN *)
393 
394  memset(txn, 0, sizeof(ReorderBufferTXN));
395 
396  dlist_init(&txn->changes);
397  dlist_init(&txn->tuplecids);
398  dlist_init(&txn->subtxns);
399 
400  /* InvalidCommandId is not zero, so set it explicitly */
402 
403  return txn;
404 }
405 
406 /*
407  * Free a ReorderBufferTXN.
408  */
409 static void
411 {
412  /* clean the lookup cache if we were cached (quite likely) */
413  if (rb->by_txn_last_xid == txn->xid)
414  {
416  rb->by_txn_last_txn = NULL;
417  }
418 
419  /* free data that's contained */
420 
421  if (txn->tuplecid_hash != NULL)
422  {
424  txn->tuplecid_hash = NULL;
425  }
426 
427  if (txn->invalidations)
428  {
429  pfree(txn->invalidations);
430  txn->invalidations = NULL;
431  }
432 
433  pfree(txn);
434 }
435 
436 /*
437  * Get an fresh ReorderBufferChange.
438  */
441 {
442  ReorderBufferChange *change;
443 
444  change = (ReorderBufferChange *)
446 
447  memset(change, 0, sizeof(ReorderBufferChange));
448  return change;
449 }
450 
451 /*
452  * Free a ReorderBufferChange and update memory accounting, if requested.
453  */
454 void
456  bool upd_mem)
457 {
458  /* update memory accounting info */
459  if (upd_mem)
460  ReorderBufferChangeMemoryUpdate(rb, change, false);
461 
462  /* free contained data */
463  switch (change->action)
464  {
469  if (change->data.tp.newtuple)
470  {
471  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
472  change->data.tp.newtuple = NULL;
473  }
474 
475  if (change->data.tp.oldtuple)
476  {
477  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
478  change->data.tp.oldtuple = NULL;
479  }
480  break;
482  if (change->data.msg.prefix != NULL)
483  pfree(change->data.msg.prefix);
484  change->data.msg.prefix = NULL;
485  if (change->data.msg.message != NULL)
486  pfree(change->data.msg.message);
487  change->data.msg.message = NULL;
488  break;
490  if (change->data.inval.invalidations)
491  pfree(change->data.inval.invalidations);
492  change->data.inval.invalidations = NULL;
493  break;
495  if (change->data.snapshot)
496  {
497  ReorderBufferFreeSnap(rb, change->data.snapshot);
498  change->data.snapshot = NULL;
499  }
500  break;
501  /* no data in addition to the struct itself */
503  if (change->data.truncate.relids != NULL)
504  {
505  ReorderBufferReturnRelids(rb, change->data.truncate.relids);
506  change->data.truncate.relids = NULL;
507  }
508  break;
512  break;
513  }
514 
515  pfree(change);
516 }
517 
518 /*
519  * Get a fresh ReorderBufferTupleBuf fitting at least a tuple of size
520  * tuple_len (excluding header overhead).
521  */
524 {
525  ReorderBufferTupleBuf *tuple;
526  Size alloc_len;
527 
528  alloc_len = tuple_len + SizeofHeapTupleHeader;
529 
530  tuple = (ReorderBufferTupleBuf *)
532  sizeof(ReorderBufferTupleBuf) +
533  MAXIMUM_ALIGNOF + alloc_len);
534  tuple->alloc_tuple_size = alloc_len;
535  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
536 
537  return tuple;
538 }
539 
540 /*
541  * Free an ReorderBufferTupleBuf.
542  */
543 void
545 {
546  pfree(tuple);
547 }
548 
549 /*
550  * Get an array for relids of truncated relations.
551  *
552  * We use the global memory context (for the whole reorder buffer), because
553  * none of the existing ones seems like a good match (some are SLAB, so we
554  * can't use those, and tup_context is meant for tuple data, not relids). We
555  * could add yet another context, but it seems like an overkill - TRUNCATE is
556  * not particularly common operation, so it does not seem worth it.
557  */
558 Oid *
560 {
561  Oid *relids;
562  Size alloc_len;
563 
564  alloc_len = sizeof(Oid) * nrelids;
565 
566  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
567 
568  return relids;
569 }
570 
571 /*
572  * Free an array of relids.
573  */
574 void
576 {
577  pfree(relids);
578 }
579 
580 /*
581  * Return the ReorderBufferTXN from the given buffer, specified by Xid.
582  * If create is true, and a transaction doesn't already exist, create it
583  * (with the given LSN, and as top transaction if that's specified);
584  * when this happens, is_new is set to true.
585  */
586 static ReorderBufferTXN *
588  bool *is_new, XLogRecPtr lsn, bool create_as_top)
589 {
592  bool found;
593 
595 
596  /*
597  * Check the one-entry lookup cache first
598  */
600  rb->by_txn_last_xid == xid)
601  {
602  txn = rb->by_txn_last_txn;
603 
604  if (txn != NULL)
605  {
606  /* found it, and it's valid */
607  if (is_new)
608  *is_new = false;
609  return txn;
610  }
611 
612  /*
613  * cached as non-existent, and asked not to create? Then nothing else
614  * to do.
615  */
616  if (!create)
617  return NULL;
618  /* otherwise fall through to create it */
619  }
620 
621  /*
622  * If the cache wasn't hit or it yielded an "does-not-exist" and we want
623  * to create an entry.
624  */
625 
626  /* search the lookup table */
627  ent = (ReorderBufferTXNByIdEnt *)
628  hash_search(rb->by_txn,
629  (void *) &xid,
630  create ? HASH_ENTER : HASH_FIND,
631  &found);
632  if (found)
633  txn = ent->txn;
634  else if (create)
635  {
636  /* initialize the new entry, if creation was requested */
637  Assert(ent != NULL);
638  Assert(lsn != InvalidXLogRecPtr);
639 
640  ent->txn = ReorderBufferGetTXN(rb);
641  ent->txn->xid = xid;
642  txn = ent->txn;
643  txn->first_lsn = lsn;
645 
646  if (create_as_top)
647  {
648  dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
649  AssertTXNLsnOrder(rb);
650  }
651  }
652  else
653  txn = NULL; /* not found and not asked to create */
654 
655  /* update cache */
656  rb->by_txn_last_xid = xid;
657  rb->by_txn_last_txn = txn;
658 
659  if (is_new)
660  *is_new = !found;
661 
662  Assert(!create || txn != NULL);
663  return txn;
664 }
665 
666 /*
667  * Record the partial change for the streaming of in-progress transactions. We
668  * can stream only complete changes so if we have a partial change like toast
669  * table insert or speculative insert then we mark such a 'txn' so that it
670  * can't be streamed. We also ensure that if the changes in such a 'txn' are
671  * above logical_decoding_work_mem threshold then we stream them as soon as we
672  * have a complete change.
673  */
674 static void
676  ReorderBufferChange *change,
677  bool toast_insert)
678 {
679  ReorderBufferTXN *toptxn;
680 
681  /*
682  * The partial changes need to be processed only while streaming
683  * in-progress transactions.
684  */
685  if (!ReorderBufferCanStream(rb))
686  return;
687 
688  /* Get the top transaction. */
689  if (txn->toptxn != NULL)
690  toptxn = txn->toptxn;
691  else
692  toptxn = txn;
693 
694  /*
695  * Set the toast insert bit whenever we get toast insert to indicate a
696  * partial change and clear it when we get the insert or update on main
697  * table (Both update and insert will do the insert in the toast table).
698  */
699  if (toast_insert)
701  else if (rbtxn_has_toast_insert(toptxn) &&
702  IsInsertOrUpdate(change->action))
703  toptxn->txn_flags &= ~RBTXN_HAS_TOAST_INSERT;
704 
705  /*
706  * Set the spec insert bit whenever we get the speculative insert to
707  * indicate the partial change and clear the same on speculative confirm.
708  */
709  if (IsSpecInsert(change->action))
710  toptxn->txn_flags |= RBTXN_HAS_SPEC_INSERT;
711  else if (IsSpecConfirm(change->action))
712  {
713  /*
714  * Speculative confirm change must be preceded by speculative
715  * insertion.
716  */
717  Assert(rbtxn_has_spec_insert(toptxn));
718  toptxn->txn_flags &= ~RBTXN_HAS_SPEC_INSERT;
719  }
720 
721  /*
722  * Stream the transaction if it is serialized before and the changes are
723  * now complete in the top-level transaction.
724  *
725  * The reason for doing the streaming of such a transaction as soon as we
726  * get the complete change for it is that previously it would have reached
727  * the memory threshold and wouldn't get streamed because of incomplete
728  * changes. Delaying such transactions would increase apply lag for them.
729  */
731  !(rbtxn_has_incomplete_tuple(toptxn)) &&
732  rbtxn_is_serialized(txn))
733  ReorderBufferStreamTXN(rb, toptxn);
734 }
735 
736 /*
737  * Queue a change into a transaction so it can be replayed upon commit or will be
738  * streamed when we reach logical_decoding_work_mem threshold.
739  */
740 void
742  ReorderBufferChange *change, bool toast_insert)
743 {
745 
746  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
747 
748  /*
749  * While streaming the previous changes we have detected that the
750  * transaction is aborted. So there is no point in collecting further
751  * changes for it.
752  */
753  if (txn->concurrent_abort)
754  {
755  /*
756  * We don't need to update memory accounting for this change as we
757  * have not added it to the queue yet.
758  */
759  ReorderBufferReturnChange(rb, change, false);
760  return;
761  }
762 
763  change->lsn = lsn;
764  change->txn = txn;
765 
766  Assert(InvalidXLogRecPtr != lsn);
767  dlist_push_tail(&txn->changes, &change->node);
768  txn->nentries++;
769  txn->nentries_mem++;
770 
771  /* update memory accounting information */
772  ReorderBufferChangeMemoryUpdate(rb, change, true);
773 
774  /* process partial change */
775  ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
776 
777  /* check the memory limits and evict something if needed */
779 }
780 
781 /*
782  * Queue message into a transaction so it can be processed upon commit.
783  */
784 void
786  Snapshot snapshot, XLogRecPtr lsn,
787  bool transactional, const char *prefix,
788  Size message_size, const char *message)
789 {
790  if (transactional)
791  {
792  MemoryContext oldcontext;
793  ReorderBufferChange *change;
794 
796 
797  oldcontext = MemoryContextSwitchTo(rb->context);
798 
799  change = ReorderBufferGetChange(rb);
801  change->data.msg.prefix = pstrdup(prefix);
802  change->data.msg.message_size = message_size;
803  change->data.msg.message = palloc(message_size);
804  memcpy(change->data.msg.message, message, message_size);
805 
806  ReorderBufferQueueChange(rb, xid, lsn, change, false);
807 
808  MemoryContextSwitchTo(oldcontext);
809  }
810  else
811  {
812  ReorderBufferTXN *txn = NULL;
813  volatile Snapshot snapshot_now = snapshot;
814 
815  if (xid != InvalidTransactionId)
816  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
817 
818  /* setup snapshot to allow catalog access */
819  SetupHistoricSnapshot(snapshot_now, NULL);
820  PG_TRY();
821  {
822  rb->message(rb, txn, lsn, false, prefix, message_size, message);
823 
825  }
826  PG_CATCH();
827  {
829  PG_RE_THROW();
830  }
831  PG_END_TRY();
832  }
833 }
834 
835 /*
836  * AssertTXNLsnOrder
837  * Verify LSN ordering of transaction lists in the reorderbuffer
838  *
839  * Other LSN-related invariants are checked too.
840  *
841  * No-op if assertions are not in use.
842  */
843 static void
845 {
846 #ifdef USE_ASSERT_CHECKING
847  dlist_iter iter;
848  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
849  XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
850 
851  dlist_foreach(iter, &rb->toplevel_by_lsn)
852  {
854  iter.cur);
855 
856  /* start LSN must be set */
857  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
858 
859  /* If there is an end LSN, it must be higher than start LSN */
860  if (cur_txn->end_lsn != InvalidXLogRecPtr)
861  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
862 
863  /* Current initial LSN must be strictly higher than previous */
864  if (prev_first_lsn != InvalidXLogRecPtr)
865  Assert(prev_first_lsn < cur_txn->first_lsn);
866 
867  /* known-as-subtxn txns must not be listed */
868  Assert(!rbtxn_is_known_subxact(cur_txn));
869 
870  prev_first_lsn = cur_txn->first_lsn;
871  }
872 
874  {
876  base_snapshot_node,
877  iter.cur);
878 
879  /* base snapshot (and its LSN) must be set */
880  Assert(cur_txn->base_snapshot != NULL);
882 
883  /* current LSN must be strictly higher than previous */
884  if (prev_base_snap_lsn != InvalidXLogRecPtr)
885  Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
886 
887  /* known-as-subtxn txns must not be listed */
888  Assert(!rbtxn_is_known_subxact(cur_txn));
889 
890  prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
891  }
892 #endif
893 }
894 
895 /*
896  * AssertChangeLsnOrder
897  *
898  * Check ordering of changes in the (sub)transaction.
899  */
900 static void
902 {
903 #ifdef USE_ASSERT_CHECKING
904  dlist_iter iter;
905  XLogRecPtr prev_lsn = txn->first_lsn;
906 
907  dlist_foreach(iter, &txn->changes)
908  {
909  ReorderBufferChange *cur_change;
910 
911  cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
912 
914  Assert(cur_change->lsn != InvalidXLogRecPtr);
915  Assert(txn->first_lsn <= cur_change->lsn);
916 
917  if (txn->end_lsn != InvalidXLogRecPtr)
918  Assert(cur_change->lsn <= txn->end_lsn);
919 
920  Assert(prev_lsn <= cur_change->lsn);
921 
922  prev_lsn = cur_change->lsn;
923  }
924 #endif
925 }
926 
927 /*
928  * ReorderBufferGetOldestTXN
929  * Return oldest transaction in reorderbuffer
930  */
933 {
935 
936  AssertTXNLsnOrder(rb);
937 
939  return NULL;
940 
942 
945  return txn;
946 }
947 
948 /*
949  * ReorderBufferGetOldestXmin
950  * Return oldest Xmin in reorderbuffer
951  *
952  * Returns oldest possibly running Xid from the point of view of snapshots
953  * used in the transactions kept by reorderbuffer, or InvalidTransactionId if
954  * there are none.
955  *
956  * Since snapshots are assigned monotonically, this equals the Xmin of the
957  * base snapshot with minimal base_snapshot_lsn.
958  */
961 {
963 
964  AssertTXNLsnOrder(rb);
965 
967  return InvalidTransactionId;
968 
969  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
971  return txn->base_snapshot->xmin;
972 }
973 
974 void
976 {
978 }
979 
980 /*
981  * ReorderBufferAssignChild
982  *
983  * Make note that we know that subxid is a subtransaction of xid, seen as of
984  * the given lsn.
985  */
986 void
988  TransactionId subxid, XLogRecPtr lsn)
989 {
991  ReorderBufferTXN *subtxn;
992  bool new_top;
993  bool new_sub;
994 
995  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
996  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
997 
998  if (!new_sub)
999  {
1000  if (rbtxn_is_known_subxact(subtxn))
1001  {
1002  /* already associated, nothing to do */
1003  return;
1004  }
1005  else
1006  {
1007  /*
1008  * We already saw this transaction, but initially added it to the
1009  * list of top-level txns. Now that we know it's not top-level,
1010  * remove it from there.
1011  */
1012  dlist_delete(&subtxn->node);
1013  }
1014  }
1015 
1016  subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1017  subtxn->toplevel_xid = xid;
1018  Assert(subtxn->nsubtxns == 0);
1019 
1020  /* set the reference to top-level transaction */
1021  subtxn->toptxn = txn;
1022 
1023  /* add to subtransaction list */
1024  dlist_push_tail(&txn->subtxns, &subtxn->node);
1025  txn->nsubtxns++;
1026 
1027  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1028  ReorderBufferTransferSnapToParent(txn, subtxn);
1029 
1030  /* Verify LSN-ordering invariant */
1031  AssertTXNLsnOrder(rb);
1032 }
1033 
1034 /*
1035  * ReorderBufferTransferSnapToParent
1036  * Transfer base snapshot from subtxn to top-level txn, if needed
1037  *
1038  * This is done if the top-level txn doesn't have a base snapshot, or if the
1039  * subtxn's base snapshot has an earlier LSN than the top-level txn's base
1040  * snapshot's LSN. This can happen if there are no changes in the toplevel
1041  * txn but there are some in the subtxn, or the first change in subtxn has
1042  * earlier LSN than first change in the top-level txn and we learned about
1043  * their kinship only now.
1044  *
1045  * The subtransaction's snapshot is cleared regardless of the transfer
1046  * happening, since it's not needed anymore in either case.
1047  *
1048  * We do this as soon as we become aware of their kinship, to avoid queueing
1049  * extra snapshots to txns known-as-subtxns -- only top-level txns will
1050  * receive further snapshots.
1051  */
1052 static void
1054  ReorderBufferTXN *subtxn)
1055 {
1056  Assert(subtxn->toplevel_xid == txn->xid);
1057 
1058  if (subtxn->base_snapshot != NULL)
1059  {
1060  if (txn->base_snapshot == NULL ||
1061  subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
1062  {
1063  /*
1064  * If the toplevel transaction already has a base snapshot but
1065  * it's newer than the subxact's, purge it.
1066  */
1067  if (txn->base_snapshot != NULL)
1068  {
1071  }
1072 
1073  /*
1074  * The snapshot is now the top transaction's; transfer it, and
1075  * adjust the list position of the top transaction in the list by
1076  * moving it to where the subtransaction is.
1077  */
1078  txn->base_snapshot = subtxn->base_snapshot;
1079  txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
1081  &txn->base_snapshot_node);
1082 
1083  /*
1084  * The subtransaction doesn't have a snapshot anymore (so it
1085  * mustn't be in the list.)
1086  */
1087  subtxn->base_snapshot = NULL;
1089  dlist_delete(&subtxn->base_snapshot_node);
1090  }
1091  else
1092  {
1093  /* Base snap of toplevel is fine, so subxact's is not needed */
1095  dlist_delete(&subtxn->base_snapshot_node);
1096  subtxn->base_snapshot = NULL;
1098  }
1099  }
1100 }
1101 
1102 /*
1103  * Associate a subtransaction with its toplevel transaction at commit
1104  * time. There may be no further changes added after this.
1105  */
1106 void
1108  TransactionId subxid, XLogRecPtr commit_lsn,
1109  XLogRecPtr end_lsn)
1110 {
1111  ReorderBufferTXN *subtxn;
1112 
1113  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1114  InvalidXLogRecPtr, false);
1115 
1116  /*
1117  * No need to do anything if that subtxn didn't contain any changes
1118  */
1119  if (!subtxn)
1120  return;
1121 
1122  subtxn->final_lsn = commit_lsn;
1123  subtxn->end_lsn = end_lsn;
1124 
1125  /*
1126  * Assign this subxact as a child of the toplevel xact (no-op if already
1127  * done.)
1128  */
1129  ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
1130 }
1131 
1132 
1133 /*
1134  * Support for efficiently iterating over a transaction's and its
1135  * subtransactions' changes.
1136  *
1137  * We do by doing a k-way merge between transactions/subtransactions. For that
1138  * we model the current heads of the different transactions as a binary heap
1139  * so we easily know which (sub-)transaction has the change with the smallest
1140  * lsn next.
1141  *
1142  * We assume the changes in individual transactions are already sorted by LSN.
1143  */
1144 
1145 /*
1146  * Binary heap comparison function.
1147  */
1148 static int
1150 {
1152  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
1153  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
1154 
1155  if (pos_a < pos_b)
1156  return 1;
1157  else if (pos_a == pos_b)
1158  return 0;
1159  return -1;
1160 }
1161 
1162 /*
1163  * Allocate & initialize an iterator which iterates in lsn order over a
1164  * transaction and all its subtransactions.
1165  *
1166  * Note: The iterator state is returned through iter_state parameter rather
1167  * than the function's return value. This is because the state gets cleaned up
1168  * in a PG_CATCH block in the caller, so we want to make sure the caller gets
1169  * back the state even if this function throws an exception.
1170  */
1171 static void
1173  ReorderBufferIterTXNState *volatile *iter_state)
1174 {
1175  Size nr_txns = 0;
1177  dlist_iter cur_txn_i;
1178  int32 off;
1179 
1180  *iter_state = NULL;
1181 
1182  /* Check ordering of changes in the toplevel transaction. */
1183  AssertChangeLsnOrder(txn);
1184 
1185  /*
1186  * Calculate the size of our heap: one element for every transaction that
1187  * contains changes. (Besides the transactions already in the reorder
1188  * buffer, we count the one we were directly passed.)
1189  */
1190  if (txn->nentries > 0)
1191  nr_txns++;
1192 
1193  dlist_foreach(cur_txn_i, &txn->subtxns)
1194  {
1195  ReorderBufferTXN *cur_txn;
1196 
1197  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1198 
1199  /* Check ordering of changes in this subtransaction. */
1200  AssertChangeLsnOrder(cur_txn);
1201 
1202  if (cur_txn->nentries > 0)
1203  nr_txns++;
1204  }
1205 
1206  /* allocate iteration state */
1207  state = (ReorderBufferIterTXNState *)
1209  sizeof(ReorderBufferIterTXNState) +
1210  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1211 
1212  state->nr_txns = nr_txns;
1213  dlist_init(&state->old_change);
1214 
1215  for (off = 0; off < state->nr_txns; off++)
1216  {
1217  state->entries[off].file.vfd = -1;
1218  state->entries[off].segno = 0;
1219  }
1220 
1221  /* allocate heap */
1222  state->heap = binaryheap_allocate(state->nr_txns,
1224  state);
1225 
1226  /* Now that the state fields are initialized, it is safe to return it. */
1227  *iter_state = state;
1228 
1229  /*
1230  * Now insert items into the binary heap, in an unordered fashion. (We
1231  * will run a heap assembly step at the end; this is more efficient.)
1232  */
1233 
1234  off = 0;
1235 
1236  /* add toplevel transaction if it contains changes */
1237  if (txn->nentries > 0)
1238  {
1239  ReorderBufferChange *cur_change;
1240 
1241  if (rbtxn_is_serialized(txn))
1242  {
1243  /* serialize remaining changes */
1244  ReorderBufferSerializeTXN(rb, txn);
1245  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1246  &state->entries[off].segno);
1247  }
1248 
1249  cur_change = dlist_head_element(ReorderBufferChange, node,
1250  &txn->changes);
1251 
1252  state->entries[off].lsn = cur_change->lsn;
1253  state->entries[off].change = cur_change;
1254  state->entries[off].txn = txn;
1255 
1257  }
1258 
1259  /* add subtransactions if they contain changes */
1260  dlist_foreach(cur_txn_i, &txn->subtxns)
1261  {
1262  ReorderBufferTXN *cur_txn;
1263 
1264  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1265 
1266  if (cur_txn->nentries > 0)
1267  {
1268  ReorderBufferChange *cur_change;
1269 
1270  if (rbtxn_is_serialized(cur_txn))
1271  {
1272  /* serialize remaining changes */
1273  ReorderBufferSerializeTXN(rb, cur_txn);
1274  ReorderBufferRestoreChanges(rb, cur_txn,
1275  &state->entries[off].file,
1276  &state->entries[off].segno);
1277  }
1278  cur_change = dlist_head_element(ReorderBufferChange, node,
1279  &cur_txn->changes);
1280 
1281  state->entries[off].lsn = cur_change->lsn;
1282  state->entries[off].change = cur_change;
1283  state->entries[off].txn = cur_txn;
1284 
1286  }
1287  }
1288 
1289  /* assemble a valid binary heap */
1290  binaryheap_build(state->heap);
1291 }
1292 
1293 /*
1294  * Return the next change when iterating over a transaction and its
1295  * subtransactions.
1296  *
1297  * Returns NULL when no further changes exist.
1298  */
1299 static ReorderBufferChange *
1301 {
1302  ReorderBufferChange *change;
1304  int32 off;
1305 
1306  /* nothing there anymore */
1307  if (state->heap->bh_size == 0)
1308  return NULL;
1309 
1310  off = DatumGetInt32(binaryheap_first(state->heap));
1311  entry = &state->entries[off];
1312 
1313  /* free memory we might have "leaked" in the previous *Next call */
1314  if (!dlist_is_empty(&state->old_change))
1315  {
1316  change = dlist_container(ReorderBufferChange, node,
1317  dlist_pop_head_node(&state->old_change));
1318  ReorderBufferReturnChange(rb, change, true);
1319  Assert(dlist_is_empty(&state->old_change));
1320  }
1321 
1322  change = entry->change;
1323 
1324  /*
1325  * update heap with information about which transaction has the next
1326  * relevant change in LSN order
1327  */
1328 
1329  /* there are in-memory changes */
1330  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1331  {
1332  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1333  ReorderBufferChange *next_change =
1334  dlist_container(ReorderBufferChange, node, next);
1335 
1336  /* txn stays the same */
1337  state->entries[off].lsn = next_change->lsn;
1338  state->entries[off].change = next_change;
1339 
1341  return change;
1342  }
1343 
1344  /* try to load changes from disk */
1345  if (entry->txn->nentries != entry->txn->nentries_mem)
1346  {
1347  /*
1348  * Ugly: restoring changes will reuse *Change records, thus delete the
1349  * current one from the per-tx list and only free in the next call.
1350  */
1351  dlist_delete(&change->node);
1352  dlist_push_tail(&state->old_change, &change->node);
1353 
1354  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1355  &state->entries[off].segno))
1356  {
1357  /* successfully restored changes from disk */
1358  ReorderBufferChange *next_change =
1360  &entry->txn->changes);
1361 
1362  elog(DEBUG2, "restored %u/%u changes from disk",
1363  (uint32) entry->txn->nentries_mem,
1364  (uint32) entry->txn->nentries);
1365 
1366  Assert(entry->txn->nentries_mem);
1367  /* txn stays the same */
1368  state->entries[off].lsn = next_change->lsn;
1369  state->entries[off].change = next_change;
1371 
1372  return change;
1373  }
1374  }
1375 
1376  /* ok, no changes there anymore, remove */
1377  binaryheap_remove_first(state->heap);
1378 
1379  return change;
1380 }
1381 
1382 /*
1383  * Deallocate the iterator
1384  */
1385 static void
1388 {
1389  int32 off;
1390 
1391  for (off = 0; off < state->nr_txns; off++)
1392  {
1393  if (state->entries[off].file.vfd != -1)
1394  FileClose(state->entries[off].file.vfd);
1395  }
1396 
1397  /* free memory we might have "leaked" in the last *Next call */
1398  if (!dlist_is_empty(&state->old_change))
1399  {
1400  ReorderBufferChange *change;
1401 
1402  change = dlist_container(ReorderBufferChange, node,
1403  dlist_pop_head_node(&state->old_change));
1404  ReorderBufferReturnChange(rb, change, true);
1405  Assert(dlist_is_empty(&state->old_change));
1406  }
1407 
1408  binaryheap_free(state->heap);
1409  pfree(state);
1410 }
1411 
1412 /*
1413  * Cleanup the contents of a transaction, usually after the transaction
1414  * committed or aborted.
1415  */
1416 static void
1418 {
1419  bool found;
1420  dlist_mutable_iter iter;
1421 
1422  /* cleanup subtransactions & their changes */
1423  dlist_foreach_modify(iter, &txn->subtxns)
1424  {
1425  ReorderBufferTXN *subtxn;
1426 
1427  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1428 
1429  /*
1430  * Subtransactions are always associated to the toplevel TXN, even if
1431  * they originally were happening inside another subtxn, so we won't
1432  * ever recurse more than one level deep here.
1433  */
1434  Assert(rbtxn_is_known_subxact(subtxn));
1435  Assert(subtxn->nsubtxns == 0);
1436 
1437  ReorderBufferCleanupTXN(rb, subtxn);
1438  }
1439 
1440  /* cleanup changes in the txn */
1441  dlist_foreach_modify(iter, &txn->changes)
1442  {
1443  ReorderBufferChange *change;
1444 
1445  change = dlist_container(ReorderBufferChange, node, iter.cur);
1446 
1447  /* Check we're not mixing changes from different transactions. */
1448  Assert(change->txn == txn);
1449 
1450  ReorderBufferReturnChange(rb, change, true);
1451  }
1452 
1453  /*
1454  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1455  * They are always stored in the toplevel transaction.
1456  */
1457  dlist_foreach_modify(iter, &txn->tuplecids)
1458  {
1459  ReorderBufferChange *change;
1460 
1461  change = dlist_container(ReorderBufferChange, node, iter.cur);
1462 
1463  /* Check we're not mixing changes from different transactions. */
1464  Assert(change->txn == txn);
1466 
1467  ReorderBufferReturnChange(rb, change, true);
1468  }
1469 
1470  /*
1471  * Cleanup the base snapshot, if set.
1472  */
1473  if (txn->base_snapshot != NULL)
1474  {
1477  }
1478 
1479  /*
1480  * Cleanup the snapshot for the last streamed run.
1481  */
1482  if (txn->snapshot_now != NULL)
1483  {
1484  Assert(rbtxn_is_streamed(txn));
1486  }
1487 
1488  /*
1489  * Remove TXN from its containing list.
1490  *
1491  * Note: if txn is known as subxact, we are deleting the TXN from its
1492  * parent's list of known subxacts; this leaves the parent's nsubxacts
1493  * count too high, but we don't care. Otherwise, we are deleting the TXN
1494  * from the LSN-ordered list of toplevel TXNs.
1495  */
1496  dlist_delete(&txn->node);
1497 
1498  /* now remove reference from buffer */
1499  hash_search(rb->by_txn,
1500  (void *) &txn->xid,
1501  HASH_REMOVE,
1502  &found);
1503  Assert(found);
1504 
1505  /* remove entries spilled to disk */
1506  if (rbtxn_is_serialized(txn))
1507  ReorderBufferRestoreCleanup(rb, txn);
1508 
1509  /* deallocate */
1510  ReorderBufferReturnTXN(rb, txn);
1511 }
1512 
1513 /*
1514  * Discard changes from a transaction (and subtransactions), after streaming
1515  * them. Keep the remaining info - transactions, tuplecids, invalidations and
1516  * snapshots.
1517  */
1518 static void
1520 {
1521  dlist_mutable_iter iter;
1522 
1523  /* cleanup subtransactions & their changes */
1524  dlist_foreach_modify(iter, &txn->subtxns)
1525  {
1526  ReorderBufferTXN *subtxn;
1527 
1528  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1529 
1530  /*
1531  * Subtransactions are always associated to the toplevel TXN, even if
1532  * they originally were happening inside another subtxn, so we won't
1533  * ever recurse more than one level deep here.
1534  */
1535  Assert(rbtxn_is_known_subxact(subtxn));
1536  Assert(subtxn->nsubtxns == 0);
1537 
1538  ReorderBufferTruncateTXN(rb, subtxn);
1539  }
1540 
1541  /* cleanup changes in the txn */
1542  dlist_foreach_modify(iter, &txn->changes)
1543  {
1544  ReorderBufferChange *change;
1545 
1546  change = dlist_container(ReorderBufferChange, node, iter.cur);
1547 
1548  /* Check we're not mixing changes from different transactions. */
1549  Assert(change->txn == txn);
1550 
1551  /* remove the change from it's containing list */
1552  dlist_delete(&change->node);
1553 
1554  ReorderBufferReturnChange(rb, change, true);
1555  }
1556 
1557  /*
1558  * Mark the transaction as streamed.
1559  *
1560  * The toplevel transaction, identified by (toptxn==NULL), is marked as
1561  * streamed always, even if it does not contain any changes (that is, when
1562  * all the changes are in subtransactions).
1563  *
1564  * For subtransactions, we only mark them as streamed when there are
1565  * changes in them.
1566  *
1567  * We do it this way because of aborts - we don't want to send aborts for
1568  * XIDs the downstream is not aware of. And of course, it always knows
1569  * about the toplevel xact (we send the XID in all messages), but we never
1570  * stream XIDs of empty subxacts.
1571  */
1572  if ((!txn->toptxn) || (txn->nentries_mem != 0))
1573  txn->txn_flags |= RBTXN_IS_STREAMED;
1574 
1575  /*
1576  * Destroy the (relfilenode, ctid) hashtable, so that we don't leak any
1577  * memory. We could also keep the hash table and update it with new ctid
1578  * values, but this seems simpler and good enough for now.
1579  */
1580  if (txn->tuplecid_hash != NULL)
1581  {
1583  txn->tuplecid_hash = NULL;
1584  }
1585 
1586  /* If this txn is serialized then clean the disk space. */
1587  if (rbtxn_is_serialized(txn))
1588  {
1589  ReorderBufferRestoreCleanup(rb, txn);
1590  txn->txn_flags &= ~RBTXN_IS_SERIALIZED;
1591 
1592  /*
1593  * We set this flag to indicate if the transaction is ever serialized.
1594  * We need this to accurately update the stats as otherwise the same
1595  * transaction can be counted as serialized multiple times.
1596  */
1598  }
1599 
1600  /* also reset the number of entries in the transaction */
1601  txn->nentries_mem = 0;
1602  txn->nentries = 0;
1603 }
1604 
1605 /*
1606  * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by
1607  * HeapTupleSatisfiesHistoricMVCC.
1608  */
1609 static void
1611 {
1612  dlist_iter iter;
1613  HASHCTL hash_ctl;
1614 
1616  return;
1617 
1618  memset(&hash_ctl, 0, sizeof(hash_ctl));
1619 
1620  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1621  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1622  hash_ctl.hcxt = rb->context;
1623 
1624  /*
1625  * create the hash with the exact number of to-be-stored tuplecids from
1626  * the start
1627  */
1628  txn->tuplecid_hash =
1629  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1631 
1632  dlist_foreach(iter, &txn->tuplecids)
1633  {
1636  bool found;
1637  ReorderBufferChange *change;
1638 
1639  change = dlist_container(ReorderBufferChange, node, iter.cur);
1640 
1642 
1643  /* be careful about padding */
1644  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1645 
1646  key.relnode = change->data.tuplecid.node;
1647 
1648  ItemPointerCopy(&change->data.tuplecid.tid,
1649  &key.tid);
1650 
1651  ent = (ReorderBufferTupleCidEnt *)
1653  (void *) &key,
1655  &found);
1656  if (!found)
1657  {
1658  ent->cmin = change->data.tuplecid.cmin;
1659  ent->cmax = change->data.tuplecid.cmax;
1660  ent->combocid = change->data.tuplecid.combocid;
1661  }
1662  else
1663  {
1664  /*
1665  * Maybe we already saw this tuple before in this transaction, but
1666  * if so it must have the same cmin.
1667  */
1668  Assert(ent->cmin == change->data.tuplecid.cmin);
1669 
1670  /*
1671  * cmax may be initially invalid, but once set it can only grow,
1672  * and never become invalid again.
1673  */
1674  Assert((ent->cmax == InvalidCommandId) ||
1675  ((change->data.tuplecid.cmax != InvalidCommandId) &&
1676  (change->data.tuplecid.cmax > ent->cmax)));
1677  ent->cmax = change->data.tuplecid.cmax;
1678  }
1679  }
1680 }
1681 
1682 /*
1683  * Copy a provided snapshot so we can modify it privately. This is needed so
1684  * that catalog modifying transactions can look into intermediate catalog
1685  * states.
1686  */
1687 static Snapshot
1690 {
1691  Snapshot snap;
1692  dlist_iter iter;
1693  int i = 0;
1694  Size size;
1695 
1696  size = sizeof(SnapshotData) +
1697  sizeof(TransactionId) * orig_snap->xcnt +
1698  sizeof(TransactionId) * (txn->nsubtxns + 1);
1699 
1700  snap = MemoryContextAllocZero(rb->context, size);
1701  memcpy(snap, orig_snap, sizeof(SnapshotData));
1702 
1703  snap->copied = true;
1704  snap->active_count = 1; /* mark as active so nobody frees it */
1705  snap->regd_count = 0;
1706  snap->xip = (TransactionId *) (snap + 1);
1707 
1708  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1709 
1710  /*
1711  * snap->subxip contains all txids that belong to our transaction which we
1712  * need to check via cmin/cmax. That's why we store the toplevel
1713  * transaction in there as well.
1714  */
1715  snap->subxip = snap->xip + snap->xcnt;
1716  snap->subxip[i++] = txn->xid;
1717 
1718  /*
1719  * subxcnt isn't decreased when subtransactions abort, so count manually.
1720  * Since it's an upper boundary it is safe to use it for the allocation
1721  * above.
1722  */
1723  snap->subxcnt = 1;
1724 
1725  dlist_foreach(iter, &txn->subtxns)
1726  {
1727  ReorderBufferTXN *sub_txn;
1728 
1729  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1730  snap->subxip[i++] = sub_txn->xid;
1731  snap->subxcnt++;
1732  }
1733 
1734  /* sort so we can bsearch() later */
1735  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1736 
1737  /* store the specified current CommandId */
1738  snap->curcid = cid;
1739 
1740  return snap;
1741 }
1742 
1743 /*
1744  * Free a previously ReorderBufferCopySnap'ed snapshot
1745  */
1746 static void
1748 {
1749  if (snap->copied)
1750  pfree(snap);
1751  else
1753 }
1754 
1755 /*
1756  * If the transaction was (partially) streamed, we need to commit it in a
1757  * 'streamed' way. That is, we first stream the remaining part of the
1758  * transaction, and then invoke stream_commit message.
1759  */
1760 static void
1762 {
1763  /* we should only call this for previously streamed transactions */
1764  Assert(rbtxn_is_streamed(txn));
1765 
1766  ReorderBufferStreamTXN(rb, txn);
1767 
1768  rb->stream_commit(rb, txn, txn->final_lsn);
1769 
1770  ReorderBufferCleanupTXN(rb, txn);
1771 }
1772 
1773 /*
1774  * Set xid to detect concurrent aborts.
1775  *
1776  * While streaming an in-progress transaction there is a possibility that the
1777  * (sub)transaction might get aborted concurrently. In such case if the
1778  * (sub)transaction has catalog update then we might decode the tuple using
1779  * wrong catalog version. For example, suppose there is one catalog tuple with
1780  * (xmin: 500, xmax: 0). Now, the transaction 501 updates the catalog tuple
1781  * and after that we will have two tuples (xmin: 500, xmax: 501) and
1782  * (xmin: 501, xmax: 0). Now, if 501 is aborted and some other transaction
1783  * say 502 updates the same catalog tuple then the first tuple will be changed
1784  * to (xmin: 500, xmax: 502). So, the problem is that when we try to decode
1785  * the tuple inserted/updated in 501 after the catalog update, we will see the
1786  * catalog tuple with (xmin: 500, xmax: 502) as visible because it will
1787  * consider that the tuple is deleted by xid 502 which is not visible to our
1788  * snapshot. And when we will try to decode with that catalog tuple, it can
1789  * lead to a wrong result or a crash. So, it is necessary to detect
1790  * concurrent aborts to allow streaming of in-progress transactions.
1791  *
1792  * For detecting the concurrent abort we set CheckXidAlive to the current
1793  * (sub)transaction's xid for which this change belongs to. And, during
1794  * catalog scan we can check the status of the xid and if it is aborted we will
1795  * report a specific error so that we can stop streaming current transaction
1796  * and discard the already streamed changes on such an error. We might have
1797  * already streamed some of the changes for the aborted (sub)transaction, but
1798  * that is fine because when we decode the abort we will stream abort message
1799  * to truncate the changes in the subscriber.
1800  */
1801 static inline void
1803 {
1804  /*
1805  * If the input transaction id is already set as a CheckXidAlive then
1806  * nothing to do.
1807  */
1809  return;
1810 
1811  /*
1812  * setup CheckXidAlive if it's not committed yet. We don't check if the
1813  * xid is aborted. That will happen during catalog access.
1814  */
1815  if (!TransactionIdDidCommit(xid))
1816  CheckXidAlive = xid;
1817  else
1819 }
1820 
1821 /*
1822  * Helper function for ReorderBufferProcessTXN for applying change.
1823  */
1824 static inline void
1826  Relation relation, ReorderBufferChange *change,
1827  bool streaming)
1828 {
1829  if (streaming)
1830  rb->stream_change(rb, txn, relation, change);
1831  else
1832  rb->apply_change(rb, txn, relation, change);
1833 }
1834 
1835 /*
1836  * Helper function for ReorderBufferProcessTXN for applying the truncate.
1837  */
1838 static inline void
1840  int nrelations, Relation *relations,
1841  ReorderBufferChange *change, bool streaming)
1842 {
1843  if (streaming)
1844  rb->stream_truncate(rb, txn, nrelations, relations, change);
1845  else
1846  rb->apply_truncate(rb, txn, nrelations, relations, change);
1847 }
1848 
1849 /*
1850  * Helper function for ReorderBufferProcessTXN for applying the message.
1851  */
1852 static inline void
1854  ReorderBufferChange *change, bool streaming)
1855 {
1856  if (streaming)
1857  rb->stream_message(rb, txn, change->lsn, true,
1858  change->data.msg.prefix,
1859  change->data.msg.message_size,
1860  change->data.msg.message);
1861  else
1862  rb->message(rb, txn, change->lsn, true,
1863  change->data.msg.prefix,
1864  change->data.msg.message_size,
1865  change->data.msg.message);
1866 }
1867 
1868 /*
1869  * Function to store the command id and snapshot at the end of the current
1870  * stream so that we can reuse the same while sending the next stream.
1871  */
1872 static inline void
1874  Snapshot snapshot_now, CommandId command_id)
1875 {
1876  txn->command_id = command_id;
1877 
1878  /* Avoid copying if it's already copied. */
1879  if (snapshot_now->copied)
1880  txn->snapshot_now = snapshot_now;
1881  else
1882  txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1883  txn, command_id);
1884 }
1885 
1886 /*
1887  * Helper function for ReorderBufferProcessTXN to handle the concurrent
1888  * abort of the streaming transaction. This resets the TXN such that it
1889  * can be used to stream the remaining data of transaction being processed.
1890  */
1891 static void
1893  Snapshot snapshot_now,
1894  CommandId command_id,
1895  XLogRecPtr last_lsn,
1896  ReorderBufferChange *specinsert)
1897 {
1898  /* Discard the changes that we just streamed */
1899  ReorderBufferTruncateTXN(rb, txn);
1900 
1901  /* Free all resources allocated for toast reconstruction */
1902  ReorderBufferToastReset(rb, txn);
1903 
1904  /* Return the spec insert change if it is not NULL */
1905  if (specinsert != NULL)
1906  {
1907  ReorderBufferReturnChange(rb, specinsert, true);
1908  specinsert = NULL;
1909  }
1910 
1911  /* Stop the stream. */
1912  rb->stream_stop(rb, txn, last_lsn);
1913 
1914  /* Remember the command ID and snapshot for the streaming run */
1915  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
1916 }
1917 
1918 /*
1919  * Helper function for ReorderBufferCommit and ReorderBufferStreamTXN.
1920  *
1921  * Send data of a transaction (and its subtransactions) to the
1922  * output plugin. We iterate over the top and subtransactions (using a k-way
1923  * merge) and replay the changes in lsn order.
1924  *
1925  * If streaming is true then data will be sent using stream API.
1926  *
1927  * Note: "volatile" markers on some parameters are to avoid trouble with
1928  * PG_TRY inside the function.
1929  */
1930 static void
1932  XLogRecPtr commit_lsn,
1933  volatile Snapshot snapshot_now,
1934  volatile CommandId command_id,
1935  bool streaming)
1936 {
1937  bool using_subtxn;
1939  ReorderBufferIterTXNState *volatile iterstate = NULL;
1940  volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
1941  ReorderBufferChange *volatile specinsert = NULL;
1942  volatile bool stream_started = false;
1943  ReorderBufferTXN *volatile curtxn = NULL;
1944 
1945  /* build data to be able to lookup the CommandIds of catalog tuples */
1947 
1948  /* setup the initial snapshot */
1949  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1950 
1951  /*
1952  * Decoding needs access to syscaches et al., which in turn use
1953  * heavyweight locks and such. Thus we need to have enough state around to
1954  * keep track of those. The easiest way is to simply use a transaction
1955  * internally. That also allows us to easily enforce that nothing writes
1956  * to the database by checking for xid assignments.
1957  *
1958  * When we're called via the SQL SRF there's already a transaction
1959  * started, so start an explicit subtransaction there.
1960  */
1961  using_subtxn = IsTransactionOrTransactionBlock();
1962 
1963  PG_TRY();
1964  {
1965  ReorderBufferChange *change;
1966 
1967  if (using_subtxn)
1968  BeginInternalSubTransaction(streaming ? "stream" : "replay");
1969  else
1971 
1972  /* We only need to send begin/commit for non-streamed transactions. */
1973  if (!streaming)
1974  rb->begin(rb, txn);
1975 
1976  ReorderBufferIterTXNInit(rb, txn, &iterstate);
1977  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1978  {
1979  Relation relation = NULL;
1980  Oid reloid;
1981 
1982  /*
1983  * We can't call start stream callback before processing first
1984  * change.
1985  */
1986  if (prev_lsn == InvalidXLogRecPtr)
1987  {
1988  if (streaming)
1989  {
1990  txn->origin_id = change->origin_id;
1991  rb->stream_start(rb, txn, change->lsn);
1992  stream_started = true;
1993  }
1994  }
1995 
1996  /*
1997  * Enforce correct ordering of changes, merged from multiple
1998  * subtransactions. The changes may have the same LSN due to
1999  * MULTI_INSERT xlog records.
2000  */
2001  Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
2002 
2003  prev_lsn = change->lsn;
2004 
2005  /* Set the current xid to detect concurrent aborts. */
2006  if (streaming)
2007  {
2008  curtxn = change->txn;
2009  SetupCheckXidLive(curtxn->xid);
2010  }
2011 
2012  switch (change->action)
2013  {
2015 
2016  /*
2017  * Confirmation for speculative insertion arrived. Simply
2018  * use as a normal record. It'll be cleaned up at the end
2019  * of INSERT processing.
2020  */
2021  if (specinsert == NULL)
2022  elog(ERROR, "invalid ordering of speculative insertion changes");
2023  Assert(specinsert->data.tp.oldtuple == NULL);
2024  change = specinsert;
2026 
2027  /* intentionally fall through */
2031  Assert(snapshot_now);
2032 
2033  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
2034  change->data.tp.relnode.relNode);
2035 
2036  /*
2037  * Mapped catalog tuple without data, emitted while
2038  * catalog table was in the process of being rewritten. We
2039  * can fail to look up the relfilenode, because the
2040  * relmapper has no "historic" view, in contrast to normal
2041  * the normal catalog during decoding. Thus repeated
2042  * rewrites can cause a lookup failure. That's OK because
2043  * we do not decode catalog changes anyway. Normally such
2044  * tuples would be skipped over below, but we can't
2045  * identify whether the table should be logically logged
2046  * without mapping the relfilenode to the oid.
2047  */
2048  if (reloid == InvalidOid &&
2049  change->data.tp.newtuple == NULL &&
2050  change->data.tp.oldtuple == NULL)
2051  goto change_done;
2052  else if (reloid == InvalidOid)
2053  elog(ERROR, "could not map filenode \"%s\" to relation OID",
2054  relpathperm(change->data.tp.relnode,
2055  MAIN_FORKNUM));
2056 
2057  relation = RelationIdGetRelation(reloid);
2058 
2059  if (!RelationIsValid(relation))
2060  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
2061  reloid,
2062  relpathperm(change->data.tp.relnode,
2063  MAIN_FORKNUM));
2064 
2065  if (!RelationIsLogicallyLogged(relation))
2066  goto change_done;
2067 
2068  /*
2069  * Ignore temporary heaps created during DDL unless the
2070  * plugin has asked for them.
2071  */
2072  if (relation->rd_rel->relrewrite && !rb->output_rewrites)
2073  goto change_done;
2074 
2075  /*
2076  * For now ignore sequence changes entirely. Most of the
2077  * time they don't log changes using records we
2078  * understand, so it doesn't make sense to handle the few
2079  * cases we do.
2080  */
2081  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
2082  goto change_done;
2083 
2084  /* user-triggered change */
2085  if (!IsToastRelation(relation))
2086  {
2087  ReorderBufferToastReplace(rb, txn, relation, change);
2088  ReorderBufferApplyChange(rb, txn, relation, change,
2089  streaming);
2090 
2091  /*
2092  * Only clear reassembled toast chunks if we're sure
2093  * they're not required anymore. The creator of the
2094  * tuple tells us.
2095  */
2096  if (change->data.tp.clear_toast_afterwards)
2097  ReorderBufferToastReset(rb, txn);
2098  }
2099  /* we're not interested in toast deletions */
2100  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
2101  {
2102  /*
2103  * Need to reassemble the full toasted Datum in
2104  * memory, to ensure the chunks don't get reused till
2105  * we're done remove it from the list of this
2106  * transaction's changes. Otherwise it will get
2107  * freed/reused while restoring spooled data from
2108  * disk.
2109  */
2110  Assert(change->data.tp.newtuple != NULL);
2111 
2112  dlist_delete(&change->node);
2113  ReorderBufferToastAppendChunk(rb, txn, relation,
2114  change);
2115  }
2116 
2117  change_done:
2118 
2119  /*
2120  * Either speculative insertion was confirmed, or it was
2121  * unsuccessful and the record isn't needed anymore.
2122  */
2123  if (specinsert != NULL)
2124  {
2125  ReorderBufferReturnChange(rb, specinsert, true);
2126  specinsert = NULL;
2127  }
2128 
2129  if (RelationIsValid(relation))
2130  {
2131  RelationClose(relation);
2132  relation = NULL;
2133  }
2134  break;
2135 
2137 
2138  /*
2139  * Speculative insertions are dealt with by delaying the
2140  * processing of the insert until the confirmation record
2141  * arrives. For that we simply unlink the record from the
2142  * chain, so it does not get freed/reused while restoring
2143  * spooled data from disk.
2144  *
2145  * This is safe in the face of concurrent catalog changes
2146  * because the relevant relation can't be changed between
2147  * speculative insertion and confirmation due to
2148  * CheckTableNotInUse() and locking.
2149  */
2150 
2151  /* clear out a pending (and thus failed) speculation */
2152  if (specinsert != NULL)
2153  {
2154  ReorderBufferReturnChange(rb, specinsert, true);
2155  specinsert = NULL;
2156  }
2157 
2158  /* and memorize the pending insertion */
2159  dlist_delete(&change->node);
2160  specinsert = change;
2161  break;
2162 
2164  {
2165  int i;
2166  int nrelids = change->data.truncate.nrelids;
2167  int nrelations = 0;
2168  Relation *relations;
2169 
2170  relations = palloc0(nrelids * sizeof(Relation));
2171  for (i = 0; i < nrelids; i++)
2172  {
2173  Oid relid = change->data.truncate.relids[i];
2174  Relation relation;
2175 
2176  relation = RelationIdGetRelation(relid);
2177 
2178  if (!RelationIsValid(relation))
2179  elog(ERROR, "could not open relation with OID %u", relid);
2180 
2181  if (!RelationIsLogicallyLogged(relation))
2182  continue;
2183 
2184  relations[nrelations++] = relation;
2185  }
2186 
2187  /* Apply the truncate. */
2188  ReorderBufferApplyTruncate(rb, txn, nrelations,
2189  relations, change,
2190  streaming);
2191 
2192  for (i = 0; i < nrelations; i++)
2193  RelationClose(relations[i]);
2194 
2195  break;
2196  }
2197 
2199  ReorderBufferApplyMessage(rb, txn, change, streaming);
2200  break;
2201 
2203  /* Execute the invalidation messages locally */
2205  change->data.inval.ninvalidations,
2206  change->data.inval.invalidations);
2207  break;
2208 
2210  /* get rid of the old */
2211  TeardownHistoricSnapshot(false);
2212 
2213  if (snapshot_now->copied)
2214  {
2215  ReorderBufferFreeSnap(rb, snapshot_now);
2216  snapshot_now =
2217  ReorderBufferCopySnap(rb, change->data.snapshot,
2218  txn, command_id);
2219  }
2220 
2221  /*
2222  * Restored from disk, need to be careful not to double
2223  * free. We could introduce refcounting for that, but for
2224  * now this seems infrequent enough not to care.
2225  */
2226  else if (change->data.snapshot->copied)
2227  {
2228  snapshot_now =
2229  ReorderBufferCopySnap(rb, change->data.snapshot,
2230  txn, command_id);
2231  }
2232  else
2233  {
2234  snapshot_now = change->data.snapshot;
2235  }
2236 
2237  /* and continue with the new one */
2238  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2239  break;
2240 
2242  Assert(change->data.command_id != InvalidCommandId);
2243 
2244  if (command_id < change->data.command_id)
2245  {
2246  command_id = change->data.command_id;
2247 
2248  if (!snapshot_now->copied)
2249  {
2250  /* we don't use the global one anymore */
2251  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2252  txn, command_id);
2253  }
2254 
2255  snapshot_now->curcid = command_id;
2256 
2257  TeardownHistoricSnapshot(false);
2258  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2259  }
2260 
2261  break;
2262 
2264  elog(ERROR, "tuplecid value in changequeue");
2265  break;
2266  }
2267  }
2268 
2269  /*
2270  * There's a speculative insertion remaining, just clean in up, it
2271  * can't have been successful, otherwise we'd gotten a confirmation
2272  * record.
2273  */
2274  if (specinsert)
2275  {
2276  ReorderBufferReturnChange(rb, specinsert, true);
2277  specinsert = NULL;
2278  }
2279 
2280  /* clean up the iterator */
2281  ReorderBufferIterTXNFinish(rb, iterstate);
2282  iterstate = NULL;
2283 
2284  /*
2285  * Done with current changes, send the last message for this set of
2286  * changes depending upon streaming mode.
2287  */
2288  if (streaming)
2289  {
2290  if (stream_started)
2291  {
2292  rb->stream_stop(rb, txn, prev_lsn);
2293  stream_started = false;
2294  }
2295  }
2296  else
2297  rb->commit(rb, txn, commit_lsn);
2298 
2299  /* this is just a sanity check against bad output plugin behaviour */
2301  elog(ERROR, "output plugin used XID %u",
2303 
2304  /*
2305  * Remember the command ID and snapshot for the next set of changes in
2306  * streaming mode.
2307  */
2308  if (streaming)
2309  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2310  else if (snapshot_now->copied)
2311  ReorderBufferFreeSnap(rb, snapshot_now);
2312 
2313  /* cleanup */
2314  TeardownHistoricSnapshot(false);
2315 
2316  /*
2317  * Aborting the current (sub-)transaction as a whole has the right
2318  * semantics. We want all locks acquired in here to be released, not
2319  * reassigned to the parent and we do not want any database access
2320  * have persistent effects.
2321  */
2323 
2324  /* make sure there's no cache pollution */
2326 
2327  if (using_subtxn)
2329 
2330  /*
2331  * If we are streaming the in-progress transaction then discard the
2332  * changes that we just streamed, and mark the transactions as
2333  * streamed (if they contained changes). Otherwise, remove all the
2334  * changes and deallocate the ReorderBufferTXN.
2335  */
2336  if (streaming)
2337  {
2338  ReorderBufferTruncateTXN(rb, txn);
2339 
2340  /* Reset the CheckXidAlive */
2342  }
2343  else
2344  ReorderBufferCleanupTXN(rb, txn);
2345  }
2346  PG_CATCH();
2347  {
2348  MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
2349  ErrorData *errdata = CopyErrorData();
2350 
2351  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
2352  if (iterstate)
2353  ReorderBufferIterTXNFinish(rb, iterstate);
2354 
2356 
2357  /*
2358  * Force cache invalidation to happen outside of a valid transaction
2359  * to prevent catalog access as we just caught an error.
2360  */
2362 
2363  /* make sure there's no cache pollution */
2365  txn->invalidations);
2366 
2367  if (using_subtxn)
2369 
2370  /*
2371  * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
2372  * abort of the (sub)transaction we are streaming. We need to do the
2373  * cleanup and return gracefully on this error, see SetupCheckXidLive.
2374  */
2375  if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK)
2376  {
2377  /*
2378  * This error can only occur when we are sending the data in
2379  * streaming mode and the streaming is not finished yet.
2380  */
2381  Assert(streaming);
2382  Assert(stream_started);
2383 
2384  /* Cleanup the temporary error state. */
2385  FlushErrorState();
2386  FreeErrorData(errdata);
2387  errdata = NULL;
2388  curtxn->concurrent_abort = true;
2389 
2390  /* Reset the TXN so that it is allowed to stream remaining data. */
2391  ReorderBufferResetTXN(rb, txn, snapshot_now,
2392  command_id, prev_lsn,
2393  specinsert);
2394  }
2395  else
2396  {
2397  ReorderBufferCleanupTXN(rb, txn);
2398  MemoryContextSwitchTo(ecxt);
2399  PG_RE_THROW();
2400  }
2401  }
2402  PG_END_TRY();
2403 }
2404 
2405 /*
2406  * Perform the replay of a transaction and its non-aborted subtransactions.
2407  *
2408  * Subtransactions previously have to be processed by
2409  * ReorderBufferCommitChild(), even if previously assigned to the toplevel
2410  * transaction with ReorderBufferAssignChild.
2411  *
2412  * This interface is called once a toplevel commit is read for both streamed
2413  * as well as non-streamed transactions.
2414  */
2415 void
2417  XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2418  TimestampTz commit_time,
2419  RepOriginId origin_id, XLogRecPtr origin_lsn)
2420 {
2422  Snapshot snapshot_now;
2423  CommandId command_id = FirstCommandId;
2424 
2425  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2426  false);
2427 
2428  /* unknown transaction, nothing to replay */
2429  if (txn == NULL)
2430  return;
2431 
2432  txn->final_lsn = commit_lsn;
2433  txn->end_lsn = end_lsn;
2434  txn->commit_time = commit_time;
2435  txn->origin_id = origin_id;
2436  txn->origin_lsn = origin_lsn;
2437 
2438  /*
2439  * If the transaction was (partially) streamed, we need to commit it in a
2440  * 'streamed' way. That is, we first stream the remaining part of the
2441  * transaction, and then invoke stream_commit message.
2442  *
2443  * Called after everything (origin ID, LSN, ...) is stored in the
2444  * transaction to avoid passing that information directly.
2445  */
2446  if (rbtxn_is_streamed(txn))
2447  {
2448  ReorderBufferStreamCommit(rb, txn);
2449  return;
2450  }
2451 
2452  /*
2453  * If this transaction has no snapshot, it didn't make any changes to the
2454  * database, so there's nothing to decode. Note that
2455  * ReorderBufferCommitChild will have transferred any snapshots from
2456  * subtransactions if there were any.
2457  */
2458  if (txn->base_snapshot == NULL)
2459  {
2460  Assert(txn->ninvalidations == 0);
2461  ReorderBufferCleanupTXN(rb, txn);
2462  return;
2463  }
2464 
2465  snapshot_now = txn->base_snapshot;
2466 
2467  /* Process and send the changes to output plugin. */
2468  ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2469  command_id, false);
2470 }
2471 
2472 /*
2473  * Abort a transaction that possibly has previous changes. Needs to be first
2474  * called for subtransactions and then for the toplevel xid.
2475  *
2476  * NB: Transactions handled here have to have actively aborted (i.e. have
2477  * produced an abort record). Implicitly aborted transactions are handled via
2478  * ReorderBufferAbortOld(); transactions we're just not interested in, but
2479  * which have committed are handled in ReorderBufferForget().
2480  *
2481  * This function purges this transaction and its contents from memory and
2482  * disk.
2483  */
2484 void
2486 {
2488 
2489  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2490  false);
2491 
2492  /* unknown, nothing to remove */
2493  if (txn == NULL)
2494  return;
2495 
2496  /* For streamed transactions notify the remote node about the abort. */
2497  if (rbtxn_is_streamed(txn))
2498  {
2499  rb->stream_abort(rb, txn, lsn);
2500 
2501  /*
2502  * We might have decoded changes for this transaction that could load
2503  * the cache as per the current transaction's view (consider DDL's
2504  * happened in this transaction). We don't want the decoding of future
2505  * transactions to use those cache entries so execute invalidations.
2506  */
2507  if (txn->ninvalidations > 0)
2509  txn->invalidations);
2510  }
2511 
2512  /* cosmetic... */
2513  txn->final_lsn = lsn;
2514 
2515  /* remove potential on-disk data, and deallocate */
2516  ReorderBufferCleanupTXN(rb, txn);
2517 }
2518 
2519 /*
2520  * Abort all transactions that aren't actually running anymore because the
2521  * server restarted.
2522  *
2523  * NB: These really have to be transactions that have aborted due to a server
2524  * crash/immediate restart, as we don't deal with invalidations here.
2525  */
2526 void
2528 {
2529  dlist_mutable_iter it;
2530 
2531  /*
2532  * Iterate through all (potential) toplevel TXNs and abort all that are
2533  * older than what possibly can be running. Once we've found the first
2534  * that is alive we stop, there might be some that acquired an xid earlier
2535  * but started writing later, but it's unlikely and they will be cleaned
2536  * up in a later call to this function.
2537  */
2539  {
2541 
2542  txn = dlist_container(ReorderBufferTXN, node, it.cur);
2543 
2544  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
2545  {
2546  elog(DEBUG2, "aborting old transaction %u", txn->xid);
2547 
2548  /* remove potential on-disk data, and deallocate this tx */
2549  ReorderBufferCleanupTXN(rb, txn);
2550  }
2551  else
2552  return;
2553  }
2554 }
2555 
2556 /*
2557  * Forget the contents of a transaction if we aren't interested in its
2558  * contents. Needs to be first called for subtransactions and then for the
2559  * toplevel xid.
2560  *
2561  * This is significantly different to ReorderBufferAbort() because
2562  * transactions that have committed need to be treated differently from aborted
2563  * ones since they may have modified the catalog.
2564  *
2565  * Note that this is only allowed to be called in the moment a transaction
2566  * commit has just been read, not earlier; otherwise later records referring
2567  * to this xid might re-create the transaction incompletely.
2568  */
2569 void
2571 {
2573 
2574  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2575  false);
2576 
2577  /* unknown, nothing to forget */
2578  if (txn == NULL)
2579  return;
2580 
2581  /* For streamed transactions notify the remote node about the abort. */
2582  if (rbtxn_is_streamed(txn))
2583  rb->stream_abort(rb, txn, lsn);
2584 
2585  /* cosmetic... */
2586  txn->final_lsn = lsn;
2587 
2588  /*
2589  * Process cache invalidation messages if there are any. Even if we're not
2590  * interested in the transaction's contents, it could have manipulated the
2591  * catalog and we need to update the caches according to that.
2592  */
2593  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2595  txn->invalidations);
2596  else
2597  Assert(txn->ninvalidations == 0);
2598 
2599  /* remove potential on-disk data, and deallocate */
2600  ReorderBufferCleanupTXN(rb, txn);
2601 }
2602 
2603 /*
2604  * Execute invalidations happening outside the context of a decoded
2605  * transaction. That currently happens either for xid-less commits
2606  * (cf. RecordTransactionCommit()) or for invalidations in uninteresting
2607  * transactions (via ReorderBufferForget()).
2608  */
2609 void
2611  SharedInvalidationMessage *invalidations)
2612 {
2613  bool use_subtxn = IsTransactionOrTransactionBlock();
2614  int i;
2615 
2616  if (use_subtxn)
2617  BeginInternalSubTransaction("replay");
2618 
2619  /*
2620  * Force invalidations to happen outside of a valid transaction - that way
2621  * entries will just be marked as invalid without accessing the catalog.
2622  * That's advantageous because we don't need to setup the full state
2623  * necessary for catalog access.
2624  */
2625  if (use_subtxn)
2627 
2628  for (i = 0; i < ninvalidations; i++)
2629  LocalExecuteInvalidationMessage(&invalidations[i]);
2630 
2631  if (use_subtxn)
2633 }
2634 
2635 /*
2636  * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
2637  * least once for every xid in XLogRecord->xl_xid (other places in records
2638  * may, but do not have to be passed through here).
2639  *
2640  * Reorderbuffer keeps some datastructures about transactions in LSN order,
2641  * for efficiency. To do that it has to know about when transactions are seen
2642  * first in the WAL. As many types of records are not actually interesting for
2643  * logical decoding, they do not necessarily pass though here.
2644  */
2645 void
2647 {
2648  /* many records won't have an xid assigned, centralize check here */
2649  if (xid != InvalidTransactionId)
2650  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2651 }
2652 
2653 /*
2654  * Add a new snapshot to this transaction that may only used after lsn 'lsn'
2655  * because the previous snapshot doesn't describe the catalog correctly for
2656  * following rows.
2657  */
2658 void
2660  XLogRecPtr lsn, Snapshot snap)
2661 {
2663 
2664  change->data.snapshot = snap;
2666 
2667  ReorderBufferQueueChange(rb, xid, lsn, change, false);
2668 }
2669 
2670 /*
2671  * Set up the transaction's base snapshot.
2672  *
2673  * If we know that xid is a subtransaction, set the base snapshot on the
2674  * top-level transaction instead.
2675  */
2676 void
2678  XLogRecPtr lsn, Snapshot snap)
2679 {
2681  bool is_new;
2682 
2683  AssertArg(snap != NULL);
2684 
2685  /*
2686  * Fetch the transaction to operate on. If we know it's a subtransaction,
2687  * operate on its top-level transaction instead.
2688  */
2689  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
2690  if (rbtxn_is_known_subxact(txn))
2691  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
2692  NULL, InvalidXLogRecPtr, false);
2693  Assert(txn->base_snapshot == NULL);
2694 
2695  txn->base_snapshot = snap;
2696  txn->base_snapshot_lsn = lsn;
2698 
2699  AssertTXNLsnOrder(rb);
2700 }
2701 
2702 /*
2703  * Access the catalog with this CommandId at this point in the changestream.
2704  *
2705  * May only be called for command ids > 1
2706  */
2707 void
2709  XLogRecPtr lsn, CommandId cid)
2710 {
2712 
2713  change->data.command_id = cid;
2715 
2716  ReorderBufferQueueChange(rb, xid, lsn, change, false);
2717 }
2718 
2719 /*
2720  * Update memory counters to account for the new or removed change.
2721  *
2722  * We update two counters - in the reorder buffer, and in the transaction
2723  * containing the change. The reorder buffer counter allows us to quickly
2724  * decide if we reached the memory limit, the transaction counter allows
2725  * us to quickly pick the largest transaction for eviction.
2726  *
2727  * When streaming is enabled, we need to update the toplevel transaction
2728  * counters instead - we don't really care about subtransactions as we
2729  * can't stream them individually anyway, and we only pick toplevel
2730  * transactions for eviction. So only toplevel transactions matter.
2731  */
2732 static void
2734  ReorderBufferChange *change,
2735  bool addition)
2736 {
2737  Size sz;
2739  ReorderBufferTXN *toptxn = NULL;
2740 
2741  Assert(change->txn);
2742 
2743  /*
2744  * Ignore tuple CID changes, because those are not evicted when reaching
2745  * memory limit. So we just don't count them, because it might easily
2746  * trigger a pointless attempt to spill.
2747  */
2749  return;
2750 
2751  txn = change->txn;
2752 
2753  /* If streaming supported, update the total size in top level as well. */
2754  if (ReorderBufferCanStream(rb))
2755  {
2756  if (txn->toptxn != NULL)
2757  toptxn = txn->toptxn;
2758  else
2759  toptxn = txn;
2760  }
2761 
2762  sz = ReorderBufferChangeSize(change);
2763 
2764  if (addition)
2765  {
2766  txn->size += sz;
2767  rb->size += sz;
2768 
2769  /* Update the total size in the top transaction. */
2770  if (toptxn)
2771  toptxn->total_size += sz;
2772  }
2773  else
2774  {
2775  Assert((rb->size >= sz) && (txn->size >= sz));
2776  txn->size -= sz;
2777  rb->size -= sz;
2778 
2779  /* Update the total size in the top transaction. */
2780  if (toptxn)
2781  toptxn->total_size -= sz;
2782  }
2783 
2784  Assert(txn->size <= rb->size);
2785 }
2786 
2787 /*
2788  * Add new (relfilenode, tid) -> (cmin, cmax) mappings.
2789  *
2790  * We do not include this change type in memory accounting, because we
2791  * keep CIDs in a separate list and do not evict them when reaching
2792  * the memory limit.
2793  */
2794 void
2796  XLogRecPtr lsn, RelFileNode node,
2797  ItemPointerData tid, CommandId cmin,
2798  CommandId cmax, CommandId combocid)
2799 {
2802 
2803  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2804 
2805  change->data.tuplecid.node = node;
2806  change->data.tuplecid.tid = tid;
2807  change->data.tuplecid.cmin = cmin;
2808  change->data.tuplecid.cmax = cmax;
2809  change->data.tuplecid.combocid = combocid;
2810  change->lsn = lsn;
2811  change->txn = txn;
2813 
2814  dlist_push_tail(&txn->tuplecids, &change->node);
2815  txn->ntuplecids++;
2816 }
2817 
2818 /*
2819  * Setup the invalidation of the toplevel transaction.
2820  *
2821  * This needs to be called for each XLOG_XACT_INVALIDATIONS message and
2822  * accumulates all the invalidation messages in the toplevel transaction as
2823  * well as in the form of change in reorder buffer. We require to record it in
2824  * form of the change so that we can execute only the required invalidations
2825  * instead of executing all the invalidations on each CommandId increment. We
2826  * also need to accumulate these in the toplevel transaction because in some
2827  * cases we skip processing the transaction (see ReorderBufferForget), we need
2828  * to execute all the invalidations together.
2829  */
2830 void
2832  XLogRecPtr lsn, Size nmsgs,
2834 {
2836  MemoryContext oldcontext;
2837  ReorderBufferChange *change;
2838 
2839  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2840 
2841  oldcontext = MemoryContextSwitchTo(rb->context);
2842 
2843  /*
2844  * Collect all the invalidations under the top transaction so that we can
2845  * execute them all together. See comment atop this function
2846  */
2847  if (txn->toptxn)
2848  txn = txn->toptxn;
2849 
2850  Assert(nmsgs > 0);
2851 
2852  /* Accumulate invalidations. */
2853  if (txn->ninvalidations == 0)
2854  {
2855  txn->ninvalidations = nmsgs;
2857  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
2858  memcpy(txn->invalidations, msgs,
2859  sizeof(SharedInvalidationMessage) * nmsgs);
2860  }
2861  else
2862  {
2865  (txn->ninvalidations + nmsgs));
2866 
2867  memcpy(txn->invalidations + txn->ninvalidations, msgs,
2868  nmsgs * sizeof(SharedInvalidationMessage));
2869  txn->ninvalidations += nmsgs;
2870  }
2871 
2872  change = ReorderBufferGetChange(rb);
2874  change->data.inval.ninvalidations = nmsgs;
2875  change->data.inval.invalidations = (SharedInvalidationMessage *)
2876  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
2877  memcpy(change->data.inval.invalidations, msgs,
2878  sizeof(SharedInvalidationMessage) * nmsgs);
2879 
2880  ReorderBufferQueueChange(rb, xid, lsn, change, false);
2881 
2882  MemoryContextSwitchTo(oldcontext);
2883 }
2884 
2885 /*
2886  * Apply all invalidations we know. Possibly we only need parts at this point
2887  * in the changestream but we don't know which those are.
2888  */
2889 static void
2891 {
2892  int i;
2893 
2894  for (i = 0; i < nmsgs; i++)
2896 }
2897 
2898 /*
2899  * Mark a transaction as containing catalog changes
2900  */
2901 void
2903  XLogRecPtr lsn)
2904 {
2906 
2907  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2908 
2910 
2911  /*
2912  * Mark top-level transaction as having catalog changes too if one of its
2913  * children has so that the ReorderBufferBuildTupleCidHash can
2914  * conveniently check just top-level transaction and decide whether to
2915  * build the hash table or not.
2916  */
2917  if (txn->toptxn != NULL)
2919 }
2920 
2921 /*
2922  * Query whether a transaction is already *known* to contain catalog
2923  * changes. This can be wrong until directly before the commit!
2924  */
2925 bool
2927 {
2929 
2930  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2931  false);
2932  if (txn == NULL)
2933  return false;
2934 
2935  return rbtxn_has_catalog_changes(txn);
2936 }
2937 
2938 /*
2939  * ReorderBufferXidHasBaseSnapshot
2940  * Have we already set the base snapshot for the given txn/subtxn?
2941  */
2942 bool
2944 {
2946 
2947  txn = ReorderBufferTXNByXid(rb, xid, false,
2948  NULL, InvalidXLogRecPtr, false);
2949 
2950  /* transaction isn't known yet, ergo no snapshot */
2951  if (txn == NULL)
2952  return false;
2953 
2954  /* a known subtxn? operate on top-level txn instead */
2955  if (rbtxn_is_known_subxact(txn))
2956  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
2957  NULL, InvalidXLogRecPtr, false);
2958 
2959  return txn->base_snapshot != NULL;
2960 }
2961 
2962 
2963 /*
2964  * ---------------------------------------
2965  * Disk serialization support
2966  * ---------------------------------------
2967  */
2968 
2969 /*
2970  * Ensure the IO buffer is >= sz.
2971  */
2972 static void
2974 {
2975  if (!rb->outbufsize)
2976  {
2977  rb->outbuf = MemoryContextAlloc(rb->context, sz);
2978  rb->outbufsize = sz;
2979  }
2980  else if (rb->outbufsize < sz)
2981  {
2982  rb->outbuf = repalloc(rb->outbuf, sz);
2983  rb->outbufsize = sz;
2984  }
2985 }
2986 
2987 /*
2988  * Find the largest transaction (toplevel or subxact) to evict (spill to disk).
2989  *
2990  * XXX With many subtransactions this might be quite slow, because we'll have
2991  * to walk through all of them. There are some options how we could improve
2992  * that: (a) maintain some secondary structure with transactions sorted by
2993  * amount of changes, (b) not looking for the entirely largest transaction,
2994  * but e.g. for transaction using at least some fraction of the memory limit,
2995  * and (c) evicting multiple transactions at once, e.g. to free a given portion
2996  * of the memory limit (e.g. 50%).
2997  */
2998 static ReorderBufferTXN *
3000 {
3001  HASH_SEQ_STATUS hash_seq;
3003  ReorderBufferTXN *largest = NULL;
3004 
3005  hash_seq_init(&hash_seq, rb->by_txn);
3006  while ((ent = hash_seq_search(&hash_seq)) != NULL)
3007  {
3008  ReorderBufferTXN *txn = ent->txn;
3009 
3010  /* if the current transaction is larger, remember it */
3011  if ((!largest) || (txn->size > largest->size))
3012  largest = txn;
3013  }
3014 
3015  Assert(largest);
3016  Assert(largest->size > 0);
3017  Assert(largest->size <= rb->size);
3018 
3019  return largest;
3020 }
3021 
3022 /*
3023  * Find the largest toplevel transaction to evict (by streaming).
3024  *
3025  * This can be seen as an optimized version of ReorderBufferLargestTXN, which
3026  * should give us the same transaction (because we don't update memory account
3027  * for subtransaction with streaming, so it's always 0). But we can simply
3028  * iterate over the limited number of toplevel transactions.
3029  *
3030  * Note that, we skip transactions that contains incomplete changes. There
3031  * is a scope of optimization here such that we can select the largest transaction
3032  * which has complete changes. But that will make the code and design quite complex
3033  * and that might not be worth the benefit. If we plan to stream the transactions
3034  * that contains incomplete changes then we need to find a way to partially
3035  * stream/truncate the transaction changes in-memory and build a mechanism to
3036  * partially truncate the spilled files. Additionally, whenever we partially
3037  * stream the transaction we need to maintain the last streamed lsn and next time
3038  * we need to restore from that segment and the offset in WAL. As we stream the
3039  * changes from the top transaction and restore them subtransaction wise, we need
3040  * to even remember the subxact from where we streamed the last change.
3041  */
3042 static ReorderBufferTXN *
3044 {
3045  dlist_iter iter;
3046  Size largest_size = 0;
3047  ReorderBufferTXN *largest = NULL;
3048 
3049  /* Find the largest top-level transaction. */
3050  dlist_foreach(iter, &rb->toplevel_by_lsn)
3051  {
3053 
3054  txn = dlist_container(ReorderBufferTXN, node, iter.cur);
3055 
3056  if ((largest != NULL || txn->total_size > largest_size) &&
3057  (txn->total_size > 0) && !(rbtxn_has_incomplete_tuple(txn)))
3058  {
3059  largest = txn;
3060  largest_size = txn->total_size;
3061  }
3062  }
3063 
3064  return largest;
3065 }
3066 
3067 /*
3068  * Check whether the logical_decoding_work_mem limit was reached, and if yes
3069  * pick the largest (sub)transaction at-a-time to evict and spill its changes to
3070  * disk until we reach under the memory limit.
3071  *
3072  * XXX At this point we select the transactions until we reach under the memory
3073  * limit, but we might also adapt a more elaborate eviction strategy - for example
3074  * evicting enough transactions to free certain fraction (e.g. 50%) of the memory
3075  * limit.
3076  */
3077 static void
3079 {
3081 
3082  /* bail out if we haven't exceeded the memory limit */
3083  if (rb->size < logical_decoding_work_mem * 1024L)
3084  return;
3085 
3086  /*
3087  * Loop until we reach under the memory limit. One might think that just
3088  * by evicting the largest (sub)transaction we will come under the memory
3089  * limit based on assumption that the selected transaction is at least as
3090  * large as the most recent change (which caused us to go over the memory
3091  * limit). However, that is not true because a user can reduce the
3092  * logical_decoding_work_mem to a smaller value before the most recent
3093  * change.
3094  */
3095  while (rb->size >= logical_decoding_work_mem * 1024L)
3096  {
3097  /*
3098  * Pick the largest transaction (or subtransaction) and evict it from
3099  * memory by streaming, if possible. Otherwise, spill to disk.
3100  */
3102  (txn = ReorderBufferLargestTopTXN(rb)) != NULL)
3103  {
3104  /* we know there has to be one, because the size is not zero */
3105  Assert(txn && !txn->toptxn);
3106  Assert(txn->total_size > 0);
3107  Assert(rb->size >= txn->total_size);
3108 
3109  ReorderBufferStreamTXN(rb, txn);
3110  }
3111  else
3112  {
3113  /*
3114  * Pick the largest transaction (or subtransaction) and evict it
3115  * from memory by serializing it to disk.
3116  */
3117  txn = ReorderBufferLargestTXN(rb);
3118 
3119  /* we know there has to be one, because the size is not zero */
3120  Assert(txn);
3121  Assert(txn->size > 0);
3122  Assert(rb->size >= txn->size);
3123 
3124  ReorderBufferSerializeTXN(rb, txn);
3125  }
3126 
3127  /*
3128  * After eviction, the transaction should have no entries in memory,
3129  * and should use 0 bytes for changes.
3130  */
3131  Assert(txn->size == 0);
3132  Assert(txn->nentries_mem == 0);
3133  }
3134 
3135  /* We must be under the memory limit now. */
3136  Assert(rb->size < logical_decoding_work_mem * 1024L);
3137 }
3138 
3139 /*
3140  * Spill data of a large transaction (and its subtransactions) to disk.
3141  */
3142 static void
3144 {
3145  dlist_iter subtxn_i;
3146  dlist_mutable_iter change_i;
3147  int fd = -1;
3148  XLogSegNo curOpenSegNo = 0;
3149  Size spilled = 0;
3150  Size size = txn->size;
3151 
3152  elog(DEBUG2, "spill %u changes in XID %u to disk",
3153  (uint32) txn->nentries_mem, txn->xid);
3154 
3155  /* do the same to all child TXs */
3156  dlist_foreach(subtxn_i, &txn->subtxns)
3157  {
3158  ReorderBufferTXN *subtxn;
3159 
3160  subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
3161  ReorderBufferSerializeTXN(rb, subtxn);
3162  }
3163 
3164  /* serialize changestream */
3165  dlist_foreach_modify(change_i, &txn->changes)
3166  {
3167  ReorderBufferChange *change;
3168 
3169  change = dlist_container(ReorderBufferChange, node, change_i.cur);
3170 
3171  /*
3172  * store in segment in which it belongs by start lsn, don't split over
3173  * multiple segments tho
3174  */
3175  if (fd == -1 ||
3176  !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
3177  {
3178  char path[MAXPGPATH];
3179 
3180  if (fd != -1)
3181  CloseTransientFile(fd);
3182 
3183  XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
3184 
3185  /*
3186  * No need to care about TLIs here, only used during a single run,
3187  * so each LSN only maps to a specific WAL record.
3188  */
3190  curOpenSegNo);
3191 
3192  /* open segment, create it if necessary */
3193  fd = OpenTransientFile(path,
3194  O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
3195 
3196  if (fd < 0)
3197  ereport(ERROR,
3199  errmsg("could not open file \"%s\": %m", path)));
3200  }
3201 
3202  ReorderBufferSerializeChange(rb, txn, fd, change);
3203  dlist_delete(&change->node);
3204  ReorderBufferReturnChange(rb, change, true);
3205 
3206  spilled++;
3207  }
3208 
3209  /* update the statistics iff we have spilled anything */
3210  if (spilled)
3211  {
3212  rb->spillCount += 1;
3213  rb->spillBytes += size;
3214 
3215  /* don't consider already serialized transactions */
3216  rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
3217  }
3218 
3219  Assert(spilled == txn->nentries_mem);
3220  Assert(dlist_is_empty(&txn->changes));
3221  txn->nentries_mem = 0;
3223 
3224  if (fd != -1)
3225  CloseTransientFile(fd);
3226 }
3227 
3228 /*
3229  * Serialize individual change to disk.
3230  */
3231 static void
3233  int fd, ReorderBufferChange *change)
3234 {
3235  ReorderBufferDiskChange *ondisk;
3236  Size sz = sizeof(ReorderBufferDiskChange);
3237 
3239 
3240  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3241  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
3242 
3243  switch (change->action)
3244  {
3245  /* fall through these, they're all similar enough */
3250  {
3251  char *data;
3252  ReorderBufferTupleBuf *oldtup,
3253  *newtup;
3254  Size oldlen = 0;
3255  Size newlen = 0;
3256 
3257  oldtup = change->data.tp.oldtuple;
3258  newtup = change->data.tp.newtuple;
3259 
3260  if (oldtup)
3261  {
3262  sz += sizeof(HeapTupleData);
3263  oldlen = oldtup->tuple.t_len;
3264  sz += oldlen;
3265  }
3266 
3267  if (newtup)
3268  {
3269  sz += sizeof(HeapTupleData);
3270  newlen = newtup->tuple.t_len;
3271  sz += newlen;
3272  }
3273 
3274  /* make sure we have enough space */
3276 
3277  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3278  /* might have been reallocated above */
3279  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3280 
3281  if (oldlen)
3282  {
3283  memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
3284  data += sizeof(HeapTupleData);
3285 
3286  memcpy(data, oldtup->tuple.t_data, oldlen);
3287  data += oldlen;
3288  }
3289 
3290  if (newlen)
3291  {
3292  memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
3293  data += sizeof(HeapTupleData);
3294 
3295  memcpy(data, newtup->tuple.t_data, newlen);
3296  data += newlen;
3297  }
3298  break;
3299  }
3301  {
3302  char *data;
3303  Size prefix_size = strlen(change->data.msg.prefix) + 1;
3304 
3305  sz += prefix_size + change->data.msg.message_size +
3306  sizeof(Size) + sizeof(Size);
3308 
3309  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3310 
3311  /* might have been reallocated above */
3312  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3313 
3314  /* write the prefix including the size */
3315  memcpy(data, &prefix_size, sizeof(Size));
3316  data += sizeof(Size);
3317  memcpy(data, change->data.msg.prefix,
3318  prefix_size);
3319  data += prefix_size;
3320 
3321  /* write the message including the size */
3322  memcpy(data, &change->data.msg.message_size, sizeof(Size));
3323  data += sizeof(Size);
3324  memcpy(data, change->data.msg.message,
3325  change->data.msg.message_size);
3326  data += change->data.msg.message_size;
3327 
3328  break;
3329  }
3331  {
3332  char *data;
3333  Size inval_size = sizeof(SharedInvalidationMessage) *
3334  change->data.inval.ninvalidations;
3335 
3336  sz += inval_size;
3337 
3339  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3340 
3341  /* might have been reallocated above */
3342  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3343  memcpy(data, change->data.inval.invalidations, inval_size);
3344  data += inval_size;
3345 
3346  break;
3347  }
3349  {
3350  Snapshot snap;
3351  char *data;
3352 
3353  snap = change->data.snapshot;
3354 
3355  sz += sizeof(SnapshotData) +
3356  sizeof(TransactionId) * snap->xcnt +
3357  sizeof(TransactionId) * snap->subxcnt;
3358 
3359  /* make sure we have enough space */
3361  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3362  /* might have been reallocated above */
3363  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3364 
3365  memcpy(data, snap, sizeof(SnapshotData));
3366  data += sizeof(SnapshotData);
3367 
3368  if (snap->xcnt)
3369  {
3370  memcpy(data, snap->xip,
3371  sizeof(TransactionId) * snap->xcnt);
3372  data += sizeof(TransactionId) * snap->xcnt;
3373  }
3374 
3375  if (snap->subxcnt)
3376  {
3377  memcpy(data, snap->subxip,
3378  sizeof(TransactionId) * snap->subxcnt);
3379  data += sizeof(TransactionId) * snap->subxcnt;
3380  }
3381  break;
3382  }
3384  {
3385  Size size;
3386  char *data;
3387 
3388  /* account for the OIDs of truncated relations */
3389  size = sizeof(Oid) * change->data.truncate.nrelids;
3390  sz += size;
3391 
3392  /* make sure we have enough space */
3394 
3395  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3396  /* might have been reallocated above */
3397  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3398 
3399  memcpy(data, change->data.truncate.relids, size);
3400  data += size;
3401 
3402  break;
3403  }
3407  /* ReorderBufferChange contains everything important */
3408  break;
3409  }
3410 
3411  ondisk->size = sz;
3412 
3413  errno = 0;
3415  if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
3416  {
3417  int save_errno = errno;
3418 
3419  CloseTransientFile(fd);
3420 
3421  /* if write didn't set errno, assume problem is no disk space */
3422  errno = save_errno ? save_errno : ENOSPC;
3423  ereport(ERROR,
3425  errmsg("could not write to data file for XID %u: %m",
3426  txn->xid)));
3427  }
3429 
3430  /*
3431  * Keep the transaction's final_lsn up to date with each change we send to
3432  * disk, so that ReorderBufferRestoreCleanup works correctly. (We used to
3433  * only do this on commit and abort records, but that doesn't work if a
3434  * system crash leaves a transaction without its abort record).
3435  *
3436  * Make sure not to move it backwards.
3437  */
3438  if (txn->final_lsn < change->lsn)
3439  txn->final_lsn = change->lsn;
3440 
3441  Assert(ondisk->change.action == change->action);
3442 }
3443 
3444 /* Returns true, if the output plugin supports streaming, false, otherwise. */
3445 static inline bool
3447 {
3449 
3450  return ctx->streaming;
3451 }
3452 
3453 /* Returns true, if the streaming can be started now, false, otherwise. */
3454 static inline bool
3456 {
3458  SnapBuild *builder = ctx->snapshot_builder;
3459 
3460  /*
3461  * We can't start streaming immediately even if the streaming is enabled
3462  * because we previously decoded this transaction and now just are
3463  * restarting.
3464  */
3465  if (ReorderBufferCanStream(rb) &&
3466  !SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))
3467  {
3468  /* We must have a consistent snapshot by this time */
3470  return true;
3471  }
3472 
3473  return false;
3474 }
3475 
3476 /*
3477  * Send data of a large transaction (and its subtransactions) to the
3478  * output plugin, but using the stream API.
3479  */
3480 static void
3482 {
3483  Snapshot snapshot_now;
3484  CommandId command_id;
3485 
3486  /* We can never reach here for a subtransaction. */
3487  Assert(txn->toptxn == NULL);
3488 
3489  /*
3490  * We can't make any assumptions about base snapshot here, similar to what
3491  * ReorderBufferCommit() does. That relies on base_snapshot getting
3492  * transferred from subxact in ReorderBufferCommitChild(), but that was
3493  * not yet called as the transaction is in-progress.
3494  *
3495  * So just walk the subxacts and use the same logic here. But we only need
3496  * to do that once, when the transaction is streamed for the first time.
3497  * After that we need to reuse the snapshot from the previous run.
3498  *
3499  * Unlike DecodeCommit which adds xids of all the subtransactions in
3500  * snapshot's xip array via SnapBuildCommittedTxn, we can't do that here
3501  * but we do add them to subxip array instead via ReorderBufferCopySnap.
3502  * This allows the catalog changes made in subtransactions decoded till
3503  * now to be visible.
3504  */
3505  if (txn->snapshot_now == NULL)
3506  {
3507  dlist_iter subxact_i;
3508 
3509  /* make sure this transaction is streamed for the first time */
3510  Assert(!rbtxn_is_streamed(txn));
3511 
3512  /* at the beginning we should have invalid command ID */
3514 
3515  dlist_foreach(subxact_i, &txn->subtxns)
3516  {
3517  ReorderBufferTXN *subtxn;
3518 
3519  subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
3520  ReorderBufferTransferSnapToParent(txn, subtxn);
3521  }
3522 
3523  /*
3524  * If this transaction has no snapshot, it didn't make any changes to
3525  * the database till now, so there's nothing to decode.
3526  */
3527  if (txn->base_snapshot == NULL)
3528  {
3529  Assert(txn->ninvalidations == 0);
3530  return;
3531  }
3532 
3533  command_id = FirstCommandId;
3534  snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
3535  txn, command_id);
3536  }
3537  else
3538  {
3539  /* the transaction must have been already streamed */
3540  Assert(rbtxn_is_streamed(txn));
3541 
3542  /*
3543  * Nah, we already have snapshot from the previous streaming run. We
3544  * assume new subxacts can't move the LSN backwards, and so can't beat
3545  * the LSN condition in the previous branch (so no need to walk
3546  * through subxacts again). In fact, we must not do that as we may be
3547  * using snapshot half-way through the subxact.
3548  */
3549  command_id = txn->command_id;
3550 
3551  /*
3552  * We can't use txn->snapshot_now directly because after the last
3553  * streaming run, we might have got some new sub-transactions. So we
3554  * need to add them to the snapshot.
3555  */
3556  snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now,
3557  txn, command_id);
3558 
3559  /* Free the previously copied snapshot. */
3560  Assert(txn->snapshot_now->copied);
3562  txn->snapshot_now = NULL;
3563  }
3564 
3565  /* Process and send the changes to output plugin. */
3566  ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
3567  command_id, true);
3568 
3569  Assert(dlist_is_empty(&txn->changes));
3570  Assert(txn->nentries == 0);
3571  Assert(txn->nentries_mem == 0);
3572 }
3573 
3574 /*
3575  * Size of a change in memory.
3576  */
3577 static Size
3579 {
3580  Size sz = sizeof(ReorderBufferChange);
3581 
3582  switch (change->action)
3583  {
3584  /* fall through these, they're all similar enough */
3589  {
3590  ReorderBufferTupleBuf *oldtup,
3591  *newtup;
3592  Size oldlen = 0;
3593  Size newlen = 0;
3594 
3595  oldtup = change->data.tp.oldtuple;
3596  newtup = change->data.tp.newtuple;
3597 
3598  if (oldtup)
3599  {
3600  sz += sizeof(HeapTupleData);
3601  oldlen = oldtup->tuple.t_len;
3602  sz += oldlen;
3603  }
3604 
3605  if (newtup)
3606  {
3607  sz += sizeof(HeapTupleData);
3608  newlen = newtup->tuple.t_len;
3609  sz += newlen;
3610  }
3611 
3612  break;
3613  }
3615  {
3616  Size prefix_size = strlen(change->data.msg.prefix) + 1;
3617 
3618  sz += prefix_size + change->data.msg.message_size +
3619  sizeof(Size) + sizeof(Size);
3620 
3621  break;
3622  }
3624  {
3625  sz += sizeof(SharedInvalidationMessage) *
3626  change->data.inval.ninvalidations;
3627  break;
3628  }
3630  {
3631  Snapshot snap;
3632 
3633  snap = change->data.snapshot;
3634 
3635  sz += sizeof(SnapshotData) +
3636  sizeof(TransactionId) * snap->xcnt +
3637  sizeof(TransactionId) * snap->subxcnt;
3638 
3639  break;
3640  }
3642  {
3643  sz += sizeof(Oid) * change->data.truncate.nrelids;
3644 
3645  break;
3646  }
3650  /* ReorderBufferChange contains everything important */
3651  break;
3652  }
3653 
3654  return sz;
3655 }
3656 
3657 
3658 /*
3659  * Restore a number of changes spilled to disk back into memory.
3660  */
3661 static Size
3663  TXNEntryFile *file, XLogSegNo *segno)
3664 {
3665  Size restored = 0;
3666  XLogSegNo last_segno;
3667  dlist_mutable_iter cleanup_iter;
3668  File *fd = &file->vfd;
3669 
3672 
3673  /* free current entries, so we have memory for more */
3674  dlist_foreach_modify(cleanup_iter, &txn->changes)
3675  {
3677  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
3678 
3679  dlist_delete(&cleanup->node);
3680  ReorderBufferReturnChange(rb, cleanup, true);
3681  }
3682  txn->nentries_mem = 0;
3683  Assert(dlist_is_empty(&txn->changes));
3684 
3685  XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
3686 
3687  while (restored < max_changes_in_memory && *segno <= last_segno)
3688  {
3689  int readBytes;
3690  ReorderBufferDiskChange *ondisk;
3691 
3692  if (*fd == -1)
3693  {
3694  char path[MAXPGPATH];
3695 
3696  /* first time in */
3697  if (*segno == 0)
3698  XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
3699 
3700  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
3701 
3702  /*
3703  * No need to care about TLIs here, only used during a single run,
3704  * so each LSN only maps to a specific WAL record.
3705  */
3707  *segno);
3708 
3709  *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
3710 
3711  /* No harm in resetting the offset even in case of failure */
3712  file->curOffset = 0;
3713 
3714  if (*fd < 0 && errno == ENOENT)
3715  {
3716  *fd = -1;
3717  (*segno)++;
3718  continue;
3719  }
3720  else if (*fd < 0)
3721  ereport(ERROR,
3723  errmsg("could not open file \"%s\": %m",
3724  path)));
3725  }
3726 
3727  /*
3728  * Read the statically sized part of a change which has information
3729  * about the total size. If we couldn't read a record, we're at the
3730  * end of this file.
3731  */
3733  readBytes = FileRead(file->vfd, rb->outbuf,
3734  sizeof(ReorderBufferDiskChange),
3736 
3737  /* eof */
3738  if (readBytes == 0)
3739  {
3740  FileClose(*fd);
3741  *fd = -1;
3742  (*segno)++;
3743  continue;
3744  }
3745  else if (readBytes < 0)
3746  ereport(ERROR,
3748  errmsg("could not read from reorderbuffer spill file: %m")));
3749  else if (readBytes != sizeof(ReorderBufferDiskChange))
3750  ereport(ERROR,
3752  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
3753  readBytes,
3754  (uint32) sizeof(ReorderBufferDiskChange))));
3755 
3756  file->curOffset += readBytes;
3757 
3758  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3759 
3761  sizeof(ReorderBufferDiskChange) + ondisk->size);
3762  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3763 
3764  readBytes = FileRead(file->vfd,
3765  rb->outbuf + sizeof(ReorderBufferDiskChange),
3766  ondisk->size - sizeof(ReorderBufferDiskChange),
3767  file->curOffset,
3769 
3770  if (readBytes < 0)
3771  ereport(ERROR,
3773  errmsg("could not read from reorderbuffer spill file: %m")));
3774  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
3775  ereport(ERROR,
3777  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
3778  readBytes,
3779  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
3780 
3781  file->curOffset += readBytes;
3782 
3783  /*
3784  * ok, read a full change from disk, now restore it into proper
3785  * in-memory format
3786  */
3787  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
3788  restored++;
3789  }
3790 
3791  return restored;
3792 }
3793 
3794 /*
3795  * Convert change from its on-disk format to in-memory format and queue it onto
3796  * the TXN's ->changes list.
3797  *
3798  * Note: although "data" is declared char*, at entry it points to a
3799  * maxalign'd buffer, making it safe in most of this function to assume
3800  * that the pointed-to data is suitably aligned for direct access.
3801  */
3802 static void
3804  char *data)
3805 {
3806  ReorderBufferDiskChange *ondisk;
3807  ReorderBufferChange *change;
3808 
3809  ondisk = (ReorderBufferDiskChange *) data;
3810 
3811  change = ReorderBufferGetChange(rb);
3812 
3813  /* copy static part */
3814  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
3815 
3816  data += sizeof(ReorderBufferDiskChange);
3817 
3818  /* restore individual stuff */
3819  switch (change->action)
3820  {
3821  /* fall through these, they're all similar enough */
3826  if (change->data.tp.oldtuple)
3827  {
3828  uint32 tuplelen = ((HeapTuple) data)->t_len;
3829 
3830  change->data.tp.oldtuple =
3832 
3833  /* restore ->tuple */
3834  memcpy(&change->data.tp.oldtuple->tuple, data,
3835  sizeof(HeapTupleData));
3836  data += sizeof(HeapTupleData);
3837 
3838  /* reset t_data pointer into the new tuplebuf */
3839  change->data.tp.oldtuple->tuple.t_data =
3840  ReorderBufferTupleBufData(change->data.tp.oldtuple);
3841 
3842  /* restore tuple data itself */
3843  memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
3844  data += tuplelen;
3845  }
3846 
3847  if (change->data.tp.newtuple)
3848  {
3849  /* here, data might not be suitably aligned! */
3850  uint32 tuplelen;
3851 
3852  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
3853  sizeof(uint32));
3854 
3855  change->data.tp.newtuple =
3857 
3858  /* restore ->tuple */
3859  memcpy(&change->data.tp.newtuple->tuple, data,
3860  sizeof(HeapTupleData));
3861  data += sizeof(HeapTupleData);
3862 
3863  /* reset t_data pointer into the new tuplebuf */
3864  change->data.tp.newtuple->tuple.t_data =
3865  ReorderBufferTupleBufData(change->data.tp.newtuple);
3866 
3867  /* restore tuple data itself */
3868  memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
3869  data += tuplelen;
3870  }
3871 
3872  break;
3874  {
3875  Size prefix_size;
3876 
3877  /* read prefix */
3878  memcpy(&prefix_size, data, sizeof(Size));
3879  data += sizeof(Size);
3880  change->data.msg.prefix = MemoryContextAlloc(rb->context,
3881  prefix_size);
3882  memcpy(change->data.msg.prefix, data, prefix_size);
3883  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
3884  data += prefix_size;
3885 
3886  /* read the message */
3887  memcpy(&change->data.msg.message_size, data, sizeof(Size));
3888  data += sizeof(Size);
3889  change->data.msg.message = MemoryContextAlloc(rb->context,
3890  change->data.msg.message_size);
3891  memcpy(change->data.msg.message, data,
3892  change->data.msg.message_size);
3893  data += change->data.msg.message_size;
3894 
3895  break;
3896  }
3898  {
3899  Size inval_size = sizeof(SharedInvalidationMessage) *
3900  change->data.inval.ninvalidations;
3901 
3902  change->data.inval.invalidations =
3903  MemoryContextAlloc(rb->context, inval_size);
3904 
3905  /* read the message */
3906  memcpy(change->data.inval.invalidations, data, inval_size);
3907 
3908  break;
3909  }
3911  {
3912  Snapshot oldsnap;
3913  Snapshot newsnap;
3914  Size size;
3915 
3916  oldsnap = (Snapshot) data;
3917 
3918  size = sizeof(SnapshotData) +
3919  sizeof(TransactionId) * oldsnap->xcnt +
3920  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
3921 
3922  change->data.snapshot = MemoryContextAllocZero(rb->context, size);
3923 
3924  newsnap = change->data.snapshot;
3925 
3926  memcpy(newsnap, data, size);
3927  newsnap->xip = (TransactionId *)
3928  (((char *) newsnap) + sizeof(SnapshotData));
3929  newsnap->subxip = newsnap->xip + newsnap->xcnt;
3930  newsnap->copied = true;
3931  break;
3932  }
3933  /* the base struct contains all the data, easy peasy */
3935  {
3936  Oid *relids;
3937 
3938  relids = ReorderBufferGetRelids(rb,
3939  change->data.truncate.nrelids);
3940  memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
3941  change->data.truncate.relids = relids;
3942 
3943  break;
3944  }
3948  break;
3949  }
3950 
3951  dlist_push_tail(&txn->changes, &change->node);
3952  txn->nentries_mem++;
3953 
3954  /*
3955  * Update memory accounting for the restored change. We need to do this
3956  * although we don't check the memory limit when restoring the changes in
3957  * this branch (we only do that when initially queueing the changes after
3958  * decoding), because we will release the changes later, and that will
3959  * update the accounting too (subtracting the size from the counters). And
3960  * we don't want to underflow there.
3961  */
3962  ReorderBufferChangeMemoryUpdate(rb, change, true);
3963 }
3964 
3965 /*
3966  * Remove all on-disk stored for the passed in transaction.
3967  */
3968 static void
3970 {
3971  XLogSegNo first;
3972  XLogSegNo cur;
3973  XLogSegNo last;
3974 
3977 
3978  XLByteToSeg(txn->first_lsn, first, wal_segment_size);
3979  XLByteToSeg(txn->final_lsn, last, wal_segment_size);
3980 
3981  /* iterate over all possible filenames, and delete them */
3982  for (cur = first; cur <= last; cur++)
3983  {
3984  char path[MAXPGPATH];
3985 
3987  if (unlink(path) != 0 && errno != ENOENT)
3988  ereport(ERROR,
3990  errmsg("could not remove file \"%s\": %m", path)));
3991  }
3992 }
3993 
3994 /*
3995  * Remove any leftover serialized reorder buffers from a slot directory after a
3996  * prior crash or decoding session exit.
3997  */
3998 static void
4000 {
4001  DIR *spill_dir;
4002  struct dirent *spill_de;
4003  struct stat statbuf;
4004  char path[MAXPGPATH * 2 + 12];
4005 
4006  sprintf(path, "pg_replslot/%s", slotname);
4007 
4008  /* we're only handling directories here, skip if it's not ours */
4009  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
4010  return;
4011 
4012  spill_dir = AllocateDir(path);
4013  while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
4014  {
4015  /* only look at names that can be ours */
4016  if (strncmp(spill_de->d_name, "xid", 3) == 0)
4017  {
4018  snprintf(path, sizeof(path),
4019  "pg_replslot/%s/%s", slotname,
4020  spill_de->d_name);
4021 
4022  if (unlink(path) != 0)
4023  ereport(ERROR,
4025  errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
4026  path, slotname)));
4027  }
4028  }
4029  FreeDir(spill_dir);
4030 }
4031 
4032 /*
4033  * Given a replication slot, transaction ID and segment number, fill in the
4034  * corresponding spill file into 'path', which is a caller-owned buffer of size
4035  * at least MAXPGPATH.
4036  */
4037 static void
4039  XLogSegNo segno)
4040 {
4041  XLogRecPtr recptr;
4042 
4043  XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
4044 
4045  snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.spill",
4047  xid,
4048  (uint32) (recptr >> 32), (uint32) recptr);
4049 }
4050 
4051 /*
4052  * Delete all data spilled to disk after we've restarted/crashed. It will be
4053  * recreated when the respective slots are reused.
4054  */
4055 void
4057 {
4058  DIR *logical_dir;
4059  struct dirent *logical_de;
4060 
4061  logical_dir = AllocateDir("pg_replslot");
4062  while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
4063  {
4064  if (strcmp(logical_de->d_name, ".") == 0 ||
4065  strcmp(logical_de->d_name, "..") == 0)
4066  continue;
4067 
4068  /* if it cannot be a slot, skip the directory */
4069  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
4070  continue;
4071 
4072  /*
4073  * ok, has to be a surviving logical slot, iterate and delete
4074  * everything starting with xid-*
4075  */
4077  }
4078  FreeDir(logical_dir);
4079 }
4080 
4081 /* ---------------------------------------
4082  * toast reassembly support
4083  * ---------------------------------------
4084  */
4085 
4086 /*
4087  * Initialize per tuple toast reconstruction support.
4088  */
4089 static void
4091 {
4092  HASHCTL hash_ctl;
4093 
4094  Assert(txn->toast_hash == NULL);
4095 
4096  memset(&hash_ctl, 0, sizeof(hash_ctl));
4097  hash_ctl.keysize = sizeof(Oid);
4098  hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
4099  hash_ctl.hcxt = rb->context;
4100  txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
4102 }
4103 
4104 /*
4105  * Per toast-chunk handling for toast reconstruction
4106  *
4107  * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
4108  * toasted Datum comes along.
4109  */
4110 static void
4112  Relation relation, ReorderBufferChange *change)
4113 {
4114  ReorderBufferToastEnt *ent;
4115  ReorderBufferTupleBuf *newtup;
4116  bool found;
4117  int32 chunksize;
4118  bool isnull;
4119  Pointer chunk;
4120  TupleDesc desc = RelationGetDescr(relation);
4121  Oid chunk_id;
4122  int32 chunk_seq;
4123 
4124  if (txn->toast_hash == NULL)
4125  ReorderBufferToastInitHash(rb, txn);
4126 
4127  Assert(IsToastRelation(relation));
4128 
4129  newtup = change->data.tp.newtuple;
4130  chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
4131  Assert(!isnull);
4132  chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
4133  Assert(!isnull);
4134 
4135  ent = (ReorderBufferToastEnt *)
4136  hash_search(txn->toast_hash,
4137  (void *) &chunk_id,
4138  HASH_ENTER,
4139  &found);
4140 
4141  if (!found)
4142  {
4143  Assert(ent->chunk_id == chunk_id);
4144  ent->num_chunks = 0;
4145  ent->last_chunk_seq = 0;
4146  ent->size = 0;
4147  ent->reconstructed = NULL;
4148  dlist_init(&ent->chunks);
4149 
4150  if (chunk_seq != 0)
4151  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
4152  chunk_seq, chunk_id);
4153  }
4154  else if (found && chunk_seq != ent->last_chunk_seq + 1)
4155  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
4156  chunk_seq, chunk_id, ent->last_chunk_seq + 1);
4157 
4158  chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
4159  Assert(!isnull);
4160 
4161  /* calculate size so we can allocate the right size at once later */
4162  if (!VARATT_IS_EXTENDED(chunk))
4163  chunksize = VARSIZE(chunk) - VARHDRSZ;
4164  else if (VARATT_IS_SHORT(chunk))
4165  /* could happen due to heap_form_tuple doing its thing */
4166  chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
4167  else
4168  elog(ERROR, "unexpected type of toast chunk");
4169 
4170  ent->size += chunksize;
4171  ent->last_chunk_seq = chunk_seq;
4172  ent->num_chunks++;
4173  dlist_push_tail(&ent->chunks, &change->node);
4174 }
4175 
4176 /*
4177  * Rejigger change->newtuple to point to in-memory toast tuples instead to
4178  * on-disk toast tuples that may not longer exist (think DROP TABLE or VACUUM).
4179  *
4180  * We cannot replace unchanged toast tuples though, so those will still point
4181  * to on-disk toast data.
4182  *
4183  * While updating the existing change with detoasted tuple data, we need to
4184  * update the memory accounting info, because the change size will differ.
4185  * Otherwise the accounting may get out of sync, triggering serialization
4186  * at unexpected times.
4187  *
4188  * We simply subtract size of the change before rejiggering the tuple, and
4189  * then adding the new size. This makes it look like the change was removed
4190  * and then added back, except it only tweaks the accounting info.
4191  *
4192  * In particular it can't trigger serialization, which would be pointless
4193  * anyway as it happens during commit processing right before handing
4194  * the change to the output plugin.
4195  */
4196 static void
4198  Relation relation, ReorderBufferChange *change)
4199 {
4200  TupleDesc desc;
4201  int natt;
4202  Datum *attrs;
4203  bool *isnull;
4204  bool *free;
4205  HeapTuple tmphtup;
4206  Relation toast_rel;
4207  TupleDesc toast_desc;
4208  MemoryContext oldcontext;
4209  ReorderBufferTupleBuf *newtup;
4210 
4211  /* no toast tuples changed */
4212  if (txn->toast_hash == NULL)
4213  return;
4214 
4215  /*
4216  * We're going to modify the size of the change, so to make sure the
4217  * accounting is correct we'll make it look like we're removing the change
4218  * now (with the old size), and then re-add it at the end.
4219  */
4220  ReorderBufferChangeMemoryUpdate(rb, change, false);
4221 
4222  oldcontext = MemoryContextSwitchTo(rb->context);
4223 
4224  /* we should only have toast tuples in an INSERT or UPDATE */
4225  Assert(change->data.tp.newtuple);
4226 
4227  desc = RelationGetDescr(relation);
4228 
4229  toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
4230  if (!RelationIsValid(toast_rel))
4231  elog(ERROR, "could not open relation with OID %u",
4232  relation->rd_rel->reltoastrelid);
4233 
4234  toast_desc = RelationGetDescr(toast_rel);
4235 
4236  /* should we allocate from stack instead? */
4237  attrs = palloc0(sizeof(Datum) * desc->natts);
4238  isnull = palloc0(sizeof(bool) * desc->natts);
4239  free = palloc0(sizeof(bool) * desc->natts);
4240 
4241  newtup = change->data.tp.newtuple;
4242 
4243  heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
4244 
4245  for (natt = 0; natt < desc->natts; natt++)
4246  {
4247  Form_pg_attribute attr = TupleDescAttr(desc, natt);
4248  ReorderBufferToastEnt *ent;
4249  struct varlena *varlena;
4250 
4251  /* va_rawsize is the size of the original datum -- including header */
4252  struct varatt_external toast_pointer;
4253  struct varatt_indirect redirect_pointer;
4254  struct varlena *new_datum = NULL;
4255  struct varlena *reconstructed;
4256  dlist_iter it;
4257  Size data_done = 0;
4258 
4259  /* system columns aren't toasted */
4260  if (attr->attnum < 0)
4261  continue;
4262 
4263  if (attr->attisdropped)
4264  continue;
4265 
4266  /* not a varlena datatype */
4267  if (attr->attlen != -1)
4268  continue;
4269 
4270  /* no data */
4271  if (isnull[natt])
4272  continue;
4273 
4274  /* ok, we know we have a toast datum */
4275  varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
4276 
4277  /* no need to do anything if the tuple isn't external */
4278  if (!VARATT_IS_EXTERNAL(varlena))
4279  continue;
4280 
4281  VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
4282 
4283  /*
4284  * Check whether the toast tuple changed, replace if so.
4285  */
4286  ent = (ReorderBufferToastEnt *)
4287  hash_search(txn->toast_hash,
4288  (void *) &toast_pointer.va_valueid,
4289  HASH_FIND,
4290  NULL);
4291  if (ent == NULL)
4292  continue;
4293 
4294  new_datum =
4295  (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
4296 
4297  free[natt] = true;
4298 
4299  reconstructed = palloc0(toast_pointer.va_rawsize);
4300 
4301  ent->reconstructed = reconstructed;
4302 
4303  /* stitch toast tuple back together from its parts */
4304  dlist_foreach(it, &ent->chunks)
4305  {
4306  bool isnull;
4307  ReorderBufferChange *cchange;
4308  ReorderBufferTupleBuf *ctup;
4309  Pointer chunk;
4310 
4311  cchange = dlist_container(ReorderBufferChange, node, it.cur);
4312  ctup = cchange->data.tp.newtuple;
4313  chunk = DatumGetPointer(fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
4314 
4315  Assert(!isnull);
4316  Assert(!VARATT_IS_EXTERNAL(chunk));
4317  Assert(!VARATT_IS_SHORT(chunk));
4318 
4319  memcpy(VARDATA(reconstructed) + data_done,
4320  VARDATA(chunk),
4321  VARSIZE(chunk) - VARHDRSZ);
4322  data_done += VARSIZE(chunk) - VARHDRSZ;
4323  }
4324  Assert(data_done == toast_pointer.va_extsize);
4325 
4326  /* make sure its marked as compressed or not */
4327  if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
4328  SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
4329  else
4330  SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
4331 
4332  memset(&redirect_pointer, 0, sizeof(redirect_pointer));
4333  redirect_pointer.pointer = reconstructed;
4334 
4336  memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
4337  sizeof(redirect_pointer));
4338 
4339  attrs[natt] = PointerGetDatum(new_datum);
4340  }
4341 
4342  /*
4343  * Build tuple in separate memory & copy tuple back into the tuplebuf
4344  * passed to the output plugin. We can't directly heap_fill_tuple() into
4345  * the tuplebuf because attrs[] will point back into the current content.
4346  */
4347  tmphtup = heap_form_tuple(desc, attrs, isnull);
4348  Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
4349  Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
4350 
4351  memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
4352  newtup->tuple.t_len = tmphtup->t_len;
4353 
4354  /*
4355  * free resources we won't further need, more persistent stuff will be
4356  * free'd in ReorderBufferToastReset().
4357  */
4358  RelationClose(toast_rel);
4359  pfree(tmphtup);
4360  for (natt = 0; natt < desc->natts; natt++)
4361  {
4362  if (free[natt])
4363  pfree(DatumGetPointer(attrs[natt]));
4364  }
4365  pfree(attrs);
4366  pfree(free);
4367  pfree(isnull);
4368 
4369  MemoryContextSwitchTo(oldcontext);
4370 
4371  /* now add the change back, with the correct size */
4372  ReorderBufferChangeMemoryUpdate(rb, change, true);
4373 }
4374 
4375 /*
4376  * Free all resources allocated for toast reconstruction.
4377  */
4378 static void
4380 {
4381  HASH_SEQ_STATUS hstat;
4382  ReorderBufferToastEnt *ent;
4383 
4384  if (txn->toast_hash == NULL)
4385  return;
4386 
4387  /* sequentially walk over the hash and free everything */
4388  hash_seq_init(&hstat, txn->toast_hash);
4389  while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
4390  {
4391  dlist_mutable_iter it;
4392 
4393  if (ent->reconstructed != NULL)
4394  pfree(ent->reconstructed);
4395 
4396  dlist_foreach_modify(it, &ent->chunks)
4397  {
4398  ReorderBufferChange *change =
4400 
4401  dlist_delete(&change->node);
4402  ReorderBufferReturnChange(rb, change, true);
4403  }
4404  }
4405 
4406  hash_destroy(txn->toast_hash);
4407  txn->toast_hash = NULL;
4408 }
4409 
4410 
4411 /* ---------------------------------------
4412  * Visibility support for logical decoding
4413  *
4414  *
4415  * Lookup actual cmin/cmax values when using decoding snapshot. We can't
4416  * always rely on stored cmin/cmax values because of two scenarios:
4417  *
4418  * * A tuple got changed multiple times during a single transaction and thus
4419  * has got a combocid. Combocid's are only valid for the duration of a
4420  * single transaction.
4421  * * A tuple with a cmin but no cmax (and thus no combocid) got
4422  * deleted/updated in another transaction than the one which created it
4423  * which we are looking at right now. As only one of cmin, cmax or combocid
4424  * is actually stored in the heap we don't have access to the value we
4425  * need anymore.
4426  *
4427  * To resolve those problems we have a per-transaction hash of (cmin,
4428  * cmax) tuples keyed by (relfilenode, ctid) which contains the actual
4429  * (cmin, cmax) values. That also takes care of combocids by simply
4430  * not caring about them at all. As we have the real cmin/cmax values
4431  * combocids aren't interesting.
4432  *
4433  * As we only care about catalog tuples here the overhead of this
4434  * hashtable should be acceptable.
4435  *
4436  * Heap rewrites complicate this a bit, check rewriteheap.c for
4437  * details.
4438  * -------------------------------------------------------------------------
4439  */
4440 
4441 /* struct for sorting mapping files by LSN efficiently */
4442 typedef struct RewriteMappingFile
4443 {
4445  char fname[MAXPGPATH];
4447 
4448 #ifdef NOT_USED
4449 static void
4450 DisplayMapping(HTAB *tuplecid_data)
4451 {
4452  HASH_SEQ_STATUS hstat;
4454 
4455  hash_seq_init(&hstat, tuplecid_data);
4456  while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
4457  {
4458  elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
4459  ent->key.relnode.dbNode,
4460  ent->key.relnode.spcNode,
4461  ent->key.relnode.relNode,
4464  ent->cmin,
4465  ent->cmax
4466  );
4467  }
4468 }
4469 #endif
4470 
4471 /*
4472  * Apply a single mapping file to tuplecid_data.
4473  *
4474  * The mapping file has to have been verified to be a) committed b) for our
4475  * transaction c) applied in LSN order.
4476  */
4477 static void
4478 ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
4479 {
4480  char path[MAXPGPATH];
4481  int fd;
4482  int readBytes;
4484 
4485  sprintf(path, "pg_logical/mappings/%s", fname);
4486  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
4487  if (fd < 0)
4488  ereport(ERROR,
4490  errmsg("could not open file \"%s\": %m", path)));
4491 
4492  while (true)
4493  {
4496  ReorderBufferTupleCidEnt *new_ent;
4497  bool found;
4498 
4499  /* be careful about padding */
4500  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
4501 
4502  /* read all mappings till the end of the file */
4504  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
4506 
4507  if (readBytes < 0)
4508  ereport(ERROR,
4510  errmsg("could not read file \"%s\": %m",
4511  path)));
4512  else if (readBytes == 0) /* EOF */
4513  break;
4514  else if (readBytes != sizeof(LogicalRewriteMappingData))
4515  ereport(ERROR,
4517  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
4518  path, readBytes,
4519  (int32) sizeof(LogicalRewriteMappingData))));
4520 
4521  key.relnode = map.old_node;
4522  ItemPointerCopy(&map.old_tid,
4523  &key.tid);
4524 
4525 
4526  ent = (ReorderBufferTupleCidEnt *)
4527  hash_search(tuplecid_data,
4528  (void *) &key,
4529  HASH_FIND,
4530  NULL);
4531 
4532  /* no existing mapping, no need to update */
4533  if (!ent)
4534  continue;
4535 
4536  key.relnode = map.new_node;
4537  ItemPointerCopy(&map.new_tid,
4538  &key.tid);
4539 
4540  new_ent = (ReorderBufferTupleCidEnt *)
4541  hash_search(tuplecid_data,
4542  (void *) &key,
4543  HASH_ENTER,
4544  &found);
4545 
4546  if (found)
4547  {
4548  /*
4549  * Make sure the existing mapping makes sense. We sometime update
4550  * old records that did not yet have a cmax (e.g. pg_class' own
4551  * entry while rewriting it) during rewrites, so allow that.
4552  */
4553  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
4554  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
4555  }
4556  else
4557  {
4558  /* update mapping */
4559  new_ent->cmin = ent->cmin;
4560  new_ent->cmax = ent->cmax;
4561  new_ent->combocid = ent->combocid;
4562  }
4563  }
4564 
4565  if (CloseTransientFile(fd) != 0)
4566  ereport(ERROR,
4568  errmsg("could not close file \"%s\": %m", path)));
4569 }
4570 
4571 
4572 /*
4573  * Check whether the TransactionId 'xid' is in the pre-sorted array 'xip'.
4574  */
4575 static bool
4577 {
4578  return bsearch(&xid, xip, num,
4579  sizeof(TransactionId), xidComparator) != NULL;
4580 }
4581 
4582 /*
4583  * list_sort() comparator for sorting RewriteMappingFiles in LSN order.
4584  */
4585 static int
4586 file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
4587 {
4590 
4591  if (a->lsn < b->lsn)
4592  return -1;
4593  else if (a->lsn > b->lsn)
4594  return 1;
4595  return 0;
4596 }
4597 
4598 /*
4599  * Apply any existing logical remapping files if there are any targeted at our
4600  * transaction for relid.
4601  */
4602 static void
4604 {
4605  DIR *mapping_dir;
4606  struct dirent *mapping_de;
4607  List *files = NIL;
4608  ListCell *file;
4609  Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
4610 
4611  mapping_dir = AllocateDir("pg_logical/mappings");
4612  while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
4613  {
4614  Oid f_dboid;
4615  Oid f_relid;
4616  TransactionId f_mapped_xid;
4617  TransactionId f_create_xid;
4618  XLogRecPtr f_lsn;
4619  uint32 f_hi,
4620  f_lo;
4621  RewriteMappingFile *f;
4622 
4623  if (strcmp(mapping_de->d_name, ".") == 0 ||
4624  strcmp(mapping_de->d_name, "..") == 0)
4625  continue;
4626 
4627  /* Ignore files that aren't ours */
4628  if (strncmp(mapping_de->d_name, "map-", 4) != 0)
4629  continue;
4630 
4631  if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
4632  &f_dboid, &f_relid, &f_hi, &f_lo,
4633  &f_mapped_xid, &f_create_xid) != 6)
4634  elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
4635 
4636  f_lsn = ((uint64) f_hi) << 32 | f_lo;
4637 
4638  /* mapping for another database */
4639  if (f_dboid != dboid)
4640  continue;
4641 
4642  /* mapping for another relation */
4643  if (f_relid != relid)
4644  continue;
4645 
4646  /* did the creating transaction abort? */
4647  if (!TransactionIdDidCommit(f_create_xid))
4648  continue;
4649 
4650  /* not for our transaction */
4651  if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
4652  continue;
4653 
4654  /* ok, relevant, queue for apply */
4655  f = palloc(sizeof(RewriteMappingFile));
4656  f->lsn = f_lsn;
4657  strcpy(f->fname, mapping_de->d_name);
4658  files = lappend(files, f);
4659  }
4660  FreeDir(mapping_dir);
4661 
4662  /* sort files so we apply them in LSN order */
4663  list_sort(files, file_sort_by_lsn);
4664 
4665  foreach(file, files)
4666  {
4668 
4669  elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
4670  snapshot->subxip[0]);
4671  ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
4672  pfree(f);
4673  }
4674 }
4675 
4676 /*
4677  * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
4678  * combocids.
4679  */
4680 bool
4682  Snapshot snapshot,
4683  HeapTuple htup, Buffer buffer,
4684  CommandId *cmin, CommandId *cmax)
4685 {
4688  ForkNumber forkno;
4689  BlockNumber blockno;
4690  bool updated_mapping = false;
4691 
4692  /*
4693  * Return unresolved if tuplecid_data is not valid. That's because when
4694  * streaming in-progress transactions we may run into tuples with the CID
4695  * before actually decoding them. Think e.g. about INSERT followed by
4696  * TRUNCATE, where the TRUNCATE may not be decoded yet when applying the
4697  * INSERT. So in such cases, we assume the CID is from the future
4698  * command.
4699  */
4700  if (tuplecid_data == NULL)
4701  return false;
4702 
4703  /* be careful about padding */
4704  memset(&key, 0, sizeof(key));
4705 
4706  Assert(!BufferIsLocal(buffer));
4707 
4708  /*
4709  * get relfilenode from the buffer, no convenient way to access it other
4710  * than that.
4711  */
4712  BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
4713 
4714  /* tuples can only be in the main fork */
4715  Assert(forkno == MAIN_FORKNUM);
4716  Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
4717 
4718  ItemPointerCopy(&htup->t_self,
4719  &key.tid);
4720 
4721 restart:
4722  ent = (ReorderBufferTupleCidEnt *)
4723  hash_search(tuplecid_data,
4724  (void *) &key,
4725  HASH_FIND,
4726  NULL);
4727 
4728  /*
4729  * failed to find a mapping, check whether the table was rewritten and
4730  * apply mapping if so, but only do that once - there can be no new
4731  * mappings while we are in here since we have to hold a lock on the
4732  * relation.
4733  */
4734  if (ent == NULL && !updated_mapping)
4735  {
4736  UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
4737  /* now check but don't update for a mapping again */
4738  updated_mapping = true;
4739  goto restart;
4740  }
4741  else if (ent == NULL)
4742  return false;
4743 
4744  if (cmin)
4745  *cmin = ent->cmin;
4746  if (cmax)
4747  *cmax = ent->cmax;
4748  return true;
4749 }
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
XLogRecPtr first_lsn
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
#define NIL
Definition: pg_list.h:65
uint32 CommandId
Definition: c.h:535
TimestampTz commit_time
struct ReorderBufferToastEnt ReorderBufferToastEnt
struct TXNEntryFile TXNEntryFile
void AbortCurrentTransaction(void)
Definition: xact.c:3211
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
bool IsToastRelation(Relation relation)
Definition: catalog.c:140
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:827
void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
#define relpathperm(rnode, forknum)
Definition: relpath.h:83
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
ReorderBufferApplyChangeCB apply_change
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:212
#define AllocSetContextCreate
Definition: memutils.h:170
HeapTupleData * HeapTuple
Definition: htup.h:71
void * private_data
dlist_node base_snapshot_node
#define DEBUG1
Definition: elog.h:25
dlist_node * cur
Definition: ilist.h:180
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
void StartupReorderBuffer(void)
#define VARDATA(PTR)
Definition: postgres.h:302
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define fastgetattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:712
static int32 next
Definition: blutils.c:219
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1437
#define RBTXN_HAS_TOAST_INSERT
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
#define HASH_CONTEXT
Definition: hsearch.h:91
#define HASH_ELEM
Definition: hsearch.h:85
int wal_segment_size
Definition: xlog.c:117
void ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
uint32 TransactionId
Definition: c.h:521
void ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
bool copied
Definition: snapshot.h:185
bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
Definition: snapbuild.c:404
void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
#define RBTXN_IS_STREAMED
MemoryContext hcxt
Definition: hsearch.h:77
#define DatumGetInt32(X)
Definition: postgres.h:472
int sqlerrcode
Definition: elog.h:364
#define RelationGetDescr(relation)
Definition: rel.h:482
static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb)
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
#define DEBUG3
Definition: elog.h:23
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
Definition: snapbuild.c:395
#define write(a, b, c)
Definition: win32.h:14
#define VARHDRSZ_SHORT
Definition: postgres.h:268
TransactionId by_txn_last_xid
#define VARSIZE(PTR)
Definition: postgres.h:303
ErrorData * CopyErrorData(void)
Definition: elog.c:1471
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
Definition: detoast.h:32
int64 TimestampTz
Definition: timestamp.h:39
#define PointerGetDatum(X)
Definition: postgres.h:556
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
ReorderBufferTXN * txn
static bool TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
#define VARHDRSZ
Definition: c.h:569
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
ReorderBufferStreamAbortCB stream_abort
XLogRecPtr current_restart_decoding_lsn
#define DatumGetObjectId(X)
Definition: postgres.h:500
char * pstrdup(const char *in)
Definition: mcxt.c:1187
#define rbtxn_has_incomplete_tuple(txn)
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2664
static ReorderBufferTXN * ReorderBufferGetTXN(ReorderBuffer *rb)
static void ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:87
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
union ReorderBufferChange::@98 data
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void ReorderBufferFree(ReorderBuffer *rb)
#define rbtxn_is_serialized_clear(txn)
uint16 RepOriginId
Definition: xlogdefs.h:58
Size entrysize
Definition: hsearch.h:72
CommandId command_id
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:284
Snapshot snapshot_now
struct cursor * cur
Definition: ecpg.c:28
char fname[MAXPGPATH]
int32 va_rawsize
Definition: postgres.h:69
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4702
#define INFO
Definition: elog.h:33
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:174
void binaryheap_replace_first(binaryheap *heap, Datum d)
Definition: binaryheap.c:204
void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileNode node, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
uint32 BlockNumber
Definition: block.h:31
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2047
ReorderBufferCommitCB commit
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
struct ReorderBufferChange::@98::@103 inval
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:635
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:919
static int64 files
Definition: pg_checksums.c:34
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:125
ReplicationSlotPersistentData data
Definition: slot.h:143
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
struct SnapshotData * Snapshot
Definition: snapshot.h:121
Form_pg_class rd_rel
Definition: rel.h:109
unsigned int Oid
Definition: postgres_ext.h:31
XLogRecPtr base_snapshot_lsn
ReorderBufferStreamCommitCB stream_commit
Definition: dirent.h:9
uint32 regd_count
Definition: snapshot.h:205
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
void binaryheap_add_unordered(binaryheap *heap, Datum d)
Definition: binaryheap.c:110
MemoryContext change_context
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define VARDATA_EXTERNAL(PTR)
Definition: postgres.h:310
#define PG_BINARY
Definition: c.h:1213
void FlushErrorState(void)
Definition: elog.c:1565
XLogRecPtr origin_lsn
static void ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
signed int int32
Definition: c.h:363
#define FirstCommandId
Definition: c.h:537
XLogRecPtr EndRecPtr
Definition: xlogreader.h:176
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
HeapTupleHeader t_data
Definition: htup.h:68
#define VARATT_IS_EXTERNAL(PTR)
Definition: postgres.h:313
#define sprintf
Definition: port.h:217
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:174
#define rbtxn_is_streamed(txn)
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:421
Definition: dynahash.c:218
ReorderBufferStreamMessageCB stream_message
#define RBTXN_HAS_CATALOG_CHANGES
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:38
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:1057
char * Pointer
Definition: c.h:352
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1527
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
Definition: dirent.c:25
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2372
#define rbtxn_has_toast_insert(txn)
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:222
#define VARATT_IS_SHORT(PTR)
Definition: postgres.h:326
#define RelationIsValid(relation)
Definition: rel.h:429
dlist_head changes
void ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)
dlist_head txns_by_base_snapshot_lsn
Datum binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:159
#define IsSpecInsert(action)
#define MAXPGPATH
ItemPointerData t_self
Definition: htup.h:65
ReorderBufferTupleCidKey key
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
#define DEBUG2
Definition: elog.h:24
static bool ReorderBufferCanStream(ReorderBuffer *rb)
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:438
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
uint32 t_len
Definition: htup.h:64
#define MaxHeapTupleSize
Definition: htup_details.h:560
struct varlena * reconstructed
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4513
#define SET_VARTAG_EXTERNAL(PTR, tag)
Definition: postgres.h:333
uint64 XLogSegNo
Definition: xlogdefs.h:41
int errcode_for_file_access(void)
Definition: elog.c:633
HeapTupleData tuple
Definition: reorderbuffer.h:29
struct SnapshotData SnapshotData
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:455
static ReorderBufferTXN * ReorderBufferLargestTopTXN(ReorderBuffer *rb)
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
#define InvalidTransactionId
Definition: transam.h:31
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:193
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
unsigned int uint32
Definition: c.h:375
XLogRecPtr final_lsn
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2583
#define RBTXN_IS_SUBXACT
Oid t_tableOid
Definition: htup.h:66
static int file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
struct ReorderBufferTXN * toptxn
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1460
void RelationClose(Relation relation)
Definition: relcache.c:2110
TransactionId xmin
Definition: snapshot.h:157
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define INDIRECT_POINTER_SIZE
Definition: detoast.h:44
ReorderBufferMessageCB message
ReorderBufferStreamChangeCB stream_change
#define AssertArg(condition)
Definition: c.h:748
int bh_size
Definition: binaryheap.h:32
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
TransactionId * xip
Definition: snapshot.h:168
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
#define VARSIZE_SHORT(PTR)
Definition: postgres.h:305
ForkNumber
Definition: relpath.h:40
RepOriginId origin_id
Definition: reorderbuffer.h:89
ReorderBufferStreamTruncateCB stream_truncate
List * lappend(List *list, void *datum)
Definition: list.c:321
#define rbtxn_has_spec_insert(txn)
static HTAB * tuplecid_data
Definition: snapmgr.c:116
int CloseTransientFile(int fd)
Definition: fd.c:2549
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
TransactionId CheckXidAlive
Definition: xact.c:95
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
#define HASH_BLOBS
Definition: hsearch.h:86
TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb)
ReorderBufferChange * change
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size blockSize)
Definition: generation.c:196
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
MemoryContext context
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
void ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
static void ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
void * palloc0(Size size)
Definition: mcxt.c:981
static void ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
static void dlist_insert_before(dlist_node *before, dlist_node *node)
Definition: ilist.h:346
static bool dlist_has_next(dlist_head *head, dlist_node *node)
Definition: ilist.h:402
#define InvalidCommandId
Definition: c.h:538
uintptr_t Datum
Definition: postgres.h:367
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:326
ReorderBufferTXN * by_txn_last_txn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
unsigned short st_mode
Definition: win32_port.h:260
static void cleanup(void)
Definition: bootstrap.c:862
dlist_head toplevel_by_lsn
struct RewriteMappingFile RewriteMappingFile
Oid MyDatabaseId
Definition: globals.c:85
TransactionId xid
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
bool IsSharedRelation(Oid relationId)
Definition: catalog.c:238
#define RBTXN_IS_SERIALIZED_CLEAR
Size keysize
Definition: hsearch.h:71
dlist_node * cur
Definition: ilist.h:161
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
static void AssertChangeLsnOrder(ReorderBufferTXN *txn)
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:840
static void AssertTXNLsnOrder(ReorderBuffer *rb)
void ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
#define InvalidOid
Definition: postgres_ext.h:36
ReorderBufferStreamStartCB stream_start
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
CommandId curcid
Definition: snapshot.h:187
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
struct ReorderBufferIterTXNState ReorderBufferIterTXNState
#define ereport(elevel,...)
Definition: elog.h:144
struct ReorderBufferDiskChange ReorderBufferDiskChange
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:126
#define free(a)
Definition: header.h:65
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
struct SnapBuild * snapshot_builder
Definition: logical.h:43
#define PG_CATCH()
Definition: elog.h:305
void FileClose(File file)
Definition: fd.c:1826
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
ItemPointerData new_tid
Definition: rewriteheap.h:40
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:746
#define lfirst(lc)
Definition: pg_list.h:169
ReorderBufferTXN * ReorderBufferGetOldestTXN(ReorderBuffer *rb)
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
Definition: regguts.h:298
#define IsSpecConfirm(action)
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2649
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
int32 va_extsize
Definition: postgres.h:70
XLogRecPtr end_lsn
void StartTransactionCommand(void)
Definition: xact.c:2846
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4408
void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
int logical_decoding_work_mem
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
size_t Size
Definition: c.h:474
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1436
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:69
#define IsInsertOrUpdate(action)
SharedInvalidationMessage * invalidations
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)
#define BufferIsLocal(buffer)
Definition: buf.h:37
#define RBTXN_HAS_SPEC_INSERT
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:221
#define ItemPointerGetOffsetNumber(pointer)
Definition: itemptr.h:117
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
#define PG_RE_THROW()
Definition: elog.h:336
ReorderBuffer * ReorderBufferAllocate(void)
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1401
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1070
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1391
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
struct varlena * pointer
Definition: postgres.h:86
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:428
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
dlist_head subtxns
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:33
#define VARATT_IS_EXTENDED(PTR)
Definition: postgres.h:327
#define S_ISDIR(m)
Definition: win32_port.h:316
#define DatumGetPointer(X)
Definition: postgres.h:549
#define lstat(path, sb)
Definition: win32_port.h:276
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:1249
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:559
#define Int32GetDatum(X)
Definition: postgres.h:479
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define rbtxn_has_catalog_changes(txn)
static void ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
uint32 xcnt
Definition: snapshot.h:169
void * palloc(Size size)
Definition: mcxt.c:950
void list_sort(List *list, list_sort_comparator cmp)
Definition: list.c:1481
int errmsg(const char *fmt,...)
Definition: elog.c:821
struct ReorderBufferChange::@98::@102 tuplecid
XLogReaderState * reader
Definition: logical.h:41
static void UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
ReorderBufferApplyTruncateCB apply_truncate
#define RBTXN_IS_SERIALIZED
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:797
ReorderBufferTXN * txn
void ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2031
struct ReorderBufferChange::@98::@100 truncate
#define elog(elevel,...)
Definition: elog.h:214
Oid * ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
struct ReorderBufferChange::@98::@99 tp
int i
XLogRecPtr restart_decoding_lsn
#define NameStr(name)
Definition: c.h:623
#define SET_VARSIZE_COMPRESSED(PTR, len)
Definition: postgres.h:331
#define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)
Definition: detoast.h:22
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
void * arg
Datum binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:174
static const Size max_changes_in_memory
Definition: c.h:563
static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
CommandId cmax
Definition: