PostgreSQL Source Code  git master
reorderbuffer.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * reorderbuffer.c
4  * PostgreSQL logical replay/reorder buffer management
5  *
6  *
7  * Copyright (c) 2012-2024, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/replication/logical/reorderbuffer.c
12  *
13  * NOTES
14  * This module gets handed individual pieces of transactions in the order
15  * they are written to the WAL and is responsible to reassemble them into
16  * toplevel transaction sized pieces. When a transaction is completely
17  * reassembled - signaled by reading the transaction commit record - it
18  * will then call the output plugin (cf. ReorderBufferCommit()) with the
19  * individual changes. The output plugins rely on snapshots built by
20  * snapbuild.c which hands them to us.
21  *
22  * Transactions and subtransactions/savepoints in postgres are not
23  * immediately linked to each other from outside the performing
24  * backend. Only at commit/abort (or special xact_assignment records) they
25  * are linked together. Which means that we will have to splice together a
26  * toplevel transaction from its subtransactions. To do that efficiently we
27  * build a binary heap indexed by the smallest current lsn of the individual
28  * subtransactions' changestreams. As the individual streams are inherently
29  * ordered by LSN - since that is where we build them from - the transaction
30  * can easily be reassembled by always using the subtransaction with the
31  * smallest current LSN from the heap.
32  *
33  * In order to cope with large transactions - which can be several times as
34  * big as the available memory - this module supports spooling the contents
35  * of a large transactions to disk. When the transaction is replayed the
36  * contents of individual (sub-)transactions will be read from disk in
37  * chunks.
38  *
39  * This module also has to deal with reassembling toast records from the
40  * individual chunks stored in WAL. When a new (or initial) version of a
41  * tuple is stored in WAL it will always be preceded by the toast chunks
42  * emitted for the columns stored out of line. Within a single toplevel
43  * transaction there will be no other data carrying records between a row's
44  * toast chunks and the row data itself. See ReorderBufferToast* for
45  * details.
46  *
47  * ReorderBuffer uses two special memory context types - SlabContext for
48  * allocations of fixed-length structures (changes and transactions), and
49  * GenerationContext for the variable-length transaction data (allocated
50  * and freed in groups with similar lifespans).
51  *
52  * To limit the amount of memory used by decoded changes, we track memory
53  * used at the reorder buffer level (i.e. total amount of memory), and for
54  * each transaction. When the total amount of used memory exceeds the
55  * limit, the transaction consuming the most memory is then serialized to
56  * disk.
57  *
58  * Only decoded changes are evicted from memory (spilled to disk), not the
59  * transaction records. The number of toplevel transactions is limited,
60  * but a transaction with many subtransactions may still consume significant
61  * amounts of memory. However, the transaction records are fairly small and
62  * are not included in the memory limit.
63  *
64  * The current eviction algorithm is very simple - the transaction is
65  * picked merely by size, while it might be useful to also consider age
66  * (LSN) of the changes for example. With the new Generational memory
67  * allocator, evicting the oldest changes would make it more likely the
68  * memory gets actually freed.
69  *
70  * We still rely on max_changes_in_memory when loading serialized changes
71  * back into memory. At that point we can't use the memory limit directly
72  * as we load the subxacts independently. One option to deal with this
73  * would be to count the subxacts, and allow each to allocate 1/N of the
74  * memory limit. That however does not seem very appealing, because with
75  * many subtransactions it may easily cause thrashing (short cycles of
76  * deserializing and applying very few changes). We probably should give
77  * a bit more memory to the oldest subtransactions, because it's likely
78  * they are the source for the next sequence of changes.
79  *
80  * -------------------------------------------------------------------------
81  */
82 #include "postgres.h"
83 
84 #include <unistd.h>
85 #include <sys/stat.h>
86 
87 #include "access/detoast.h"
88 #include "access/heapam.h"
89 #include "access/rewriteheap.h"
90 #include "access/transam.h"
91 #include "access/xact.h"
92 #include "access/xlog_internal.h"
93 #include "catalog/catalog.h"
94 #include "common/int.h"
95 #include "lib/binaryheap.h"
96 #include "miscadmin.h"
97 #include "pgstat.h"
98 #include "replication/logical.h"
100 #include "replication/slot.h"
101 #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
102 #include "storage/bufmgr.h"
103 #include "storage/fd.h"
104 #include "storage/sinval.h"
105 #include "utils/builtins.h"
106 #include "utils/combocid.h"
107 #include "utils/memdebug.h"
108 #include "utils/memutils.h"
109 #include "utils/rel.h"
110 #include "utils/relfilenumbermap.h"
111 
112 
113 /* entry for a hash table we use to map from xid to our transaction state */
115 {
119 
120 /* data structures for (relfilelocator, ctid) => (cmin, cmax) mapping */
122 {
126 
128 {
132  CommandId combocid; /* just for debugging */
134 
135 /* Virtual file descriptor with file offset tracking */
136 typedef struct TXNEntryFile
137 {
138  File vfd; /* -1 when the file is closed */
139  off_t curOffset; /* offset for next write or read. Reset to 0
140  * when vfd is opened. */
142 
143 /* k-way in-order change iteration support structures */
145 {
152 
154 {
160 
161 /* toast datastructures */
162 typedef struct ReorderBufferToastEnt
163 {
164  Oid chunk_id; /* toast_table.chunk_id */
165  int32 last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
166  * have seen */
167  Size num_chunks; /* number of chunks we've already seen */
168  Size size; /* combined size of chunks seen */
169  dlist_head chunks; /* linked list of chunks */
170  struct varlena *reconstructed; /* reconstructed varlena now pointed to in
171  * main tup */
173 
174 /* Disk serialization support datastructures */
176 {
179  /* data follows */
181 
182 #define IsSpecInsert(action) \
183 ( \
184  ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \
185 )
186 #define IsSpecConfirmOrAbort(action) \
187 ( \
188  (((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) || \
189  ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT)) \
190 )
191 #define IsInsertOrUpdate(action) \
192 ( \
193  (((action) == REORDER_BUFFER_CHANGE_INSERT) || \
194  ((action) == REORDER_BUFFER_CHANGE_UPDATE) || \
195  ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT)) \
196 )
197 
198 /*
199  * Maximum number of changes kept in memory, per transaction. After that,
200  * changes are spooled to disk.
201  *
202  * The current value should be sufficient to decode the entire transaction
203  * without hitting disk in OLTP workloads, while starting to spool to disk in
204  * other workloads reasonably fast.
205  *
206  * At some point in the future it probably makes sense to have a more elaborate
207  * resource management here, but it's not entirely clear what that would look
208  * like.
209  */
211 static const Size max_changes_in_memory = 4096; /* XXX for restore only */
212 
213 /* GUC variable */
215 
216 /* ---------------------------------------
217  * primary reorderbuffer support routines
218  * ---------------------------------------
219  */
223  TransactionId xid, bool create, bool *is_new,
224  XLogRecPtr lsn, bool create_as_top);
226  ReorderBufferTXN *subtxn);
227 
228 static void AssertTXNLsnOrder(ReorderBuffer *rb);
229 
230 /* ---------------------------------------
231  * support functions for lsn-order iterating over the ->changes of a
232  * transaction and its subtransactions
233  *
234  * used for iteration over the k-way heap merge of a transaction and its
235  * subtransactions
236  * ---------------------------------------
237  */
239  ReorderBufferIterTXNState *volatile *iter_state);
244 
245 /*
246  * ---------------------------------------
247  * Disk serialization support functions
248  * ---------------------------------------
249  */
253  int fd, ReorderBufferChange *change);
255  TXNEntryFile *file, XLogSegNo *segno);
257  char *data);
260  bool txn_prepared);
261 static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
262 static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
263  TransactionId xid, XLogSegNo segno);
264 
265 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
267  ReorderBufferTXN *txn, CommandId cid);
268 
269 /*
270  * ---------------------------------------
271  * Streaming support functions
272  * ---------------------------------------
273  */
274 static inline bool ReorderBufferCanStream(ReorderBuffer *rb);
275 static inline bool ReorderBufferCanStartStreaming(ReorderBuffer *rb);
278 
279 /* ---------------------------------------
280  * toast reassembly support
281  * ---------------------------------------
282  */
286  Relation relation, ReorderBufferChange *change);
288  Relation relation, ReorderBufferChange *change);
289 
290 /*
291  * ---------------------------------------
292  * memory accounting
293  * ---------------------------------------
294  */
297  ReorderBufferChange *change,
298  bool addition, Size sz);
299 
300 /*
301  * Allocate a new ReorderBuffer and clean out any old serialized state from
302  * prior ReorderBuffer instances for the same slot.
303  */
306 {
307  ReorderBuffer *buffer;
308  HASHCTL hash_ctl;
309  MemoryContext new_ctx;
310 
311  Assert(MyReplicationSlot != NULL);
312 
313  /* allocate memory in own context, to have better accountability */
315  "ReorderBuffer",
317 
318  buffer =
319  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
320 
321  memset(&hash_ctl, 0, sizeof(hash_ctl));
322 
323  buffer->context = new_ctx;
324 
325  buffer->change_context = SlabContextCreate(new_ctx,
326  "Change",
328  sizeof(ReorderBufferChange));
329 
330  buffer->txn_context = SlabContextCreate(new_ctx,
331  "TXN",
333  sizeof(ReorderBufferTXN));
334 
335  /*
336  * XXX the allocation sizes used below pre-date generation context's block
337  * growing code. These values should likely be benchmarked and set to
338  * more suitable values.
339  */
340  buffer->tup_context = GenerationContextCreate(new_ctx,
341  "Tuples",
345 
346  hash_ctl.keysize = sizeof(TransactionId);
347  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
348  hash_ctl.hcxt = buffer->context;
349 
350  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
352 
354  buffer->by_txn_last_txn = NULL;
355 
356  buffer->outbuf = NULL;
357  buffer->outbufsize = 0;
358  buffer->size = 0;
359 
360  buffer->spillTxns = 0;
361  buffer->spillCount = 0;
362  buffer->spillBytes = 0;
363  buffer->streamTxns = 0;
364  buffer->streamCount = 0;
365  buffer->streamBytes = 0;
366  buffer->totalTxns = 0;
367  buffer->totalBytes = 0;
368 
370 
371  dlist_init(&buffer->toplevel_by_lsn);
373  dclist_init(&buffer->catchange_txns);
374 
375  /*
376  * Ensure there's no stale data from prior uses of this slot, in case some
377  * prior exit avoided calling ReorderBufferFree. Failure to do this can
378  * produce duplicated txns, and it's very cheap if there's nothing there.
379  */
381 
382  return buffer;
383 }
384 
385 /*
386  * Free a ReorderBuffer
387  */
388 void
390 {
391  MemoryContext context = rb->context;
392 
393  /*
394  * We free separately allocated data by entirely scrapping reorderbuffer's
395  * memory context.
396  */
397  MemoryContextDelete(context);
398 
399  /* Free disk space used by unconsumed reorder buffers */
401 }
402 
403 /*
404  * Get an unused, possibly preallocated, ReorderBufferTXN.
405  */
406 static ReorderBufferTXN *
408 {
409  ReorderBufferTXN *txn;
410 
411  txn = (ReorderBufferTXN *)
413 
414  memset(txn, 0, sizeof(ReorderBufferTXN));
415 
416  dlist_init(&txn->changes);
417  dlist_init(&txn->tuplecids);
418  dlist_init(&txn->subtxns);
419 
420  /* InvalidCommandId is not zero, so set it explicitly */
422  txn->output_plugin_private = NULL;
423 
424  return txn;
425 }
426 
427 /*
428  * Free a ReorderBufferTXN.
429  */
430 static void
432 {
433  /* clean the lookup cache if we were cached (quite likely) */
434  if (rb->by_txn_last_xid == txn->xid)
435  {
437  rb->by_txn_last_txn = NULL;
438  }
439 
440  /* free data that's contained */
441 
442  if (txn->gid != NULL)
443  {
444  pfree(txn->gid);
445  txn->gid = NULL;
446  }
447 
448  if (txn->tuplecid_hash != NULL)
449  {
451  txn->tuplecid_hash = NULL;
452  }
453 
454  if (txn->invalidations)
455  {
456  pfree(txn->invalidations);
457  txn->invalidations = NULL;
458  }
459 
460  /* Reset the toast hash */
461  ReorderBufferToastReset(rb, txn);
462 
463  pfree(txn);
464 }
465 
466 /*
467  * Get a fresh ReorderBufferChange.
468  */
471 {
472  ReorderBufferChange *change;
473 
474  change = (ReorderBufferChange *)
476 
477  memset(change, 0, sizeof(ReorderBufferChange));
478  return change;
479 }
480 
481 /*
482  * Free a ReorderBufferChange and update memory accounting, if requested.
483  */
484 void
486  bool upd_mem)
487 {
488  /* update memory accounting info */
489  if (upd_mem)
490  ReorderBufferChangeMemoryUpdate(rb, change, false,
491  ReorderBufferChangeSize(change));
492 
493  /* free contained data */
494  switch (change->action)
495  {
500  if (change->data.tp.newtuple)
501  {
502  ReorderBufferReturnTupleBuf(change->data.tp.newtuple);
503  change->data.tp.newtuple = NULL;
504  }
505 
506  if (change->data.tp.oldtuple)
507  {
508  ReorderBufferReturnTupleBuf(change->data.tp.oldtuple);
509  change->data.tp.oldtuple = NULL;
510  }
511  break;
513  if (change->data.msg.prefix != NULL)
514  pfree(change->data.msg.prefix);
515  change->data.msg.prefix = NULL;
516  if (change->data.msg.message != NULL)
517  pfree(change->data.msg.message);
518  change->data.msg.message = NULL;
519  break;
521  if (change->data.inval.invalidations)
522  pfree(change->data.inval.invalidations);
523  change->data.inval.invalidations = NULL;
524  break;
526  if (change->data.snapshot)
527  {
528  ReorderBufferFreeSnap(rb, change->data.snapshot);
529  change->data.snapshot = NULL;
530  }
531  break;
532  /* no data in addition to the struct itself */
534  if (change->data.truncate.relids != NULL)
535  {
536  ReorderBufferReturnRelids(rb, change->data.truncate.relids);
537  change->data.truncate.relids = NULL;
538  }
539  break;
544  break;
545  }
546 
547  pfree(change);
548 }
549 
550 /*
551  * Get a fresh HeapTuple fitting a tuple of size tuple_len (excluding header
552  * overhead).
553  */
554 HeapTuple
556 {
557  HeapTuple tuple;
558  Size alloc_len;
559 
560  alloc_len = tuple_len + SizeofHeapTupleHeader;
561 
563  HEAPTUPLESIZE + alloc_len);
564  tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
565 
566  return tuple;
567 }
568 
569 /*
570  * Free a HeapTuple returned by ReorderBufferGetTupleBuf().
571  */
572 void
574 {
575  pfree(tuple);
576 }
577 
578 /*
579  * Get an array for relids of truncated relations.
580  *
581  * We use the global memory context (for the whole reorder buffer), because
582  * none of the existing ones seems like a good match (some are SLAB, so we
583  * can't use those, and tup_context is meant for tuple data, not relids). We
584  * could add yet another context, but it seems like an overkill - TRUNCATE is
585  * not particularly common operation, so it does not seem worth it.
586  */
587 Oid *
589 {
590  Oid *relids;
591  Size alloc_len;
592 
593  alloc_len = sizeof(Oid) * nrelids;
594 
595  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
596 
597  return relids;
598 }
599 
600 /*
601  * Free an array of relids.
602  */
603 void
605 {
606  pfree(relids);
607 }
608 
609 /*
610  * Return the ReorderBufferTXN from the given buffer, specified by Xid.
611  * If create is true, and a transaction doesn't already exist, create it
612  * (with the given LSN, and as top transaction if that's specified);
613  * when this happens, is_new is set to true.
614  */
615 static ReorderBufferTXN *
617  bool *is_new, XLogRecPtr lsn, bool create_as_top)
618 {
619  ReorderBufferTXN *txn;
621  bool found;
622 
624 
625  /*
626  * Check the one-entry lookup cache first
627  */
629  rb->by_txn_last_xid == xid)
630  {
631  txn = rb->by_txn_last_txn;
632 
633  if (txn != NULL)
634  {
635  /* found it, and it's valid */
636  if (is_new)
637  *is_new = false;
638  return txn;
639  }
640 
641  /*
642  * cached as non-existent, and asked not to create? Then nothing else
643  * to do.
644  */
645  if (!create)
646  return NULL;
647  /* otherwise fall through to create it */
648  }
649 
650  /*
651  * If the cache wasn't hit or it yielded a "does-not-exist" and we want to
652  * create an entry.
653  */
654 
655  /* search the lookup table */
656  ent = (ReorderBufferTXNByIdEnt *)
657  hash_search(rb->by_txn,
658  &xid,
659  create ? HASH_ENTER : HASH_FIND,
660  &found);
661  if (found)
662  txn = ent->txn;
663  else if (create)
664  {
665  /* initialize the new entry, if creation was requested */
666  Assert(ent != NULL);
667  Assert(lsn != InvalidXLogRecPtr);
668 
669  ent->txn = ReorderBufferGetTXN(rb);
670  ent->txn->xid = xid;
671  txn = ent->txn;
672  txn->first_lsn = lsn;
674 
675  if (create_as_top)
676  {
677  dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
678  AssertTXNLsnOrder(rb);
679  }
680  }
681  else
682  txn = NULL; /* not found and not asked to create */
683 
684  /* update cache */
685  rb->by_txn_last_xid = xid;
686  rb->by_txn_last_txn = txn;
687 
688  if (is_new)
689  *is_new = !found;
690 
691  Assert(!create || txn != NULL);
692  return txn;
693 }
694 
695 /*
696  * Record the partial change for the streaming of in-progress transactions. We
697  * can stream only complete changes so if we have a partial change like toast
698  * table insert or speculative insert then we mark such a 'txn' so that it
699  * can't be streamed. We also ensure that if the changes in such a 'txn' can
700  * be streamed and are above logical_decoding_work_mem threshold then we stream
701  * them as soon as we have a complete change.
702  */
703 static void
705  ReorderBufferChange *change,
706  bool toast_insert)
707 {
708  ReorderBufferTXN *toptxn;
709 
710  /*
711  * The partial changes need to be processed only while streaming
712  * in-progress transactions.
713  */
714  if (!ReorderBufferCanStream(rb))
715  return;
716 
717  /* Get the top transaction. */
718  toptxn = rbtxn_get_toptxn(txn);
719 
720  /*
721  * Indicate a partial change for toast inserts. The change will be
722  * considered as complete once we get the insert or update on the main
723  * table and we are sure that the pending toast chunks are not required
724  * anymore.
725  *
726  * If we allow streaming when there are pending toast chunks then such
727  * chunks won't be released till the insert (multi_insert) is complete and
728  * we expect the txn to have streamed all changes after streaming. This
729  * restriction is mainly to ensure the correctness of streamed
730  * transactions and it doesn't seem worth uplifting such a restriction
731  * just to allow this case because anyway we will stream the transaction
732  * once such an insert is complete.
733  */
734  if (toast_insert)
736  else if (rbtxn_has_partial_change(toptxn) &&
737  IsInsertOrUpdate(change->action) &&
738  change->data.tp.clear_toast_afterwards)
740 
741  /*
742  * Indicate a partial change for speculative inserts. The change will be
743  * considered as complete once we get the speculative confirm or abort
744  * token.
745  */
746  if (IsSpecInsert(change->action))
748  else if (rbtxn_has_partial_change(toptxn) &&
749  IsSpecConfirmOrAbort(change->action))
751 
752  /*
753  * Stream the transaction if it is serialized before and the changes are
754  * now complete in the top-level transaction.
755  *
756  * The reason for doing the streaming of such a transaction as soon as we
757  * get the complete change for it is that previously it would have reached
758  * the memory threshold and wouldn't get streamed because of incomplete
759  * changes. Delaying such transactions would increase apply lag for them.
760  */
762  !(rbtxn_has_partial_change(toptxn)) &&
763  rbtxn_is_serialized(txn) &&
765  ReorderBufferStreamTXN(rb, toptxn);
766 }
767 
768 /*
769  * Queue a change into a transaction so it can be replayed upon commit or will be
770  * streamed when we reach logical_decoding_work_mem threshold.
771  */
772 void
774  ReorderBufferChange *change, bool toast_insert)
775 {
776  ReorderBufferTXN *txn;
777 
778  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
779 
780  /*
781  * While streaming the previous changes we have detected that the
782  * transaction is aborted. So there is no point in collecting further
783  * changes for it.
784  */
785  if (txn->concurrent_abort)
786  {
787  /*
788  * We don't need to update memory accounting for this change as we
789  * have not added it to the queue yet.
790  */
791  ReorderBufferReturnChange(rb, change, false);
792  return;
793  }
794 
795  /*
796  * The changes that are sent downstream are considered streamable. We
797  * remember such transactions so that only those will later be considered
798  * for streaming.
799  */
800  if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
806  {
807  ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
808 
810  }
811 
812  change->lsn = lsn;
813  change->txn = txn;
814 
815  Assert(InvalidXLogRecPtr != lsn);
816  dlist_push_tail(&txn->changes, &change->node);
817  txn->nentries++;
818  txn->nentries_mem++;
819 
820  /* update memory accounting information */
821  ReorderBufferChangeMemoryUpdate(rb, change, true,
822  ReorderBufferChangeSize(change));
823 
824  /* process partial change */
825  ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
826 
827  /* check the memory limits and evict something if needed */
829 }
830 
831 /*
832  * A transactional message is queued to be processed upon commit and a
833  * non-transactional message gets processed immediately.
834  */
835 void
837  Snapshot snap, XLogRecPtr lsn,
838  bool transactional, const char *prefix,
839  Size message_size, const char *message)
840 {
841  if (transactional)
842  {
843  MemoryContext oldcontext;
844  ReorderBufferChange *change;
845 
847 
848  /*
849  * We don't expect snapshots for transactional changes - we'll use the
850  * snapshot derived later during apply (unless the change gets
851  * skipped).
852  */
853  Assert(!snap);
854 
855  oldcontext = MemoryContextSwitchTo(rb->context);
856 
857  change = ReorderBufferGetChange(rb);
859  change->data.msg.prefix = pstrdup(prefix);
860  change->data.msg.message_size = message_size;
861  change->data.msg.message = palloc(message_size);
862  memcpy(change->data.msg.message, message, message_size);
863 
864  ReorderBufferQueueChange(rb, xid, lsn, change, false);
865 
866  MemoryContextSwitchTo(oldcontext);
867  }
868  else
869  {
870  ReorderBufferTXN *txn = NULL;
871  volatile Snapshot snapshot_now = snap;
872 
873  /* Non-transactional changes require a valid snapshot. */
874  Assert(snapshot_now);
875 
876  if (xid != InvalidTransactionId)
877  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
878 
879  /* setup snapshot to allow catalog access */
880  SetupHistoricSnapshot(snapshot_now, NULL);
881  PG_TRY();
882  {
883  rb->message(rb, txn, lsn, false, prefix, message_size, message);
884 
886  }
887  PG_CATCH();
888  {
890  PG_RE_THROW();
891  }
892  PG_END_TRY();
893  }
894 }
895 
896 /*
897  * AssertTXNLsnOrder
898  * Verify LSN ordering of transaction lists in the reorderbuffer
899  *
900  * Other LSN-related invariants are checked too.
901  *
902  * No-op if assertions are not in use.
903  */
904 static void
906 {
907 #ifdef USE_ASSERT_CHECKING
909  dlist_iter iter;
910  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
911  XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
912 
913  /*
914  * Skip the verification if we don't reach the LSN at which we start
915  * decoding the contents of transactions yet because until we reach the
916  * LSN, we could have transactions that don't have the association between
917  * the top-level transaction and subtransaction yet and consequently have
918  * the same LSN. We don't guarantee this association until we try to
919  * decode the actual contents of transaction. The ordering of the records
920  * prior to the start_decoding_at LSN should have been checked before the
921  * restart.
922  */
924  return;
925 
926  dlist_foreach(iter, &rb->toplevel_by_lsn)
927  {
929  iter.cur);
930 
931  /* start LSN must be set */
932  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
933 
934  /* If there is an end LSN, it must be higher than start LSN */
935  if (cur_txn->end_lsn != InvalidXLogRecPtr)
936  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
937 
938  /* Current initial LSN must be strictly higher than previous */
939  if (prev_first_lsn != InvalidXLogRecPtr)
940  Assert(prev_first_lsn < cur_txn->first_lsn);
941 
942  /* known-as-subtxn txns must not be listed */
943  Assert(!rbtxn_is_known_subxact(cur_txn));
944 
945  prev_first_lsn = cur_txn->first_lsn;
946  }
947 
949  {
951  base_snapshot_node,
952  iter.cur);
953 
954  /* base snapshot (and its LSN) must be set */
955  Assert(cur_txn->base_snapshot != NULL);
957 
958  /* current LSN must be strictly higher than previous */
959  if (prev_base_snap_lsn != InvalidXLogRecPtr)
960  Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
961 
962  /* known-as-subtxn txns must not be listed */
963  Assert(!rbtxn_is_known_subxact(cur_txn));
964 
965  prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
966  }
967 #endif
968 }
969 
970 /*
971  * AssertChangeLsnOrder
972  *
973  * Check ordering of changes in the (sub)transaction.
974  */
975 static void
977 {
978 #ifdef USE_ASSERT_CHECKING
979  dlist_iter iter;
980  XLogRecPtr prev_lsn = txn->first_lsn;
981 
982  dlist_foreach(iter, &txn->changes)
983  {
984  ReorderBufferChange *cur_change;
985 
986  cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
987 
989  Assert(cur_change->lsn != InvalidXLogRecPtr);
990  Assert(txn->first_lsn <= cur_change->lsn);
991 
992  if (txn->end_lsn != InvalidXLogRecPtr)
993  Assert(cur_change->lsn <= txn->end_lsn);
994 
995  Assert(prev_lsn <= cur_change->lsn);
996 
997  prev_lsn = cur_change->lsn;
998  }
999 #endif
1000 }
1001 
1002 /*
1003  * ReorderBufferGetOldestTXN
1004  * Return oldest transaction in reorderbuffer
1005  */
1008 {
1009  ReorderBufferTXN *txn;
1010 
1011  AssertTXNLsnOrder(rb);
1012 
1013  if (dlist_is_empty(&rb->toplevel_by_lsn))
1014  return NULL;
1015 
1017 
1020  return txn;
1021 }
1022 
1023 /*
1024  * ReorderBufferGetOldestXmin
1025  * Return oldest Xmin in reorderbuffer
1026  *
1027  * Returns oldest possibly running Xid from the point of view of snapshots
1028  * used in the transactions kept by reorderbuffer, or InvalidTransactionId if
1029  * there are none.
1030  *
1031  * Since snapshots are assigned monotonically, this equals the Xmin of the
1032  * base snapshot with minimal base_snapshot_lsn.
1033  */
1036 {
1037  ReorderBufferTXN *txn;
1038 
1039  AssertTXNLsnOrder(rb);
1040 
1042  return InvalidTransactionId;
1043 
1044  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
1046  return txn->base_snapshot->xmin;
1047 }
1048 
1049 void
1051 {
1052  rb->current_restart_decoding_lsn = ptr;
1053 }
1054 
1055 /*
1056  * ReorderBufferAssignChild
1057  *
1058  * Make note that we know that subxid is a subtransaction of xid, seen as of
1059  * the given lsn.
1060  */
1061 void
1063  TransactionId subxid, XLogRecPtr lsn)
1064 {
1065  ReorderBufferTXN *txn;
1066  ReorderBufferTXN *subtxn;
1067  bool new_top;
1068  bool new_sub;
1069 
1070  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1071  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1072 
1073  if (!new_sub)
1074  {
1075  if (rbtxn_is_known_subxact(subtxn))
1076  {
1077  /* already associated, nothing to do */
1078  return;
1079  }
1080  else
1081  {
1082  /*
1083  * We already saw this transaction, but initially added it to the
1084  * list of top-level txns. Now that we know it's not top-level,
1085  * remove it from there.
1086  */
1087  dlist_delete(&subtxn->node);
1088  }
1089  }
1090 
1091  subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1092  subtxn->toplevel_xid = xid;
1093  Assert(subtxn->nsubtxns == 0);
1094 
1095  /* set the reference to top-level transaction */
1096  subtxn->toptxn = txn;
1097 
1098  /* add to subtransaction list */
1099  dlist_push_tail(&txn->subtxns, &subtxn->node);
1100  txn->nsubtxns++;
1101 
1102  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1103  ReorderBufferTransferSnapToParent(txn, subtxn);
1104 
1105  /* Verify LSN-ordering invariant */
1106  AssertTXNLsnOrder(rb);
1107 }
1108 
1109 /*
1110  * ReorderBufferTransferSnapToParent
1111  * Transfer base snapshot from subtxn to top-level txn, if needed
1112  *
1113  * This is done if the top-level txn doesn't have a base snapshot, or if the
1114  * subtxn's base snapshot has an earlier LSN than the top-level txn's base
1115  * snapshot's LSN. This can happen if there are no changes in the toplevel
1116  * txn but there are some in the subtxn, or the first change in subtxn has
1117  * earlier LSN than first change in the top-level txn and we learned about
1118  * their kinship only now.
1119  *
1120  * The subtransaction's snapshot is cleared regardless of the transfer
1121  * happening, since it's not needed anymore in either case.
1122  *
1123  * We do this as soon as we become aware of their kinship, to avoid queueing
1124  * extra snapshots to txns known-as-subtxns -- only top-level txns will
1125  * receive further snapshots.
1126  */
1127 static void
1129  ReorderBufferTXN *subtxn)
1130 {
1131  Assert(subtxn->toplevel_xid == txn->xid);
1132 
1133  if (subtxn->base_snapshot != NULL)
1134  {
1135  if (txn->base_snapshot == NULL ||
1136  subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
1137  {
1138  /*
1139  * If the toplevel transaction already has a base snapshot but
1140  * it's newer than the subxact's, purge it.
1141  */
1142  if (txn->base_snapshot != NULL)
1143  {
1146  }
1147 
1148  /*
1149  * The snapshot is now the top transaction's; transfer it, and
1150  * adjust the list position of the top transaction in the list by
1151  * moving it to where the subtransaction is.
1152  */
1153  txn->base_snapshot = subtxn->base_snapshot;
1154  txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
1156  &txn->base_snapshot_node);
1157 
1158  /*
1159  * The subtransaction doesn't have a snapshot anymore (so it
1160  * mustn't be in the list.)
1161  */
1162  subtxn->base_snapshot = NULL;
1164  dlist_delete(&subtxn->base_snapshot_node);
1165  }
1166  else
1167  {
1168  /* Base snap of toplevel is fine, so subxact's is not needed */
1170  dlist_delete(&subtxn->base_snapshot_node);
1171  subtxn->base_snapshot = NULL;
1173  }
1174  }
1175 }
1176 
1177 /*
1178  * Associate a subtransaction with its toplevel transaction at commit
1179  * time. There may be no further changes added after this.
1180  */
1181 void
1183  TransactionId subxid, XLogRecPtr commit_lsn,
1184  XLogRecPtr end_lsn)
1185 {
1186  ReorderBufferTXN *subtxn;
1187 
1188  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1189  InvalidXLogRecPtr, false);
1190 
1191  /*
1192  * No need to do anything if that subtxn didn't contain any changes
1193  */
1194  if (!subtxn)
1195  return;
1196 
1197  subtxn->final_lsn = commit_lsn;
1198  subtxn->end_lsn = end_lsn;
1199 
1200  /*
1201  * Assign this subxact as a child of the toplevel xact (no-op if already
1202  * done.)
1203  */
1204  ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
1205 }
1206 
1207 
1208 /*
1209  * Support for efficiently iterating over a transaction's and its
1210  * subtransactions' changes.
1211  *
1212  * We do by doing a k-way merge between transactions/subtransactions. For that
1213  * we model the current heads of the different transactions as a binary heap
1214  * so we easily know which (sub-)transaction has the change with the smallest
1215  * lsn next.
1216  *
1217  * We assume the changes in individual transactions are already sorted by LSN.
1218  */
1219 
1220 /*
1221  * Binary heap comparison function.
1222  */
1223 static int
1225 {
1227  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
1228  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
1229 
1230  if (pos_a < pos_b)
1231  return 1;
1232  else if (pos_a == pos_b)
1233  return 0;
1234  return -1;
1235 }
1236 
1237 /*
1238  * Allocate & initialize an iterator which iterates in lsn order over a
1239  * transaction and all its subtransactions.
1240  *
1241  * Note: The iterator state is returned through iter_state parameter rather
1242  * than the function's return value. This is because the state gets cleaned up
1243  * in a PG_CATCH block in the caller, so we want to make sure the caller gets
1244  * back the state even if this function throws an exception.
1245  */
1246 static void
1248  ReorderBufferIterTXNState *volatile *iter_state)
1249 {
1250  Size nr_txns = 0;
1252  dlist_iter cur_txn_i;
1253  int32 off;
1254 
1255  *iter_state = NULL;
1256 
1257  /* Check ordering of changes in the toplevel transaction. */
1258  AssertChangeLsnOrder(txn);
1259 
1260  /*
1261  * Calculate the size of our heap: one element for every transaction that
1262  * contains changes. (Besides the transactions already in the reorder
1263  * buffer, we count the one we were directly passed.)
1264  */
1265  if (txn->nentries > 0)
1266  nr_txns++;
1267 
1268  dlist_foreach(cur_txn_i, &txn->subtxns)
1269  {
1270  ReorderBufferTXN *cur_txn;
1271 
1272  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1273 
1274  /* Check ordering of changes in this subtransaction. */
1275  AssertChangeLsnOrder(cur_txn);
1276 
1277  if (cur_txn->nentries > 0)
1278  nr_txns++;
1279  }
1280 
1281  /* allocate iteration state */
1284  sizeof(ReorderBufferIterTXNState) +
1285  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1286 
1287  state->nr_txns = nr_txns;
1288  dlist_init(&state->old_change);
1289 
1290  for (off = 0; off < state->nr_txns; off++)
1291  {
1292  state->entries[off].file.vfd = -1;
1293  state->entries[off].segno = 0;
1294  }
1295 
1296  /* allocate heap */
1297  state->heap = binaryheap_allocate(state->nr_txns,
1299  state);
1300 
1301  /* Now that the state fields are initialized, it is safe to return it. */
1302  *iter_state = state;
1303 
1304  /*
1305  * Now insert items into the binary heap, in an unordered fashion. (We
1306  * will run a heap assembly step at the end; this is more efficient.)
1307  */
1308 
1309  off = 0;
1310 
1311  /* add toplevel transaction if it contains changes */
1312  if (txn->nentries > 0)
1313  {
1314  ReorderBufferChange *cur_change;
1315 
1316  if (rbtxn_is_serialized(txn))
1317  {
1318  /* serialize remaining changes */
1319  ReorderBufferSerializeTXN(rb, txn);
1320  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1321  &state->entries[off].segno);
1322  }
1323 
1324  cur_change = dlist_head_element(ReorderBufferChange, node,
1325  &txn->changes);
1326 
1327  state->entries[off].lsn = cur_change->lsn;
1328  state->entries[off].change = cur_change;
1329  state->entries[off].txn = txn;
1330 
1332  }
1333 
1334  /* add subtransactions if they contain changes */
1335  dlist_foreach(cur_txn_i, &txn->subtxns)
1336  {
1337  ReorderBufferTXN *cur_txn;
1338 
1339  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1340 
1341  if (cur_txn->nentries > 0)
1342  {
1343  ReorderBufferChange *cur_change;
1344 
1345  if (rbtxn_is_serialized(cur_txn))
1346  {
1347  /* serialize remaining changes */
1348  ReorderBufferSerializeTXN(rb, cur_txn);
1349  ReorderBufferRestoreChanges(rb, cur_txn,
1350  &state->entries[off].file,
1351  &state->entries[off].segno);
1352  }
1353  cur_change = dlist_head_element(ReorderBufferChange, node,
1354  &cur_txn->changes);
1355 
1356  state->entries[off].lsn = cur_change->lsn;
1357  state->entries[off].change = cur_change;
1358  state->entries[off].txn = cur_txn;
1359 
1361  }
1362  }
1363 
1364  /* assemble a valid binary heap */
1365  binaryheap_build(state->heap);
1366 }
1367 
1368 /*
1369  * Return the next change when iterating over a transaction and its
1370  * subtransactions.
1371  *
1372  * Returns NULL when no further changes exist.
1373  */
1374 static ReorderBufferChange *
1376 {
1377  ReorderBufferChange *change;
1379  int32 off;
1380 
1381  /* nothing there anymore */
1382  if (state->heap->bh_size == 0)
1383  return NULL;
1384 
1385  off = DatumGetInt32(binaryheap_first(state->heap));
1386  entry = &state->entries[off];
1387 
1388  /* free memory we might have "leaked" in the previous *Next call */
1389  if (!dlist_is_empty(&state->old_change))
1390  {
1391  change = dlist_container(ReorderBufferChange, node,
1392  dlist_pop_head_node(&state->old_change));
1393  ReorderBufferReturnChange(rb, change, true);
1394  Assert(dlist_is_empty(&state->old_change));
1395  }
1396 
1397  change = entry->change;
1398 
1399  /*
1400  * update heap with information about which transaction has the next
1401  * relevant change in LSN order
1402  */
1403 
1404  /* there are in-memory changes */
1405  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1406  {
1407  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1408  ReorderBufferChange *next_change =
1410 
1411  /* txn stays the same */
1412  state->entries[off].lsn = next_change->lsn;
1413  state->entries[off].change = next_change;
1414 
1416  return change;
1417  }
1418 
1419  /* try to load changes from disk */
1420  if (entry->txn->nentries != entry->txn->nentries_mem)
1421  {
1422  /*
1423  * Ugly: restoring changes will reuse *Change records, thus delete the
1424  * current one from the per-tx list and only free in the next call.
1425  */
1426  dlist_delete(&change->node);
1427  dlist_push_tail(&state->old_change, &change->node);
1428 
1429  /*
1430  * Update the total bytes processed by the txn for which we are
1431  * releasing the current set of changes and restoring the new set of
1432  * changes.
1433  */
1434  rb->totalBytes += entry->txn->size;
1435  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1436  &state->entries[off].segno))
1437  {
1438  /* successfully restored changes from disk */
1439  ReorderBufferChange *next_change =
1441  &entry->txn->changes);
1442 
1443  elog(DEBUG2, "restored %u/%u changes from disk",
1444  (uint32) entry->txn->nentries_mem,
1445  (uint32) entry->txn->nentries);
1446 
1447  Assert(entry->txn->nentries_mem);
1448  /* txn stays the same */
1449  state->entries[off].lsn = next_change->lsn;
1450  state->entries[off].change = next_change;
1452 
1453  return change;
1454  }
1455  }
1456 
1457  /* ok, no changes there anymore, remove */
1459 
1460  return change;
1461 }
1462 
1463 /*
1464  * Deallocate the iterator
1465  */
1466 static void
1469 {
1470  int32 off;
1471 
1472  for (off = 0; off < state->nr_txns; off++)
1473  {
1474  if (state->entries[off].file.vfd != -1)
1475  FileClose(state->entries[off].file.vfd);
1476  }
1477 
1478  /* free memory we might have "leaked" in the last *Next call */
1479  if (!dlist_is_empty(&state->old_change))
1480  {
1481  ReorderBufferChange *change;
1482 
1483  change = dlist_container(ReorderBufferChange, node,
1484  dlist_pop_head_node(&state->old_change));
1485  ReorderBufferReturnChange(rb, change, true);
1486  Assert(dlist_is_empty(&state->old_change));
1487  }
1488 
1489  binaryheap_free(state->heap);
1490  pfree(state);
1491 }
1492 
1493 /*
1494  * Cleanup the contents of a transaction, usually after the transaction
1495  * committed or aborted.
1496  */
1497 static void
1499 {
1500  bool found;
1501  dlist_mutable_iter iter;
1502 
1503  /* cleanup subtransactions & their changes */
1504  dlist_foreach_modify(iter, &txn->subtxns)
1505  {
1506  ReorderBufferTXN *subtxn;
1507 
1508  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1509 
1510  /*
1511  * Subtransactions are always associated to the toplevel TXN, even if
1512  * they originally were happening inside another subtxn, so we won't
1513  * ever recurse more than one level deep here.
1514  */
1515  Assert(rbtxn_is_known_subxact(subtxn));
1516  Assert(subtxn->nsubtxns == 0);
1517 
1518  ReorderBufferCleanupTXN(rb, subtxn);
1519  }
1520 
1521  /* cleanup changes in the txn */
1522  dlist_foreach_modify(iter, &txn->changes)
1523  {
1524  ReorderBufferChange *change;
1525 
1526  change = dlist_container(ReorderBufferChange, node, iter.cur);
1527 
1528  /* Check we're not mixing changes from different transactions. */
1529  Assert(change->txn == txn);
1530 
1531  ReorderBufferReturnChange(rb, change, true);
1532  }
1533 
1534  /*
1535  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1536  * They are always stored in the toplevel transaction.
1537  */
1538  dlist_foreach_modify(iter, &txn->tuplecids)
1539  {
1540  ReorderBufferChange *change;
1541 
1542  change = dlist_container(ReorderBufferChange, node, iter.cur);
1543 
1544  /* Check we're not mixing changes from different transactions. */
1545  Assert(change->txn == txn);
1547 
1548  ReorderBufferReturnChange(rb, change, true);
1549  }
1550 
1551  /*
1552  * Cleanup the base snapshot, if set.
1553  */
1554  if (txn->base_snapshot != NULL)
1555  {
1558  }
1559 
1560  /*
1561  * Cleanup the snapshot for the last streamed run.
1562  */
1563  if (txn->snapshot_now != NULL)
1564  {
1565  Assert(rbtxn_is_streamed(txn));
1567  }
1568 
1569  /*
1570  * Remove TXN from its containing lists.
1571  *
1572  * Note: if txn is known as subxact, we are deleting the TXN from its
1573  * parent's list of known subxacts; this leaves the parent's nsubxacts
1574  * count too high, but we don't care. Otherwise, we are deleting the TXN
1575  * from the LSN-ordered list of toplevel TXNs. We remove the TXN from the
1576  * list of catalog modifying transactions as well.
1577  */
1578  dlist_delete(&txn->node);
1579  if (rbtxn_has_catalog_changes(txn))
1581 
1582  /* now remove reference from buffer */
1583  hash_search(rb->by_txn, &txn->xid, HASH_REMOVE, &found);
1584  Assert(found);
1585 
1586  /* remove entries spilled to disk */
1587  if (rbtxn_is_serialized(txn))
1588  ReorderBufferRestoreCleanup(rb, txn);
1589 
1590  /* deallocate */
1591  ReorderBufferReturnTXN(rb, txn);
1592 }
1593 
1594 /*
1595  * Discard changes from a transaction (and subtransactions), either after
1596  * streaming or decoding them at PREPARE. Keep the remaining info -
1597  * transactions, tuplecids, invalidations and snapshots.
1598  *
1599  * We additionally remove tuplecids after decoding the transaction at prepare
1600  * time as we only need to perform invalidation at rollback or commit prepared.
1601  *
1602  * 'txn_prepared' indicates that we have decoded the transaction at prepare
1603  * time.
1604  */
1605 static void
1607 {
1608  dlist_mutable_iter iter;
1609 
1610  /* cleanup subtransactions & their changes */
1611  dlist_foreach_modify(iter, &txn->subtxns)
1612  {
1613  ReorderBufferTXN *subtxn;
1614 
1615  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1616 
1617  /*
1618  * Subtransactions are always associated to the toplevel TXN, even if
1619  * they originally were happening inside another subtxn, so we won't
1620  * ever recurse more than one level deep here.
1621  */
1622  Assert(rbtxn_is_known_subxact(subtxn));
1623  Assert(subtxn->nsubtxns == 0);
1624 
1625  ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
1626  }
1627 
1628  /* cleanup changes in the txn */
1629  dlist_foreach_modify(iter, &txn->changes)
1630  {
1631  ReorderBufferChange *change;
1632 
1633  change = dlist_container(ReorderBufferChange, node, iter.cur);
1634 
1635  /* Check we're not mixing changes from different transactions. */
1636  Assert(change->txn == txn);
1637 
1638  /* remove the change from it's containing list */
1639  dlist_delete(&change->node);
1640 
1641  ReorderBufferReturnChange(rb, change, true);
1642  }
1643 
1644  /*
1645  * Mark the transaction as streamed.
1646  *
1647  * The top-level transaction, is marked as streamed always, even if it
1648  * does not contain any changes (that is, when all the changes are in
1649  * subtransactions).
1650  *
1651  * For subtransactions, we only mark them as streamed when there are
1652  * changes in them.
1653  *
1654  * We do it this way because of aborts - we don't want to send aborts for
1655  * XIDs the downstream is not aware of. And of course, it always knows
1656  * about the toplevel xact (we send the XID in all messages), but we never
1657  * stream XIDs of empty subxacts.
1658  */
1659  if ((!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0)))
1660  txn->txn_flags |= RBTXN_IS_STREAMED;
1661 
1662  if (txn_prepared)
1663  {
1664  /*
1665  * If this is a prepared txn, cleanup the tuplecids we stored for
1666  * decoding catalog snapshot access. They are always stored in the
1667  * toplevel transaction.
1668  */
1669  dlist_foreach_modify(iter, &txn->tuplecids)
1670  {
1671  ReorderBufferChange *change;
1672 
1673  change = dlist_container(ReorderBufferChange, node, iter.cur);
1674 
1675  /* Check we're not mixing changes from different transactions. */
1676  Assert(change->txn == txn);
1678 
1679  /* Remove the change from its containing list. */
1680  dlist_delete(&change->node);
1681 
1682  ReorderBufferReturnChange(rb, change, true);
1683  }
1684  }
1685 
1686  /*
1687  * Destroy the (relfilelocator, ctid) hashtable, so that we don't leak any
1688  * memory. We could also keep the hash table and update it with new ctid
1689  * values, but this seems simpler and good enough for now.
1690  */
1691  if (txn->tuplecid_hash != NULL)
1692  {
1694  txn->tuplecid_hash = NULL;
1695  }
1696 
1697  /* If this txn is serialized then clean the disk space. */
1698  if (rbtxn_is_serialized(txn))
1699  {
1700  ReorderBufferRestoreCleanup(rb, txn);
1701  txn->txn_flags &= ~RBTXN_IS_SERIALIZED;
1702 
1703  /*
1704  * We set this flag to indicate if the transaction is ever serialized.
1705  * We need this to accurately update the stats as otherwise the same
1706  * transaction can be counted as serialized multiple times.
1707  */
1709  }
1710 
1711  /* also reset the number of entries in the transaction */
1712  txn->nentries_mem = 0;
1713  txn->nentries = 0;
1714 }
1715 
1716 /*
1717  * Build a hash with a (relfilelocator, ctid) -> (cmin, cmax) mapping for use by
1718  * HeapTupleSatisfiesHistoricMVCC.
1719  */
1720 static void
1722 {
1723  dlist_iter iter;
1724  HASHCTL hash_ctl;
1725 
1727  return;
1728 
1729  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1730  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1731  hash_ctl.hcxt = rb->context;
1732 
1733  /*
1734  * create the hash with the exact number of to-be-stored tuplecids from
1735  * the start
1736  */
1737  txn->tuplecid_hash =
1738  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1740 
1741  dlist_foreach(iter, &txn->tuplecids)
1742  {
1745  bool found;
1746  ReorderBufferChange *change;
1747 
1748  change = dlist_container(ReorderBufferChange, node, iter.cur);
1749 
1751 
1752  /* be careful about padding */
1753  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1754 
1755  key.rlocator = change->data.tuplecid.locator;
1756 
1757  ItemPointerCopy(&change->data.tuplecid.tid,
1758  &key.tid);
1759 
1760  ent = (ReorderBufferTupleCidEnt *)
1761  hash_search(txn->tuplecid_hash, &key, HASH_ENTER, &found);
1762  if (!found)
1763  {
1764  ent->cmin = change->data.tuplecid.cmin;
1765  ent->cmax = change->data.tuplecid.cmax;
1766  ent->combocid = change->data.tuplecid.combocid;
1767  }
1768  else
1769  {
1770  /*
1771  * Maybe we already saw this tuple before in this transaction, but
1772  * if so it must have the same cmin.
1773  */
1774  Assert(ent->cmin == change->data.tuplecid.cmin);
1775 
1776  /*
1777  * cmax may be initially invalid, but once set it can only grow,
1778  * and never become invalid again.
1779  */
1780  Assert((ent->cmax == InvalidCommandId) ||
1781  ((change->data.tuplecid.cmax != InvalidCommandId) &&
1782  (change->data.tuplecid.cmax > ent->cmax)));
1783  ent->cmax = change->data.tuplecid.cmax;
1784  }
1785  }
1786 }
1787 
1788 /*
1789  * Copy a provided snapshot so we can modify it privately. This is needed so
1790  * that catalog modifying transactions can look into intermediate catalog
1791  * states.
1792  */
1793 static Snapshot
1795  ReorderBufferTXN *txn, CommandId cid)
1796 {
1797  Snapshot snap;
1798  dlist_iter iter;
1799  int i = 0;
1800  Size size;
1801 
1802  size = sizeof(SnapshotData) +
1803  sizeof(TransactionId) * orig_snap->xcnt +
1804  sizeof(TransactionId) * (txn->nsubtxns + 1);
1805 
1806  snap = MemoryContextAllocZero(rb->context, size);
1807  memcpy(snap, orig_snap, sizeof(SnapshotData));
1808 
1809  snap->copied = true;
1810  snap->active_count = 1; /* mark as active so nobody frees it */
1811  snap->regd_count = 0;
1812  snap->xip = (TransactionId *) (snap + 1);
1813 
1814  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1815 
1816  /*
1817  * snap->subxip contains all txids that belong to our transaction which we
1818  * need to check via cmin/cmax. That's why we store the toplevel
1819  * transaction in there as well.
1820  */
1821  snap->subxip = snap->xip + snap->xcnt;
1822  snap->subxip[i++] = txn->xid;
1823 
1824  /*
1825  * subxcnt isn't decreased when subtransactions abort, so count manually.
1826  * Since it's an upper boundary it is safe to use it for the allocation
1827  * above.
1828  */
1829  snap->subxcnt = 1;
1830 
1831  dlist_foreach(iter, &txn->subtxns)
1832  {
1833  ReorderBufferTXN *sub_txn;
1834 
1835  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1836  snap->subxip[i++] = sub_txn->xid;
1837  snap->subxcnt++;
1838  }
1839 
1840  /* sort so we can bsearch() later */
1841  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1842 
1843  /* store the specified current CommandId */
1844  snap->curcid = cid;
1845 
1846  return snap;
1847 }
1848 
1849 /*
1850  * Free a previously ReorderBufferCopySnap'ed snapshot
1851  */
1852 static void
1854 {
1855  if (snap->copied)
1856  pfree(snap);
1857  else
1859 }
1860 
1861 /*
1862  * If the transaction was (partially) streamed, we need to prepare or commit
1863  * it in a 'streamed' way. That is, we first stream the remaining part of the
1864  * transaction, and then invoke stream_prepare or stream_commit message as per
1865  * the case.
1866  */
1867 static void
1869 {
1870  /* we should only call this for previously streamed transactions */
1871  Assert(rbtxn_is_streamed(txn));
1872 
1873  ReorderBufferStreamTXN(rb, txn);
1874 
1875  if (rbtxn_prepared(txn))
1876  {
1877  /*
1878  * Note, we send stream prepare even if a concurrent abort is
1879  * detected. See DecodePrepare for more information.
1880  */
1881  rb->stream_prepare(rb, txn, txn->final_lsn);
1882 
1883  /*
1884  * This is a PREPARED transaction, part of a two-phase commit. The
1885  * full cleanup will happen as part of the COMMIT PREPAREDs, so now
1886  * just truncate txn by removing changes and tuplecids.
1887  */
1888  ReorderBufferTruncateTXN(rb, txn, true);
1889  /* Reset the CheckXidAlive */
1891  }
1892  else
1893  {
1894  rb->stream_commit(rb, txn, txn->final_lsn);
1895  ReorderBufferCleanupTXN(rb, txn);
1896  }
1897 }
1898 
1899 /*
1900  * Set xid to detect concurrent aborts.
1901  *
1902  * While streaming an in-progress transaction or decoding a prepared
1903  * transaction there is a possibility that the (sub)transaction might get
1904  * aborted concurrently. In such case if the (sub)transaction has catalog
1905  * update then we might decode the tuple using wrong catalog version. For
1906  * example, suppose there is one catalog tuple with (xmin: 500, xmax: 0). Now,
1907  * the transaction 501 updates the catalog tuple and after that we will have
1908  * two tuples (xmin: 500, xmax: 501) and (xmin: 501, xmax: 0). Now, if 501 is
1909  * aborted and some other transaction say 502 updates the same catalog tuple
1910  * then the first tuple will be changed to (xmin: 500, xmax: 502). So, the
1911  * problem is that when we try to decode the tuple inserted/updated in 501
1912  * after the catalog update, we will see the catalog tuple with (xmin: 500,
1913  * xmax: 502) as visible because it will consider that the tuple is deleted by
1914  * xid 502 which is not visible to our snapshot. And when we will try to
1915  * decode with that catalog tuple, it can lead to a wrong result or a crash.
1916  * So, it is necessary to detect concurrent aborts to allow streaming of
1917  * in-progress transactions or decoding of prepared transactions.
1918  *
1919  * For detecting the concurrent abort we set CheckXidAlive to the current
1920  * (sub)transaction's xid for which this change belongs to. And, during
1921  * catalog scan we can check the status of the xid and if it is aborted we will
1922  * report a specific error so that we can stop streaming current transaction
1923  * and discard the already streamed changes on such an error. We might have
1924  * already streamed some of the changes for the aborted (sub)transaction, but
1925  * that is fine because when we decode the abort we will stream abort message
1926  * to truncate the changes in the subscriber. Similarly, for prepared
1927  * transactions, we stop decoding if concurrent abort is detected and then
1928  * rollback the changes when rollback prepared is encountered. See
1929  * DecodePrepare.
1930  */
1931 static inline void
1933 {
1934  /*
1935  * If the input transaction id is already set as a CheckXidAlive then
1936  * nothing to do.
1937  */
1939  return;
1940 
1941  /*
1942  * setup CheckXidAlive if it's not committed yet. We don't check if the
1943  * xid is aborted. That will happen during catalog access.
1944  */
1945  if (!TransactionIdDidCommit(xid))
1946  CheckXidAlive = xid;
1947  else
1949 }
1950 
1951 /*
1952  * Helper function for ReorderBufferProcessTXN for applying change.
1953  */
1954 static inline void
1956  Relation relation, ReorderBufferChange *change,
1957  bool streaming)
1958 {
1959  if (streaming)
1960  rb->stream_change(rb, txn, relation, change);
1961  else
1962  rb->apply_change(rb, txn, relation, change);
1963 }
1964 
1965 /*
1966  * Helper function for ReorderBufferProcessTXN for applying the truncate.
1967  */
1968 static inline void
1970  int nrelations, Relation *relations,
1971  ReorderBufferChange *change, bool streaming)
1972 {
1973  if (streaming)
1974  rb->stream_truncate(rb, txn, nrelations, relations, change);
1975  else
1976  rb->apply_truncate(rb, txn, nrelations, relations, change);
1977 }
1978 
1979 /*
1980  * Helper function for ReorderBufferProcessTXN for applying the message.
1981  */
1982 static inline void
1984  ReorderBufferChange *change, bool streaming)
1985 {
1986  if (streaming)
1987  rb->stream_message(rb, txn, change->lsn, true,
1988  change->data.msg.prefix,
1989  change->data.msg.message_size,
1990  change->data.msg.message);
1991  else
1992  rb->message(rb, txn, change->lsn, true,
1993  change->data.msg.prefix,
1994  change->data.msg.message_size,
1995  change->data.msg.message);
1996 }
1997 
1998 /*
1999  * Function to store the command id and snapshot at the end of the current
2000  * stream so that we can reuse the same while sending the next stream.
2001  */
2002 static inline void
2004  Snapshot snapshot_now, CommandId command_id)
2005 {
2006  txn->command_id = command_id;
2007 
2008  /* Avoid copying if it's already copied. */
2009  if (snapshot_now->copied)
2010  txn->snapshot_now = snapshot_now;
2011  else
2012  txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2013  txn, command_id);
2014 }
2015 
2016 /*
2017  * Helper function for ReorderBufferProcessTXN to handle the concurrent
2018  * abort of the streaming transaction. This resets the TXN such that it
2019  * can be used to stream the remaining data of transaction being processed.
2020  * This can happen when the subtransaction is aborted and we still want to
2021  * continue processing the main or other subtransactions data.
2022  */
2023 static void
2025  Snapshot snapshot_now,
2026  CommandId command_id,
2027  XLogRecPtr last_lsn,
2028  ReorderBufferChange *specinsert)
2029 {
2030  /* Discard the changes that we just streamed */
2032 
2033  /* Free all resources allocated for toast reconstruction */
2034  ReorderBufferToastReset(rb, txn);
2035 
2036  /* Return the spec insert change if it is not NULL */
2037  if (specinsert != NULL)
2038  {
2039  ReorderBufferReturnChange(rb, specinsert, true);
2040  specinsert = NULL;
2041  }
2042 
2043  /*
2044  * For the streaming case, stop the stream and remember the command ID and
2045  * snapshot for the streaming run.
2046  */
2047  if (rbtxn_is_streamed(txn))
2048  {
2049  rb->stream_stop(rb, txn, last_lsn);
2050  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2051  }
2052 }
2053 
2054 /*
2055  * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN.
2056  *
2057  * Send data of a transaction (and its subtransactions) to the
2058  * output plugin. We iterate over the top and subtransactions (using a k-way
2059  * merge) and replay the changes in lsn order.
2060  *
2061  * If streaming is true then data will be sent using stream API.
2062  *
2063  * Note: "volatile" markers on some parameters are to avoid trouble with
2064  * PG_TRY inside the function.
2065  */
2066 static void
2068  XLogRecPtr commit_lsn,
2069  volatile Snapshot snapshot_now,
2070  volatile CommandId command_id,
2071  bool streaming)
2072 {
2073  bool using_subtxn;
2075  ReorderBufferIterTXNState *volatile iterstate = NULL;
2076  volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
2077  ReorderBufferChange *volatile specinsert = NULL;
2078  volatile bool stream_started = false;
2079  ReorderBufferTXN *volatile curtxn = NULL;
2080 
2081  /* build data to be able to lookup the CommandIds of catalog tuples */
2083 
2084  /* setup the initial snapshot */
2085  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2086 
2087  /*
2088  * Decoding needs access to syscaches et al., which in turn use
2089  * heavyweight locks and such. Thus we need to have enough state around to
2090  * keep track of those. The easiest way is to simply use a transaction
2091  * internally. That also allows us to easily enforce that nothing writes
2092  * to the database by checking for xid assignments.
2093  *
2094  * When we're called via the SQL SRF there's already a transaction
2095  * started, so start an explicit subtransaction there.
2096  */
2097  using_subtxn = IsTransactionOrTransactionBlock();
2098 
2099  PG_TRY();
2100  {
2101  ReorderBufferChange *change;
2102  int changes_count = 0; /* used to accumulate the number of
2103  * changes */
2104 
2105  if (using_subtxn)
2106  BeginInternalSubTransaction(streaming ? "stream" : "replay");
2107  else
2109 
2110  /*
2111  * We only need to send begin/begin-prepare for non-streamed
2112  * transactions.
2113  */
2114  if (!streaming)
2115  {
2116  if (rbtxn_prepared(txn))
2117  rb->begin_prepare(rb, txn);
2118  else
2119  rb->begin(rb, txn);
2120  }
2121 
2122  ReorderBufferIterTXNInit(rb, txn, &iterstate);
2123  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
2124  {
2125  Relation relation = NULL;
2126  Oid reloid;
2127 
2129 
2130  /*
2131  * We can't call start stream callback before processing first
2132  * change.
2133  */
2134  if (prev_lsn == InvalidXLogRecPtr)
2135  {
2136  if (streaming)
2137  {
2138  txn->origin_id = change->origin_id;
2139  rb->stream_start(rb, txn, change->lsn);
2140  stream_started = true;
2141  }
2142  }
2143 
2144  /*
2145  * Enforce correct ordering of changes, merged from multiple
2146  * subtransactions. The changes may have the same LSN due to
2147  * MULTI_INSERT xlog records.
2148  */
2149  Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
2150 
2151  prev_lsn = change->lsn;
2152 
2153  /*
2154  * Set the current xid to detect concurrent aborts. This is
2155  * required for the cases when we decode the changes before the
2156  * COMMIT record is processed.
2157  */
2158  if (streaming || rbtxn_prepared(change->txn))
2159  {
2160  curtxn = change->txn;
2161  SetupCheckXidLive(curtxn->xid);
2162  }
2163 
2164  switch (change->action)
2165  {
2167 
2168  /*
2169  * Confirmation for speculative insertion arrived. Simply
2170  * use as a normal record. It'll be cleaned up at the end
2171  * of INSERT processing.
2172  */
2173  if (specinsert == NULL)
2174  elog(ERROR, "invalid ordering of speculative insertion changes");
2175  Assert(specinsert->data.tp.oldtuple == NULL);
2176  change = specinsert;
2178 
2179  /* intentionally fall through */
2183  Assert(snapshot_now);
2184 
2185  reloid = RelidByRelfilenumber(change->data.tp.rlocator.spcOid,
2186  change->data.tp.rlocator.relNumber);
2187 
2188  /*
2189  * Mapped catalog tuple without data, emitted while
2190  * catalog table was in the process of being rewritten. We
2191  * can fail to look up the relfilenumber, because the
2192  * relmapper has no "historic" view, in contrast to the
2193  * normal catalog during decoding. Thus repeated rewrites
2194  * can cause a lookup failure. That's OK because we do not
2195  * decode catalog changes anyway. Normally such tuples
2196  * would be skipped over below, but we can't identify
2197  * whether the table should be logically logged without
2198  * mapping the relfilenumber to the oid.
2199  */
2200  if (reloid == InvalidOid &&
2201  change->data.tp.newtuple == NULL &&
2202  change->data.tp.oldtuple == NULL)
2203  goto change_done;
2204  else if (reloid == InvalidOid)
2205  elog(ERROR, "could not map filenumber \"%s\" to relation OID",
2206  relpathperm(change->data.tp.rlocator,
2207  MAIN_FORKNUM));
2208 
2209  relation = RelationIdGetRelation(reloid);
2210 
2211  if (!RelationIsValid(relation))
2212  elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",
2213  reloid,
2214  relpathperm(change->data.tp.rlocator,
2215  MAIN_FORKNUM));
2216 
2217  if (!RelationIsLogicallyLogged(relation))
2218  goto change_done;
2219 
2220  /*
2221  * Ignore temporary heaps created during DDL unless the
2222  * plugin has asked for them.
2223  */
2224  if (relation->rd_rel->relrewrite && !rb->output_rewrites)
2225  goto change_done;
2226 
2227  /*
2228  * For now ignore sequence changes entirely. Most of the
2229  * time they don't log changes using records we
2230  * understand, so it doesn't make sense to handle the few
2231  * cases we do.
2232  */
2233  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
2234  goto change_done;
2235 
2236  /* user-triggered change */
2237  if (!IsToastRelation(relation))
2238  {
2239  ReorderBufferToastReplace(rb, txn, relation, change);
2240  ReorderBufferApplyChange(rb, txn, relation, change,
2241  streaming);
2242 
2243  /*
2244  * Only clear reassembled toast chunks if we're sure
2245  * they're not required anymore. The creator of the
2246  * tuple tells us.
2247  */
2248  if (change->data.tp.clear_toast_afterwards)
2249  ReorderBufferToastReset(rb, txn);
2250  }
2251  /* we're not interested in toast deletions */
2252  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
2253  {
2254  /*
2255  * Need to reassemble the full toasted Datum in
2256  * memory, to ensure the chunks don't get reused till
2257  * we're done remove it from the list of this
2258  * transaction's changes. Otherwise it will get
2259  * freed/reused while restoring spooled data from
2260  * disk.
2261  */
2262  Assert(change->data.tp.newtuple != NULL);
2263 
2264  dlist_delete(&change->node);
2265  ReorderBufferToastAppendChunk(rb, txn, relation,
2266  change);
2267  }
2268 
2269  change_done:
2270 
2271  /*
2272  * If speculative insertion was confirmed, the record
2273  * isn't needed anymore.
2274  */
2275  if (specinsert != NULL)
2276  {
2277  ReorderBufferReturnChange(rb, specinsert, true);
2278  specinsert = NULL;
2279  }
2280 
2281  if (RelationIsValid(relation))
2282  {
2283  RelationClose(relation);
2284  relation = NULL;
2285  }
2286  break;
2287 
2289 
2290  /*
2291  * Speculative insertions are dealt with by delaying the
2292  * processing of the insert until the confirmation record
2293  * arrives. For that we simply unlink the record from the
2294  * chain, so it does not get freed/reused while restoring
2295  * spooled data from disk.
2296  *
2297  * This is safe in the face of concurrent catalog changes
2298  * because the relevant relation can't be changed between
2299  * speculative insertion and confirmation due to
2300  * CheckTableNotInUse() and locking.
2301  */
2302 
2303  /* clear out a pending (and thus failed) speculation */
2304  if (specinsert != NULL)
2305  {
2306  ReorderBufferReturnChange(rb, specinsert, true);
2307  specinsert = NULL;
2308  }
2309 
2310  /* and memorize the pending insertion */
2311  dlist_delete(&change->node);
2312  specinsert = change;
2313  break;
2314 
2316 
2317  /*
2318  * Abort for speculative insertion arrived. So cleanup the
2319  * specinsert tuple and toast hash.
2320  *
2321  * Note that we get the spec abort change for each toast
2322  * entry but we need to perform the cleanup only the first
2323  * time we get it for the main table.
2324  */
2325  if (specinsert != NULL)
2326  {
2327  /*
2328  * We must clean the toast hash before processing a
2329  * completely new tuple to avoid confusion about the
2330  * previous tuple's toast chunks.
2331  */
2332  Assert(change->data.tp.clear_toast_afterwards);
2333  ReorderBufferToastReset(rb, txn);
2334 
2335  /* We don't need this record anymore. */
2336  ReorderBufferReturnChange(rb, specinsert, true);
2337  specinsert = NULL;
2338  }
2339  break;
2340 
2342  {
2343  int i;
2344  int nrelids = change->data.truncate.nrelids;
2345  int nrelations = 0;
2346  Relation *relations;
2347 
2348  relations = palloc0(nrelids * sizeof(Relation));
2349  for (i = 0; i < nrelids; i++)
2350  {
2351  Oid relid = change->data.truncate.relids[i];
2352  Relation rel;
2353 
2354  rel = RelationIdGetRelation(relid);
2355 
2356  if (!RelationIsValid(rel))
2357  elog(ERROR, "could not open relation with OID %u", relid);
2358 
2359  if (!RelationIsLogicallyLogged(rel))
2360  continue;
2361 
2362  relations[nrelations++] = rel;
2363  }
2364 
2365  /* Apply the truncate. */
2366  ReorderBufferApplyTruncate(rb, txn, nrelations,
2367  relations, change,
2368  streaming);
2369 
2370  for (i = 0; i < nrelations; i++)
2371  RelationClose(relations[i]);
2372 
2373  break;
2374  }
2375 
2377  ReorderBufferApplyMessage(rb, txn, change, streaming);
2378  break;
2379 
2381  /* Execute the invalidation messages locally */
2382  ReorderBufferExecuteInvalidations(change->data.inval.ninvalidations,
2383  change->data.inval.invalidations);
2384  break;
2385 
2387  /* get rid of the old */
2388  TeardownHistoricSnapshot(false);
2389 
2390  if (snapshot_now->copied)
2391  {
2392  ReorderBufferFreeSnap(rb, snapshot_now);
2393  snapshot_now =
2394  ReorderBufferCopySnap(rb, change->data.snapshot,
2395  txn, command_id);
2396  }
2397 
2398  /*
2399  * Restored from disk, need to be careful not to double
2400  * free. We could introduce refcounting for that, but for
2401  * now this seems infrequent enough not to care.
2402  */
2403  else if (change->data.snapshot->copied)
2404  {
2405  snapshot_now =
2406  ReorderBufferCopySnap(rb, change->data.snapshot,
2407  txn, command_id);
2408  }
2409  else
2410  {
2411  snapshot_now = change->data.snapshot;
2412  }
2413 
2414  /* and continue with the new one */
2415  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2416  break;
2417 
2419  Assert(change->data.command_id != InvalidCommandId);
2420 
2421  if (command_id < change->data.command_id)
2422  {
2423  command_id = change->data.command_id;
2424 
2425  if (!snapshot_now->copied)
2426  {
2427  /* we don't use the global one anymore */
2428  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2429  txn, command_id);
2430  }
2431 
2432  snapshot_now->curcid = command_id;
2433 
2434  TeardownHistoricSnapshot(false);
2435  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2436  }
2437 
2438  break;
2439 
2441  elog(ERROR, "tuplecid value in changequeue");
2442  break;
2443  }
2444 
2445  /*
2446  * It is possible that the data is not sent to downstream for a
2447  * long time either because the output plugin filtered it or there
2448  * is a DDL that generates a lot of data that is not processed by
2449  * the plugin. So, in such cases, the downstream can timeout. To
2450  * avoid that we try to send a keepalive message if required.
2451  * Trying to send a keepalive message after every change has some
2452  * overhead, but testing showed there is no noticeable overhead if
2453  * we do it after every ~100 changes.
2454  */
2455 #define CHANGES_THRESHOLD 100
2456 
2457  if (++changes_count >= CHANGES_THRESHOLD)
2458  {
2459  rb->update_progress_txn(rb, txn, change->lsn);
2460  changes_count = 0;
2461  }
2462  }
2463 
2464  /* speculative insertion record must be freed by now */
2465  Assert(!specinsert);
2466 
2467  /* clean up the iterator */
2468  ReorderBufferIterTXNFinish(rb, iterstate);
2469  iterstate = NULL;
2470 
2471  /*
2472  * Update total transaction count and total bytes processed by the
2473  * transaction and its subtransactions. Ensure to not count the
2474  * streamed transaction multiple times.
2475  *
2476  * Note that the statistics computation has to be done after
2477  * ReorderBufferIterTXNFinish as it releases the serialized change
2478  * which we have already accounted in ReorderBufferIterTXNNext.
2479  */
2480  if (!rbtxn_is_streamed(txn))
2481  rb->totalTxns++;
2482 
2483  rb->totalBytes += txn->total_size;
2484 
2485  /*
2486  * Done with current changes, send the last message for this set of
2487  * changes depending upon streaming mode.
2488  */
2489  if (streaming)
2490  {
2491  if (stream_started)
2492  {
2493  rb->stream_stop(rb, txn, prev_lsn);
2494  stream_started = false;
2495  }
2496  }
2497  else
2498  {
2499  /*
2500  * Call either PREPARE (for two-phase transactions) or COMMIT (for
2501  * regular ones).
2502  */
2503  if (rbtxn_prepared(txn))
2504  rb->prepare(rb, txn, commit_lsn);
2505  else
2506  rb->commit(rb, txn, commit_lsn);
2507  }
2508 
2509  /* this is just a sanity check against bad output plugin behaviour */
2511  elog(ERROR, "output plugin used XID %u",
2513 
2514  /*
2515  * Remember the command ID and snapshot for the next set of changes in
2516  * streaming mode.
2517  */
2518  if (streaming)
2519  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2520  else if (snapshot_now->copied)
2521  ReorderBufferFreeSnap(rb, snapshot_now);
2522 
2523  /* cleanup */
2524  TeardownHistoricSnapshot(false);
2525 
2526  /*
2527  * Aborting the current (sub-)transaction as a whole has the right
2528  * semantics. We want all locks acquired in here to be released, not
2529  * reassigned to the parent and we do not want any database access
2530  * have persistent effects.
2531  */
2533 
2534  /* make sure there's no cache pollution */
2536 
2537  if (using_subtxn)
2539 
2540  /*
2541  * We are here due to one of the four reasons: 1. Decoding an
2542  * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
2543  * prepared txn that was (partially) streamed. 4. Decoding a committed
2544  * txn.
2545  *
2546  * For 1, we allow truncation of txn data by removing the changes
2547  * already streamed but still keeping other things like invalidations,
2548  * snapshot, and tuplecids. For 2 and 3, we indicate
2549  * ReorderBufferTruncateTXN to do more elaborate truncation of txn
2550  * data as the entire transaction has been decoded except for commit.
2551  * For 4, as the entire txn has been decoded, we can fully clean up
2552  * the TXN reorder buffer.
2553  */
2554  if (streaming || rbtxn_prepared(txn))
2555  {
2557  /* Reset the CheckXidAlive */
2559  }
2560  else
2561  ReorderBufferCleanupTXN(rb, txn);
2562  }
2563  PG_CATCH();
2564  {
2565  MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
2566  ErrorData *errdata = CopyErrorData();
2567 
2568  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
2569  if (iterstate)
2570  ReorderBufferIterTXNFinish(rb, iterstate);
2571 
2573 
2574  /*
2575  * Force cache invalidation to happen outside of a valid transaction
2576  * to prevent catalog access as we just caught an error.
2577  */
2579 
2580  /* make sure there's no cache pollution */
2582  txn->invalidations);
2583 
2584  if (using_subtxn)
2586 
2587  /*
2588  * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
2589  * abort of the (sub)transaction we are streaming or preparing. We
2590  * need to do the cleanup and return gracefully on this error, see
2591  * SetupCheckXidLive.
2592  *
2593  * This error code can be thrown by one of the callbacks we call
2594  * during decoding so we need to ensure that we return gracefully only
2595  * when we are sending the data in streaming mode and the streaming is
2596  * not finished yet or when we are sending the data out on a PREPARE
2597  * during a two-phase commit.
2598  */
2599  if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
2600  (stream_started || rbtxn_prepared(txn)))
2601  {
2602  /* curtxn must be set for streaming or prepared transactions */
2603  Assert(curtxn);
2604 
2605  /* Cleanup the temporary error state. */
2606  FlushErrorState();
2607  FreeErrorData(errdata);
2608  errdata = NULL;
2609  curtxn->concurrent_abort = true;
2610 
2611  /* Reset the TXN so that it is allowed to stream remaining data. */
2612  ReorderBufferResetTXN(rb, txn, snapshot_now,
2613  command_id, prev_lsn,
2614  specinsert);
2615  }
2616  else
2617  {
2618  ReorderBufferCleanupTXN(rb, txn);
2619  MemoryContextSwitchTo(ecxt);
2620  PG_RE_THROW();
2621  }
2622  }
2623  PG_END_TRY();
2624 }
2625 
2626 /*
2627  * Perform the replay of a transaction and its non-aborted subtransactions.
2628  *
2629  * Subtransactions previously have to be processed by
2630  * ReorderBufferCommitChild(), even if previously assigned to the toplevel
2631  * transaction with ReorderBufferAssignChild.
2632  *
2633  * This interface is called once a prepare or toplevel commit is read for both
2634  * streamed as well as non-streamed transactions.
2635  */
2636 static void
2638  ReorderBuffer *rb, TransactionId xid,
2639  XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2640  TimestampTz commit_time,
2641  RepOriginId origin_id, XLogRecPtr origin_lsn)
2642 {
2643  Snapshot snapshot_now;
2644  CommandId command_id = FirstCommandId;
2645 
2646  txn->final_lsn = commit_lsn;
2647  txn->end_lsn = end_lsn;
2648  txn->xact_time.commit_time = commit_time;
2649  txn->origin_id = origin_id;
2650  txn->origin_lsn = origin_lsn;
2651 
2652  /*
2653  * If the transaction was (partially) streamed, we need to commit it in a
2654  * 'streamed' way. That is, we first stream the remaining part of the
2655  * transaction, and then invoke stream_commit message.
2656  *
2657  * Called after everything (origin ID, LSN, ...) is stored in the
2658  * transaction to avoid passing that information directly.
2659  */
2660  if (rbtxn_is_streamed(txn))
2661  {
2662  ReorderBufferStreamCommit(rb, txn);
2663  return;
2664  }
2665 
2666  /*
2667  * If this transaction has no snapshot, it didn't make any changes to the
2668  * database, so there's nothing to decode. Note that
2669  * ReorderBufferCommitChild will have transferred any snapshots from
2670  * subtransactions if there were any.
2671  */
2672  if (txn->base_snapshot == NULL)
2673  {
2674  Assert(txn->ninvalidations == 0);
2675 
2676  /*
2677  * Removing this txn before a commit might result in the computation
2678  * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
2679  */
2680  if (!rbtxn_prepared(txn))
2681  ReorderBufferCleanupTXN(rb, txn);
2682  return;
2683  }
2684 
2685  snapshot_now = txn->base_snapshot;
2686 
2687  /* Process and send the changes to output plugin. */
2688  ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2689  command_id, false);
2690 }
2691 
2692 /*
2693  * Commit a transaction.
2694  *
2695  * See comments for ReorderBufferReplay().
2696  */
2697 void
2699  XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2700  TimestampTz commit_time,
2701  RepOriginId origin_id, XLogRecPtr origin_lsn)
2702 {
2703  ReorderBufferTXN *txn;
2704 
2705  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2706  false);
2707 
2708  /* unknown transaction, nothing to replay */
2709  if (txn == NULL)
2710  return;
2711 
2712  ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2713  origin_id, origin_lsn);
2714 }
2715 
2716 /*
2717  * Record the prepare information for a transaction.
2718  */
2719 bool
2721  XLogRecPtr prepare_lsn, XLogRecPtr end_lsn,
2722  TimestampTz prepare_time,
2723  RepOriginId origin_id, XLogRecPtr origin_lsn)
2724 {
2725  ReorderBufferTXN *txn;
2726 
2727  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2728 
2729  /* unknown transaction, nothing to do */
2730  if (txn == NULL)
2731  return false;
2732 
2733  /*
2734  * Remember the prepare information to be later used by commit prepared in
2735  * case we skip doing prepare.
2736  */
2737  txn->final_lsn = prepare_lsn;
2738  txn->end_lsn = end_lsn;
2739  txn->xact_time.prepare_time = prepare_time;
2740  txn->origin_id = origin_id;
2741  txn->origin_lsn = origin_lsn;
2742 
2743  return true;
2744 }
2745 
2746 /* Remember that we have skipped prepare */
2747 void
2749 {
2750  ReorderBufferTXN *txn;
2751 
2752  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2753 
2754  /* unknown transaction, nothing to do */
2755  if (txn == NULL)
2756  return;
2757 
2759 }
2760 
2761 /*
2762  * Prepare a two-phase transaction.
2763  *
2764  * See comments for ReorderBufferReplay().
2765  */
2766 void
2768  char *gid)
2769 {
2770  ReorderBufferTXN *txn;
2771 
2772  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2773  false);
2774 
2775  /* unknown transaction, nothing to replay */
2776  if (txn == NULL)
2777  return;
2778 
2779  txn->txn_flags |= RBTXN_PREPARE;
2780  txn->gid = pstrdup(gid);
2781 
2782  /* The prepare info must have been updated in txn by now. */
2784 
2785  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2786  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2787 
2788  /*
2789  * We send the prepare for the concurrently aborted xacts so that later
2790  * when rollback prepared is decoded and sent, the downstream should be
2791  * able to rollback such a xact. See comments atop DecodePrepare.
2792  *
2793  * Note, for the concurrent_abort + streaming case a stream_prepare was
2794  * already sent within the ReorderBufferReplay call above.
2795  */
2796  if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
2797  rb->prepare(rb, txn, txn->final_lsn);
2798 }
2799 
2800 /*
2801  * This is used to handle COMMIT/ROLLBACK PREPARED.
2802  */
2803 void
2805  XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2806  XLogRecPtr two_phase_at,
2807  TimestampTz commit_time, RepOriginId origin_id,
2808  XLogRecPtr origin_lsn, char *gid, bool is_commit)
2809 {
2810  ReorderBufferTXN *txn;
2811  XLogRecPtr prepare_end_lsn;
2812  TimestampTz prepare_time;
2813 
2814  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
2815 
2816  /* unknown transaction, nothing to do */
2817  if (txn == NULL)
2818  return;
2819 
2820  /*
2821  * By this time the txn has the prepare record information, remember it to
2822  * be later used for rollback.
2823  */
2824  prepare_end_lsn = txn->end_lsn;
2825  prepare_time = txn->xact_time.prepare_time;
2826 
2827  /* add the gid in the txn */
2828  txn->gid = pstrdup(gid);
2829 
2830  /*
2831  * It is possible that this transaction is not decoded at prepare time
2832  * either because by that time we didn't have a consistent snapshot, or
2833  * two_phase was not enabled, or it was decoded earlier but we have
2834  * restarted. We only need to send the prepare if it was not decoded
2835  * earlier. We don't need to decode the xact for aborts if it is not done
2836  * already.
2837  */
2838  if ((txn->final_lsn < two_phase_at) && is_commit)
2839  {
2840  txn->txn_flags |= RBTXN_PREPARE;
2841 
2842  /*
2843  * The prepare info must have been updated in txn even if we skip
2844  * prepare.
2845  */
2847 
2848  /*
2849  * By this time the txn has the prepare record information and it is
2850  * important to use that so that downstream gets the accurate
2851  * information. If instead, we have passed commit information here
2852  * then downstream can behave as it has already replayed commit
2853  * prepared after the restart.
2854  */
2855  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2856  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2857  }
2858 
2859  txn->final_lsn = commit_lsn;
2860  txn->end_lsn = end_lsn;
2861  txn->xact_time.commit_time = commit_time;
2862  txn->origin_id = origin_id;
2863  txn->origin_lsn = origin_lsn;
2864 
2865  if (is_commit)
2866  rb->commit_prepared(rb, txn, commit_lsn);
2867  else
2868  rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
2869 
2870  /* cleanup: make sure there's no cache pollution */
2872  txn->invalidations);
2873  ReorderBufferCleanupTXN(rb, txn);
2874 }
2875 
2876 /*
2877  * Abort a transaction that possibly has previous changes. Needs to be first
2878  * called for subtransactions and then for the toplevel xid.
2879  *
2880  * NB: Transactions handled here have to have actively aborted (i.e. have
2881  * produced an abort record). Implicitly aborted transactions are handled via
2882  * ReorderBufferAbortOld(); transactions we're just not interested in, but
2883  * which have committed are handled in ReorderBufferForget().
2884  *
2885  * This function purges this transaction and its contents from memory and
2886  * disk.
2887  */
2888 void
2890  TimestampTz abort_time)
2891 {
2892  ReorderBufferTXN *txn;
2893 
2894  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2895  false);
2896 
2897  /* unknown, nothing to remove */
2898  if (txn == NULL)
2899  return;
2900 
2901  txn->xact_time.abort_time = abort_time;
2902 
2903  /* For streamed transactions notify the remote node about the abort. */
2904  if (rbtxn_is_streamed(txn))
2905  {
2906  rb->stream_abort(rb, txn, lsn);
2907 
2908  /*
2909  * We might have decoded changes for this transaction that could load
2910  * the cache as per the current transaction's view (consider DDL's
2911  * happened in this transaction). We don't want the decoding of future
2912  * transactions to use those cache entries so execute invalidations.
2913  */
2914  if (txn->ninvalidations > 0)
2916  txn->invalidations);
2917  }
2918 
2919  /* cosmetic... */
2920  txn->final_lsn = lsn;
2921 
2922  /* remove potential on-disk data, and deallocate */
2923  ReorderBufferCleanupTXN(rb, txn);
2924 }
2925 
2926 /*
2927  * Abort all transactions that aren't actually running anymore because the
2928  * server restarted.
2929  *
2930  * NB: These really have to be transactions that have aborted due to a server
2931  * crash/immediate restart, as we don't deal with invalidations here.
2932  */
2933 void
2935 {
2936  dlist_mutable_iter it;
2937 
2938  /*
2939  * Iterate through all (potential) toplevel TXNs and abort all that are
2940  * older than what possibly can be running. Once we've found the first
2941  * that is alive we stop, there might be some that acquired an xid earlier
2942  * but started writing later, but it's unlikely and they will be cleaned
2943  * up in a later call to this function.
2944  */
2946  {
2947  ReorderBufferTXN *txn;
2948 
2949  txn = dlist_container(ReorderBufferTXN, node, it.cur);
2950 
2951  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
2952  {
2953  elog(DEBUG2, "aborting old transaction %u", txn->xid);
2954 
2955  /* Notify the remote node about the crash/immediate restart. */
2956  if (rbtxn_is_streamed(txn))
2957  rb->stream_abort(rb, txn, InvalidXLogRecPtr);
2958 
2959  /* remove potential on-disk data, and deallocate this tx */
2960  ReorderBufferCleanupTXN(rb, txn);
2961  }
2962  else
2963  return;
2964  }
2965 }
2966 
2967 /*
2968  * Forget the contents of a transaction if we aren't interested in its
2969  * contents. Needs to be first called for subtransactions and then for the
2970  * toplevel xid.
2971  *
2972  * This is significantly different to ReorderBufferAbort() because
2973  * transactions that have committed need to be treated differently from aborted
2974  * ones since they may have modified the catalog.
2975  *
2976  * Note that this is only allowed to be called in the moment a transaction
2977  * commit has just been read, not earlier; otherwise later records referring
2978  * to this xid might re-create the transaction incompletely.
2979  */
2980 void
2982 {
2983  ReorderBufferTXN *txn;
2984 
2985  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2986  false);
2987 
2988  /* unknown, nothing to forget */
2989  if (txn == NULL)
2990  return;
2991 
2992  /* this transaction mustn't be streamed */
2993  Assert(!rbtxn_is_streamed(txn));
2994 
2995  /* cosmetic... */
2996  txn->final_lsn = lsn;
2997 
2998  /*
2999  * Process cache invalidation messages if there are any. Even if we're not
3000  * interested in the transaction's contents, it could have manipulated the
3001  * catalog and we need to update the caches according to that.
3002  */
3003  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3005  txn->invalidations);
3006  else
3007  Assert(txn->ninvalidations == 0);
3008 
3009  /* remove potential on-disk data, and deallocate */
3010  ReorderBufferCleanupTXN(rb, txn);
3011 }
3012 
3013 /*
3014  * Invalidate cache for those transactions that need to be skipped just in case
3015  * catalogs were manipulated as part of the transaction.
3016  *
3017  * Note that this is a special-purpose function for prepared transactions where
3018  * we don't want to clean up the TXN even when we decide to skip it. See
3019  * DecodePrepare.
3020  */
3021 void
3023 {
3024  ReorderBufferTXN *txn;
3025 
3026  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3027  false);
3028 
3029  /* unknown, nothing to do */
3030  if (txn == NULL)
3031  return;
3032 
3033  /*
3034  * Process cache invalidation messages if there are any. Even if we're not
3035  * interested in the transaction's contents, it could have manipulated the
3036  * catalog and we need to update the caches according to that.
3037  */
3038  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3040  txn->invalidations);
3041  else
3042  Assert(txn->ninvalidations == 0);
3043 }
3044 
3045 
3046 /*
3047  * Execute invalidations happening outside the context of a decoded
3048  * transaction. That currently happens either for xid-less commits
3049  * (cf. RecordTransactionCommit()) or for invalidations in uninteresting
3050  * transactions (via ReorderBufferForget()).
3051  */
3052 void
3054  SharedInvalidationMessage *invalidations)
3055 {
3056  bool use_subtxn = IsTransactionOrTransactionBlock();
3057  int i;
3058 
3059  if (use_subtxn)
3060  BeginInternalSubTransaction("replay");
3061 
3062  /*
3063  * Force invalidations to happen outside of a valid transaction - that way
3064  * entries will just be marked as invalid without accessing the catalog.
3065  * That's advantageous because we don't need to setup the full state
3066  * necessary for catalog access.
3067  */
3068  if (use_subtxn)
3070 
3071  for (i = 0; i < ninvalidations; i++)
3072  LocalExecuteInvalidationMessage(&invalidations[i]);
3073 
3074  if (use_subtxn)
3076 }
3077 
3078 /*
3079  * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
3080  * least once for every xid in XLogRecord->xl_xid (other places in records
3081  * may, but do not have to be passed through here).
3082  *
3083  * Reorderbuffer keeps some datastructures about transactions in LSN order,
3084  * for efficiency. To do that it has to know about when transactions are seen
3085  * first in the WAL. As many types of records are not actually interesting for
3086  * logical decoding, they do not necessarily pass though here.
3087  */
3088 void
3090 {
3091  /* many records won't have an xid assigned, centralize check here */
3092  if (xid != InvalidTransactionId)
3093  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3094 }
3095 
3096 /*
3097  * Add a new snapshot to this transaction that may only used after lsn 'lsn'
3098  * because the previous snapshot doesn't describe the catalog correctly for
3099  * following rows.
3100  */
3101 void
3103  XLogRecPtr lsn, Snapshot snap)
3104 {
3106 
3107  change->data.snapshot = snap;
3109 
3110  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3111 }
3112 
3113 /*
3114  * Set up the transaction's base snapshot.
3115  *
3116  * If we know that xid is a subtransaction, set the base snapshot on the
3117  * top-level transaction instead.
3118  */
3119 void
3121  XLogRecPtr lsn, Snapshot snap)
3122 {
3123  ReorderBufferTXN *txn;
3124  bool is_new;
3125 
3126  Assert(snap != NULL);
3127 
3128  /*
3129  * Fetch the transaction to operate on. If we know it's a subtransaction,
3130  * operate on its top-level transaction instead.
3131  */
3132  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
3133  if (rbtxn_is_known_subxact(txn))
3134  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3135  NULL, InvalidXLogRecPtr, false);
3136  Assert(txn->base_snapshot == NULL);
3137 
3138  txn->base_snapshot = snap;
3139  txn->base_snapshot_lsn = lsn;
3141 
3142  AssertTXNLsnOrder(rb);
3143 }
3144 
3145 /*
3146  * Access the catalog with this CommandId at this point in the changestream.
3147  *
3148  * May only be called for command ids > 1
3149  */
3150 void
3152  XLogRecPtr lsn, CommandId cid)
3153 {
3155 
3156  change->data.command_id = cid;
3158 
3159  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3160 }
3161 
3162 /*
3163  * Update memory counters to account for the new or removed change.
3164  *
3165  * We update two counters - in the reorder buffer, and in the transaction
3166  * containing the change. The reorder buffer counter allows us to quickly
3167  * decide if we reached the memory limit, the transaction counter allows
3168  * us to quickly pick the largest transaction for eviction.
3169  *
3170  * When streaming is enabled, we need to update the toplevel transaction
3171  * counters instead - we don't really care about subtransactions as we
3172  * can't stream them individually anyway, and we only pick toplevel
3173  * transactions for eviction. So only toplevel transactions matter.
3174  */
3175 static void
3177  ReorderBufferChange *change,
3178  bool addition, Size sz)
3179 {
3180  ReorderBufferTXN *txn;
3181  ReorderBufferTXN *toptxn;
3182 
3183  Assert(change->txn);
3184 
3185  /*
3186  * Ignore tuple CID changes, because those are not evicted when reaching
3187  * memory limit. So we just don't count them, because it might easily
3188  * trigger a pointless attempt to spill.
3189  */
3191  return;
3192 
3193  txn = change->txn;
3194 
3195  /*
3196  * Update the total size in top level as well. This is later used to
3197  * compute the decoding stats.
3198  */
3199  toptxn = rbtxn_get_toptxn(txn);
3200 
3201  if (addition)
3202  {
3203  txn->size += sz;
3204  rb->size += sz;
3205 
3206  /* Update the total size in the top transaction. */
3207  toptxn->total_size += sz;
3208  }
3209  else
3210  {
3211  Assert((rb->size >= sz) && (txn->size >= sz));
3212  txn->size -= sz;
3213  rb->size -= sz;
3214 
3215  /* Update the total size in the top transaction. */
3216  toptxn->total_size -= sz;
3217  }
3218 
3219  Assert(txn->size <= rb->size);
3220 }
3221 
3222 /*
3223  * Add new (relfilelocator, tid) -> (cmin, cmax) mappings.
3224  *
3225  * We do not include this change type in memory accounting, because we
3226  * keep CIDs in a separate list and do not evict them when reaching
3227  * the memory limit.
3228  */
3229 void
3231  XLogRecPtr lsn, RelFileLocator locator,
3232  ItemPointerData tid, CommandId cmin,
3233  CommandId cmax, CommandId combocid)
3234 {
3236  ReorderBufferTXN *txn;
3237 
3238  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3239 
3240  change->data.tuplecid.locator = locator;
3241  change->data.tuplecid.tid = tid;
3242  change->data.tuplecid.cmin = cmin;
3243  change->data.tuplecid.cmax = cmax;
3244  change->data.tuplecid.combocid = combocid;
3245  change->lsn = lsn;
3246  change->txn = txn;
3248 
3249  dlist_push_tail(&txn->tuplecids, &change->node);
3250  txn->ntuplecids++;
3251 }
3252 
3253 /*
3254  * Accumulate the invalidations for executing them later.
3255  *
3256  * This needs to be called for each XLOG_XACT_INVALIDATIONS message and
3257  * accumulates all the invalidation messages in the toplevel transaction, if
3258  * available, otherwise in the current transaction, as well as in the form of
3259  * change in reorder buffer. We require to record it in form of the change
3260  * so that we can execute only the required invalidations instead of executing
3261  * all the invalidations on each CommandId increment. We also need to
3262  * accumulate these in the txn buffer because in some cases where we skip
3263  * processing the transaction (see ReorderBufferForget), we need to execute
3264  * all the invalidations together.
3265  */
3266 void
3268  XLogRecPtr lsn, Size nmsgs,
3270 {
3271  ReorderBufferTXN *txn;
3272  MemoryContext oldcontext;
3273  ReorderBufferChange *change;
3274 
3275  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3276 
3277  oldcontext = MemoryContextSwitchTo(rb->context);
3278 
3279  /*
3280  * Collect all the invalidations under the top transaction, if available,
3281  * so that we can execute them all together. See comments atop this
3282  * function.
3283  */
3284  txn = rbtxn_get_toptxn(txn);
3285 
3286  Assert(nmsgs > 0);
3287 
3288  /* Accumulate invalidations. */
3289  if (txn->ninvalidations == 0)
3290  {
3291  txn->ninvalidations = nmsgs;
3293  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3294  memcpy(txn->invalidations, msgs,
3295  sizeof(SharedInvalidationMessage) * nmsgs);
3296  }
3297  else
3298  {
3301  (txn->ninvalidations + nmsgs));
3302 
3303  memcpy(txn->invalidations + txn->ninvalidations, msgs,
3304  nmsgs * sizeof(SharedInvalidationMessage));
3305  txn->ninvalidations += nmsgs;
3306  }
3307 
3308  change = ReorderBufferGetChange(rb);
3310  change->data.inval.ninvalidations = nmsgs;
3311  change->data.inval.invalidations = (SharedInvalidationMessage *)
3312  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3313  memcpy(change->data.inval.invalidations, msgs,
3314  sizeof(SharedInvalidationMessage) * nmsgs);
3315 
3316  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3317 
3318  MemoryContextSwitchTo(oldcontext);
3319 }
3320 
3321 /*
3322  * Apply all invalidations we know. Possibly we only need parts at this point
3323  * in the changestream but we don't know which those are.
3324  */
3325 static void
3327 {
3328  int i;
3329 
3330  for (i = 0; i < nmsgs; i++)
3332 }
3333 
3334 /*
3335  * Mark a transaction as containing catalog changes
3336  */
3337 void
3339  XLogRecPtr lsn)
3340 {
3341  ReorderBufferTXN *txn;
3342 
3343  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3344 
3345  if (!rbtxn_has_catalog_changes(txn))
3346  {
3349  }
3350 
3351  /*
3352  * Mark top-level transaction as having catalog changes too if one of its
3353  * children has so that the ReorderBufferBuildTupleCidHash can
3354  * conveniently check just top-level transaction and decide whether to
3355  * build the hash table or not.
3356  */
3357  if (rbtxn_is_subtxn(txn))
3358  {
3359  ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
3360 
3361  if (!rbtxn_has_catalog_changes(toptxn))
3362  {
3365  }
3366  }
3367 }
3368 
3369 /*
3370  * Return palloc'ed array of the transactions that have changed catalogs.
3371  * The returned array is sorted in xidComparator order.
3372  *
3373  * The caller must free the returned array when done with it.
3374  */
3375 TransactionId *
3377 {
3378  dlist_iter iter;
3379  TransactionId *xids = NULL;
3380  size_t xcnt = 0;
3381 
3382  /* Quick return if the list is empty */
3383  if (dclist_count(&rb->catchange_txns) == 0)
3384  return NULL;
3385 
3386  /* Initialize XID array */
3387  xids = (TransactionId *) palloc(sizeof(TransactionId) *
3389  dclist_foreach(iter, &rb->catchange_txns)
3390  {
3392  catchange_node,
3393  iter.cur);
3394 
3396 
3397  xids[xcnt++] = txn->xid;
3398  }
3399 
3400  qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
3401 
3402  Assert(xcnt == dclist_count(&rb->catchange_txns));
3403  return xids;
3404 }
3405 
3406 /*
3407  * Query whether a transaction is already *known* to contain catalog
3408  * changes. This can be wrong until directly before the commit!
3409  */
3410 bool
3412 {
3413  ReorderBufferTXN *txn;
3414 
3415  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3416  false);
3417  if (txn == NULL)
3418  return false;
3419 
3420  return rbtxn_has_catalog_changes(txn);
3421 }
3422 
3423 /*
3424  * ReorderBufferXidHasBaseSnapshot
3425  * Have we already set the base snapshot for the given txn/subtxn?
3426  */
3427 bool
3429 {
3430  ReorderBufferTXN *txn;
3431 
3432  txn = ReorderBufferTXNByXid(rb, xid, false,
3433  NULL, InvalidXLogRecPtr, false);
3434 
3435  /* transaction isn't known yet, ergo no snapshot */
3436  if (txn == NULL)
3437  return false;
3438 
3439  /* a known subtxn? operate on top-level txn instead */
3440  if (rbtxn_is_known_subxact(txn))
3441  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3442  NULL, InvalidXLogRecPtr, false);
3443 
3444  return txn->base_snapshot != NULL;
3445 }
3446 
3447 
3448 /*
3449  * ---------------------------------------
3450  * Disk serialization support
3451  * ---------------------------------------
3452  */
3453 
3454 /*
3455  * Ensure the IO buffer is >= sz.
3456  */
3457 static void
3459 {
3460  if (!rb->outbufsize)
3461  {
3462  rb->outbuf = MemoryContextAlloc(rb->context, sz);
3463  rb->outbufsize = sz;
3464  }
3465  else if (rb->outbufsize < sz)
3466  {
3467  rb->outbuf = repalloc(rb->outbuf, sz);
3468  rb->outbufsize = sz;
3469  }
3470 }
3471 
3472 /*
3473  * Find the largest transaction (toplevel or subxact) to evict (spill to disk).
3474  *
3475  * XXX With many subtransactions this might be quite slow, because we'll have
3476  * to walk through all of them. There are some options how we could improve
3477  * that: (a) maintain some secondary structure with transactions sorted by
3478  * amount of changes, (b) not looking for the entirely largest transaction,
3479  * but e.g. for transaction using at least some fraction of the memory limit,
3480  * and (c) evicting multiple transactions at once, e.g. to free a given portion
3481  * of the memory limit (e.g. 50%).
3482  */
3483 static ReorderBufferTXN *
3485 {
3486  HASH_SEQ_STATUS hash_seq;
3488  ReorderBufferTXN *largest = NULL;
3489 
3490  hash_seq_init(&hash_seq, rb->by_txn);
3491  while ((ent = hash_seq_search(&hash_seq)) != NULL)
3492  {
3493  ReorderBufferTXN *txn = ent->txn;
3494 
3495  /* if the current transaction is larger, remember it */
3496  if ((!largest) || (txn->size > largest->size))
3497  largest = txn;
3498  }
3499 
3500  Assert(largest);
3501  Assert(largest->size > 0);
3502  Assert(largest->size <= rb->size);
3503 
3504  return largest;
3505 }
3506 
3507 /*
3508  * Find the largest streamable toplevel transaction to evict (by streaming).
3509  *
3510  * This can be seen as an optimized version of ReorderBufferLargestTXN, which
3511  * should give us the same transaction (because we don't update memory account
3512  * for subtransaction with streaming, so it's always 0). But we can simply
3513  * iterate over the limited number of toplevel transactions that have a base
3514  * snapshot. There is no use of selecting a transaction that doesn't have base
3515  * snapshot because we don't decode such transactions. Also, we do not select
3516  * the transaction which doesn't have any streamable change.
3517  *
3518  * Note that, we skip transactions that contains incomplete changes. There
3519  * is a scope of optimization here such that we can select the largest
3520  * transaction which has incomplete changes. But that will make the code and
3521  * design quite complex and that might not be worth the benefit. If we plan to
3522  * stream the transactions that contains incomplete changes then we need to
3523  * find a way to partially stream/truncate the transaction changes in-memory
3524  * and build a mechanism to partially truncate the spilled files.
3525  * Additionally, whenever we partially stream the transaction we need to
3526  * maintain the last streamed lsn and next time we need to restore from that
3527  * segment and the offset in WAL. As we stream the changes from the top
3528  * transaction and restore them subtransaction wise, we need to even remember
3529  * the subxact from where we streamed the last change.
3530  */
3531 static ReorderBufferTXN *
3533 {
3534  dlist_iter iter;
3535  Size largest_size = 0;
3536  ReorderBufferTXN *largest = NULL;
3537 
3538  /* Find the largest top-level transaction having a base snapshot. */
3540  {
3541  ReorderBufferTXN *txn;
3542 
3543  txn = dlist_container(ReorderBufferTXN, base_snapshot_node, iter.cur);
3544 
3545  /* must not be a subtxn */
3547  /* base_snapshot must be set */
3548  Assert(txn->base_snapshot != NULL);
3549 
3550  if ((largest == NULL || txn->total_size > largest_size) &&
3551  (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) &&
3553  {
3554  largest = txn;
3555  largest_size = txn->total_size;
3556  }
3557  }
3558 
3559  return largest;
3560 }
3561 
3562 /*
3563  * Check whether the logical_decoding_work_mem limit was reached, and if yes
3564  * pick the largest (sub)transaction at-a-time to evict and spill its changes to
3565  * disk or send to the output plugin until we reach under the memory limit.
3566  *
3567  * If debug_logical_replication_streaming is set to "immediate", stream or
3568  * serialize the changes immediately.
3569  *
3570  * XXX At this point we select the transactions until we reach under the memory
3571  * limit, but we might also adapt a more elaborate eviction strategy - for example
3572  * evicting enough transactions to free certain fraction (e.g. 50%) of the memory
3573  * limit.
3574  */
3575 static void
3577 {
3578  ReorderBufferTXN *txn;
3579 
3580  /*
3581  * Bail out if debug_logical_replication_streaming is buffered and we
3582  * haven't exceeded the memory limit.
3583  */
3585  rb->size < logical_decoding_work_mem * 1024L)
3586  return;
3587 
3588  /*
3589  * If debug_logical_replication_streaming is immediate, loop until there's
3590  * no change. Otherwise, loop until we reach under the memory limit. One
3591  * might think that just by evicting the largest (sub)transaction we will
3592  * come under the memory limit based on assumption that the selected
3593  * transaction is at least as large as the most recent change (which
3594  * caused us to go over the memory limit). However, that is not true
3595  * because a user can reduce the logical_decoding_work_mem to a smaller
3596  * value before the most recent change.
3597  */
3598  while (rb->size >= logical_decoding_work_mem * 1024L ||
3600  rb->size > 0))
3601  {
3602  /*
3603  * Pick the largest transaction and evict it from memory by streaming,
3604  * if possible. Otherwise, spill to disk.
3605  */
3607  (txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
3608  {
3609  /* we know there has to be one, because the size is not zero */
3610  Assert(txn && rbtxn_is_toptxn(txn));
3611  Assert(txn->total_size > 0);
3612  Assert(rb->size >= txn->total_size);
3613 
3614  ReorderBufferStreamTXN(rb, txn);
3615  }
3616  else
3617  {
3618  /*
3619  * Pick the largest transaction (or subtransaction) and evict it
3620  * from memory by serializing it to disk.
3621  */
3622  txn = ReorderBufferLargestTXN(rb);
3623 
3624  /* we know there has to be one, because the size is not zero */
3625  Assert(txn);
3626  Assert(txn->size > 0);
3627  Assert(rb->size >= txn->size);
3628 
3629  ReorderBufferSerializeTXN(rb, txn);
3630  }
3631 
3632  /*
3633  * After eviction, the transaction should have no entries in memory,
3634  * and should use 0 bytes for changes.
3635  */
3636  Assert(txn->size == 0);
3637  Assert(txn->nentries_mem == 0);
3638  }
3639 
3640  /* We must be under the memory limit now. */
3641  Assert(rb->size < logical_decoding_work_mem * 1024L);
3642 }
3643 
3644 /*
3645  * Spill data of a large transaction (and its subtransactions) to disk.
3646  */
3647 static void
3649 {
3650  dlist_iter subtxn_i;
3651  dlist_mutable_iter change_i;
3652  int fd = -1;
3653  XLogSegNo curOpenSegNo = 0;
3654  Size spilled = 0;
3655  Size size = txn->size;
3656 
3657  elog(DEBUG2, "spill %u changes in XID %u to disk",
3658  (uint32) txn->nentries_mem, txn->xid);
3659 
3660  /* do the same to all child TXs */
3661  dlist_foreach(subtxn_i, &txn->subtxns)
3662  {
3663  ReorderBufferTXN *subtxn;
3664 
3665  subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
3666  ReorderBufferSerializeTXN(rb, subtxn);
3667  }
3668 
3669  /* serialize changestream */
3670  dlist_foreach_modify(change_i, &txn->changes)
3671  {
3672  ReorderBufferChange *change;
3673 
3674  change = dlist_container(ReorderBufferChange, node, change_i.cur);
3675 
3676  /*
3677  * store in segment in which it belongs by start lsn, don't split over
3678  * multiple segments tho
3679  */
3680  if (fd == -1 ||
3681  !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
3682  {
3683  char path[MAXPGPATH];
3684 
3685  if (fd != -1)
3687 
3688  XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
3689 
3690  /*
3691  * No need to care about TLIs here, only used during a single run,
3692  * so each LSN only maps to a specific WAL record.
3693  */
3695  curOpenSegNo);
3696 
3697  /* open segment, create it if necessary */
3698  fd = OpenTransientFile(path,
3699  O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
3700 
3701  if (fd < 0)
3702  ereport(ERROR,
3704  errmsg("could not open file \"%s\": %m", path)));
3705  }
3706 
3707  ReorderBufferSerializeChange(rb, txn, fd, change);
3708  dlist_delete(&change->node);
3709  ReorderBufferReturnChange(rb, change, true);
3710 
3711  spilled++;
3712  }
3713 
3714  /* update the statistics iff we have spilled anything */
3715  if (spilled)
3716  {
3717  rb->spillCount += 1;
3718  rb->spillBytes += size;
3719 
3720  /* don't consider already serialized transactions */
3721  rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
3722 
3723  /* update the decoding stats */
3725  }
3726 
3727  Assert(spilled == txn->nentries_mem);
3728  Assert(dlist_is_empty(&txn->changes));
3729  txn->nentries_mem = 0;
3731 
3732  if (fd != -1)
3734 }
3735 
3736 /*
3737  * Serialize individual change to disk.
3738  */
3739 static void
3741  int fd, ReorderBufferChange *change)
3742 {
3743  ReorderBufferDiskChange *ondisk;
3744  Size sz = sizeof(ReorderBufferDiskChange);
3745 
3747 
3748  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3749  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
3750 
3751  switch (change->action)
3752  {
3753  /* fall through these, they're all similar enough */
3758  {
3759  char *data;
3760  HeapTuple oldtup,
3761  newtup;
3762  Size oldlen = 0;
3763  Size newlen = 0;
3764 
3765  oldtup = change->data.tp.oldtuple;
3766  newtup = change->data.tp.newtuple;
3767 
3768  if (oldtup)
3769  {
3770  sz += sizeof(HeapTupleData);
3771  oldlen = oldtup->t_len;
3772  sz += oldlen;
3773  }
3774 
3775  if (newtup)
3776  {
3777  sz += sizeof(HeapTupleData);
3778  newlen = newtup->t_len;
3779  sz += newlen;
3780  }
3781 
3782  /* make sure we have enough space */
3784 
3785  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3786  /* might have been reallocated above */
3787  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3788 
3789  if (oldlen)
3790  {
3791  memcpy(data, oldtup, sizeof(HeapTupleData));
3792  data += sizeof(HeapTupleData);
3793 
3794  memcpy(data, oldtup->t_data, oldlen);
3795  data += oldlen;
3796  }
3797 
3798  if (newlen)
3799  {
3800  memcpy(data, newtup, sizeof(HeapTupleData));
3801  data += sizeof(HeapTupleData);
3802 
3803  memcpy(data, newtup->t_data, newlen);
3804  data += newlen;
3805  }
3806  break;
3807  }
3809  {
3810  char *data;
3811  Size prefix_size = strlen(change->data.msg.prefix) + 1;
3812 
3813  sz += prefix_size + change->data.msg.message_size +
3814  sizeof(Size) + sizeof(Size);
3816 
3817  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3818 
3819  /* might have been reallocated above */
3820  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3821 
3822  /* write the prefix including the size */
3823  memcpy(data, &prefix_size, sizeof(Size));
3824  data += sizeof(Size);
3825  memcpy(data, change->data.msg.prefix,
3826  prefix_size);
3827  data += prefix_size;
3828 
3829  /* write the message including the size */
3830  memcpy(data, &change->data.msg.message_size, sizeof(Size));
3831  data += sizeof(Size);
3832  memcpy(data, change->data.msg.message,
3833  change->data.msg.message_size);
3834  data += change->data.msg.message_size;
3835 
3836  break;
3837  }
3839  {
3840  char *data;
3841  Size inval_size = sizeof(SharedInvalidationMessage) *
3842  change->data.inval.ninvalidations;
3843 
3844  sz += inval_size;
3845 
3847  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3848 
3849  /* might have been reallocated above */
3850  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3851  memcpy(data, change->data.inval.invalidations, inval_size);
3852  data += inval_size;
3853 
3854  break;
3855  }
3857  {
3858  Snapshot snap;
3859  char *data;
3860 
3861  snap = change->data.snapshot;
3862 
3863  sz += sizeof(SnapshotData) +
3864  sizeof(TransactionId) * snap->xcnt +
3865  sizeof(TransactionId) * snap->subxcnt;
3866 
3867  /* make sure we have enough space */
3869  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3870  /* might have been reallocated above */
3871  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3872 
3873  memcpy(data, snap, sizeof(SnapshotData));
3874  data += sizeof(SnapshotData);
3875 
3876  if (snap->xcnt)
3877  {
3878  memcpy(data, snap->xip,
3879  sizeof(TransactionId) * snap->xcnt);
3880  data += sizeof(TransactionId) * snap->xcnt;
3881  }
3882 
3883  if (snap->subxcnt)
3884  {
3885  memcpy(data, snap->subxip,
3886  sizeof(TransactionId) * snap->subxcnt);
3887  data += sizeof(TransactionId) * snap->subxcnt;
3888  }
3889  break;
3890  }
3892  {
3893  Size size;
3894  char *data;
3895 
3896  /* account for the OIDs of truncated relations */
3897  size = sizeof(Oid) * change->data.truncate.nrelids;
3898  sz += size;
3899 
3900  /* make sure we have enough space */
3902 
3903  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3904  /* might have been reallocated above */
3905  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3906 
3907  memcpy(data, change->data.truncate.relids, size);
3908  data += size;
3909 
3910  break;
3911  }
3916  /* ReorderBufferChange contains everything important */
3917  break;
3918  }
3919 
3920  ondisk->size = sz;
3921 
3922  errno = 0;
3923  pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
3924  if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
3925  {
3926  int save_errno = errno;
3927 
3929 
3930  /* if write didn't set errno, assume problem is no disk space */
3931  errno = save_errno ? save_errno : ENOSPC;
3932  ereport(ERROR,
3934  errmsg("could not write to data file for XID %u: %m",
3935  txn->xid)));
3936  }
3938 
3939  /*
3940  * Keep the transaction's final_lsn up to date with each change we send to
3941  * disk, so that ReorderBufferRestoreCleanup works correctly. (We used to
3942  * only do this on commit and abort records, but that doesn't work if a
3943  * system crash leaves a transaction without its abort record).
3944  *
3945  * Make sure not to move it backwards.
3946  */
3947  if (txn->final_lsn < change->lsn)
3948  txn->final_lsn = change->lsn;
3949 
3950  Assert(ondisk->change.action == change->action);
3951 }
3952 
3953 /* Returns true, if the output plugin supports streaming, false, otherwise. */
3954 static inline bool
3956 {
3958 
3959  return ctx->streaming;
3960 }
3961 
3962 /* Returns true, if the streaming can be started now, false, otherwise. */
3963 static inline bool
3965 {
3967  SnapBuild *builder = ctx->snapshot_builder;
3968 
3969  /* We can't start streaming unless a consistent state is reached. */
3971  return false;
3972 
3973  /*
3974  * We can't start streaming immediately even if the streaming is enabled
3975  * because we previously decoded this transaction and now just are
3976  * restarting.
3977  */
3978  if (ReorderBufferCanStream(rb) &&
3979  !SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr))
3980  return true;
3981 
3982  return false;
3983 }
3984 
3985 /*
3986  * Send data of a large transaction (and its subtransactions) to the
3987  * output plugin, but using the stream API.
3988  */
3989 static void
3991 {
3992  Snapshot snapshot_now;
3993  CommandId command_id;
3994  Size stream_bytes;
3995  bool txn_is_streamed;
3996 
3997  /* We can never reach here for a subtransaction. */
3998  Assert(rbtxn_is_toptxn(txn));
3999 
4000  /*
4001  * We can't make any assumptions about base snapshot here, similar to what
4002  * ReorderBufferCommit() does. That relies on base_snapshot getting
4003  * transferred from subxact in ReorderBufferCommitChild(), but that was
4004  * not yet called as the transaction is in-progress.
4005  *
4006  * So just walk the subxacts and use the same logic here. But we only need
4007  * to do that once, when the transaction is streamed for the first time.
4008  * After that we need to reuse the snapshot from the previous run.
4009  *
4010  * Unlike DecodeCommit which adds xids of all the subtransactions in
4011  * snapshot's xip array via SnapBuildCommitTxn, we can't do that here but
4012  * we do add them to subxip array instead via ReorderBufferCopySnap. This
4013  * allows the catalog changes made in subtransactions decoded till now to
4014  * be visible.
4015  */
4016  if (txn->snapshot_now == NULL)
4017  {
4018  dlist_iter subxact_i;
4019 
4020  /* make sure this transaction is streamed for the first time */
4021  Assert(!rbtxn_is_streamed(txn));
4022 
4023  /* at the beginning we should have invalid command ID */
4025 
4026  dlist_foreach(subxact_i, &txn->subtxns)
4027  {
4028  ReorderBufferTXN *subtxn;
4029 
4030  subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
4031  ReorderBufferTransferSnapToParent(txn, subtxn);
4032  }
4033 
4034  /*
4035  * If this transaction has no snapshot, it didn't make any changes to
4036  * the database till now, so there's nothing to decode.
4037  */
4038  if (txn->base_snapshot == NULL)
4039  {
4040  Assert(txn->ninvalidations == 0);
4041  return;
4042  }
4043 
4044  command_id = FirstCommandId;
4045  snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
4046  txn, command_id);
4047  }
4048  else
4049  {
4050  /* the transaction must have been already streamed */
4051  Assert(rbtxn_is_streamed(txn));
4052 
4053  /*
4054  * Nah, we already have snapshot from the previous streaming run. We
4055  * assume new subxacts can't move the LSN backwards, and so can't beat
4056  * the LSN condition in the previous branch (so no need to walk
4057  * through subxacts again). In fact, we must not do that as we may be
4058  * using snapshot half-way through the subxact.
4059  */
4060  command_id = txn->command_id;
4061 
4062  /*
4063  * We can't use txn->snapshot_now directly because after the last
4064  * streaming run, we might have got some new sub-transactions. So we
4065  * need to add them to the snapshot.
4066  */
4067  snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now,
4068  txn, command_id);
4069 
4070  /* Free the previously copied snapshot. */
4071  Assert(txn->snapshot_now->copied);
4073  txn->snapshot_now = NULL;
4074  }
4075 
4076  /*
4077  * Remember this information to be used later to update stats. We can't
4078  * update the stats here as an error while processing the changes would
4079  * lead to the accumulation of stats even though we haven't streamed all
4080  * the changes.
4081  */
4082  txn_is_streamed = rbtxn_is_streamed(txn);
4083  stream_bytes = txn->total_size;
4084 
4085  /* Process and send the changes to output plugin. */
4086  ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
4087  command_id, true);
4088 
4089  rb->streamCount += 1;
4090  rb->streamBytes += stream_bytes;
4091 
4092  /* Don't consider already streamed transaction. */
4093  rb->streamTxns += (txn_is_streamed) ? 0 : 1;
4094 
4095  /* update the decoding stats */
4097 
4098  Assert(dlist_is_empty(&txn->changes));
4099  Assert(txn->nentries == 0);
4100  Assert(txn->nentries_mem == 0);
4101 }
4102 
4103 /*
4104  * Size of a change in memory.
4105  */
4106 static Size
4108 {
4109  Size sz = sizeof(ReorderBufferChange);
4110 
4111  switch (change->action)
4112  {
4113  /* fall through these, they're all similar enough */
4118  {
4119  HeapTuple oldtup,
4120  newtup;
4121  Size oldlen = 0;
4122  Size newlen = 0;
4123 
4124  oldtup = change->data.tp.oldtuple;
4125  newtup = change->data.tp.newtuple;
4126 
4127  if (oldtup)
4128  {
4129  sz += sizeof(HeapTupleData);
4130  oldlen = oldtup->t_len;
4131  sz += oldlen;
4132  }
4133 
4134  if (newtup)
4135  {
4136  sz += sizeof(HeapTupleData);
4137  newlen = newtup->t_len;
4138  sz += newlen;
4139  }
4140 
4141  break;
4142  }
4144  {
4145  Size prefix_size = strlen(change->data.msg.prefix) + 1;
4146 
4147  sz += prefix_size + change->data.msg.message_size +
4148  sizeof(Size) + sizeof(Size);
4149 
4150  break;
4151  }
4153  {
4154  sz += sizeof(SharedInvalidationMessage) *
4155  change->data.inval.ninvalidations;
4156  break;
4157  }
4159  {
4160  Snapshot snap;
4161 
4162  snap = change->data.snapshot;
4163 
4164  sz += sizeof(SnapshotData) +
4165  sizeof(TransactionId) * snap->xcnt +
4166  sizeof(TransactionId) * snap->subxcnt;
4167 
4168  break;
4169  }
4171  {
4172  sz += sizeof(Oid) * change->data.truncate.nrelids;
4173 
4174  break;
4175  }
4180  /* ReorderBufferChange contains everything important */
4181  break;
4182  }
4183 
4184  return sz;
4185 }
4186 
4187 
4188 /*
4189  * Restore a number of changes spilled to disk back into memory.
4190  */
4191 static Size
4193  TXNEntryFile *file, XLogSegNo *segno)
4194 {
4195  Size restored = 0;
4196  XLogSegNo last_segno;
4197  dlist_mutable_iter cleanup_iter;
4198  File *fd = &file->vfd;
4199 
4202 
4203  /* free current entries, so we have memory for more */
4204  dlist_foreach_modify(cleanup_iter, &txn->changes)
4205  {
4207  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
4208 
4209  dlist_delete(&cleanup->node);
4211  }
4212  txn->nentries_mem = 0;
4213  Assert(dlist_is_empty(&txn->changes));
4214 
4215  XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
4216 
4217  while (restored < max_changes_in_memory && *segno <= last_segno)
4218  {
4219  int readBytes;
4220  ReorderBufferDiskChange *ondisk;
4221 
4223 
4224  if (*fd == -1)
4225  {
4226  char path[MAXPGPATH];
4227 
4228  /* first time in */
4229  if (*segno == 0)
4230  XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
4231 
4232  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
4233 
4234  /*
4235  * No need to care about TLIs here, only used during a single run,
4236  * so each LSN only maps to a specific WAL record.
4237  */
4239  *segno);
4240 
4241  *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
4242 
4243  /* No harm in resetting the offset even in case of failure */
4244  file->curOffset = 0;
4245 
4246  if (*fd < 0 && errno == ENOENT)
4247  {
4248  *fd = -1;
4249  (*segno)++;
4250  continue;
4251  }
4252  else if (*fd < 0)
4253  ereport(ERROR,
4255  errmsg("could not open file \"%s\": %m",
4256  path)));
4257  }
4258 
4259  /*
4260  * Read the statically sized part of a change which has information
4261  * about the total size. If we couldn't read a record, we're at the
4262  * end of this file.
4263  */
4265  readBytes = FileRead(file->vfd, rb->outbuf,
4266  sizeof(ReorderBufferDiskChange),
4267  file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
4268 
4269  /* eof */
4270  if (readBytes == 0)
4271  {
4272  FileClose(*fd);
4273  *fd = -1;
4274  (*segno)++;
4275  continue;
4276  }
4277  else if (readBytes < 0)
4278  ereport(ERROR,
4280  errmsg("could not read from reorderbuffer spill file: %m")));
4281  else if (readBytes != sizeof(ReorderBufferDiskChange))
4282  ereport(ERROR,
4284  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4285  readBytes,
4286  (uint32) sizeof(ReorderBufferDiskChange))));
4287 
4288  file->curOffset += readBytes;
4289 
4290  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4291 
4293  sizeof(ReorderBufferDiskChange) + ondisk->size);
4294  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4295 
4296  readBytes = FileRead(file->vfd,
4297  rb->outbuf + sizeof(ReorderBufferDiskChange),
4298  ondisk->size - sizeof(ReorderBufferDiskChange),
4299  file->curOffset,
4300  WAIT_EVENT_REORDER_BUFFER_READ);
4301 
4302  if (readBytes < 0)
4303  ereport(ERROR,
4305  errmsg("could not read from reorderbuffer spill file: %m")));
4306  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
4307  ereport(ERROR,
4309  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4310  readBytes,
4311  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
4312 
4313  file->curOffset += readBytes;
4314 
4315  /*
4316  * ok, read a full change from disk, now restore it into proper
4317  * in-memory format
4318  */
4319  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
4320  restored++;
4321  }
4322 
4323  return restored;
4324 }
4325 
4326 /*
4327  * Convert change from its on-disk format to in-memory format and queue it onto
4328  * the TXN's ->changes list.
4329  *
4330  * Note: although "data" is declared char*, at entry it points to a
4331  * maxalign'd buffer, making it safe in most of this function to assume
4332  * that the pointed-to data is suitably aligned for direct access.
4333  */
4334 static void
4336  char *data)
4337 {
4338  ReorderBufferDiskChange *ondisk;
4339  ReorderBufferChange *change;
4340 
4341  ondisk = (ReorderBufferDiskChange *) data;
4342 
4343  change = ReorderBufferGetChange(rb);
4344 
4345  /* copy static part */
4346  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
4347 
4348  data += sizeof(ReorderBufferDiskChange);
4349 
4350  /* restore individual stuff */
4351  switch (change->action)
4352  {
4353  /* fall through these, they're all similar enough */
4358  if (change->data.tp.oldtuple)
4359  {
4360  uint32 tuplelen = ((HeapTuple) data)->t_len;
4361 
4362  change->data.tp.oldtuple =
4364 
4365  /* restore ->tuple */
4366  memcpy(change->data.tp.oldtuple, data,
4367  sizeof(HeapTupleData));
4368  data += sizeof(HeapTupleData);
4369 
4370  /* reset t_data pointer into the new tuplebuf */
4371  change->data.tp.oldtuple->t_data =
4372  (HeapTupleHeader) ((char *) change->data.tp.oldtuple + HEAPTUPLESIZE);
4373 
4374  /* restore tuple data itself */
4375  memcpy(change->data.tp.oldtuple->t_data, data, tuplelen);
4376  data += tuplelen;
4377  }
4378 
4379  if (change->data.tp.newtuple)
4380  {
4381  /* here, data might not be suitably aligned! */
4382  uint32 tuplelen;
4383 
4384  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
4385  sizeof(uint32));
4386 
4387  change->data.tp.newtuple =
4389 
4390  /* restore ->tuple */
4391  memcpy(change->data.tp.newtuple, data,
4392  sizeof(HeapTupleData));
4393  data += sizeof(HeapTupleData);
4394 
4395  /* reset t_data pointer into the new tuplebuf */
4396  change->data.tp.newtuple->t_data =
4397  (HeapTupleHeader) ((char *) change->data.tp.newtuple + HEAPTUPLESIZE);
4398 
4399  /* restore tuple data itself */
4400  memcpy(change->data.tp.newtuple->t_data, data, tuplelen);
4401  data += tuplelen;
4402  }
4403 
4404  break;
4406  {
4407  Size prefix_size;
4408 
4409  /* read prefix */
4410  memcpy(&prefix_size, data, sizeof(Size));
4411  data += sizeof(Size);
4412  change->data.msg.prefix = MemoryContextAlloc(rb->context,
4413  prefix_size);
4414  memcpy(change->data.msg.prefix, data, prefix_size);
4415  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
4416  data += prefix_size;
4417 
4418  /* read the message */
4419  memcpy(&change->data.msg.message_size, data, sizeof(Size));
4420  data += sizeof(Size);
4421  change->data.msg.message = MemoryContextAlloc(rb->context,
4422  change->data.msg.message_size);
4423  memcpy(change->data.msg.message, data,
4424  change->data.msg.message_size);
4425  data += change->data.msg.message_size;
4426 
4427  break;
4428  }
4430  {
4431  Size inval_size = sizeof(SharedInvalidationMessage) *
4432  change->data.inval.ninvalidations;
4433 
4434  change->data.inval.invalidations =
4435  MemoryContextAlloc(rb->context, inval_size);
4436 
4437  /* read the message */
4438  memcpy(change->data.inval.invalidations, data, inval_size);
4439 
4440  break;
4441  }
4443  {
4444  Snapshot oldsnap;
4445  Snapshot newsnap;
4446  Size size;
4447 
4448  oldsnap = (Snapshot) data;
4449 
4450  size = sizeof(SnapshotData) +
4451  sizeof(TransactionId) * oldsnap->xcnt +
4452  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
4453 
4454  change->data.snapshot = MemoryContextAllocZero(rb->context, size);
4455 
4456  newsnap = change->data.snapshot;
4457 
4458  memcpy(newsnap, data, size);
4459  newsnap->xip = (TransactionId *)
4460  (((char *) newsnap) + sizeof(SnapshotData));
4461  newsnap->subxip = newsnap->xip + newsnap->xcnt;
4462  newsnap->copied = true;
4463  break;
4464  }
4465  /* the base struct contains all the data, easy peasy */
4467  {
4468  Oid *relids;
4469 
4470  relids = ReorderBufferGetRelids(rb,
4471  change->data.truncate.nrelids);
4472  memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
4473  change->data.truncate.relids = relids;
4474 
4475  break;
4476  }
4481  break;
4482  }
4483 
4484  dlist_push_tail(&txn->changes, &change->node);
4485  txn->nentries_mem++;
4486 
4487  /*
4488  * Update memory accounting for the restored change. We need to do this
4489  * although we don't check the memory limit when restoring the changes in
4490  * this branch (we only do that when initially queueing the changes after
4491  * decoding), because we will release the changes later, and that will
4492  * update the accounting too (subtracting the size from the counters). And
4493  * we don't want to underflow there.
4494  */
4495  ReorderBufferChangeMemoryUpdate(rb, change, true,
4496  ReorderBufferChangeSize(change));
4497 }
4498 
4499 /*
4500  * Remove all on-disk stored for the passed in transaction.
4501  */
4502 static void
4504 {
4505  XLogSegNo first;
4506  XLogSegNo cur;
4507  XLogSegNo last;
4508 
4511 
4512  XLByteToSeg(txn->first_lsn, first, wal_segment_size);
4513  XLByteToSeg(txn->final_lsn, last, wal_segment_size);
4514 
4515  /* iterate over all possible filenames, and delete them */
4516  for (cur = first; cur <= last; cur++)
4517  {
4518  char path[MAXPGPATH];
4519 
4521  if (unlink(path) != 0 && errno != ENOENT)
4522  ereport(ERROR,
4524  errmsg("could not remove file \"%s\": %m", path)));
4525  }
4526 }
4527 
4528 /*
4529  * Remove any leftover serialized reorder buffers from a slot directory after a
4530  * prior crash or decoding session exit.
4531  */
4532 static void
4534 {
4535  DIR *spill_dir;
4536  struct dirent *spill_de;
4537  struct stat statbuf;
4538  char path[MAXPGPATH * 2 + 12];
4539 
4540  sprintf(path, "pg_replslot/%s", slotname);
4541 
4542  /* we're only handling directories here, skip if it's not ours */
4543  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
4544  return;
4545 
4546  spill_dir = AllocateDir(path);
4547  while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
4548  {
4549  /* only look at names that can be ours */
4550  if (strncmp(spill_de->d_name, "xid", 3) == 0)
4551  {
4552  snprintf(path, sizeof(path),
4553  "pg_replslot/%s/%s", slotname,
4554  spill_de->d_name);
4555 
4556  if (unlink(path) != 0)
4557  ereport(ERROR,
4559  errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
4560  path, slotname)));
4561  }
4562  }
4563  FreeDir(spill_dir);
4564 }
4565 
4566 /*
4567  * Given a replication slot, transaction ID and segment number, fill in the
4568  * corresponding spill file into 'path', which is a caller-owned buffer of size
4569  * at least MAXPGPATH.
4570  */
4571 static void
4573  XLogSegNo segno)
4574 {
4575  XLogRecPtr recptr;
4576 
4577  XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
4578 
4579  snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.spill",
4581  xid, LSN_FORMAT_ARGS(recptr));
4582 }
4583 
4584 /*
4585  * Delete all data spilled to disk after we've restarted/crashed. It will be
4586  * recreated when the respective slots are reused.
4587  */
4588 void
4590 {
4591  DIR *logical_dir;
4592  struct dirent *logical_de;
4593 
4594  logical_dir = AllocateDir("pg_replslot");
4595  while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
4596  {
4597  if (strcmp(logical_de->d_name, ".") == 0 ||
4598  strcmp(logical_de->d_name, "..") == 0)
4599  continue;
4600 
4601  /* if it cannot be a slot, skip the directory */
4602  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
4603  continue;
4604 
4605  /*
4606  * ok, has to be a surviving logical slot, iterate and delete
4607  * everything starting with xid-*
4608  */
4610  }
4611  FreeDir(logical_dir);
4612 }
4613 
4614 /* ---------------------------------------
4615  * toast reassembly support
4616  * ---------------------------------------
4617  */
4618 
4619 /*
4620  * Initialize per tuple toast reconstruction support.
4621  */
4622 static void
4624 {
4625  HASHCTL hash_ctl;
4626 
4627  Assert(txn->toast_hash == NULL);
4628 
4629  hash_ctl.keysize = sizeof(Oid);
4630  hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
4631  hash_ctl.hcxt = rb->context;
4632  txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
4634 }
4635 
4636 /*
4637  * Per toast-chunk handling for toast reconstruction
4638  *
4639  * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
4640  * toasted Datum comes along.
4641  */
4642 static void
4644  Relation relation, ReorderBufferChange *change)
4645 {
4646  ReorderBufferToastEnt *ent;
4647  HeapTuple newtup;
4648  bool found;
4649  int32 chunksize;
4650  bool isnull;
4651  Pointer chunk;
4652  TupleDesc desc = RelationGetDescr(relation);
4653  Oid chunk_id;
4654  int32 chunk_seq;
4655 
4656  if (txn->toast_hash == NULL)
4657  ReorderBufferToastInitHash(rb, txn);
4658 
4659  Assert(IsToastRelation(relation));
4660 
4661  newtup = change->data.tp.newtuple;
4662  chunk_id = DatumGetObjectId(fastgetattr(newtup, 1, desc, &isnull));
4663  Assert(!isnull);
4664  chunk_seq = DatumGetInt32(fastgetattr(newtup, 2, desc, &isnull));
4665  Assert(!isnull);
4666 
4667  ent = (ReorderBufferToastEnt *)
4668  hash_search(txn->toast_hash, &chunk_id, HASH_ENTER, &found);
4669 
4670  if (!found)
4671  {
4672  Assert(ent->chunk_id == chunk_id);
4673  ent->num_chunks = 0;
4674  ent->last_chunk_seq = 0;
4675  ent->size = 0;
4676  ent->reconstructed = NULL;
4677  dlist_init(&ent->chunks);
4678 
4679  if (chunk_seq != 0)
4680  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
4681  chunk_seq, chunk_id);
4682  }
4683  else if (found && chunk_seq != ent->last_chunk_seq + 1)
4684  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
4685  chunk_seq, chunk_id, ent->last_chunk_seq + 1);
4686 
4687  chunk = DatumGetPointer(fastgetattr(newtup, 3, desc, &isnull));
4688  Assert(!isnull);
4689 
4690  /* calculate size so we can allocate the right size at once later */
4691  if (!VARATT_IS_EXTENDED(chunk))
4692  chunksize = VARSIZE(chunk) - VARHDRSZ;
4693  else if (VARATT_IS_SHORT(chunk))
4694  /* could happen due to heap_form_tuple doing its thing */
4695  chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
4696  else
4697  elog(ERROR, "unexpected type of toast chunk");
4698 
4699  ent->size += chunksize;
4700  ent->last_chunk_seq = chunk_seq;
4701  ent->num_chunks++;
4702  dlist_push_tail(&ent->chunks, &change->node);
4703 }
4704 
4705 /*
4706  * Rejigger change->newtuple to point to in-memory toast tuples instead to
4707  * on-disk toast tuples that may not longer exist (think DROP TABLE or VACUUM).
4708  *
4709  * We cannot replace unchanged toast tuples though, so those will still point
4710  * to on-disk toast data.
4711  *
4712  * While updating the existing change with detoasted tuple data, we need to
4713  * update the memory accounting info, because the change size will differ.
4714  * Otherwise the accounting may get out of sync, triggering serialization
4715  * at unexpected times.
4716  *
4717  * We simply subtract size of the change before rejiggering the tuple, and
4718  * then adding the new size. This makes it look like the change was removed
4719  * and then added back, except it only tweaks the accounting info.
4720  *
4721  * In particular it can't trigger serialization, which would be pointless
4722  * anyway as it happens during commit processing right before handing
4723  * the change to the output plugin.
4724  */
4725 static void
4727  Relation relation, ReorderBufferChange *change)
4728 {
4729  TupleDesc desc;
4730  int natt;
4731  Datum *attrs;
4732  bool *isnull;
4733  bool *free;
4734  HeapTuple tmphtup;
4735  Relation toast_rel;
4736  TupleDesc toast_desc;
4737  MemoryContext oldcontext;
4738  HeapTuple newtup;
4739  Size old_size;
4740 
4741  /* no toast tuples changed */
4742  if (txn->toast_hash == NULL)
4743  return;
4744 
4745  /*
4746  * We're going to modify the size of the change. So, to make sure the
4747  * accounting is correct we record the current change size and then after
4748  * re-computing the change we'll subtract the recorded size and then
4749  * re-add the new change size at the end. We don't immediately subtract
4750  * the old size because if there is any error before we add the new size,
4751  * we will release the changes and that will update the accounting info
4752  * (subtracting the size from the counters). And we don't want to
4753  * underflow there.
4754  */
4755  old_size = ReorderBufferChangeSize(change);
4756 
4757  oldcontext = MemoryContextSwitchTo(rb->context);
4758 
4759  /* we should only have toast tuples in an INSERT or UPDATE */
4760  Assert(change->data.tp.newtuple);
4761 
4762  desc = RelationGetDescr(relation);
4763 
4764  toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
4765  if (!RelationIsValid(toast_rel))
4766  elog(ERROR, "could not open toast relation with OID %u (base relation \"%s\")",
4767  relation->rd_rel->reltoastrelid, RelationGetRelationName(relation));
4768 
4769  toast_desc = RelationGetDescr(toast_rel);
4770 
4771  /* should we allocate from stack instead? */
4772  attrs = palloc0(sizeof(Datum) * desc->natts);
4773  isnull = palloc0(sizeof(bool) * desc->natts);
4774  free = palloc0(sizeof(bool) * desc->natts);
4775 
4776  newtup = change->data.tp.newtuple;
4777 
4778  heap_deform_tuple(newtup, desc, attrs, isnull);
4779 
4780  for (natt = 0; natt < desc->natts; natt++)
4781  {
4782  Form_pg_attribute attr = TupleDescAttr(desc, natt);
4783  ReorderBufferToastEnt *ent;
4784  struct varlena *varlena;
4785 
4786  /* va_rawsize is the size of the original datum -- including header */
4787  struct varatt_external toast_pointer;
4788  struct varatt_indirect redirect_pointer;
4789  struct varlena *new_datum = NULL;
4790  struct varlena *reconstructed;
4791  dlist_iter it;
4792  Size data_done = 0;
4793 
4794  /* system columns aren't toasted */
4795  if (attr->attnum < 0)
4796  continue;
4797 
4798  if (attr->attisdropped)
4799  continue;
4800 
4801  /* not a varlena datatype */
4802  if (attr->attlen != -1)
4803  continue;
4804 
4805  /* no data */
4806  if (isnull[natt])
4807  continue;
4808 
4809  /* ok, we know we have a toast datum */
4810  varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
4811 
4812  /* no need to do anything if the tuple isn't external */
4814  continue;
4815 
4816  VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
4817 
4818  /*
4819  * Check whether the toast tuple changed, replace if so.
4820  */
4821  ent = (ReorderBufferToastEnt *)
4822  hash_search(txn->toast_hash,
4823  &toast_pointer.va_valueid,
4824  HASH_FIND,
4825  NULL);
4826  if (ent == NULL)
4827  continue;
4828 
4829  new_datum =
4831 
4832  free[natt] = true;
4833 
4834  reconstructed = palloc0(toast_pointer.va_rawsize);
4835 
4836  ent->reconstructed = reconstructed;
4837 
4838  /* stitch toast tuple back together from its parts */
4839  dlist_foreach(it, &ent->chunks)
4840  {
4841  bool cisnull;
4842  ReorderBufferChange *cchange;
4843  HeapTuple ctup;
4844  Pointer chunk;
4845 
4846  cchange = dlist_container(ReorderBufferChange, node, it.cur);
4847  ctup = cchange->data.tp.newtuple;
4848  chunk = DatumGetPointer(fastgetattr(ctup, 3, toast_desc, &cisnull));
4849 
4850  Assert(!cisnull);
4851  Assert(!VARATT_IS_EXTERNAL(chunk));
4852  Assert(!VARATT_IS_SHORT(chunk));
4853 
4854  memcpy(VARDATA(reconstructed) + data_done,
4855  VARDATA(chunk),
4856  VARSIZE(chunk) - VARHDRSZ);
4857  data_done += VARSIZE(chunk) - VARHDRSZ;
4858  }
4859  Assert(data_done == VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer));
4860 
4861  /* make sure its marked as compressed or not */
4862  if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
4863  SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
4864  else
4865  SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
4866 
4867  memset(&redirect_pointer, 0, sizeof(redirect_pointer));
4868  redirect_pointer.pointer = reconstructed;
4869 
4871  memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
4872  sizeof(redirect_pointer));
4873 
4874  attrs[natt] = PointerGetDatum(new_datum);
4875  }
4876 
4877  /*
4878  * Build tuple in separate memory & copy tuple back into the tuplebuf
4879  * passed to the output plugin. We can't directly heap_fill_tuple() into
4880  * the tuplebuf because attrs[] will point back into the current content.
4881  */
4882  tmphtup = heap_form_tuple(desc, attrs, isnull);
4883  Assert(newtup->t_len <= MaxHeapTupleSize);
4884  Assert(newtup->t_data == (HeapTupleHeader) ((char *) newtup + HEAPTUPLESIZE));
4885 
4886  memcpy(newtup->t_data, tmphtup->t_data, tmphtup->t_len);
4887  newtup->t_len = tmphtup->t_len;
4888 
4889  /*
4890  * free resources we won't further need, more persistent stuff will be
4891  * free'd in ReorderBufferToastReset().
4892  */
4893  RelationClose(toast_rel);
4894  pfree(tmphtup);
4895  for (natt = 0; natt < desc->natts; natt++)
4896  {
4897  if (free[natt])
4898  pfree(DatumGetPointer(attrs[natt]));
4899  }
4900  pfree(attrs);
4901  pfree(free);
4902  pfree(isnull);
4903 
4904  MemoryContextSwitchTo(oldcontext);
4905 
4906  /* subtract the old change size */
4907  ReorderBufferChangeMemoryUpdate(rb, change, false, old_size);
4908  /* now add the change back, with the correct size */
4909  ReorderBufferChangeMemoryUpdate(rb, change, true,
4910  ReorderBufferChangeSize(change));
4911 }
4912 
4913 /*
4914  * Free all resources allocated for toast reconstruction.
4915  */
4916 static void
4918 {
4919  HASH_SEQ_STATUS hstat;
4920  ReorderBufferToastEnt *ent;
4921 
4922  if (txn->toast_hash == NULL)
4923  return;
4924 
4925  /* sequentially walk over the hash and free everything */
4926  hash_seq_init(&hstat, txn->toast_hash);
4927  while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
4928  {
4929  dlist_mutable_iter it;
4930 
4931  if (ent->reconstructed != NULL)
4932  pfree(ent->reconstructed);
4933 
4934  dlist_foreach_modify(it, &ent->chunks)
4935  {
4936  ReorderBufferChange *change =
4938 
4939  dlist_delete(&change->node);
4940  ReorderBufferReturnChange(rb, change, true);
4941  }
4942  }
4943 
4944  hash_destroy(txn->toast_hash);
4945  txn->toast_hash = NULL;
4946 }
4947 
4948 
4949 /* ---------------------------------------
4950  * Visibility support for logical decoding
4951  *
4952  *
4953  * Lookup actual cmin/cmax values when using decoding snapshot. We can't
4954  * always rely on stored cmin/cmax values because of two scenarios:
4955  *
4956  * * A tuple got changed multiple times during a single transaction and thus
4957  * has got a combo CID. Combo CIDs are only valid for the duration of a
4958  * single transaction.
4959  * * A tuple with a cmin but no cmax (and thus no combo CID) got
4960  * deleted/updated in another transaction than the one which created it
4961  * which we are looking at right now. As only one of cmin, cmax or combo CID
4962  * is actually stored in the heap we don't have access to the value we
4963  * need anymore.
4964  *
4965  * To resolve those problems we have a per-transaction hash of (cmin,
4966  * cmax) tuples keyed by (relfilelocator, ctid) which contains the actual
4967  * (cmin, cmax) values. That also takes care of combo CIDs by simply
4968  * not caring about them at all. As we have the real cmin/cmax values
4969  * combo CIDs aren't interesting.
4970  *
4971  * As we only care about catalog tuples here the overhead of this
4972  * hashtable should be acceptable.
4973  *
4974  * Heap rewrites complicate this a bit, check rewriteheap.c for
4975  * details.
4976  * -------------------------------------------------------------------------
4977  */
4978 
4979 /* struct for sorting mapping files by LSN efficiently */
4980 typedef struct RewriteMappingFile
4981 {
4985 
4986 #ifdef NOT_USED
4987 static void
4988 DisplayMapping(HTAB *tuplecid_data)
4989 {
4990  HASH_SEQ_STATUS hstat;
4992 
4993  hash_seq_init(&hstat, tuplecid_data);
4994  while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
4995  {
4996  elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
4997  ent->key.rlocator.dbOid,
4998  ent->key.rlocator.spcOid,
4999  ent->key.rlocator.relNumber,
5002  ent->cmin,
5003  ent->cmax
5004  );
5005  }
5006 }
5007 #endif
5008 
5009 /*
5010  * Apply a single mapping file to tuplecid_data.
5011  *
5012  * The mapping file has to have been verified to be a) committed b) for our
5013  * transaction c) applied in LSN order.
5014  */
5015 static void
5016 ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
5017 {
5018  char path[MAXPGPATH];
5019  int fd;
5020  int readBytes;
5022 
5023  sprintf(path, "pg_logical/mappings/%s", fname);
5024  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
5025  if (fd < 0)
5026  ereport(ERROR,
5028  errmsg("could not open file \"%s\": %m", path)));
5029 
5030  while (true)
5031  {
5034  ReorderBufferTupleCidEnt *new_ent;
5035  bool found;
5036 
5037  /* be careful about padding */
5038  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
5039 
5040  /* read all mappings till the end of the file */
5041  pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ);
5042  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
5044 
5045  if (readBytes < 0)
5046  ereport(ERROR,
5048  errmsg("could not read file \"%s\": %m",
5049  path)));
5050  else if (readBytes == 0) /* EOF */
5051  break;
5052  else if (readBytes != sizeof(LogicalRewriteMappingData))
5053  ereport(ERROR,
5055  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
5056  path, readBytes,
5057  (int32) sizeof(LogicalRewriteMappingData))));
5058 
5059  key.rlocator = map.old_locator;
5060  ItemPointerCopy(&map.old_tid,
5061  &key.tid);
5062 
5063 
5064  ent = (ReorderBufferTupleCidEnt *)
5066 
5067  /* no existing mapping, no need to update */
5068  if (!ent)
5069  continue;
5070 
5071  key.rlocator = map.new_locator;
5072  ItemPointerCopy(&map.new_tid,
5073  &key.tid);
5074 
5075  new_ent = (ReorderBufferTupleCidEnt *)
5077 
5078  if (found)
5079  {
5080  /*
5081  * Make sure the existing mapping makes sense. We sometime update
5082  * old records that did not yet have a cmax (e.g. pg_class' own
5083  * entry while rewriting it) during rewrites, so allow that.
5084  */
5085  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
5086  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
5087  }
5088  else
5089  {
5090  /* update mapping */
5091  new_ent->cmin = ent->cmin;
5092  new_ent->cmax = ent->cmax;
5093  new_ent->combocid = ent->combocid;
5094  }
5095  }
5096 
5097  if (CloseTransientFile(fd) != 0)
5098  ereport(ERROR,
5100  errmsg("could not close file \"%s\": %m", path)));
5101 }
5102 
5103 
5104 /*
5105  * Check whether the TransactionId 'xid' is in the pre-sorted array 'xip'.
5106  */
5107 static bool
5109 {
5110  return bsearch(&xid, xip, num,
5111  sizeof(TransactionId), xidComparator) != NULL;
5112 }
5113 
5114 /*
5115  * list_sort() comparator for sorting RewriteMappingFiles in LSN order.
5116  */
5117 static int
5118 file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
5119 {
5122 
5123  return pg_cmp_u64(a->lsn, b->lsn);
5124 }
5125 
5126 /*
5127  * Apply any existing logical remapping files if there are any targeted at our
5128  * transaction for relid.
5129  */
5130 static void
5132 {
5133  DIR *mapping_dir;
5134  struct dirent *mapping_de;
5135  List *files = NIL;
5136  ListCell *file;
5137  Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
5138 
5139  mapping_dir = AllocateDir("pg_logical/mappings");
5140  while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
5141  {
5142  Oid f_dboid;
5143  Oid f_relid;
5144  TransactionId f_mapped_xid;
5145  TransactionId f_create_xid;
5146  XLogRecPtr f_lsn;
5147  uint32 f_hi,
5148  f_lo;
5149  RewriteMappingFile *f;
5150 
5151  if (strcmp(mapping_de->d_name, ".") == 0 ||
5152  strcmp(mapping_de->d_name, "..") == 0)
5153  continue;
5154 
5155  /* Ignore files that aren't ours */
5156  if (strncmp(mapping_de->d_name, "map-", 4) != 0)
5157  continue;
5158 
5159  if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
5160  &f_dboid, &f_relid, &f_hi, &f_lo,
5161  &f_mapped_xid, &f_create_xid) != 6)
5162  elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
5163 
5164  f_lsn = ((uint64) f_hi) << 32 | f_lo;
5165 
5166  /* mapping for another database */
5167  if (f_dboid != dboid)
5168  continue;
5169 
5170  /* mapping for another relation */
5171  if (f_relid != relid)
5172  continue;
5173 
5174  /* did the creating transaction abort? */
5175  if (!TransactionIdDidCommit(f_create_xid))
5176  continue;
5177 
5178  /* not for our transaction */
5179  if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
5180  continue;
5181 
5182  /* ok, relevant, queue for apply */
5183  f = palloc(sizeof(RewriteMappingFile));
5184  f->lsn = f_lsn;
5185  strcpy(f->fname, mapping_de->d_name);
5186  files = lappend(files, f);
5187  }
5188  FreeDir(mapping_dir);
5189 
5190  /* sort files so we apply them in LSN order */
5191  list_sort(files, file_sort_by_lsn);
5192 
5193  foreach(file, files)
5194  {
5196 
5197  elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
5198  snapshot->subxip[0]);
5200  pfree(f);
5201  }
5202 }
5203 
5204 /*
5205  * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
5206  * combo CIDs.
5207  */
5208 bool
5210  Snapshot snapshot,
5211  HeapTuple htup, Buffer buffer,
5212  CommandId *cmin, CommandId *cmax)
5213 {
5216  ForkNumber forkno;
5217  BlockNumber blockno;
5218  bool updated_mapping = false;
5219 
5220  /*
5221  * Return unresolved if tuplecid_data is not valid. That's because when
5222  * streaming in-progress transactions we may run into tuples with the CID
5223  * before actually decoding them. Think e.g. about INSERT followed by
5224  * TRUNCATE, where the TRUNCATE may not be decoded yet when applying the
5225  * INSERT. So in such cases, we assume the CID is from the future
5226  * command.
5227  */
5228  if (tuplecid_data == NULL)
5229  return false;
5230 
5231  /* be careful about padding */
5232  memset(&key, 0, sizeof(key));
5233 
5234  Assert(!BufferIsLocal(buffer));
5235 
5236  /*
5237  * get relfilelocator from the buffer, no convenient way to access it
5238  * other than that.
5239  */
5240  BufferGetTag(buffer, &key.rlocator, &forkno, &blockno);
5241 
5242  /* tuples can only be in the main fork */
5243  Assert(forkno == MAIN_FORKNUM);
5244  Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
5245 
5246  ItemPointerCopy(&htup->t_self,
5247  &key.tid);
5248 
5249 restart:
5250  ent = (ReorderBufferTupleCidEnt *)
5252 
5253  /*
5254  * failed to find a mapping, check whether the table was rewritten and
5255  * apply mapping if so, but only do that once - there can be no new
5256  * mappings while we are in here since we have to hold a lock on the
5257  * relation.
5258  */
5259  if (ent == NULL && !updated_mapping)
5260  {
5262  /* now check but don't update for a mapping again */
5263  updated_mapping = true;
5264  goto restart;
5265  }
5266  else if (ent == NULL)
5267  return false;
5268 
5269  if (cmin)
5270  *cmin = ent->cmin;
5271  if (cmax)
5272  *cmax = ent->cmax;
5273  return true;
5274 }
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:138
void binaryheap_replace_first(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:255
bh_node_type binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:177
bh_node_type binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:192
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:39
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:75
void binaryheap_add_unordered(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:116
uint32 BlockNumber
Definition: block.h:31
static int32 next
Definition: blutils.c:221
static void cleanup(void)
Definition: bootstrap.c:686
int Buffer
Definition: buf.h:23
#define BufferIsLocal(buffer)
Definition: buf.h:37
void BufferGetTag(Buffer buffer, RelFileLocator *rlocator, ForkNumber *forknum, BlockNumber *blknum)
Definition: bufmgr.c:3399
#define NameStr(name)
Definition: c.h:735
#define InvalidCommandId
Definition: c.h:658
unsigned int uint32
Definition: c.h:495
signed int int32
Definition: c.h:483
char * Pointer
Definition: c.h:472
#define VARHDRSZ
Definition: c.h:681
#define PG_BINARY
Definition: c.h:1262
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:387
#define FirstCommandId
Definition: c.h:657
uint32 CommandId
Definition: c.h:655
uint32 TransactionId
Definition: c.h:641
size_t Size
Definition: c.h:594
bool IsToastRelation(Relation relation)
Definition: catalog.c:147
bool IsSharedRelation(Oid relationId)
Definition: catalog.c:245
int64 TimestampTz
Definition: timestamp.h:39
#define INDIRECT_POINTER_SIZE
Definition: detoast.h:34
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
Definition: detoast.h:22
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:863
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:953
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:350
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1431
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1421
struct cursor * cur
Definition: ecpg.c:28
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1782
int errcode_for_file_access(void)
Definition: elog.c:883
void FlushErrorState(void)
Definition: elog.c:1831
int errmsg(const char *fmt,...)
Definition: elog.c:1075
ErrorData * CopyErrorData(void)
Definition: elog.c:1726
#define PG_RE_THROW()
Definition: elog.h:411
#define DEBUG3
Definition: elog.h:28
#define PG_TRY(...)
Definition: elog.h:370
#define DEBUG2
Definition: elog.h:29
#define PG_END_TRY(...)
Definition: elog.h:395
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define PG_CATCH(...)
Definition: elog.h:380
#define INFO
Definition: elog.h:34
#define ereport(elevel,...)
Definition: elog.h:149
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2909
int FreeDir(DIR *dir)
Definition: fd.c:2961
int CloseTransientFile(int fd)
Definition: fd.c:2809
void FileClose(File file)
Definition: fd.c:1978
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1575
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2924
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2633
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2843
static int FileRead(File file, void *buffer, size_t amount, off_t offset, uint32 wait_event_info)
Definition: fd.h:196
int File
Definition: fd.h:51
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: generation.c:157
Oid MyDatabaseId
Definition: globals.c:90
#define free(a)
Definition: header.h:65
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1117
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:1346
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_REMOVE
Definition: hsearch.h:115
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
#define HEAPTUPLESIZE
Definition: htup.h:73
HeapTupleData * HeapTuple
Definition: htup.h:71
struct HeapTupleData HeapTupleData
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
#define SizeofHeapTupleHeader
Definition: htup_details.h:185
#define MaxHeapTupleSize
Definition: htup_details.h:558
static Datum fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull)
Definition: htup_details.h:749
#define dlist_foreach(iter, lhead)
Definition: ilist.h:623
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
#define dclist_container(type, membername, ptr)
Definition: ilist.h:947
static bool dlist_has_next(const dlist_head *head, const dlist_node *node)
Definition: ilist.h:503
static void dclist_push_tail(dclist_head *head, dlist_node *node)
Definition: ilist.h:709
static void dlist_insert_before(dlist_node *before, dlist_node *node)
Definition: ilist.h:393
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:603
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
static uint32 dclist_count(const dclist_head *head)
Definition: ilist.h:932
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:537
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:450
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:640
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:364
static void dclist_delete_from(dclist_head *head, dlist_node *node)
Definition: ilist.h:763
static void dclist_init(dclist_head *head)
Definition: ilist.h:671
#define dlist_container(type, membername, ptr)
Definition: ilist.h:593
#define dclist_foreach(iter, lhead)
Definition: ilist.h:970
static int pg_cmp_u64(uint64 a, uint64 b)
Definition: int.h:501
#define write(a, b, c)
Definition: win32.h:14
#define read(a, b, c)
Definition: win32.h:13
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:706
int b
Definition: isn.c:70
int a
Definition: isn.c:69
int i
Definition: isn.c:73
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
static OffsetNumber ItemPointerGetOffsetNumber(const ItemPointerData *pointer)
Definition: itemptr.h:124
static BlockNumber ItemPointerGetBlockNumber(const ItemPointerData *pointer)
Definition: itemptr.h:103
static void ItemPointerCopy(const ItemPointerData *fromPointer, ItemPointerData *toPointer)
Definition: itemptr.h:172
Assert(fmt[strlen(fmt) - 1] !='\n')
void list_sort(List *list, list_sort_comparator cmp)
Definition: list.c:1674
List * lappend(List *list, void *datum)
Definition: list.c:339
void UpdateDecodingStats(LogicalDecodingContext *ctx)
Definition: logical.c:1926
char * pstrdup(const char *in)
Definition: mcxt.c:1619
void pfree(void *pointer)
Definition: mcxt.c:1431
void * palloc0(Size size)
Definition: mcxt.c:1232
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1077
MemoryContext CurrentMemoryContext
Definition: mcxt.c:135
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1451
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1034
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:403
void * palloc(Size size)
Definition: mcxt.c:1201
#define AllocSetContextCreate
Definition: memutils.h:128
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:152
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:181
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:182
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:209
void * arg
#define MAXPGPATH
const void * data
#define lfirst(lc)
Definition: pg_list.h:172
#define NIL
Definition: pg_list.h:68
#define sprintf
Definition: port.h:240
#define snprintf
Definition: port.h:238
#define qsort(a, b, c, d)
Definition: port.h:449
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
uintptr_t Datum
Definition: postgres.h:64
static Oid DatumGetObjectId(Datum X)
Definition: postgres.h:242
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:312
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:202
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:700
#define RelationGetDescr(relation)
Definition: rel.h:530
#define RelationGetRelationName(relation)
Definition: rel.h:538
#define RelationIsValid(relation)
Definition: rel.h:477
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2057
void RelationClose(Relation relation)
Definition: relcache.c:2188
Oid RelidByRelfilenumber(Oid reltablespace, RelFileNumber relfilenumber)
ForkNumber
Definition: relpath.h:48
@ MAIN_FORKNUM
Definition: relpath.h:50
#define relpathperm(rlocator, forknum)
Definition: relpath.h:90
static int file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
TransactionId * ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb)
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileLocator locator, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
static void ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, TimestampTz abort_time)
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)
static void ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
struct ReorderBufferDiskChange ReorderBufferDiskChange
HeapTuple ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
void ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)