PostgreSQL Source Code  git master
reorderbuffer.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * reorderbuffer.c
4  * PostgreSQL logical replay/reorder buffer management
5  *
6  *
7  * Copyright (c) 2012-2024, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/replication/logical/reorderbuffer.c
12  *
13  * NOTES
14  * This module gets handed individual pieces of transactions in the order
15  * they are written to the WAL and is responsible to reassemble them into
16  * toplevel transaction sized pieces. When a transaction is completely
17  * reassembled - signaled by reading the transaction commit record - it
18  * will then call the output plugin (cf. ReorderBufferCommit()) with the
19  * individual changes. The output plugins rely on snapshots built by
20  * snapbuild.c which hands them to us.
21  *
22  * Transactions and subtransactions/savepoints in postgres are not
23  * immediately linked to each other from outside the performing
24  * backend. Only at commit/abort (or special xact_assignment records) they
25  * are linked together. Which means that we will have to splice together a
26  * toplevel transaction from its subtransactions. To do that efficiently we
27  * build a binary heap indexed by the smallest current lsn of the individual
28  * subtransactions' changestreams. As the individual streams are inherently
29  * ordered by LSN - since that is where we build them from - the transaction
30  * can easily be reassembled by always using the subtransaction with the
31  * smallest current LSN from the heap.
32  *
33  * In order to cope with large transactions - which can be several times as
34  * big as the available memory - this module supports spooling the contents
35  * of large transactions to disk. When the transaction is replayed the
36  * contents of individual (sub-)transactions will be read from disk in
37  * chunks.
38  *
39  * This module also has to deal with reassembling toast records from the
40  * individual chunks stored in WAL. When a new (or initial) version of a
41  * tuple is stored in WAL it will always be preceded by the toast chunks
42  * emitted for the columns stored out of line. Within a single toplevel
43  * transaction there will be no other data carrying records between a row's
44  * toast chunks and the row data itself. See ReorderBufferToast* for
45  * details.
46  *
47  * ReorderBuffer uses two special memory context types - SlabContext for
48  * allocations of fixed-length structures (changes and transactions), and
49  * GenerationContext for the variable-length transaction data (allocated
50  * and freed in groups with similar lifespans).
51  *
52  * To limit the amount of memory used by decoded changes, we track memory
53  * used at the reorder buffer level (i.e. total amount of memory), and for
54  * each transaction. When the total amount of used memory exceeds the
55  * limit, the transaction consuming the most memory is then serialized to
56  * disk.
57  *
58  * Only decoded changes are evicted from memory (spilled to disk), not the
59  * transaction records. The number of toplevel transactions is limited,
60  * but a transaction with many subtransactions may still consume significant
61  * amounts of memory. However, the transaction records are fairly small and
62  * are not included in the memory limit.
63  *
64  * The current eviction algorithm is very simple - the transaction is
65  * picked merely by size, while it might be useful to also consider age
66  * (LSN) of the changes for example. With the new Generational memory
67  * allocator, evicting the oldest changes would make it more likely the
68  * memory gets actually freed.
69  *
70  * We still rely on max_changes_in_memory when loading serialized changes
71  * back into memory. At that point we can't use the memory limit directly
72  * as we load the subxacts independently. One option to deal with this
73  * would be to count the subxacts, and allow each to allocate 1/N of the
74  * memory limit. That however does not seem very appealing, because with
75  * many subtransactions it may easily cause thrashing (short cycles of
76  * deserializing and applying very few changes). We probably should give
77  * a bit more memory to the oldest subtransactions, because it's likely
78  * they are the source for the next sequence of changes.
79  *
80  * -------------------------------------------------------------------------
81  */
82 #include "postgres.h"
83 
84 #include <unistd.h>
85 #include <sys/stat.h>
86 
87 #include "access/detoast.h"
88 #include "access/heapam.h"
89 #include "access/rewriteheap.h"
90 #include "access/transam.h"
91 #include "access/xact.h"
92 #include "access/xlog_internal.h"
93 #include "catalog/catalog.h"
94 #include "common/int.h"
95 #include "lib/binaryheap.h"
96 #include "miscadmin.h"
97 #include "pgstat.h"
98 #include "replication/logical.h"
100 #include "replication/slot.h"
101 #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
102 #include "storage/bufmgr.h"
103 #include "storage/fd.h"
104 #include "storage/sinval.h"
105 #include "utils/builtins.h"
106 #include "utils/memutils.h"
107 #include "utils/rel.h"
108 #include "utils/relfilenumbermap.h"
109 
110 
111 /* entry for a hash table we use to map from xid to our transaction state */
113 {
117 
118 /* data structures for (relfilelocator, ctid) => (cmin, cmax) mapping */
120 {
124 
126 {
130  CommandId combocid; /* just for debugging */
132 
133 /* Virtual file descriptor with file offset tracking */
134 typedef struct TXNEntryFile
135 {
136  File vfd; /* -1 when the file is closed */
137  off_t curOffset; /* offset for next write or read. Reset to 0
138  * when vfd is opened. */
140 
141 /* k-way in-order change iteration support structures */
143 {
150 
152 {
158 
159 /* toast datastructures */
160 typedef struct ReorderBufferToastEnt
161 {
162  Oid chunk_id; /* toast_table.chunk_id */
163  int32 last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
164  * have seen */
165  Size num_chunks; /* number of chunks we've already seen */
166  Size size; /* combined size of chunks seen */
167  dlist_head chunks; /* linked list of chunks */
168  struct varlena *reconstructed; /* reconstructed varlena now pointed to in
169  * main tup */
171 
172 /* Disk serialization support datastructures */
174 {
177  /* data follows */
179 
180 #define IsSpecInsert(action) \
181 ( \
182  ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \
183 )
184 #define IsSpecConfirmOrAbort(action) \
185 ( \
186  (((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) || \
187  ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT)) \
188 )
189 #define IsInsertOrUpdate(action) \
190 ( \
191  (((action) == REORDER_BUFFER_CHANGE_INSERT) || \
192  ((action) == REORDER_BUFFER_CHANGE_UPDATE) || \
193  ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT)) \
194 )
195 
196 /*
197  * Maximum number of changes kept in memory, per transaction. After that,
198  * changes are spooled to disk.
199  *
200  * The current value should be sufficient to decode the entire transaction
201  * without hitting disk in OLTP workloads, while starting to spool to disk in
202  * other workloads reasonably fast.
203  *
204  * At some point in the future it probably makes sense to have a more elaborate
205  * resource management here, but it's not entirely clear what that would look
206  * like.
207  */
209 static const Size max_changes_in_memory = 4096; /* XXX for restore only */
210 
211 /* GUC variable */
213 
214 /* ---------------------------------------
215  * primary reorderbuffer support routines
216  * ---------------------------------------
217  */
221  TransactionId xid, bool create, bool *is_new,
222  XLogRecPtr lsn, bool create_as_top);
224  ReorderBufferTXN *subtxn);
225 
226 static void AssertTXNLsnOrder(ReorderBuffer *rb);
227 
228 /* ---------------------------------------
229  * support functions for lsn-order iterating over the ->changes of a
230  * transaction and its subtransactions
231  *
232  * used for iteration over the k-way heap merge of a transaction and its
233  * subtransactions
234  * ---------------------------------------
235  */
237  ReorderBufferIterTXNState *volatile *iter_state);
242 
243 /*
244  * ---------------------------------------
245  * Disk serialization support functions
246  * ---------------------------------------
247  */
251  int fd, ReorderBufferChange *change);
253  TXNEntryFile *file, XLogSegNo *segno);
255  char *data);
258  bool txn_prepared);
259 static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
260 static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
261  TransactionId xid, XLogSegNo segno);
262 
263 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
265  ReorderBufferTXN *txn, CommandId cid);
266 
267 /*
268  * ---------------------------------------
269  * Streaming support functions
270  * ---------------------------------------
271  */
272 static inline bool ReorderBufferCanStream(ReorderBuffer *rb);
273 static inline bool ReorderBufferCanStartStreaming(ReorderBuffer *rb);
276 
277 /* ---------------------------------------
278  * toast reassembly support
279  * ---------------------------------------
280  */
284  Relation relation, ReorderBufferChange *change);
286  Relation relation, ReorderBufferChange *change);
287 
288 /*
289  * ---------------------------------------
290  * memory accounting
291  * ---------------------------------------
292  */
295  ReorderBufferChange *change,
296  bool addition, Size sz);
297 
298 /*
299  * Allocate a new ReorderBuffer and clean out any old serialized state from
300  * prior ReorderBuffer instances for the same slot.
301  */
304 {
305  ReorderBuffer *buffer;
306  HASHCTL hash_ctl;
307  MemoryContext new_ctx;
308 
309  Assert(MyReplicationSlot != NULL);
310 
311  /* allocate memory in own context, to have better accountability */
313  "ReorderBuffer",
315 
316  buffer =
317  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
318 
319  memset(&hash_ctl, 0, sizeof(hash_ctl));
320 
321  buffer->context = new_ctx;
322 
323  buffer->change_context = SlabContextCreate(new_ctx,
324  "Change",
326  sizeof(ReorderBufferChange));
327 
328  buffer->txn_context = SlabContextCreate(new_ctx,
329  "TXN",
331  sizeof(ReorderBufferTXN));
332 
333  /*
334  * XXX the allocation sizes used below pre-date generation context's block
335  * growing code. These values should likely be benchmarked and set to
336  * more suitable values.
337  */
338  buffer->tup_context = GenerationContextCreate(new_ctx,
339  "Tuples",
343 
344  hash_ctl.keysize = sizeof(TransactionId);
345  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
346  hash_ctl.hcxt = buffer->context;
347 
348  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
350 
352  buffer->by_txn_last_txn = NULL;
353 
354  buffer->outbuf = NULL;
355  buffer->outbufsize = 0;
356  buffer->size = 0;
357 
358  buffer->spillTxns = 0;
359  buffer->spillCount = 0;
360  buffer->spillBytes = 0;
361  buffer->streamTxns = 0;
362  buffer->streamCount = 0;
363  buffer->streamBytes = 0;
364  buffer->totalTxns = 0;
365  buffer->totalBytes = 0;
366 
368 
369  dlist_init(&buffer->toplevel_by_lsn);
371  dclist_init(&buffer->catchange_txns);
372 
373  /*
374  * Ensure there's no stale data from prior uses of this slot, in case some
375  * prior exit avoided calling ReorderBufferFree. Failure to do this can
376  * produce duplicated txns, and it's very cheap if there's nothing there.
377  */
379 
380  return buffer;
381 }
382 
383 /*
384  * Free a ReorderBuffer
385  */
386 void
388 {
390 
391  /*
392  * We free separately allocated data by entirely scrapping reorderbuffer's
393  * memory context.
394  */
396 
397  /* Free disk space used by unconsumed reorder buffers */
399 }
400 
401 /*
402  * Get an unused, possibly preallocated, ReorderBufferTXN.
403  */
404 static ReorderBufferTXN *
406 {
407  ReorderBufferTXN *txn;
408 
409  txn = (ReorderBufferTXN *)
411 
412  memset(txn, 0, sizeof(ReorderBufferTXN));
413 
414  dlist_init(&txn->changes);
415  dlist_init(&txn->tuplecids);
416  dlist_init(&txn->subtxns);
417 
418  /* InvalidCommandId is not zero, so set it explicitly */
420  txn->output_plugin_private = NULL;
421 
422  return txn;
423 }
424 
425 /*
426  * Free a ReorderBufferTXN.
427  */
428 static void
430 {
431  /* clean the lookup cache if we were cached (quite likely) */
432  if (rb->by_txn_last_xid == txn->xid)
433  {
435  rb->by_txn_last_txn = NULL;
436  }
437 
438  /* free data that's contained */
439 
440  if (txn->gid != NULL)
441  {
442  pfree(txn->gid);
443  txn->gid = NULL;
444  }
445 
446  if (txn->tuplecid_hash != NULL)
447  {
449  txn->tuplecid_hash = NULL;
450  }
451 
452  if (txn->invalidations)
453  {
454  pfree(txn->invalidations);
455  txn->invalidations = NULL;
456  }
457 
458  /* Reset the toast hash */
459  ReorderBufferToastReset(rb, txn);
460 
461  pfree(txn);
462 }
463 
464 /*
465  * Get a fresh ReorderBufferChange.
466  */
469 {
470  ReorderBufferChange *change;
471 
472  change = (ReorderBufferChange *)
474 
475  memset(change, 0, sizeof(ReorderBufferChange));
476  return change;
477 }
478 
479 /*
480  * Free a ReorderBufferChange and update memory accounting, if requested.
481  */
482 void
484  bool upd_mem)
485 {
486  /* update memory accounting info */
487  if (upd_mem)
488  ReorderBufferChangeMemoryUpdate(rb, change, false,
489  ReorderBufferChangeSize(change));
490 
491  /* free contained data */
492  switch (change->action)
493  {
498  if (change->data.tp.newtuple)
499  {
500  ReorderBufferReturnTupleBuf(change->data.tp.newtuple);
501  change->data.tp.newtuple = NULL;
502  }
503 
504  if (change->data.tp.oldtuple)
505  {
506  ReorderBufferReturnTupleBuf(change->data.tp.oldtuple);
507  change->data.tp.oldtuple = NULL;
508  }
509  break;
511  if (change->data.msg.prefix != NULL)
512  pfree(change->data.msg.prefix);
513  change->data.msg.prefix = NULL;
514  if (change->data.msg.message != NULL)
515  pfree(change->data.msg.message);
516  change->data.msg.message = NULL;
517  break;
519  if (change->data.inval.invalidations)
520  pfree(change->data.inval.invalidations);
521  change->data.inval.invalidations = NULL;
522  break;
524  if (change->data.snapshot)
525  {
526  ReorderBufferFreeSnap(rb, change->data.snapshot);
527  change->data.snapshot = NULL;
528  }
529  break;
530  /* no data in addition to the struct itself */
532  if (change->data.truncate.relids != NULL)
533  {
534  ReorderBufferReturnRelids(rb, change->data.truncate.relids);
535  change->data.truncate.relids = NULL;
536  }
537  break;
542  break;
543  }
544 
545  pfree(change);
546 }
547 
548 /*
549  * Get a fresh HeapTuple fitting a tuple of size tuple_len (excluding header
550  * overhead).
551  */
552 HeapTuple
554 {
555  HeapTuple tuple;
556  Size alloc_len;
557 
558  alloc_len = tuple_len + SizeofHeapTupleHeader;
559 
561  HEAPTUPLESIZE + alloc_len);
562  tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
563 
564  return tuple;
565 }
566 
567 /*
568  * Free a HeapTuple returned by ReorderBufferGetTupleBuf().
569  */
570 void
572 {
573  pfree(tuple);
574 }
575 
576 /*
577  * Get an array for relids of truncated relations.
578  *
579  * We use the global memory context (for the whole reorder buffer), because
580  * none of the existing ones seems like a good match (some are SLAB, so we
581  * can't use those, and tup_context is meant for tuple data, not relids). We
582  * could add yet another context, but it seems like an overkill - TRUNCATE is
583  * not particularly common operation, so it does not seem worth it.
584  */
585 Oid *
587 {
588  Oid *relids;
589  Size alloc_len;
590 
591  alloc_len = sizeof(Oid) * nrelids;
592 
593  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
594 
595  return relids;
596 }
597 
598 /*
599  * Free an array of relids.
600  */
601 void
603 {
604  pfree(relids);
605 }
606 
607 /*
608  * Return the ReorderBufferTXN from the given buffer, specified by Xid.
609  * If create is true, and a transaction doesn't already exist, create it
610  * (with the given LSN, and as top transaction if that's specified);
611  * when this happens, is_new is set to true.
612  */
613 static ReorderBufferTXN *
615  bool *is_new, XLogRecPtr lsn, bool create_as_top)
616 {
617  ReorderBufferTXN *txn;
619  bool found;
620 
622 
623  /*
624  * Check the one-entry lookup cache first
625  */
627  rb->by_txn_last_xid == xid)
628  {
629  txn = rb->by_txn_last_txn;
630 
631  if (txn != NULL)
632  {
633  /* found it, and it's valid */
634  if (is_new)
635  *is_new = false;
636  return txn;
637  }
638 
639  /*
640  * cached as non-existent, and asked not to create? Then nothing else
641  * to do.
642  */
643  if (!create)
644  return NULL;
645  /* otherwise fall through to create it */
646  }
647 
648  /*
649  * If the cache wasn't hit or it yielded a "does-not-exist" and we want to
650  * create an entry.
651  */
652 
653  /* search the lookup table */
654  ent = (ReorderBufferTXNByIdEnt *)
655  hash_search(rb->by_txn,
656  &xid,
657  create ? HASH_ENTER : HASH_FIND,
658  &found);
659  if (found)
660  txn = ent->txn;
661  else if (create)
662  {
663  /* initialize the new entry, if creation was requested */
664  Assert(ent != NULL);
665  Assert(lsn != InvalidXLogRecPtr);
666 
667  ent->txn = ReorderBufferGetTXN(rb);
668  ent->txn->xid = xid;
669  txn = ent->txn;
670  txn->first_lsn = lsn;
672 
673  if (create_as_top)
674  {
675  dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
676  AssertTXNLsnOrder(rb);
677  }
678  }
679  else
680  txn = NULL; /* not found and not asked to create */
681 
682  /* update cache */
683  rb->by_txn_last_xid = xid;
684  rb->by_txn_last_txn = txn;
685 
686  if (is_new)
687  *is_new = !found;
688 
689  Assert(!create || txn != NULL);
690  return txn;
691 }
692 
693 /*
694  * Record the partial change for the streaming of in-progress transactions. We
695  * can stream only complete changes so if we have a partial change like toast
696  * table insert or speculative insert then we mark such a 'txn' so that it
697  * can't be streamed. We also ensure that if the changes in such a 'txn' can
698  * be streamed and are above logical_decoding_work_mem threshold then we stream
699  * them as soon as we have a complete change.
700  */
701 static void
703  ReorderBufferChange *change,
704  bool toast_insert)
705 {
706  ReorderBufferTXN *toptxn;
707 
708  /*
709  * The partial changes need to be processed only while streaming
710  * in-progress transactions.
711  */
712  if (!ReorderBufferCanStream(rb))
713  return;
714 
715  /* Get the top transaction. */
716  toptxn = rbtxn_get_toptxn(txn);
717 
718  /*
719  * Indicate a partial change for toast inserts. The change will be
720  * considered as complete once we get the insert or update on the main
721  * table and we are sure that the pending toast chunks are not required
722  * anymore.
723  *
724  * If we allow streaming when there are pending toast chunks then such
725  * chunks won't be released till the insert (multi_insert) is complete and
726  * we expect the txn to have streamed all changes after streaming. This
727  * restriction is mainly to ensure the correctness of streamed
728  * transactions and it doesn't seem worth uplifting such a restriction
729  * just to allow this case because anyway we will stream the transaction
730  * once such an insert is complete.
731  */
732  if (toast_insert)
734  else if (rbtxn_has_partial_change(toptxn) &&
735  IsInsertOrUpdate(change->action) &&
736  change->data.tp.clear_toast_afterwards)
738 
739  /*
740  * Indicate a partial change for speculative inserts. The change will be
741  * considered as complete once we get the speculative confirm or abort
742  * token.
743  */
744  if (IsSpecInsert(change->action))
746  else if (rbtxn_has_partial_change(toptxn) &&
747  IsSpecConfirmOrAbort(change->action))
749 
750  /*
751  * Stream the transaction if it is serialized before and the changes are
752  * now complete in the top-level transaction.
753  *
754  * The reason for doing the streaming of such a transaction as soon as we
755  * get the complete change for it is that previously it would have reached
756  * the memory threshold and wouldn't get streamed because of incomplete
757  * changes. Delaying such transactions would increase apply lag for them.
758  */
760  !(rbtxn_has_partial_change(toptxn)) &&
761  rbtxn_is_serialized(txn) &&
763  ReorderBufferStreamTXN(rb, toptxn);
764 }
765 
766 /*
767  * Queue a change into a transaction so it can be replayed upon commit or will be
768  * streamed when we reach logical_decoding_work_mem threshold.
769  */
770 void
772  ReorderBufferChange *change, bool toast_insert)
773 {
774  ReorderBufferTXN *txn;
775 
776  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
777 
778  /*
779  * While streaming the previous changes we have detected that the
780  * transaction is aborted. So there is no point in collecting further
781  * changes for it.
782  */
783  if (txn->concurrent_abort)
784  {
785  /*
786  * We don't need to update memory accounting for this change as we
787  * have not added it to the queue yet.
788  */
789  ReorderBufferReturnChange(rb, change, false);
790  return;
791  }
792 
793  /*
794  * The changes that are sent downstream are considered streamable. We
795  * remember such transactions so that only those will later be considered
796  * for streaming.
797  */
798  if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
804  {
805  ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
806 
808  }
809 
810  change->lsn = lsn;
811  change->txn = txn;
812 
813  Assert(InvalidXLogRecPtr != lsn);
814  dlist_push_tail(&txn->changes, &change->node);
815  txn->nentries++;
816  txn->nentries_mem++;
817 
818  /* update memory accounting information */
819  ReorderBufferChangeMemoryUpdate(rb, change, true,
820  ReorderBufferChangeSize(change));
821 
822  /* process partial change */
823  ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
824 
825  /* check the memory limits and evict something if needed */
827 }
828 
829 /*
830  * A transactional message is queued to be processed upon commit and a
831  * non-transactional message gets processed immediately.
832  */
833 void
835  Snapshot snap, XLogRecPtr lsn,
836  bool transactional, const char *prefix,
837  Size message_size, const char *message)
838 {
839  if (transactional)
840  {
841  MemoryContext oldcontext;
842  ReorderBufferChange *change;
843 
845 
846  /*
847  * We don't expect snapshots for transactional changes - we'll use the
848  * snapshot derived later during apply (unless the change gets
849  * skipped).
850  */
851  Assert(!snap);
852 
853  oldcontext = MemoryContextSwitchTo(rb->context);
854 
855  change = ReorderBufferGetChange(rb);
857  change->data.msg.prefix = pstrdup(prefix);
858  change->data.msg.message_size = message_size;
859  change->data.msg.message = palloc(message_size);
860  memcpy(change->data.msg.message, message, message_size);
861 
862  ReorderBufferQueueChange(rb, xid, lsn, change, false);
863 
864  MemoryContextSwitchTo(oldcontext);
865  }
866  else
867  {
868  ReorderBufferTXN *txn = NULL;
869  volatile Snapshot snapshot_now = snap;
870 
871  /* Non-transactional changes require a valid snapshot. */
872  Assert(snapshot_now);
873 
874  if (xid != InvalidTransactionId)
875  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
876 
877  /* setup snapshot to allow catalog access */
878  SetupHistoricSnapshot(snapshot_now, NULL);
879  PG_TRY();
880  {
881  rb->message(rb, txn, lsn, false, prefix, message_size, message);
882 
884  }
885  PG_CATCH();
886  {
888  PG_RE_THROW();
889  }
890  PG_END_TRY();
891  }
892 }
893 
894 /*
895  * AssertTXNLsnOrder
896  * Verify LSN ordering of transaction lists in the reorderbuffer
897  *
898  * Other LSN-related invariants are checked too.
899  *
900  * No-op if assertions are not in use.
901  */
902 static void
904 {
905 #ifdef USE_ASSERT_CHECKING
907  dlist_iter iter;
908  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
909  XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
910 
911  /*
912  * Skip the verification if we don't reach the LSN at which we start
913  * decoding the contents of transactions yet because until we reach the
914  * LSN, we could have transactions that don't have the association between
915  * the top-level transaction and subtransaction yet and consequently have
916  * the same LSN. We don't guarantee this association until we try to
917  * decode the actual contents of transaction. The ordering of the records
918  * prior to the start_decoding_at LSN should have been checked before the
919  * restart.
920  */
922  return;
923 
924  dlist_foreach(iter, &rb->toplevel_by_lsn)
925  {
927  iter.cur);
928 
929  /* start LSN must be set */
930  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
931 
932  /* If there is an end LSN, it must be higher than start LSN */
933  if (cur_txn->end_lsn != InvalidXLogRecPtr)
934  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
935 
936  /* Current initial LSN must be strictly higher than previous */
937  if (prev_first_lsn != InvalidXLogRecPtr)
938  Assert(prev_first_lsn < cur_txn->first_lsn);
939 
940  /* known-as-subtxn txns must not be listed */
941  Assert(!rbtxn_is_known_subxact(cur_txn));
942 
943  prev_first_lsn = cur_txn->first_lsn;
944  }
945 
947  {
949  base_snapshot_node,
950  iter.cur);
951 
952  /* base snapshot (and its LSN) must be set */
953  Assert(cur_txn->base_snapshot != NULL);
955 
956  /* current LSN must be strictly higher than previous */
957  if (prev_base_snap_lsn != InvalidXLogRecPtr)
958  Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
959 
960  /* known-as-subtxn txns must not be listed */
961  Assert(!rbtxn_is_known_subxact(cur_txn));
962 
963  prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
964  }
965 #endif
966 }
967 
968 /*
969  * AssertChangeLsnOrder
970  *
971  * Check ordering of changes in the (sub)transaction.
972  */
973 static void
975 {
976 #ifdef USE_ASSERT_CHECKING
977  dlist_iter iter;
978  XLogRecPtr prev_lsn = txn->first_lsn;
979 
980  dlist_foreach(iter, &txn->changes)
981  {
982  ReorderBufferChange *cur_change;
983 
984  cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
985 
987  Assert(cur_change->lsn != InvalidXLogRecPtr);
988  Assert(txn->first_lsn <= cur_change->lsn);
989 
990  if (txn->end_lsn != InvalidXLogRecPtr)
991  Assert(cur_change->lsn <= txn->end_lsn);
992 
993  Assert(prev_lsn <= cur_change->lsn);
994 
995  prev_lsn = cur_change->lsn;
996  }
997 #endif
998 }
999 
1000 /*
1001  * ReorderBufferGetOldestTXN
1002  * Return oldest transaction in reorderbuffer
1003  */
1006 {
1007  ReorderBufferTXN *txn;
1008 
1009  AssertTXNLsnOrder(rb);
1010 
1011  if (dlist_is_empty(&rb->toplevel_by_lsn))
1012  return NULL;
1013 
1015 
1018  return txn;
1019 }
1020 
1021 /*
1022  * ReorderBufferGetOldestXmin
1023  * Return oldest Xmin in reorderbuffer
1024  *
1025  * Returns oldest possibly running Xid from the point of view of snapshots
1026  * used in the transactions kept by reorderbuffer, or InvalidTransactionId if
1027  * there are none.
1028  *
1029  * Since snapshots are assigned monotonically, this equals the Xmin of the
1030  * base snapshot with minimal base_snapshot_lsn.
1031  */
1034 {
1035  ReorderBufferTXN *txn;
1036 
1037  AssertTXNLsnOrder(rb);
1038 
1040  return InvalidTransactionId;
1041 
1042  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
1044  return txn->base_snapshot->xmin;
1045 }
1046 
1047 void
1049 {
1050  rb->current_restart_decoding_lsn = ptr;
1051 }
1052 
1053 /*
1054  * ReorderBufferAssignChild
1055  *
1056  * Make note that we know that subxid is a subtransaction of xid, seen as of
1057  * the given lsn.
1058  */
1059 void
1061  TransactionId subxid, XLogRecPtr lsn)
1062 {
1063  ReorderBufferTXN *txn;
1064  ReorderBufferTXN *subtxn;
1065  bool new_top;
1066  bool new_sub;
1067 
1068  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1069  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1070 
1071  if (!new_sub)
1072  {
1073  if (rbtxn_is_known_subxact(subtxn))
1074  {
1075  /* already associated, nothing to do */
1076  return;
1077  }
1078  else
1079  {
1080  /*
1081  * We already saw this transaction, but initially added it to the
1082  * list of top-level txns. Now that we know it's not top-level,
1083  * remove it from there.
1084  */
1085  dlist_delete(&subtxn->node);
1086  }
1087  }
1088 
1089  subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1090  subtxn->toplevel_xid = xid;
1091  Assert(subtxn->nsubtxns == 0);
1092 
1093  /* set the reference to top-level transaction */
1094  subtxn->toptxn = txn;
1095 
1096  /* add to subtransaction list */
1097  dlist_push_tail(&txn->subtxns, &subtxn->node);
1098  txn->nsubtxns++;
1099 
1100  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1101  ReorderBufferTransferSnapToParent(txn, subtxn);
1102 
1103  /* Verify LSN-ordering invariant */
1104  AssertTXNLsnOrder(rb);
1105 }
1106 
1107 /*
1108  * ReorderBufferTransferSnapToParent
1109  * Transfer base snapshot from subtxn to top-level txn, if needed
1110  *
1111  * This is done if the top-level txn doesn't have a base snapshot, or if the
1112  * subtxn's base snapshot has an earlier LSN than the top-level txn's base
1113  * snapshot's LSN. This can happen if there are no changes in the toplevel
1114  * txn but there are some in the subtxn, or the first change in subtxn has
1115  * earlier LSN than first change in the top-level txn and we learned about
1116  * their kinship only now.
1117  *
1118  * The subtransaction's snapshot is cleared regardless of the transfer
1119  * happening, since it's not needed anymore in either case.
1120  *
1121  * We do this as soon as we become aware of their kinship, to avoid queueing
1122  * extra snapshots to txns known-as-subtxns -- only top-level txns will
1123  * receive further snapshots.
1124  */
1125 static void
1127  ReorderBufferTXN *subtxn)
1128 {
1129  Assert(subtxn->toplevel_xid == txn->xid);
1130 
1131  if (subtxn->base_snapshot != NULL)
1132  {
1133  if (txn->base_snapshot == NULL ||
1134  subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
1135  {
1136  /*
1137  * If the toplevel transaction already has a base snapshot but
1138  * it's newer than the subxact's, purge it.
1139  */
1140  if (txn->base_snapshot != NULL)
1141  {
1144  }
1145 
1146  /*
1147  * The snapshot is now the top transaction's; transfer it, and
1148  * adjust the list position of the top transaction in the list by
1149  * moving it to where the subtransaction is.
1150  */
1151  txn->base_snapshot = subtxn->base_snapshot;
1152  txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
1154  &txn->base_snapshot_node);
1155 
1156  /*
1157  * The subtransaction doesn't have a snapshot anymore (so it
1158  * mustn't be in the list.)
1159  */
1160  subtxn->base_snapshot = NULL;
1162  dlist_delete(&subtxn->base_snapshot_node);
1163  }
1164  else
1165  {
1166  /* Base snap of toplevel is fine, so subxact's is not needed */
1168  dlist_delete(&subtxn->base_snapshot_node);
1169  subtxn->base_snapshot = NULL;
1171  }
1172  }
1173 }
1174 
1175 /*
1176  * Associate a subtransaction with its toplevel transaction at commit
1177  * time. There may be no further changes added after this.
1178  */
1179 void
1181  TransactionId subxid, XLogRecPtr commit_lsn,
1182  XLogRecPtr end_lsn)
1183 {
1184  ReorderBufferTXN *subtxn;
1185 
1186  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1187  InvalidXLogRecPtr, false);
1188 
1189  /*
1190  * No need to do anything if that subtxn didn't contain any changes
1191  */
1192  if (!subtxn)
1193  return;
1194 
1195  subtxn->final_lsn = commit_lsn;
1196  subtxn->end_lsn = end_lsn;
1197 
1198  /*
1199  * Assign this subxact as a child of the toplevel xact (no-op if already
1200  * done.)
1201  */
1202  ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
1203 }
1204 
1205 
1206 /*
1207  * Support for efficiently iterating over a transaction's and its
1208  * subtransactions' changes.
1209  *
1210  * We do by doing a k-way merge between transactions/subtransactions. For that
1211  * we model the current heads of the different transactions as a binary heap
1212  * so we easily know which (sub-)transaction has the change with the smallest
1213  * lsn next.
1214  *
1215  * We assume the changes in individual transactions are already sorted by LSN.
1216  */
1217 
1218 /*
1219  * Binary heap comparison function.
1220  */
1221 static int
1223 {
1225  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
1226  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
1227 
1228  if (pos_a < pos_b)
1229  return 1;
1230  else if (pos_a == pos_b)
1231  return 0;
1232  return -1;
1233 }
1234 
1235 /*
1236  * Allocate & initialize an iterator which iterates in lsn order over a
1237  * transaction and all its subtransactions.
1238  *
1239  * Note: The iterator state is returned through iter_state parameter rather
1240  * than the function's return value. This is because the state gets cleaned up
1241  * in a PG_CATCH block in the caller, so we want to make sure the caller gets
1242  * back the state even if this function throws an exception.
1243  */
1244 static void
1246  ReorderBufferIterTXNState *volatile *iter_state)
1247 {
1248  Size nr_txns = 0;
1250  dlist_iter cur_txn_i;
1251  int32 off;
1252 
1253  *iter_state = NULL;
1254 
1255  /* Check ordering of changes in the toplevel transaction. */
1256  AssertChangeLsnOrder(txn);
1257 
1258  /*
1259  * Calculate the size of our heap: one element for every transaction that
1260  * contains changes. (Besides the transactions already in the reorder
1261  * buffer, we count the one we were directly passed.)
1262  */
1263  if (txn->nentries > 0)
1264  nr_txns++;
1265 
1266  dlist_foreach(cur_txn_i, &txn->subtxns)
1267  {
1268  ReorderBufferTXN *cur_txn;
1269 
1270  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1271 
1272  /* Check ordering of changes in this subtransaction. */
1273  AssertChangeLsnOrder(cur_txn);
1274 
1275  if (cur_txn->nentries > 0)
1276  nr_txns++;
1277  }
1278 
1279  /* allocate iteration state */
1282  sizeof(ReorderBufferIterTXNState) +
1283  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1284 
1285  state->nr_txns = nr_txns;
1286  dlist_init(&state->old_change);
1287 
1288  for (off = 0; off < state->nr_txns; off++)
1289  {
1290  state->entries[off].file.vfd = -1;
1291  state->entries[off].segno = 0;
1292  }
1293 
1294  /* allocate heap */
1295  state->heap = binaryheap_allocate(state->nr_txns,
1297  state);
1298 
1299  /* Now that the state fields are initialized, it is safe to return it. */
1300  *iter_state = state;
1301 
1302  /*
1303  * Now insert items into the binary heap, in an unordered fashion. (We
1304  * will run a heap assembly step at the end; this is more efficient.)
1305  */
1306 
1307  off = 0;
1308 
1309  /* add toplevel transaction if it contains changes */
1310  if (txn->nentries > 0)
1311  {
1312  ReorderBufferChange *cur_change;
1313 
1314  if (rbtxn_is_serialized(txn))
1315  {
1316  /* serialize remaining changes */
1317  ReorderBufferSerializeTXN(rb, txn);
1318  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1319  &state->entries[off].segno);
1320  }
1321 
1322  cur_change = dlist_head_element(ReorderBufferChange, node,
1323  &txn->changes);
1324 
1325  state->entries[off].lsn = cur_change->lsn;
1326  state->entries[off].change = cur_change;
1327  state->entries[off].txn = txn;
1328 
1330  }
1331 
1332  /* add subtransactions if they contain changes */
1333  dlist_foreach(cur_txn_i, &txn->subtxns)
1334  {
1335  ReorderBufferTXN *cur_txn;
1336 
1337  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1338 
1339  if (cur_txn->nentries > 0)
1340  {
1341  ReorderBufferChange *cur_change;
1342 
1343  if (rbtxn_is_serialized(cur_txn))
1344  {
1345  /* serialize remaining changes */
1346  ReorderBufferSerializeTXN(rb, cur_txn);
1347  ReorderBufferRestoreChanges(rb, cur_txn,
1348  &state->entries[off].file,
1349  &state->entries[off].segno);
1350  }
1351  cur_change = dlist_head_element(ReorderBufferChange, node,
1352  &cur_txn->changes);
1353 
1354  state->entries[off].lsn = cur_change->lsn;
1355  state->entries[off].change = cur_change;
1356  state->entries[off].txn = cur_txn;
1357 
1359  }
1360  }
1361 
1362  /* assemble a valid binary heap */
1363  binaryheap_build(state->heap);
1364 }
1365 
1366 /*
1367  * Return the next change when iterating over a transaction and its
1368  * subtransactions.
1369  *
1370  * Returns NULL when no further changes exist.
1371  */
1372 static ReorderBufferChange *
1374 {
1375  ReorderBufferChange *change;
1377  int32 off;
1378 
1379  /* nothing there anymore */
1380  if (state->heap->bh_size == 0)
1381  return NULL;
1382 
1383  off = DatumGetInt32(binaryheap_first(state->heap));
1384  entry = &state->entries[off];
1385 
1386  /* free memory we might have "leaked" in the previous *Next call */
1387  if (!dlist_is_empty(&state->old_change))
1388  {
1389  change = dlist_container(ReorderBufferChange, node,
1390  dlist_pop_head_node(&state->old_change));
1391  ReorderBufferReturnChange(rb, change, true);
1392  Assert(dlist_is_empty(&state->old_change));
1393  }
1394 
1395  change = entry->change;
1396 
1397  /*
1398  * update heap with information about which transaction has the next
1399  * relevant change in LSN order
1400  */
1401 
1402  /* there are in-memory changes */
1403  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1404  {
1405  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1406  ReorderBufferChange *next_change =
1408 
1409  /* txn stays the same */
1410  state->entries[off].lsn = next_change->lsn;
1411  state->entries[off].change = next_change;
1412 
1414  return change;
1415  }
1416 
1417  /* try to load changes from disk */
1418  if (entry->txn->nentries != entry->txn->nentries_mem)
1419  {
1420  /*
1421  * Ugly: restoring changes will reuse *Change records, thus delete the
1422  * current one from the per-tx list and only free in the next call.
1423  */
1424  dlist_delete(&change->node);
1425  dlist_push_tail(&state->old_change, &change->node);
1426 
1427  /*
1428  * Update the total bytes processed by the txn for which we are
1429  * releasing the current set of changes and restoring the new set of
1430  * changes.
1431  */
1432  rb->totalBytes += entry->txn->size;
1433  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1434  &state->entries[off].segno))
1435  {
1436  /* successfully restored changes from disk */
1437  ReorderBufferChange *next_change =
1439  &entry->txn->changes);
1440 
1441  elog(DEBUG2, "restored %u/%u changes from disk",
1442  (uint32) entry->txn->nentries_mem,
1443  (uint32) entry->txn->nentries);
1444 
1445  Assert(entry->txn->nentries_mem);
1446  /* txn stays the same */
1447  state->entries[off].lsn = next_change->lsn;
1448  state->entries[off].change = next_change;
1450 
1451  return change;
1452  }
1453  }
1454 
1455  /* ok, no changes there anymore, remove */
1457 
1458  return change;
1459 }
1460 
1461 /*
1462  * Deallocate the iterator
1463  */
1464 static void
1467 {
1468  int32 off;
1469 
1470  for (off = 0; off < state->nr_txns; off++)
1471  {
1472  if (state->entries[off].file.vfd != -1)
1473  FileClose(state->entries[off].file.vfd);
1474  }
1475 
1476  /* free memory we might have "leaked" in the last *Next call */
1477  if (!dlist_is_empty(&state->old_change))
1478  {
1479  ReorderBufferChange *change;
1480 
1481  change = dlist_container(ReorderBufferChange, node,
1482  dlist_pop_head_node(&state->old_change));
1483  ReorderBufferReturnChange(rb, change, true);
1484  Assert(dlist_is_empty(&state->old_change));
1485  }
1486 
1487  binaryheap_free(state->heap);
1488  pfree(state);
1489 }
1490 
1491 /*
1492  * Cleanup the contents of a transaction, usually after the transaction
1493  * committed or aborted.
1494  */
1495 static void
1497 {
1498  bool found;
1499  dlist_mutable_iter iter;
1500 
1501  /* cleanup subtransactions & their changes */
1502  dlist_foreach_modify(iter, &txn->subtxns)
1503  {
1504  ReorderBufferTXN *subtxn;
1505 
1506  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1507 
1508  /*
1509  * Subtransactions are always associated to the toplevel TXN, even if
1510  * they originally were happening inside another subtxn, so we won't
1511  * ever recurse more than one level deep here.
1512  */
1513  Assert(rbtxn_is_known_subxact(subtxn));
1514  Assert(subtxn->nsubtxns == 0);
1515 
1516  ReorderBufferCleanupTXN(rb, subtxn);
1517  }
1518 
1519  /* cleanup changes in the txn */
1520  dlist_foreach_modify(iter, &txn->changes)
1521  {
1522  ReorderBufferChange *change;
1523 
1524  change = dlist_container(ReorderBufferChange, node, iter.cur);
1525 
1526  /* Check we're not mixing changes from different transactions. */
1527  Assert(change->txn == txn);
1528 
1529  ReorderBufferReturnChange(rb, change, true);
1530  }
1531 
1532  /*
1533  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1534  * They are always stored in the toplevel transaction.
1535  */
1536  dlist_foreach_modify(iter, &txn->tuplecids)
1537  {
1538  ReorderBufferChange *change;
1539 
1540  change = dlist_container(ReorderBufferChange, node, iter.cur);
1541 
1542  /* Check we're not mixing changes from different transactions. */
1543  Assert(change->txn == txn);
1545 
1546  ReorderBufferReturnChange(rb, change, true);
1547  }
1548 
1549  /*
1550  * Cleanup the base snapshot, if set.
1551  */
1552  if (txn->base_snapshot != NULL)
1553  {
1556  }
1557 
1558  /*
1559  * Cleanup the snapshot for the last streamed run.
1560  */
1561  if (txn->snapshot_now != NULL)
1562  {
1563  Assert(rbtxn_is_streamed(txn));
1565  }
1566 
1567  /*
1568  * Remove TXN from its containing lists.
1569  *
1570  * Note: if txn is known as subxact, we are deleting the TXN from its
1571  * parent's list of known subxacts; this leaves the parent's nsubxacts
1572  * count too high, but we don't care. Otherwise, we are deleting the TXN
1573  * from the LSN-ordered list of toplevel TXNs. We remove the TXN from the
1574  * list of catalog modifying transactions as well.
1575  */
1576  dlist_delete(&txn->node);
1577  if (rbtxn_has_catalog_changes(txn))
1579 
1580  /* now remove reference from buffer */
1581  hash_search(rb->by_txn, &txn->xid, HASH_REMOVE, &found);
1582  Assert(found);
1583 
1584  /* remove entries spilled to disk */
1585  if (rbtxn_is_serialized(txn))
1586  ReorderBufferRestoreCleanup(rb, txn);
1587 
1588  /* deallocate */
1589  ReorderBufferReturnTXN(rb, txn);
1590 }
1591 
1592 /*
1593  * Discard changes from a transaction (and subtransactions), either after
1594  * streaming or decoding them at PREPARE. Keep the remaining info -
1595  * transactions, tuplecids, invalidations and snapshots.
1596  *
1597  * We additionally remove tuplecids after decoding the transaction at prepare
1598  * time as we only need to perform invalidation at rollback or commit prepared.
1599  *
1600  * 'txn_prepared' indicates that we have decoded the transaction at prepare
1601  * time.
1602  */
1603 static void
1605 {
1606  dlist_mutable_iter iter;
1607 
1608  /* cleanup subtransactions & their changes */
1609  dlist_foreach_modify(iter, &txn->subtxns)
1610  {
1611  ReorderBufferTXN *subtxn;
1612 
1613  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1614 
1615  /*
1616  * Subtransactions are always associated to the toplevel TXN, even if
1617  * they originally were happening inside another subtxn, so we won't
1618  * ever recurse more than one level deep here.
1619  */
1620  Assert(rbtxn_is_known_subxact(subtxn));
1621  Assert(subtxn->nsubtxns == 0);
1622 
1623  ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
1624  }
1625 
1626  /* cleanup changes in the txn */
1627  dlist_foreach_modify(iter, &txn->changes)
1628  {
1629  ReorderBufferChange *change;
1630 
1631  change = dlist_container(ReorderBufferChange, node, iter.cur);
1632 
1633  /* Check we're not mixing changes from different transactions. */
1634  Assert(change->txn == txn);
1635 
1636  /* remove the change from it's containing list */
1637  dlist_delete(&change->node);
1638 
1639  ReorderBufferReturnChange(rb, change, true);
1640  }
1641 
1642  /*
1643  * Mark the transaction as streamed.
1644  *
1645  * The top-level transaction, is marked as streamed always, even if it
1646  * does not contain any changes (that is, when all the changes are in
1647  * subtransactions).
1648  *
1649  * For subtransactions, we only mark them as streamed when there are
1650  * changes in them.
1651  *
1652  * We do it this way because of aborts - we don't want to send aborts for
1653  * XIDs the downstream is not aware of. And of course, it always knows
1654  * about the toplevel xact (we send the XID in all messages), but we never
1655  * stream XIDs of empty subxacts.
1656  */
1657  if ((!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0)))
1658  txn->txn_flags |= RBTXN_IS_STREAMED;
1659 
1660  if (txn_prepared)
1661  {
1662  /*
1663  * If this is a prepared txn, cleanup the tuplecids we stored for
1664  * decoding catalog snapshot access. They are always stored in the
1665  * toplevel transaction.
1666  */
1667  dlist_foreach_modify(iter, &txn->tuplecids)
1668  {
1669  ReorderBufferChange *change;
1670 
1671  change = dlist_container(ReorderBufferChange, node, iter.cur);
1672 
1673  /* Check we're not mixing changes from different transactions. */
1674  Assert(change->txn == txn);
1676 
1677  /* Remove the change from its containing list. */
1678  dlist_delete(&change->node);
1679 
1680  ReorderBufferReturnChange(rb, change, true);
1681  }
1682  }
1683 
1684  /*
1685  * Destroy the (relfilelocator, ctid) hashtable, so that we don't leak any
1686  * memory. We could also keep the hash table and update it with new ctid
1687  * values, but this seems simpler and good enough for now.
1688  */
1689  if (txn->tuplecid_hash != NULL)
1690  {
1692  txn->tuplecid_hash = NULL;
1693  }
1694 
1695  /* If this txn is serialized then clean the disk space. */
1696  if (rbtxn_is_serialized(txn))
1697  {
1698  ReorderBufferRestoreCleanup(rb, txn);
1699  txn->txn_flags &= ~RBTXN_IS_SERIALIZED;
1700 
1701  /*
1702  * We set this flag to indicate if the transaction is ever serialized.
1703  * We need this to accurately update the stats as otherwise the same
1704  * transaction can be counted as serialized multiple times.
1705  */
1707  }
1708 
1709  /* also reset the number of entries in the transaction */
1710  txn->nentries_mem = 0;
1711  txn->nentries = 0;
1712 }
1713 
1714 /*
1715  * Build a hash with a (relfilelocator, ctid) -> (cmin, cmax) mapping for use by
1716  * HeapTupleSatisfiesHistoricMVCC.
1717  */
1718 static void
1720 {
1721  dlist_iter iter;
1722  HASHCTL hash_ctl;
1723 
1725  return;
1726 
1727  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1728  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1729  hash_ctl.hcxt = rb->context;
1730 
1731  /*
1732  * create the hash with the exact number of to-be-stored tuplecids from
1733  * the start
1734  */
1735  txn->tuplecid_hash =
1736  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1738 
1739  dlist_foreach(iter, &txn->tuplecids)
1740  {
1743  bool found;
1744  ReorderBufferChange *change;
1745 
1746  change = dlist_container(ReorderBufferChange, node, iter.cur);
1747 
1749 
1750  /* be careful about padding */
1751  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1752 
1753  key.rlocator = change->data.tuplecid.locator;
1754 
1755  ItemPointerCopy(&change->data.tuplecid.tid,
1756  &key.tid);
1757 
1758  ent = (ReorderBufferTupleCidEnt *)
1759  hash_search(txn->tuplecid_hash, &key, HASH_ENTER, &found);
1760  if (!found)
1761  {
1762  ent->cmin = change->data.tuplecid.cmin;
1763  ent->cmax = change->data.tuplecid.cmax;
1764  ent->combocid = change->data.tuplecid.combocid;
1765  }
1766  else
1767  {
1768  /*
1769  * Maybe we already saw this tuple before in this transaction, but
1770  * if so it must have the same cmin.
1771  */
1772  Assert(ent->cmin == change->data.tuplecid.cmin);
1773 
1774  /*
1775  * cmax may be initially invalid, but once set it can only grow,
1776  * and never become invalid again.
1777  */
1778  Assert((ent->cmax == InvalidCommandId) ||
1779  ((change->data.tuplecid.cmax != InvalidCommandId) &&
1780  (change->data.tuplecid.cmax > ent->cmax)));
1781  ent->cmax = change->data.tuplecid.cmax;
1782  }
1783  }
1784 }
1785 
1786 /*
1787  * Copy a provided snapshot so we can modify it privately. This is needed so
1788  * that catalog modifying transactions can look into intermediate catalog
1789  * states.
1790  */
1791 static Snapshot
1793  ReorderBufferTXN *txn, CommandId cid)
1794 {
1795  Snapshot snap;
1796  dlist_iter iter;
1797  int i = 0;
1798  Size size;
1799 
1800  size = sizeof(SnapshotData) +
1801  sizeof(TransactionId) * orig_snap->xcnt +
1802  sizeof(TransactionId) * (txn->nsubtxns + 1);
1803 
1804  snap = MemoryContextAllocZero(rb->context, size);
1805  memcpy(snap, orig_snap, sizeof(SnapshotData));
1806 
1807  snap->copied = true;
1808  snap->active_count = 1; /* mark as active so nobody frees it */
1809  snap->regd_count = 0;
1810  snap->xip = (TransactionId *) (snap + 1);
1811 
1812  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1813 
1814  /*
1815  * snap->subxip contains all txids that belong to our transaction which we
1816  * need to check via cmin/cmax. That's why we store the toplevel
1817  * transaction in there as well.
1818  */
1819  snap->subxip = snap->xip + snap->xcnt;
1820  snap->subxip[i++] = txn->xid;
1821 
1822  /*
1823  * subxcnt isn't decreased when subtransactions abort, so count manually.
1824  * Since it's an upper boundary it is safe to use it for the allocation
1825  * above.
1826  */
1827  snap->subxcnt = 1;
1828 
1829  dlist_foreach(iter, &txn->subtxns)
1830  {
1831  ReorderBufferTXN *sub_txn;
1832 
1833  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1834  snap->subxip[i++] = sub_txn->xid;
1835  snap->subxcnt++;
1836  }
1837 
1838  /* sort so we can bsearch() later */
1839  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1840 
1841  /* store the specified current CommandId */
1842  snap->curcid = cid;
1843 
1844  return snap;
1845 }
1846 
1847 /*
1848  * Free a previously ReorderBufferCopySnap'ed snapshot
1849  */
1850 static void
1852 {
1853  if (snap->copied)
1854  pfree(snap);
1855  else
1857 }
1858 
1859 /*
1860  * If the transaction was (partially) streamed, we need to prepare or commit
1861  * it in a 'streamed' way. That is, we first stream the remaining part of the
1862  * transaction, and then invoke stream_prepare or stream_commit message as per
1863  * the case.
1864  */
1865 static void
1867 {
1868  /* we should only call this for previously streamed transactions */
1869  Assert(rbtxn_is_streamed(txn));
1870 
1871  ReorderBufferStreamTXN(rb, txn);
1872 
1873  if (rbtxn_prepared(txn))
1874  {
1875  /*
1876  * Note, we send stream prepare even if a concurrent abort is
1877  * detected. See DecodePrepare for more information.
1878  */
1879  rb->stream_prepare(rb, txn, txn->final_lsn);
1880 
1881  /*
1882  * This is a PREPARED transaction, part of a two-phase commit. The
1883  * full cleanup will happen as part of the COMMIT PREPAREDs, so now
1884  * just truncate txn by removing changes and tuplecids.
1885  */
1886  ReorderBufferTruncateTXN(rb, txn, true);
1887  /* Reset the CheckXidAlive */
1889  }
1890  else
1891  {
1892  rb->stream_commit(rb, txn, txn->final_lsn);
1893  ReorderBufferCleanupTXN(rb, txn);
1894  }
1895 }
1896 
1897 /*
1898  * Set xid to detect concurrent aborts.
1899  *
1900  * While streaming an in-progress transaction or decoding a prepared
1901  * transaction there is a possibility that the (sub)transaction might get
1902  * aborted concurrently. In such case if the (sub)transaction has catalog
1903  * update then we might decode the tuple using wrong catalog version. For
1904  * example, suppose there is one catalog tuple with (xmin: 500, xmax: 0). Now,
1905  * the transaction 501 updates the catalog tuple and after that we will have
1906  * two tuples (xmin: 500, xmax: 501) and (xmin: 501, xmax: 0). Now, if 501 is
1907  * aborted and some other transaction say 502 updates the same catalog tuple
1908  * then the first tuple will be changed to (xmin: 500, xmax: 502). So, the
1909  * problem is that when we try to decode the tuple inserted/updated in 501
1910  * after the catalog update, we will see the catalog tuple with (xmin: 500,
1911  * xmax: 502) as visible because it will consider that the tuple is deleted by
1912  * xid 502 which is not visible to our snapshot. And when we will try to
1913  * decode with that catalog tuple, it can lead to a wrong result or a crash.
1914  * So, it is necessary to detect concurrent aborts to allow streaming of
1915  * in-progress transactions or decoding of prepared transactions.
1916  *
1917  * For detecting the concurrent abort we set CheckXidAlive to the current
1918  * (sub)transaction's xid for which this change belongs to. And, during
1919  * catalog scan we can check the status of the xid and if it is aborted we will
1920  * report a specific error so that we can stop streaming current transaction
1921  * and discard the already streamed changes on such an error. We might have
1922  * already streamed some of the changes for the aborted (sub)transaction, but
1923  * that is fine because when we decode the abort we will stream abort message
1924  * to truncate the changes in the subscriber. Similarly, for prepared
1925  * transactions, we stop decoding if concurrent abort is detected and then
1926  * rollback the changes when rollback prepared is encountered. See
1927  * DecodePrepare.
1928  */
1929 static inline void
1931 {
1932  /*
1933  * If the input transaction id is already set as a CheckXidAlive then
1934  * nothing to do.
1935  */
1937  return;
1938 
1939  /*
1940  * setup CheckXidAlive if it's not committed yet. We don't check if the
1941  * xid is aborted. That will happen during catalog access.
1942  */
1943  if (!TransactionIdDidCommit(xid))
1944  CheckXidAlive = xid;
1945  else
1947 }
1948 
1949 /*
1950  * Helper function for ReorderBufferProcessTXN for applying change.
1951  */
1952 static inline void
1954  Relation relation, ReorderBufferChange *change,
1955  bool streaming)
1956 {
1957  if (streaming)
1958  rb->stream_change(rb, txn, relation, change);
1959  else
1960  rb->apply_change(rb, txn, relation, change);
1961 }
1962 
1963 /*
1964  * Helper function for ReorderBufferProcessTXN for applying the truncate.
1965  */
1966 static inline void
1968  int nrelations, Relation *relations,
1969  ReorderBufferChange *change, bool streaming)
1970 {
1971  if (streaming)
1972  rb->stream_truncate(rb, txn, nrelations, relations, change);
1973  else
1974  rb->apply_truncate(rb, txn, nrelations, relations, change);
1975 }
1976 
1977 /*
1978  * Helper function for ReorderBufferProcessTXN for applying the message.
1979  */
1980 static inline void
1982  ReorderBufferChange *change, bool streaming)
1983 {
1984  if (streaming)
1985  rb->stream_message(rb, txn, change->lsn, true,
1986  change->data.msg.prefix,
1987  change->data.msg.message_size,
1988  change->data.msg.message);
1989  else
1990  rb->message(rb, txn, change->lsn, true,
1991  change->data.msg.prefix,
1992  change->data.msg.message_size,
1993  change->data.msg.message);
1994 }
1995 
1996 /*
1997  * Function to store the command id and snapshot at the end of the current
1998  * stream so that we can reuse the same while sending the next stream.
1999  */
2000 static inline void
2002  Snapshot snapshot_now, CommandId command_id)
2003 {
2004  txn->command_id = command_id;
2005 
2006  /* Avoid copying if it's already copied. */
2007  if (snapshot_now->copied)
2008  txn->snapshot_now = snapshot_now;
2009  else
2010  txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2011  txn, command_id);
2012 }
2013 
2014 /*
2015  * Helper function for ReorderBufferProcessTXN to handle the concurrent
2016  * abort of the streaming transaction. This resets the TXN such that it
2017  * can be used to stream the remaining data of transaction being processed.
2018  * This can happen when the subtransaction is aborted and we still want to
2019  * continue processing the main or other subtransactions data.
2020  */
2021 static void
2023  Snapshot snapshot_now,
2024  CommandId command_id,
2025  XLogRecPtr last_lsn,
2026  ReorderBufferChange *specinsert)
2027 {
2028  /* Discard the changes that we just streamed */
2030 
2031  /* Free all resources allocated for toast reconstruction */
2032  ReorderBufferToastReset(rb, txn);
2033 
2034  /* Return the spec insert change if it is not NULL */
2035  if (specinsert != NULL)
2036  {
2037  ReorderBufferReturnChange(rb, specinsert, true);
2038  specinsert = NULL;
2039  }
2040 
2041  /*
2042  * For the streaming case, stop the stream and remember the command ID and
2043  * snapshot for the streaming run.
2044  */
2045  if (rbtxn_is_streamed(txn))
2046  {
2047  rb->stream_stop(rb, txn, last_lsn);
2048  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2049  }
2050 }
2051 
2052 /*
2053  * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN.
2054  *
2055  * Send data of a transaction (and its subtransactions) to the
2056  * output plugin. We iterate over the top and subtransactions (using a k-way
2057  * merge) and replay the changes in lsn order.
2058  *
2059  * If streaming is true then data will be sent using stream API.
2060  *
2061  * Note: "volatile" markers on some parameters are to avoid trouble with
2062  * PG_TRY inside the function.
2063  */
2064 static void
2066  XLogRecPtr commit_lsn,
2067  volatile Snapshot snapshot_now,
2068  volatile CommandId command_id,
2069  bool streaming)
2070 {
2071  bool using_subtxn;
2073  ReorderBufferIterTXNState *volatile iterstate = NULL;
2074  volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
2075  ReorderBufferChange *volatile specinsert = NULL;
2076  volatile bool stream_started = false;
2077  ReorderBufferTXN *volatile curtxn = NULL;
2078 
2079  /* build data to be able to lookup the CommandIds of catalog tuples */
2081 
2082  /* setup the initial snapshot */
2083  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2084 
2085  /*
2086  * Decoding needs access to syscaches et al., which in turn use
2087  * heavyweight locks and such. Thus we need to have enough state around to
2088  * keep track of those. The easiest way is to simply use a transaction
2089  * internally. That also allows us to easily enforce that nothing writes
2090  * to the database by checking for xid assignments.
2091  *
2092  * When we're called via the SQL SRF there's already a transaction
2093  * started, so start an explicit subtransaction there.
2094  */
2095  using_subtxn = IsTransactionOrTransactionBlock();
2096 
2097  PG_TRY();
2098  {
2099  ReorderBufferChange *change;
2100  int changes_count = 0; /* used to accumulate the number of
2101  * changes */
2102 
2103  if (using_subtxn)
2104  BeginInternalSubTransaction(streaming ? "stream" : "replay");
2105  else
2107 
2108  /*
2109  * We only need to send begin/begin-prepare for non-streamed
2110  * transactions.
2111  */
2112  if (!streaming)
2113  {
2114  if (rbtxn_prepared(txn))
2115  rb->begin_prepare(rb, txn);
2116  else
2117  rb->begin(rb, txn);
2118  }
2119 
2120  ReorderBufferIterTXNInit(rb, txn, &iterstate);
2121  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
2122  {
2123  Relation relation = NULL;
2124  Oid reloid;
2125 
2127 
2128  /*
2129  * We can't call start stream callback before processing first
2130  * change.
2131  */
2132  if (prev_lsn == InvalidXLogRecPtr)
2133  {
2134  if (streaming)
2135  {
2136  txn->origin_id = change->origin_id;
2137  rb->stream_start(rb, txn, change->lsn);
2138  stream_started = true;
2139  }
2140  }
2141 
2142  /*
2143  * Enforce correct ordering of changes, merged from multiple
2144  * subtransactions. The changes may have the same LSN due to
2145  * MULTI_INSERT xlog records.
2146  */
2147  Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
2148 
2149  prev_lsn = change->lsn;
2150 
2151  /*
2152  * Set the current xid to detect concurrent aborts. This is
2153  * required for the cases when we decode the changes before the
2154  * COMMIT record is processed.
2155  */
2156  if (streaming || rbtxn_prepared(change->txn))
2157  {
2158  curtxn = change->txn;
2159  SetupCheckXidLive(curtxn->xid);
2160  }
2161 
2162  switch (change->action)
2163  {
2165 
2166  /*
2167  * Confirmation for speculative insertion arrived. Simply
2168  * use as a normal record. It'll be cleaned up at the end
2169  * of INSERT processing.
2170  */
2171  if (specinsert == NULL)
2172  elog(ERROR, "invalid ordering of speculative insertion changes");
2173  Assert(specinsert->data.tp.oldtuple == NULL);
2174  change = specinsert;
2176 
2177  /* intentionally fall through */
2181  Assert(snapshot_now);
2182 
2183  reloid = RelidByRelfilenumber(change->data.tp.rlocator.spcOid,
2184  change->data.tp.rlocator.relNumber);
2185 
2186  /*
2187  * Mapped catalog tuple without data, emitted while
2188  * catalog table was in the process of being rewritten. We
2189  * can fail to look up the relfilenumber, because the
2190  * relmapper has no "historic" view, in contrast to the
2191  * normal catalog during decoding. Thus repeated rewrites
2192  * can cause a lookup failure. That's OK because we do not
2193  * decode catalog changes anyway. Normally such tuples
2194  * would be skipped over below, but we can't identify
2195  * whether the table should be logically logged without
2196  * mapping the relfilenumber to the oid.
2197  */
2198  if (reloid == InvalidOid &&
2199  change->data.tp.newtuple == NULL &&
2200  change->data.tp.oldtuple == NULL)
2201  goto change_done;
2202  else if (reloid == InvalidOid)
2203  elog(ERROR, "could not map filenumber \"%s\" to relation OID",
2204  relpathperm(change->data.tp.rlocator,
2205  MAIN_FORKNUM));
2206 
2207  relation = RelationIdGetRelation(reloid);
2208 
2209  if (!RelationIsValid(relation))
2210  elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",
2211  reloid,
2212  relpathperm(change->data.tp.rlocator,
2213  MAIN_FORKNUM));
2214 
2215  if (!RelationIsLogicallyLogged(relation))
2216  goto change_done;
2217 
2218  /*
2219  * Ignore temporary heaps created during DDL unless the
2220  * plugin has asked for them.
2221  */
2222  if (relation->rd_rel->relrewrite && !rb->output_rewrites)
2223  goto change_done;
2224 
2225  /*
2226  * For now ignore sequence changes entirely. Most of the
2227  * time they don't log changes using records we
2228  * understand, so it doesn't make sense to handle the few
2229  * cases we do.
2230  */
2231  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
2232  goto change_done;
2233 
2234  /* user-triggered change */
2235  if (!IsToastRelation(relation))
2236  {
2237  ReorderBufferToastReplace(rb, txn, relation, change);
2238  ReorderBufferApplyChange(rb, txn, relation, change,
2239  streaming);
2240 
2241  /*
2242  * Only clear reassembled toast chunks if we're sure
2243  * they're not required anymore. The creator of the
2244  * tuple tells us.
2245  */
2246  if (change->data.tp.clear_toast_afterwards)
2247  ReorderBufferToastReset(rb, txn);
2248  }
2249  /* we're not interested in toast deletions */
2250  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
2251  {
2252  /*
2253  * Need to reassemble the full toasted Datum in
2254  * memory, to ensure the chunks don't get reused till
2255  * we're done remove it from the list of this
2256  * transaction's changes. Otherwise it will get
2257  * freed/reused while restoring spooled data from
2258  * disk.
2259  */
2260  Assert(change->data.tp.newtuple != NULL);
2261 
2262  dlist_delete(&change->node);
2263  ReorderBufferToastAppendChunk(rb, txn, relation,
2264  change);
2265  }
2266 
2267  change_done:
2268 
2269  /*
2270  * If speculative insertion was confirmed, the record
2271  * isn't needed anymore.
2272  */
2273  if (specinsert != NULL)
2274  {
2275  ReorderBufferReturnChange(rb, specinsert, true);
2276  specinsert = NULL;
2277  }
2278 
2279  if (RelationIsValid(relation))
2280  {
2281  RelationClose(relation);
2282  relation = NULL;
2283  }
2284  break;
2285 
2287 
2288  /*
2289  * Speculative insertions are dealt with by delaying the
2290  * processing of the insert until the confirmation record
2291  * arrives. For that we simply unlink the record from the
2292  * chain, so it does not get freed/reused while restoring
2293  * spooled data from disk.
2294  *
2295  * This is safe in the face of concurrent catalog changes
2296  * because the relevant relation can't be changed between
2297  * speculative insertion and confirmation due to
2298  * CheckTableNotInUse() and locking.
2299  */
2300 
2301  /* clear out a pending (and thus failed) speculation */
2302  if (specinsert != NULL)
2303  {
2304  ReorderBufferReturnChange(rb, specinsert, true);
2305  specinsert = NULL;
2306  }
2307 
2308  /* and memorize the pending insertion */
2309  dlist_delete(&change->node);
2310  specinsert = change;
2311  break;
2312 
2314 
2315  /*
2316  * Abort for speculative insertion arrived. So cleanup the
2317  * specinsert tuple and toast hash.
2318  *
2319  * Note that we get the spec abort change for each toast
2320  * entry but we need to perform the cleanup only the first
2321  * time we get it for the main table.
2322  */
2323  if (specinsert != NULL)
2324  {
2325  /*
2326  * We must clean the toast hash before processing a
2327  * completely new tuple to avoid confusion about the
2328  * previous tuple's toast chunks.
2329  */
2330  Assert(change->data.tp.clear_toast_afterwards);
2331  ReorderBufferToastReset(rb, txn);
2332 
2333  /* We don't need this record anymore. */
2334  ReorderBufferReturnChange(rb, specinsert, true);
2335  specinsert = NULL;
2336  }
2337  break;
2338 
2340  {
2341  int i;
2342  int nrelids = change->data.truncate.nrelids;
2343  int nrelations = 0;
2344  Relation *relations;
2345 
2346  relations = palloc0(nrelids * sizeof(Relation));
2347  for (i = 0; i < nrelids; i++)
2348  {
2349  Oid relid = change->data.truncate.relids[i];
2350  Relation rel;
2351 
2352  rel = RelationIdGetRelation(relid);
2353 
2354  if (!RelationIsValid(rel))
2355  elog(ERROR, "could not open relation with OID %u", relid);
2356 
2357  if (!RelationIsLogicallyLogged(rel))
2358  continue;
2359 
2360  relations[nrelations++] = rel;
2361  }
2362 
2363  /* Apply the truncate. */
2364  ReorderBufferApplyTruncate(rb, txn, nrelations,
2365  relations, change,
2366  streaming);
2367 
2368  for (i = 0; i < nrelations; i++)
2369  RelationClose(relations[i]);
2370 
2371  break;
2372  }
2373 
2375  ReorderBufferApplyMessage(rb, txn, change, streaming);
2376  break;
2377 
2379  /* Execute the invalidation messages locally */
2380  ReorderBufferExecuteInvalidations(change->data.inval.ninvalidations,
2381  change->data.inval.invalidations);
2382  break;
2383 
2385  /* get rid of the old */
2386  TeardownHistoricSnapshot(false);
2387 
2388  if (snapshot_now->copied)
2389  {
2390  ReorderBufferFreeSnap(rb, snapshot_now);
2391  snapshot_now =
2392  ReorderBufferCopySnap(rb, change->data.snapshot,
2393  txn, command_id);
2394  }
2395 
2396  /*
2397  * Restored from disk, need to be careful not to double
2398  * free. We could introduce refcounting for that, but for
2399  * now this seems infrequent enough not to care.
2400  */
2401  else if (change->data.snapshot->copied)
2402  {
2403  snapshot_now =
2404  ReorderBufferCopySnap(rb, change->data.snapshot,
2405  txn, command_id);
2406  }
2407  else
2408  {
2409  snapshot_now = change->data.snapshot;
2410  }
2411 
2412  /* and continue with the new one */
2413  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2414  break;
2415 
2417  Assert(change->data.command_id != InvalidCommandId);
2418 
2419  if (command_id < change->data.command_id)
2420  {
2421  command_id = change->data.command_id;
2422 
2423  if (!snapshot_now->copied)
2424  {
2425  /* we don't use the global one anymore */
2426  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2427  txn, command_id);
2428  }
2429 
2430  snapshot_now->curcid = command_id;
2431 
2432  TeardownHistoricSnapshot(false);
2433  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2434  }
2435 
2436  break;
2437 
2439  elog(ERROR, "tuplecid value in changequeue");
2440  break;
2441  }
2442 
2443  /*
2444  * It is possible that the data is not sent to downstream for a
2445  * long time either because the output plugin filtered it or there
2446  * is a DDL that generates a lot of data that is not processed by
2447  * the plugin. So, in such cases, the downstream can timeout. To
2448  * avoid that we try to send a keepalive message if required.
2449  * Trying to send a keepalive message after every change has some
2450  * overhead, but testing showed there is no noticeable overhead if
2451  * we do it after every ~100 changes.
2452  */
2453 #define CHANGES_THRESHOLD 100
2454 
2455  if (++changes_count >= CHANGES_THRESHOLD)
2456  {
2457  rb->update_progress_txn(rb, txn, change->lsn);
2458  changes_count = 0;
2459  }
2460  }
2461 
2462  /* speculative insertion record must be freed by now */
2463  Assert(!specinsert);
2464 
2465  /* clean up the iterator */
2466  ReorderBufferIterTXNFinish(rb, iterstate);
2467  iterstate = NULL;
2468 
2469  /*
2470  * Update total transaction count and total bytes processed by the
2471  * transaction and its subtransactions. Ensure to not count the
2472  * streamed transaction multiple times.
2473  *
2474  * Note that the statistics computation has to be done after
2475  * ReorderBufferIterTXNFinish as it releases the serialized change
2476  * which we have already accounted in ReorderBufferIterTXNNext.
2477  */
2478  if (!rbtxn_is_streamed(txn))
2479  rb->totalTxns++;
2480 
2481  rb->totalBytes += txn->total_size;
2482 
2483  /*
2484  * Done with current changes, send the last message for this set of
2485  * changes depending upon streaming mode.
2486  */
2487  if (streaming)
2488  {
2489  if (stream_started)
2490  {
2491  rb->stream_stop(rb, txn, prev_lsn);
2492  stream_started = false;
2493  }
2494  }
2495  else
2496  {
2497  /*
2498  * Call either PREPARE (for two-phase transactions) or COMMIT (for
2499  * regular ones).
2500  */
2501  if (rbtxn_prepared(txn))
2502  rb->prepare(rb, txn, commit_lsn);
2503  else
2504  rb->commit(rb, txn, commit_lsn);
2505  }
2506 
2507  /* this is just a sanity check against bad output plugin behaviour */
2509  elog(ERROR, "output plugin used XID %u",
2511 
2512  /*
2513  * Remember the command ID and snapshot for the next set of changes in
2514  * streaming mode.
2515  */
2516  if (streaming)
2517  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2518  else if (snapshot_now->copied)
2519  ReorderBufferFreeSnap(rb, snapshot_now);
2520 
2521  /* cleanup */
2522  TeardownHistoricSnapshot(false);
2523 
2524  /*
2525  * Aborting the current (sub-)transaction as a whole has the right
2526  * semantics. We want all locks acquired in here to be released, not
2527  * reassigned to the parent and we do not want any database access
2528  * have persistent effects.
2529  */
2531 
2532  /* make sure there's no cache pollution */
2534 
2535  if (using_subtxn)
2537 
2538  /*
2539  * We are here due to one of the four reasons: 1. Decoding an
2540  * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
2541  * prepared txn that was (partially) streamed. 4. Decoding a committed
2542  * txn.
2543  *
2544  * For 1, we allow truncation of txn data by removing the changes
2545  * already streamed but still keeping other things like invalidations,
2546  * snapshot, and tuplecids. For 2 and 3, we indicate
2547  * ReorderBufferTruncateTXN to do more elaborate truncation of txn
2548  * data as the entire transaction has been decoded except for commit.
2549  * For 4, as the entire txn has been decoded, we can fully clean up
2550  * the TXN reorder buffer.
2551  */
2552  if (streaming || rbtxn_prepared(txn))
2553  {
2555  /* Reset the CheckXidAlive */
2557  }
2558  else
2559  ReorderBufferCleanupTXN(rb, txn);
2560  }
2561  PG_CATCH();
2562  {
2563  MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
2564  ErrorData *errdata = CopyErrorData();
2565 
2566  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
2567  if (iterstate)
2568  ReorderBufferIterTXNFinish(rb, iterstate);
2569 
2571 
2572  /*
2573  * Force cache invalidation to happen outside of a valid transaction
2574  * to prevent catalog access as we just caught an error.
2575  */
2577 
2578  /* make sure there's no cache pollution */
2580  txn->invalidations);
2581 
2582  if (using_subtxn)
2584 
2585  /*
2586  * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
2587  * abort of the (sub)transaction we are streaming or preparing. We
2588  * need to do the cleanup and return gracefully on this error, see
2589  * SetupCheckXidLive.
2590  *
2591  * This error code can be thrown by one of the callbacks we call
2592  * during decoding so we need to ensure that we return gracefully only
2593  * when we are sending the data in streaming mode and the streaming is
2594  * not finished yet or when we are sending the data out on a PREPARE
2595  * during a two-phase commit.
2596  */
2597  if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
2598  (stream_started || rbtxn_prepared(txn)))
2599  {
2600  /* curtxn must be set for streaming or prepared transactions */
2601  Assert(curtxn);
2602 
2603  /* Cleanup the temporary error state. */
2604  FlushErrorState();
2605  FreeErrorData(errdata);
2606  errdata = NULL;
2607  curtxn->concurrent_abort = true;
2608 
2609  /* Reset the TXN so that it is allowed to stream remaining data. */
2610  ReorderBufferResetTXN(rb, txn, snapshot_now,
2611  command_id, prev_lsn,
2612  specinsert);
2613  }
2614  else
2615  {
2616  ReorderBufferCleanupTXN(rb, txn);
2617  MemoryContextSwitchTo(ecxt);
2618  PG_RE_THROW();
2619  }
2620  }
2621  PG_END_TRY();
2622 }
2623 
2624 /*
2625  * Perform the replay of a transaction and its non-aborted subtransactions.
2626  *
2627  * Subtransactions previously have to be processed by
2628  * ReorderBufferCommitChild(), even if previously assigned to the toplevel
2629  * transaction with ReorderBufferAssignChild.
2630  *
2631  * This interface is called once a prepare or toplevel commit is read for both
2632  * streamed as well as non-streamed transactions.
2633  */
2634 static void
2636  ReorderBuffer *rb, TransactionId xid,
2637  XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2638  TimestampTz commit_time,
2639  RepOriginId origin_id, XLogRecPtr origin_lsn)
2640 {
2641  Snapshot snapshot_now;
2642  CommandId command_id = FirstCommandId;
2643 
2644  txn->final_lsn = commit_lsn;
2645  txn->end_lsn = end_lsn;
2646  txn->xact_time.commit_time = commit_time;
2647  txn->origin_id = origin_id;
2648  txn->origin_lsn = origin_lsn;
2649 
2650  /*
2651  * If the transaction was (partially) streamed, we need to commit it in a
2652  * 'streamed' way. That is, we first stream the remaining part of the
2653  * transaction, and then invoke stream_commit message.
2654  *
2655  * Called after everything (origin ID, LSN, ...) is stored in the
2656  * transaction to avoid passing that information directly.
2657  */
2658  if (rbtxn_is_streamed(txn))
2659  {
2660  ReorderBufferStreamCommit(rb, txn);
2661  return;
2662  }
2663 
2664  /*
2665  * If this transaction has no snapshot, it didn't make any changes to the
2666  * database, so there's nothing to decode. Note that
2667  * ReorderBufferCommitChild will have transferred any snapshots from
2668  * subtransactions if there were any.
2669  */
2670  if (txn->base_snapshot == NULL)
2671  {
2672  Assert(txn->ninvalidations == 0);
2673 
2674  /*
2675  * Removing this txn before a commit might result in the computation
2676  * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
2677  */
2678  if (!rbtxn_prepared(txn))
2679  ReorderBufferCleanupTXN(rb, txn);
2680  return;
2681  }
2682 
2683  snapshot_now = txn->base_snapshot;
2684 
2685  /* Process and send the changes to output plugin. */
2686  ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2687  command_id, false);
2688 }
2689 
2690 /*
2691  * Commit a transaction.
2692  *
2693  * See comments for ReorderBufferReplay().
2694  */
2695 void
2697  XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2698  TimestampTz commit_time,
2699  RepOriginId origin_id, XLogRecPtr origin_lsn)
2700 {
2701  ReorderBufferTXN *txn;
2702 
2703  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2704  false);
2705 
2706  /* unknown transaction, nothing to replay */
2707  if (txn == NULL)
2708  return;
2709 
2710  ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2711  origin_id, origin_lsn);
2712 }
2713 
2714 /*
2715  * Record the prepare information for a transaction.
2716  */
2717 bool
2719  XLogRecPtr prepare_lsn, XLogRecPtr end_lsn,
2720  TimestampTz prepare_time,
2721  RepOriginId origin_id, XLogRecPtr origin_lsn)
2722 {
2723  ReorderBufferTXN *txn;
2724 
2725  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2726 
2727  /* unknown transaction, nothing to do */
2728  if (txn == NULL)
2729  return false;
2730 
2731  /*
2732  * Remember the prepare information to be later used by commit prepared in
2733  * case we skip doing prepare.
2734  */
2735  txn->final_lsn = prepare_lsn;
2736  txn->end_lsn = end_lsn;
2737  txn->xact_time.prepare_time = prepare_time;
2738  txn->origin_id = origin_id;
2739  txn->origin_lsn = origin_lsn;
2740 
2741  return true;
2742 }
2743 
2744 /* Remember that we have skipped prepare */
2745 void
2747 {
2748  ReorderBufferTXN *txn;
2749 
2750  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2751 
2752  /* unknown transaction, nothing to do */
2753  if (txn == NULL)
2754  return;
2755 
2757 }
2758 
2759 /*
2760  * Prepare a two-phase transaction.
2761  *
2762  * See comments for ReorderBufferReplay().
2763  */
2764 void
2766  char *gid)
2767 {
2768  ReorderBufferTXN *txn;
2769 
2770  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2771  false);
2772 
2773  /* unknown transaction, nothing to replay */
2774  if (txn == NULL)
2775  return;
2776 
2777  txn->txn_flags |= RBTXN_PREPARE;
2778  txn->gid = pstrdup(gid);
2779 
2780  /* The prepare info must have been updated in txn by now. */
2782 
2783  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2784  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2785 
2786  /*
2787  * We send the prepare for the concurrently aborted xacts so that later
2788  * when rollback prepared is decoded and sent, the downstream should be
2789  * able to rollback such a xact. See comments atop DecodePrepare.
2790  *
2791  * Note, for the concurrent_abort + streaming case a stream_prepare was
2792  * already sent within the ReorderBufferReplay call above.
2793  */
2794  if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
2795  rb->prepare(rb, txn, txn->final_lsn);
2796 }
2797 
2798 /*
2799  * This is used to handle COMMIT/ROLLBACK PREPARED.
2800  */
2801 void
2803  XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2804  XLogRecPtr two_phase_at,
2805  TimestampTz commit_time, RepOriginId origin_id,
2806  XLogRecPtr origin_lsn, char *gid, bool is_commit)
2807 {
2808  ReorderBufferTXN *txn;
2809  XLogRecPtr prepare_end_lsn;
2810  TimestampTz prepare_time;
2811 
2812  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
2813 
2814  /* unknown transaction, nothing to do */
2815  if (txn == NULL)
2816  return;
2817 
2818  /*
2819  * By this time the txn has the prepare record information, remember it to
2820  * be later used for rollback.
2821  */
2822  prepare_end_lsn = txn->end_lsn;
2823  prepare_time = txn->xact_time.prepare_time;
2824 
2825  /* add the gid in the txn */
2826  txn->gid = pstrdup(gid);
2827 
2828  /*
2829  * It is possible that this transaction is not decoded at prepare time
2830  * either because by that time we didn't have a consistent snapshot, or
2831  * two_phase was not enabled, or it was decoded earlier but we have
2832  * restarted. We only need to send the prepare if it was not decoded
2833  * earlier. We don't need to decode the xact for aborts if it is not done
2834  * already.
2835  */
2836  if ((txn->final_lsn < two_phase_at) && is_commit)
2837  {
2838  txn->txn_flags |= RBTXN_PREPARE;
2839 
2840  /*
2841  * The prepare info must have been updated in txn even if we skip
2842  * prepare.
2843  */
2845 
2846  /*
2847  * By this time the txn has the prepare record information and it is
2848  * important to use that so that downstream gets the accurate
2849  * information. If instead, we have passed commit information here
2850  * then downstream can behave as it has already replayed commit
2851  * prepared after the restart.
2852  */
2853  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2854  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2855  }
2856 
2857  txn->final_lsn = commit_lsn;
2858  txn->end_lsn = end_lsn;
2859  txn->xact_time.commit_time = commit_time;
2860  txn->origin_id = origin_id;
2861  txn->origin_lsn = origin_lsn;
2862 
2863  if (is_commit)
2864  rb->commit_prepared(rb, txn, commit_lsn);
2865  else
2866  rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
2867 
2868  /* cleanup: make sure there's no cache pollution */
2870  txn->invalidations);
2871  ReorderBufferCleanupTXN(rb, txn);
2872 }
2873 
2874 /*
2875  * Abort a transaction that possibly has previous changes. Needs to be first
2876  * called for subtransactions and then for the toplevel xid.
2877  *
2878  * NB: Transactions handled here have to have actively aborted (i.e. have
2879  * produced an abort record). Implicitly aborted transactions are handled via
2880  * ReorderBufferAbortOld(); transactions we're just not interested in, but
2881  * which have committed are handled in ReorderBufferForget().
2882  *
2883  * This function purges this transaction and its contents from memory and
2884  * disk.
2885  */
2886 void
2888  TimestampTz abort_time)
2889 {
2890  ReorderBufferTXN *txn;
2891 
2892  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2893  false);
2894 
2895  /* unknown, nothing to remove */
2896  if (txn == NULL)
2897  return;
2898 
2899  txn->xact_time.abort_time = abort_time;
2900 
2901  /* For streamed transactions notify the remote node about the abort. */
2902  if (rbtxn_is_streamed(txn))
2903  {
2904  rb->stream_abort(rb, txn, lsn);
2905 
2906  /*
2907  * We might have decoded changes for this transaction that could load
2908  * the cache as per the current transaction's view (consider DDL's
2909  * happened in this transaction). We don't want the decoding of future
2910  * transactions to use those cache entries so execute invalidations.
2911  */
2912  if (txn->ninvalidations > 0)
2914  txn->invalidations);
2915  }
2916 
2917  /* cosmetic... */
2918  txn->final_lsn = lsn;
2919 
2920  /* remove potential on-disk data, and deallocate */
2921  ReorderBufferCleanupTXN(rb, txn);
2922 }
2923 
2924 /*
2925  * Abort all transactions that aren't actually running anymore because the
2926  * server restarted.
2927  *
2928  * NB: These really have to be transactions that have aborted due to a server
2929  * crash/immediate restart, as we don't deal with invalidations here.
2930  */
2931 void
2933 {
2934  dlist_mutable_iter it;
2935 
2936  /*
2937  * Iterate through all (potential) toplevel TXNs and abort all that are
2938  * older than what possibly can be running. Once we've found the first
2939  * that is alive we stop, there might be some that acquired an xid earlier
2940  * but started writing later, but it's unlikely and they will be cleaned
2941  * up in a later call to this function.
2942  */
2944  {
2945  ReorderBufferTXN *txn;
2946 
2947  txn = dlist_container(ReorderBufferTXN, node, it.cur);
2948 
2949  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
2950  {
2951  elog(DEBUG2, "aborting old transaction %u", txn->xid);
2952 
2953  /* Notify the remote node about the crash/immediate restart. */
2954  if (rbtxn_is_streamed(txn))
2955  rb->stream_abort(rb, txn, InvalidXLogRecPtr);
2956 
2957  /* remove potential on-disk data, and deallocate this tx */
2958  ReorderBufferCleanupTXN(rb, txn);
2959  }
2960  else
2961  return;
2962  }
2963 }
2964 
2965 /*
2966  * Forget the contents of a transaction if we aren't interested in its
2967  * contents. Needs to be first called for subtransactions and then for the
2968  * toplevel xid.
2969  *
2970  * This is significantly different to ReorderBufferAbort() because
2971  * transactions that have committed need to be treated differently from aborted
2972  * ones since they may have modified the catalog.
2973  *
2974  * Note that this is only allowed to be called in the moment a transaction
2975  * commit has just been read, not earlier; otherwise later records referring
2976  * to this xid might re-create the transaction incompletely.
2977  */
2978 void
2980 {
2981  ReorderBufferTXN *txn;
2982 
2983  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2984  false);
2985 
2986  /* unknown, nothing to forget */
2987  if (txn == NULL)
2988  return;
2989 
2990  /* this transaction mustn't be streamed */
2991  Assert(!rbtxn_is_streamed(txn));
2992 
2993  /* cosmetic... */
2994  txn->final_lsn = lsn;
2995 
2996  /*
2997  * Process cache invalidation messages if there are any. Even if we're not
2998  * interested in the transaction's contents, it could have manipulated the
2999  * catalog and we need to update the caches according to that.
3000  */
3001  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3003  txn->invalidations);
3004  else
3005  Assert(txn->ninvalidations == 0);
3006 
3007  /* remove potential on-disk data, and deallocate */
3008  ReorderBufferCleanupTXN(rb, txn);
3009 }
3010 
3011 /*
3012  * Invalidate cache for those transactions that need to be skipped just in case
3013  * catalogs were manipulated as part of the transaction.
3014  *
3015  * Note that this is a special-purpose function for prepared transactions where
3016  * we don't want to clean up the TXN even when we decide to skip it. See
3017  * DecodePrepare.
3018  */
3019 void
3021 {
3022  ReorderBufferTXN *txn;
3023 
3024  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3025  false);
3026 
3027  /* unknown, nothing to do */
3028  if (txn == NULL)
3029  return;
3030 
3031  /*
3032  * Process cache invalidation messages if there are any. Even if we're not
3033  * interested in the transaction's contents, it could have manipulated the
3034  * catalog and we need to update the caches according to that.
3035  */
3036  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3038  txn->invalidations);
3039  else
3040  Assert(txn->ninvalidations == 0);
3041 }
3042 
3043 
3044 /*
3045  * Execute invalidations happening outside the context of a decoded
3046  * transaction. That currently happens either for xid-less commits
3047  * (cf. RecordTransactionCommit()) or for invalidations in uninteresting
3048  * transactions (via ReorderBufferForget()).
3049  */
3050 void
3052  SharedInvalidationMessage *invalidations)
3053 {
3054  bool use_subtxn = IsTransactionOrTransactionBlock();
3055  int i;
3056 
3057  if (use_subtxn)
3058  BeginInternalSubTransaction("replay");
3059 
3060  /*
3061  * Force invalidations to happen outside of a valid transaction - that way
3062  * entries will just be marked as invalid without accessing the catalog.
3063  * That's advantageous because we don't need to setup the full state
3064  * necessary for catalog access.
3065  */
3066  if (use_subtxn)
3068 
3069  for (i = 0; i < ninvalidations; i++)
3070  LocalExecuteInvalidationMessage(&invalidations[i]);
3071 
3072  if (use_subtxn)
3074 }
3075 
3076 /*
3077  * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
3078  * least once for every xid in XLogRecord->xl_xid (other places in records
3079  * may, but do not have to be passed through here).
3080  *
3081  * Reorderbuffer keeps some data structures about transactions in LSN order,
3082  * for efficiency. To do that it has to know about when transactions are seen
3083  * first in the WAL. As many types of records are not actually interesting for
3084  * logical decoding, they do not necessarily pass through here.
3085  */
3086 void
3088 {
3089  /* many records won't have an xid assigned, centralize check here */
3090  if (xid != InvalidTransactionId)
3091  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3092 }
3093 
3094 /*
3095  * Add a new snapshot to this transaction that may only used after lsn 'lsn'
3096  * because the previous snapshot doesn't describe the catalog correctly for
3097  * following rows.
3098  */
3099 void
3101  XLogRecPtr lsn, Snapshot snap)
3102 {
3104 
3105  change->data.snapshot = snap;
3107 
3108  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3109 }
3110 
3111 /*
3112  * Set up the transaction's base snapshot.
3113  *
3114  * If we know that xid is a subtransaction, set the base snapshot on the
3115  * top-level transaction instead.
3116  */
3117 void
3119  XLogRecPtr lsn, Snapshot snap)
3120 {
3121  ReorderBufferTXN *txn;
3122  bool is_new;
3123 
3124  Assert(snap != NULL);
3125 
3126  /*
3127  * Fetch the transaction to operate on. If we know it's a subtransaction,
3128  * operate on its top-level transaction instead.
3129  */
3130  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
3131  if (rbtxn_is_known_subxact(txn))
3132  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3133  NULL, InvalidXLogRecPtr, false);
3134  Assert(txn->base_snapshot == NULL);
3135 
3136  txn->base_snapshot = snap;
3137  txn->base_snapshot_lsn = lsn;
3139 
3140  AssertTXNLsnOrder(rb);
3141 }
3142 
3143 /*
3144  * Access the catalog with this CommandId at this point in the changestream.
3145  *
3146  * May only be called for command ids > 1
3147  */
3148 void
3150  XLogRecPtr lsn, CommandId cid)
3151 {
3153 
3154  change->data.command_id = cid;
3156 
3157  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3158 }
3159 
3160 /*
3161  * Update memory counters to account for the new or removed change.
3162  *
3163  * We update two counters - in the reorder buffer, and in the transaction
3164  * containing the change. The reorder buffer counter allows us to quickly
3165  * decide if we reached the memory limit, the transaction counter allows
3166  * us to quickly pick the largest transaction for eviction.
3167  *
3168  * When streaming is enabled, we need to update the toplevel transaction
3169  * counters instead - we don't really care about subtransactions as we
3170  * can't stream them individually anyway, and we only pick toplevel
3171  * transactions for eviction. So only toplevel transactions matter.
3172  */
3173 static void
3175  ReorderBufferChange *change,
3176  bool addition, Size sz)
3177 {
3178  ReorderBufferTXN *txn;
3179  ReorderBufferTXN *toptxn;
3180 
3181  Assert(change->txn);
3182 
3183  /*
3184  * Ignore tuple CID changes, because those are not evicted when reaching
3185  * memory limit. So we just don't count them, because it might easily
3186  * trigger a pointless attempt to spill.
3187  */
3189  return;
3190 
3191  txn = change->txn;
3192 
3193  /*
3194  * Update the total size in top level as well. This is later used to
3195  * compute the decoding stats.
3196  */
3197  toptxn = rbtxn_get_toptxn(txn);
3198 
3199  if (addition)
3200  {
3201  txn->size += sz;
3202  rb->size += sz;
3203 
3204  /* Update the total size in the top transaction. */
3205  toptxn->total_size += sz;
3206  }
3207  else
3208  {
3209  Assert((rb->size >= sz) && (txn->size >= sz));
3210  txn->size -= sz;
3211  rb->size -= sz;
3212 
3213  /* Update the total size in the top transaction. */
3214  toptxn->total_size -= sz;
3215  }
3216 
3217  Assert(txn->size <= rb->size);
3218 }
3219 
3220 /*
3221  * Add new (relfilelocator, tid) -> (cmin, cmax) mappings.
3222  *
3223  * We do not include this change type in memory accounting, because we
3224  * keep CIDs in a separate list and do not evict them when reaching
3225  * the memory limit.
3226  */
3227 void
3229  XLogRecPtr lsn, RelFileLocator locator,
3230  ItemPointerData tid, CommandId cmin,
3231  CommandId cmax, CommandId combocid)
3232 {
3234  ReorderBufferTXN *txn;
3235 
3236  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3237 
3238  change->data.tuplecid.locator = locator;
3239  change->data.tuplecid.tid = tid;
3240  change->data.tuplecid.cmin = cmin;
3241  change->data.tuplecid.cmax = cmax;
3242  change->data.tuplecid.combocid = combocid;
3243  change->lsn = lsn;
3244  change->txn = txn;
3246 
3247  dlist_push_tail(&txn->tuplecids, &change->node);
3248  txn->ntuplecids++;
3249 }
3250 
3251 /*
3252  * Accumulate the invalidations for executing them later.
3253  *
3254  * This needs to be called for each XLOG_XACT_INVALIDATIONS message and
3255  * accumulates all the invalidation messages in the toplevel transaction, if
3256  * available, otherwise in the current transaction, as well as in the form of
3257  * change in reorder buffer. We require to record it in form of the change
3258  * so that we can execute only the required invalidations instead of executing
3259  * all the invalidations on each CommandId increment. We also need to
3260  * accumulate these in the txn buffer because in some cases where we skip
3261  * processing the transaction (see ReorderBufferForget), we need to execute
3262  * all the invalidations together.
3263  */
3264 void
3266  XLogRecPtr lsn, Size nmsgs,
3268 {
3269  ReorderBufferTXN *txn;
3270  MemoryContext oldcontext;
3271  ReorderBufferChange *change;
3272 
3273  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3274 
3275  oldcontext = MemoryContextSwitchTo(rb->context);
3276 
3277  /*
3278  * Collect all the invalidations under the top transaction, if available,
3279  * so that we can execute them all together. See comments atop this
3280  * function.
3281  */
3282  txn = rbtxn_get_toptxn(txn);
3283 
3284  Assert(nmsgs > 0);
3285 
3286  /* Accumulate invalidations. */
3287  if (txn->ninvalidations == 0)
3288  {
3289  txn->ninvalidations = nmsgs;
3291  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3292  memcpy(txn->invalidations, msgs,
3293  sizeof(SharedInvalidationMessage) * nmsgs);
3294  }
3295  else
3296  {
3299  (txn->ninvalidations + nmsgs));
3300 
3301  memcpy(txn->invalidations + txn->ninvalidations, msgs,
3302  nmsgs * sizeof(SharedInvalidationMessage));
3303  txn->ninvalidations += nmsgs;
3304  }
3305 
3306  change = ReorderBufferGetChange(rb);
3308  change->data.inval.ninvalidations = nmsgs;
3309  change->data.inval.invalidations = (SharedInvalidationMessage *)
3310  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3311  memcpy(change->data.inval.invalidations, msgs,
3312  sizeof(SharedInvalidationMessage) * nmsgs);
3313 
3314  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3315 
3316  MemoryContextSwitchTo(oldcontext);
3317 }
3318 
3319 /*
3320  * Apply all invalidations we know. Possibly we only need parts at this point
3321  * in the changestream but we don't know which those are.
3322  */
3323 static void
3325 {
3326  int i;
3327 
3328  for (i = 0; i < nmsgs; i++)
3330 }
3331 
3332 /*
3333  * Mark a transaction as containing catalog changes
3334  */
3335 void
3337  XLogRecPtr lsn)
3338 {
3339  ReorderBufferTXN *txn;
3340 
3341  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3342 
3343  if (!rbtxn_has_catalog_changes(txn))
3344  {
3347  }
3348 
3349  /*
3350  * Mark top-level transaction as having catalog changes too if one of its
3351  * children has so that the ReorderBufferBuildTupleCidHash can
3352  * conveniently check just top-level transaction and decide whether to
3353  * build the hash table or not.
3354  */
3355  if (rbtxn_is_subtxn(txn))
3356  {
3357  ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
3358 
3359  if (!rbtxn_has_catalog_changes(toptxn))
3360  {
3363  }
3364  }
3365 }
3366 
3367 /*
3368  * Return palloc'ed array of the transactions that have changed catalogs.
3369  * The returned array is sorted in xidComparator order.
3370  *
3371  * The caller must free the returned array when done with it.
3372  */
3373 TransactionId *
3375 {
3376  dlist_iter iter;
3377  TransactionId *xids = NULL;
3378  size_t xcnt = 0;
3379 
3380  /* Quick return if the list is empty */
3381  if (dclist_count(&rb->catchange_txns) == 0)
3382  return NULL;
3383 
3384  /* Initialize XID array */
3385  xids = (TransactionId *) palloc(sizeof(TransactionId) *
3387  dclist_foreach(iter, &rb->catchange_txns)
3388  {
3390  catchange_node,
3391  iter.cur);
3392 
3394 
3395  xids[xcnt++] = txn->xid;
3396  }
3397 
3398  qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
3399 
3400  Assert(xcnt == dclist_count(&rb->catchange_txns));
3401  return xids;
3402 }
3403 
3404 /*
3405  * Query whether a transaction is already *known* to contain catalog
3406  * changes. This can be wrong until directly before the commit!
3407  */
3408 bool
3410 {
3411  ReorderBufferTXN *txn;
3412 
3413  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3414  false);
3415  if (txn == NULL)
3416  return false;
3417 
3418  return rbtxn_has_catalog_changes(txn);
3419 }
3420 
3421 /*
3422  * ReorderBufferXidHasBaseSnapshot
3423  * Have we already set the base snapshot for the given txn/subtxn?
3424  */
3425 bool
3427 {
3428  ReorderBufferTXN *txn;
3429 
3430  txn = ReorderBufferTXNByXid(rb, xid, false,
3431  NULL, InvalidXLogRecPtr, false);
3432 
3433  /* transaction isn't known yet, ergo no snapshot */
3434  if (txn == NULL)
3435  return false;
3436 
3437  /* a known subtxn? operate on top-level txn instead */
3438  if (rbtxn_is_known_subxact(txn))
3439  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3440  NULL, InvalidXLogRecPtr, false);
3441 
3442  return txn->base_snapshot != NULL;
3443 }
3444 
3445 
3446 /*
3447  * ---------------------------------------
3448  * Disk serialization support
3449  * ---------------------------------------
3450  */
3451 
3452 /*
3453  * Ensure the IO buffer is >= sz.
3454  */
3455 static void
3457 {
3458  if (!rb->outbufsize)
3459  {
3460  rb->outbuf = MemoryContextAlloc(rb->context, sz);
3461  rb->outbufsize = sz;
3462  }
3463  else if (rb->outbufsize < sz)
3464  {
3465  rb->outbuf = repalloc(rb->outbuf, sz);
3466  rb->outbufsize = sz;
3467  }
3468 }
3469 
3470 /*
3471  * Find the largest transaction (toplevel or subxact) to evict (spill to disk).
3472  *
3473  * XXX With many subtransactions this might be quite slow, because we'll have
3474  * to walk through all of them. There are some options how we could improve
3475  * that: (a) maintain some secondary structure with transactions sorted by
3476  * amount of changes, (b) not looking for the entirely largest transaction,
3477  * but e.g. for transaction using at least some fraction of the memory limit,
3478  * and (c) evicting multiple transactions at once, e.g. to free a given portion
3479  * of the memory limit (e.g. 50%).
3480  */
3481 static ReorderBufferTXN *
3483 {
3484  HASH_SEQ_STATUS hash_seq;
3486  ReorderBufferTXN *largest = NULL;
3487 
3488  hash_seq_init(&hash_seq, rb->by_txn);
3489  while ((ent = hash_seq_search(&hash_seq)) != NULL)
3490  {
3491  ReorderBufferTXN *txn = ent->txn;
3492 
3493  /* if the current transaction is larger, remember it */
3494  if ((!largest) || (txn->size > largest->size))
3495  largest = txn;
3496  }
3497 
3498  Assert(largest);
3499  Assert(largest->size > 0);
3500  Assert(largest->size <= rb->size);
3501 
3502  return largest;
3503 }
3504 
3505 /*
3506  * Find the largest streamable toplevel transaction to evict (by streaming).
3507  *
3508  * This can be seen as an optimized version of ReorderBufferLargestTXN, which
3509  * should give us the same transaction (because we don't update memory account
3510  * for subtransaction with streaming, so it's always 0). But we can simply
3511  * iterate over the limited number of toplevel transactions that have a base
3512  * snapshot. There is no use of selecting a transaction that doesn't have base
3513  * snapshot because we don't decode such transactions. Also, we do not select
3514  * the transaction which doesn't have any streamable change.
3515  *
3516  * Note that, we skip transactions that contain incomplete changes. There
3517  * is a scope of optimization here such that we can select the largest
3518  * transaction which has incomplete changes. But that will make the code and
3519  * design quite complex and that might not be worth the benefit. If we plan to
3520  * stream the transactions that contain incomplete changes then we need to
3521  * find a way to partially stream/truncate the transaction changes in-memory
3522  * and build a mechanism to partially truncate the spilled files.
3523  * Additionally, whenever we partially stream the transaction we need to
3524  * maintain the last streamed lsn and next time we need to restore from that
3525  * segment and the offset in WAL. As we stream the changes from the top
3526  * transaction and restore them subtransaction wise, we need to even remember
3527  * the subxact from where we streamed the last change.
3528  */
3529 static ReorderBufferTXN *
3531 {
3532  dlist_iter iter;
3533  Size largest_size = 0;
3534  ReorderBufferTXN *largest = NULL;
3535 
3536  /* Find the largest top-level transaction having a base snapshot. */
3538  {
3539  ReorderBufferTXN *txn;
3540 
3541  txn = dlist_container(ReorderBufferTXN, base_snapshot_node, iter.cur);
3542 
3543  /* must not be a subtxn */
3545  /* base_snapshot must be set */
3546  Assert(txn->base_snapshot != NULL);
3547 
3548  if ((largest == NULL || txn->total_size > largest_size) &&
3549  (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) &&
3551  {
3552  largest = txn;
3553  largest_size = txn->total_size;
3554  }
3555  }
3556 
3557  return largest;
3558 }
3559 
3560 /*
3561  * Check whether the logical_decoding_work_mem limit was reached, and if yes
3562  * pick the largest (sub)transaction at-a-time to evict and spill its changes to
3563  * disk or send to the output plugin until we reach under the memory limit.
3564  *
3565  * If debug_logical_replication_streaming is set to "immediate", stream or
3566  * serialize the changes immediately.
3567  *
3568  * XXX At this point we select the transactions until we reach under the memory
3569  * limit, but we might also adapt a more elaborate eviction strategy - for example
3570  * evicting enough transactions to free certain fraction (e.g. 50%) of the memory
3571  * limit.
3572  */
3573 static void
3575 {
3576  ReorderBufferTXN *txn;
3577 
3578  /*
3579  * Bail out if debug_logical_replication_streaming is buffered and we
3580  * haven't exceeded the memory limit.
3581  */
3583  rb->size < logical_decoding_work_mem * 1024L)
3584  return;
3585 
3586  /*
3587  * If debug_logical_replication_streaming is immediate, loop until there's
3588  * no change. Otherwise, loop until we reach under the memory limit. One
3589  * might think that just by evicting the largest (sub)transaction we will
3590  * come under the memory limit based on assumption that the selected
3591  * transaction is at least as large as the most recent change (which
3592  * caused us to go over the memory limit). However, that is not true
3593  * because a user can reduce the logical_decoding_work_mem to a smaller
3594  * value before the most recent change.
3595  */
3596  while (rb->size >= logical_decoding_work_mem * 1024L ||
3598  rb->size > 0))
3599  {
3600  /*
3601  * Pick the largest transaction and evict it from memory by streaming,
3602  * if possible. Otherwise, spill to disk.
3603  */
3605  (txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
3606  {
3607  /* we know there has to be one, because the size is not zero */
3608  Assert(txn && rbtxn_is_toptxn(txn));
3609  Assert(txn->total_size > 0);
3610  Assert(rb->size >= txn->total_size);
3611 
3612  ReorderBufferStreamTXN(rb, txn);
3613  }
3614  else
3615  {
3616  /*
3617  * Pick the largest transaction (or subtransaction) and evict it
3618  * from memory by serializing it to disk.
3619  */
3620  txn = ReorderBufferLargestTXN(rb);
3621 
3622  /* we know there has to be one, because the size is not zero */
3623  Assert(txn);
3624  Assert(txn->size > 0);
3625  Assert(rb->size >= txn->size);
3626 
3627  ReorderBufferSerializeTXN(rb, txn);
3628  }
3629 
3630  /*
3631  * After eviction, the transaction should have no entries in memory,
3632  * and should use 0 bytes for changes.
3633  */
3634  Assert(txn->size == 0);
3635  Assert(txn->nentries_mem == 0);
3636  }
3637 
3638  /* We must be under the memory limit now. */
3639  Assert(rb->size < logical_decoding_work_mem * 1024L);
3640 }
3641 
3642 /*
3643  * Spill data of a large transaction (and its subtransactions) to disk.
3644  */
3645 static void
3647 {
3648  dlist_iter subtxn_i;
3649  dlist_mutable_iter change_i;
3650  int fd = -1;
3651  XLogSegNo curOpenSegNo = 0;
3652  Size spilled = 0;
3653  Size size = txn->size;
3654 
3655  elog(DEBUG2, "spill %u changes in XID %u to disk",
3656  (uint32) txn->nentries_mem, txn->xid);
3657 
3658  /* do the same to all child TXs */
3659  dlist_foreach(subtxn_i, &txn->subtxns)
3660  {
3661  ReorderBufferTXN *subtxn;
3662 
3663  subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
3664  ReorderBufferSerializeTXN(rb, subtxn);
3665  }
3666 
3667  /* serialize changestream */
3668  dlist_foreach_modify(change_i, &txn->changes)
3669  {
3670  ReorderBufferChange *change;
3671 
3672  change = dlist_container(ReorderBufferChange, node, change_i.cur);
3673 
3674  /*
3675  * store in segment in which it belongs by start lsn, don't split over
3676  * multiple segments tho
3677  */
3678  if (fd == -1 ||
3679  !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
3680  {
3681  char path[MAXPGPATH];
3682 
3683  if (fd != -1)
3685 
3686  XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
3687 
3688  /*
3689  * No need to care about TLIs here, only used during a single run,
3690  * so each LSN only maps to a specific WAL record.
3691  */
3693  curOpenSegNo);
3694 
3695  /* open segment, create it if necessary */
3696  fd = OpenTransientFile(path,
3697  O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
3698 
3699  if (fd < 0)
3700  ereport(ERROR,
3702  errmsg("could not open file \"%s\": %m", path)));
3703  }
3704 
3705  ReorderBufferSerializeChange(rb, txn, fd, change);
3706  dlist_delete(&change->node);
3707  ReorderBufferReturnChange(rb, change, true);
3708 
3709  spilled++;
3710  }
3711 
3712  /* update the statistics iff we have spilled anything */
3713  if (spilled)
3714  {
3715  rb->spillCount += 1;
3716  rb->spillBytes += size;
3717 
3718  /* don't consider already serialized transactions */
3719  rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
3720 
3721  /* update the decoding stats */
3723  }
3724 
3725  Assert(spilled == txn->nentries_mem);
3726  Assert(dlist_is_empty(&txn->changes));
3727  txn->nentries_mem = 0;
3729 
3730  if (fd != -1)
3732 }
3733 
3734 /*
3735  * Serialize individual change to disk.
3736  */
3737 static void
3739  int fd, ReorderBufferChange *change)
3740 {
3741  ReorderBufferDiskChange *ondisk;
3742  Size sz = sizeof(ReorderBufferDiskChange);
3743 
3745 
3746  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3747  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
3748 
3749  switch (change->action)
3750  {
3751  /* fall through these, they're all similar enough */
3756  {
3757  char *data;
3758  HeapTuple oldtup,
3759  newtup;
3760  Size oldlen = 0;
3761  Size newlen = 0;
3762 
3763  oldtup = change->data.tp.oldtuple;
3764  newtup = change->data.tp.newtuple;
3765 
3766  if (oldtup)
3767  {
3768  sz += sizeof(HeapTupleData);
3769  oldlen = oldtup->t_len;
3770  sz += oldlen;
3771  }
3772 
3773  if (newtup)
3774  {
3775  sz += sizeof(HeapTupleData);
3776  newlen = newtup->t_len;
3777  sz += newlen;
3778  }
3779 
3780  /* make sure we have enough space */
3782 
3783  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3784  /* might have been reallocated above */
3785  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3786 
3787  if (oldlen)
3788  {
3789  memcpy(data, oldtup, sizeof(HeapTupleData));
3790  data += sizeof(HeapTupleData);
3791 
3792  memcpy(data, oldtup->t_data, oldlen);
3793  data += oldlen;
3794  }
3795 
3796  if (newlen)
3797  {
3798  memcpy(data, newtup, sizeof(HeapTupleData));
3799  data += sizeof(HeapTupleData);
3800 
3801  memcpy(data, newtup->t_data, newlen);
3802  data += newlen;
3803  }
3804  break;
3805  }
3807  {
3808  char *data;
3809  Size prefix_size = strlen(change->data.msg.prefix) + 1;
3810 
3811  sz += prefix_size + change->data.msg.message_size +
3812  sizeof(Size) + sizeof(Size);
3814 
3815  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3816 
3817  /* might have been reallocated above */
3818  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3819 
3820  /* write the prefix including the size */
3821  memcpy(data, &prefix_size, sizeof(Size));
3822  data += sizeof(Size);
3823  memcpy(data, change->data.msg.prefix,
3824  prefix_size);
3825  data += prefix_size;
3826 
3827  /* write the message including the size */
3828  memcpy(data, &change->data.msg.message_size, sizeof(Size));
3829  data += sizeof(Size);
3830  memcpy(data, change->data.msg.message,
3831  change->data.msg.message_size);
3832  data += change->data.msg.message_size;
3833 
3834  break;
3835  }
3837  {
3838  char *data;
3839  Size inval_size = sizeof(SharedInvalidationMessage) *
3840  change->data.inval.ninvalidations;
3841 
3842  sz += inval_size;
3843 
3845  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3846 
3847  /* might have been reallocated above */
3848  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3849  memcpy(data, change->data.inval.invalidations, inval_size);
3850  data += inval_size;
3851 
3852  break;
3853  }
3855  {
3856  Snapshot snap;
3857  char *data;
3858 
3859  snap = change->data.snapshot;
3860 
3861  sz += sizeof(SnapshotData) +
3862  sizeof(TransactionId) * snap->xcnt +
3863  sizeof(TransactionId) * snap->subxcnt;
3864 
3865  /* make sure we have enough space */
3867  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3868  /* might have been reallocated above */
3869  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3870 
3871  memcpy(data, snap, sizeof(SnapshotData));
3872  data += sizeof(SnapshotData);
3873 
3874  if (snap->xcnt)
3875  {
3876  memcpy(data, snap->xip,
3877  sizeof(TransactionId) * snap->xcnt);
3878  data += sizeof(TransactionId) * snap->xcnt;
3879  }
3880 
3881  if (snap->subxcnt)
3882  {
3883  memcpy(data, snap->subxip,
3884  sizeof(TransactionId) * snap->subxcnt);
3885  data += sizeof(TransactionId) * snap->subxcnt;
3886  }
3887  break;
3888  }
3890  {
3891  Size size;
3892  char *data;
3893 
3894  /* account for the OIDs of truncated relations */
3895  size = sizeof(Oid) * change->data.truncate.nrelids;
3896  sz += size;
3897 
3898  /* make sure we have enough space */
3900 
3901  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3902  /* might have been reallocated above */
3903  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3904 
3905  memcpy(data, change->data.truncate.relids, size);
3906  data += size;
3907 
3908  break;
3909  }
3914  /* ReorderBufferChange contains everything important */
3915  break;
3916  }
3917 
3918  ondisk->size = sz;
3919 
3920  errno = 0;
3921  pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
3922  if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
3923  {
3924  int save_errno = errno;
3925 
3927 
3928  /* if write didn't set errno, assume problem is no disk space */
3929  errno = save_errno ? save_errno : ENOSPC;
3930  ereport(ERROR,
3932  errmsg("could not write to data file for XID %u: %m",
3933  txn->xid)));
3934  }
3936 
3937  /*
3938  * Keep the transaction's final_lsn up to date with each change we send to
3939  * disk, so that ReorderBufferRestoreCleanup works correctly. (We used to
3940  * only do this on commit and abort records, but that doesn't work if a
3941  * system crash leaves a transaction without its abort record).
3942  *
3943  * Make sure not to move it backwards.
3944  */
3945  if (txn->final_lsn < change->lsn)
3946  txn->final_lsn = change->lsn;
3947 
3948  Assert(ondisk->change.action == change->action);
3949 }
3950 
3951 /* Returns true, if the output plugin supports streaming, false, otherwise. */
3952 static inline bool
3954 {
3956 
3957  return ctx->streaming;
3958 }
3959 
3960 /* Returns true, if the streaming can be started now, false, otherwise. */
3961 static inline bool
3963 {
3965  SnapBuild *builder = ctx->snapshot_builder;
3966 
3967  /* We can't start streaming unless a consistent state is reached. */
3969  return false;
3970 
3971  /*
3972  * We can't start streaming immediately even if the streaming is enabled
3973  * because we previously decoded this transaction and now just are
3974  * restarting.
3975  */
3976  if (ReorderBufferCanStream(rb) &&
3977  !SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr))
3978  return true;
3979 
3980  return false;
3981 }
3982 
3983 /*
3984  * Send data of a large transaction (and its subtransactions) to the
3985  * output plugin, but using the stream API.
3986  */
3987 static void
3989 {
3990  Snapshot snapshot_now;
3991  CommandId command_id;
3992  Size stream_bytes;
3993  bool txn_is_streamed;
3994 
3995  /* We can never reach here for a subtransaction. */
3996  Assert(rbtxn_is_toptxn(txn));
3997 
3998  /*
3999  * We can't make any assumptions about base snapshot here, similar to what
4000  * ReorderBufferCommit() does. That relies on base_snapshot getting
4001  * transferred from subxact in ReorderBufferCommitChild(), but that was
4002  * not yet called as the transaction is in-progress.
4003  *
4004  * So just walk the subxacts and use the same logic here. But we only need
4005  * to do that once, when the transaction is streamed for the first time.
4006  * After that we need to reuse the snapshot from the previous run.
4007  *
4008  * Unlike DecodeCommit which adds xids of all the subtransactions in
4009  * snapshot's xip array via SnapBuildCommitTxn, we can't do that here but
4010  * we do add them to subxip array instead via ReorderBufferCopySnap. This
4011  * allows the catalog changes made in subtransactions decoded till now to
4012  * be visible.
4013  */
4014  if (txn->snapshot_now == NULL)
4015  {
4016  dlist_iter subxact_i;
4017 
4018  /* make sure this transaction is streamed for the first time */
4019  Assert(!rbtxn_is_streamed(txn));
4020 
4021  /* at the beginning we should have invalid command ID */
4023 
4024  dlist_foreach(subxact_i, &txn->subtxns)
4025  {
4026  ReorderBufferTXN *subtxn;
4027 
4028  subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
4029  ReorderBufferTransferSnapToParent(txn, subtxn);
4030  }
4031 
4032  /*
4033  * If this transaction has no snapshot, it didn't make any changes to
4034  * the database till now, so there's nothing to decode.
4035  */
4036  if (txn->base_snapshot == NULL)
4037  {
4038  Assert(txn->ninvalidations == 0);
4039  return;
4040  }
4041 
4042  command_id = FirstCommandId;
4043  snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
4044  txn, command_id);
4045  }
4046  else
4047  {
4048  /* the transaction must have been already streamed */
4049  Assert(rbtxn_is_streamed(txn));
4050 
4051  /*
4052  * Nah, we already have snapshot from the previous streaming run. We
4053  * assume new subxacts can't move the LSN backwards, and so can't beat
4054  * the LSN condition in the previous branch (so no need to walk
4055  * through subxacts again). In fact, we must not do that as we may be
4056  * using snapshot half-way through the subxact.
4057  */
4058  command_id = txn->command_id;
4059 
4060  /*
4061  * We can't use txn->snapshot_now directly because after the last
4062  * streaming run, we might have got some new sub-transactions. So we
4063  * need to add them to the snapshot.
4064  */
4065  snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now,
4066  txn, command_id);
4067 
4068  /* Free the previously copied snapshot. */
4069  Assert(txn->snapshot_now->copied);
4071  txn->snapshot_now = NULL;
4072  }
4073 
4074  /*
4075  * Remember this information to be used later to update stats. We can't
4076  * update the stats here as an error while processing the changes would
4077  * lead to the accumulation of stats even though we haven't streamed all
4078  * the changes.
4079  */
4080  txn_is_streamed = rbtxn_is_streamed(txn);
4081  stream_bytes = txn->total_size;
4082 
4083  /* Process and send the changes to output plugin. */
4084  ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
4085  command_id, true);
4086 
4087  rb->streamCount += 1;
4088  rb->streamBytes += stream_bytes;
4089 
4090  /* Don't consider already streamed transaction. */
4091  rb->streamTxns += (txn_is_streamed) ? 0 : 1;
4092 
4093  /* update the decoding stats */
4095 
4096  Assert(dlist_is_empty(&txn->changes));
4097  Assert(txn->nentries == 0);
4098  Assert(txn->nentries_mem == 0);
4099 }
4100 
4101 /*
4102  * Size of a change in memory.
4103  */
4104 static Size
4106 {
4107  Size sz = sizeof(ReorderBufferChange);
4108 
4109  switch (change->action)
4110  {
4111  /* fall through these, they're all similar enough */
4116  {
4117  HeapTuple oldtup,
4118  newtup;
4119  Size oldlen = 0;
4120  Size newlen = 0;
4121 
4122  oldtup = change->data.tp.oldtuple;
4123  newtup = change->data.tp.newtuple;
4124 
4125  if (oldtup)
4126  {
4127  sz += sizeof(HeapTupleData);
4128  oldlen = oldtup->t_len;
4129  sz += oldlen;
4130  }
4131 
4132  if (newtup)
4133  {
4134  sz += sizeof(HeapTupleData);
4135  newlen = newtup->t_len;
4136  sz += newlen;
4137  }
4138 
4139  break;
4140  }
4142  {
4143  Size prefix_size = strlen(change->data.msg.prefix) + 1;
4144 
4145  sz += prefix_size + change->data.msg.message_size +
4146  sizeof(Size) + sizeof(Size);
4147 
4148  break;
4149  }
4151  {
4152  sz += sizeof(SharedInvalidationMessage) *
4153  change->data.inval.ninvalidations;
4154  break;
4155  }
4157  {
4158  Snapshot snap;
4159 
4160  snap = change->data.snapshot;
4161 
4162  sz += sizeof(SnapshotData) +
4163  sizeof(TransactionId) * snap->xcnt +
4164  sizeof(TransactionId) * snap->subxcnt;
4165 
4166  break;
4167  }
4169  {
4170  sz += sizeof(Oid) * change->data.truncate.nrelids;
4171 
4172  break;
4173  }
4178  /* ReorderBufferChange contains everything important */
4179  break;
4180  }
4181 
4182  return sz;
4183 }
4184 
4185 
4186 /*
4187  * Restore a number of changes spilled to disk back into memory.
4188  */
4189 static Size
4191  TXNEntryFile *file, XLogSegNo *segno)
4192 {
4193  Size restored = 0;
4194  XLogSegNo last_segno;
4195  dlist_mutable_iter cleanup_iter;
4196  File *fd = &file->vfd;
4197 
4200 
4201  /* free current entries, so we have memory for more */
4202  dlist_foreach_modify(cleanup_iter, &txn->changes)
4203  {
4205  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
4206 
4207  dlist_delete(&cleanup->node);
4209  }
4210  txn->nentries_mem = 0;
4211  Assert(dlist_is_empty(&txn->changes));
4212 
4213  XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
4214 
4215  while (restored < max_changes_in_memory && *segno <= last_segno)
4216  {
4217  int readBytes;
4218  ReorderBufferDiskChange *ondisk;
4219 
4221 
4222  if (*fd == -1)
4223  {
4224  char path[MAXPGPATH];
4225 
4226  /* first time in */
4227  if (*segno == 0)
4228  XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
4229 
4230  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
4231 
4232  /*
4233  * No need to care about TLIs here, only used during a single run,
4234  * so each LSN only maps to a specific WAL record.
4235  */
4237  *segno);
4238 
4239  *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
4240 
4241  /* No harm in resetting the offset even in case of failure */
4242  file->curOffset = 0;
4243 
4244  if (*fd < 0 && errno == ENOENT)
4245  {
4246  *fd = -1;
4247  (*segno)++;
4248  continue;
4249  }
4250  else if (*fd < 0)
4251  ereport(ERROR,
4253  errmsg("could not open file \"%s\": %m",
4254  path)));
4255  }
4256 
4257  /*
4258  * Read the statically sized part of a change which has information
4259  * about the total size. If we couldn't read a record, we're at the
4260  * end of this file.
4261  */
4263  readBytes = FileRead(file->vfd, rb->outbuf,
4264  sizeof(ReorderBufferDiskChange),
4265  file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
4266 
4267  /* eof */
4268  if (readBytes == 0)
4269  {
4270  FileClose(*fd);
4271  *fd = -1;
4272  (*segno)++;
4273  continue;
4274  }
4275  else if (readBytes < 0)
4276  ereport(ERROR,
4278  errmsg("could not read from reorderbuffer spill file: %m")));
4279  else if (readBytes != sizeof(ReorderBufferDiskChange))
4280  ereport(ERROR,
4282  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4283  readBytes,
4284  (uint32) sizeof(ReorderBufferDiskChange))));
4285 
4286  file->curOffset += readBytes;
4287 
4288  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4289 
4291  sizeof(ReorderBufferDiskChange) + ondisk->size);
4292  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4293 
4294  readBytes = FileRead(file->vfd,
4295  rb->outbuf + sizeof(ReorderBufferDiskChange),
4296  ondisk->size - sizeof(ReorderBufferDiskChange),
4297  file->curOffset,
4298  WAIT_EVENT_REORDER_BUFFER_READ);
4299 
4300  if (readBytes < 0)
4301  ereport(ERROR,
4303  errmsg("could not read from reorderbuffer spill file: %m")));
4304  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
4305  ereport(ERROR,
4307  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4308  readBytes,
4309  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
4310 
4311  file->curOffset += readBytes;
4312 
4313  /*
4314  * ok, read a full change from disk, now restore it into proper
4315  * in-memory format
4316  */
4317  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
4318  restored++;
4319  }
4320 
4321  return restored;
4322 }
4323 
4324 /*
4325  * Convert change from its on-disk format to in-memory format and queue it onto
4326  * the TXN's ->changes list.
4327  *
4328  * Note: although "data" is declared char*, at entry it points to a
4329  * maxalign'd buffer, making it safe in most of this function to assume
4330  * that the pointed-to data is suitably aligned for direct access.
4331  */
4332 static void
4334  char *data)
4335 {
4336  ReorderBufferDiskChange *ondisk;
4337  ReorderBufferChange *change;
4338 
4339  ondisk = (ReorderBufferDiskChange *) data;
4340 
4341  change = ReorderBufferGetChange(rb);
4342 
4343  /* copy static part */
4344  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
4345 
4346  data += sizeof(ReorderBufferDiskChange);
4347 
4348  /* restore individual stuff */
4349  switch (change->action)
4350  {
4351  /* fall through these, they're all similar enough */
4356  if (change->data.tp.oldtuple)
4357  {
4358  uint32 tuplelen = ((HeapTuple) data)->t_len;
4359 
4360  change->data.tp.oldtuple =
4362 
4363  /* restore ->tuple */
4364  memcpy(change->data.tp.oldtuple, data,
4365  sizeof(HeapTupleData));
4366  data += sizeof(HeapTupleData);
4367 
4368  /* reset t_data pointer into the new tuplebuf */
4369  change->data.tp.oldtuple->t_data =
4370  (HeapTupleHeader) ((char *) change->data.tp.oldtuple + HEAPTUPLESIZE);
4371 
4372  /* restore tuple data itself */
4373  memcpy(change->data.tp.oldtuple->t_data, data, tuplelen);
4374  data += tuplelen;
4375  }
4376 
4377  if (change->data.tp.newtuple)
4378  {
4379  /* here, data might not be suitably aligned! */
4380  uint32 tuplelen;
4381 
4382  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
4383  sizeof(uint32));
4384 
4385  change->data.tp.newtuple =
4387 
4388  /* restore ->tuple */
4389  memcpy(change->data.tp.newtuple, data,
4390  sizeof(HeapTupleData));
4391  data += sizeof(HeapTupleData);
4392 
4393  /* reset t_data pointer into the new tuplebuf */
4394  change->data.tp.newtuple->t_data =
4395  (HeapTupleHeader) ((char *) change->data.tp.newtuple + HEAPTUPLESIZE);
4396 
4397  /* restore tuple data itself */
4398  memcpy(change->data.tp.newtuple->t_data, data, tuplelen);
4399  data += tuplelen;
4400  }
4401 
4402  break;
4404  {
4405  Size prefix_size;
4406 
4407  /* read prefix */
4408  memcpy(&prefix_size, data, sizeof(Size));
4409  data += sizeof(Size);
4410  change->data.msg.prefix = MemoryContextAlloc(rb->context,
4411  prefix_size);
4412  memcpy(change->data.msg.prefix, data, prefix_size);
4413  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
4414  data += prefix_size;
4415 
4416  /* read the message */
4417  memcpy(&change->data.msg.message_size, data, sizeof(Size));
4418  data += sizeof(Size);
4419  change->data.msg.message = MemoryContextAlloc(rb->context,
4420  change->data.msg.message_size);
4421  memcpy(change->data.msg.message, data,
4422  change->data.msg.message_size);
4423  data += change->data.msg.message_size;
4424 
4425  break;
4426  }
4428  {
4429  Size inval_size = sizeof(SharedInvalidationMessage) *
4430  change->data.inval.ninvalidations;
4431 
4432  change->data.inval.invalidations =
4433  MemoryContextAlloc(rb->context, inval_size);
4434 
4435  /* read the message */
4436  memcpy(change->data.inval.invalidations, data, inval_size);
4437 
4438  break;
4439  }
4441  {
4442  Snapshot oldsnap;
4443  Snapshot newsnap;
4444  Size size;
4445 
4446  oldsnap = (Snapshot) data;
4447 
4448  size = sizeof(SnapshotData) +
4449  sizeof(TransactionId) * oldsnap->xcnt +
4450  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
4451 
4453 
4454  newsnap = change->data.snapshot;
4455 
4456  memcpy(newsnap, data, size);
4457  newsnap->xip = (TransactionId *)
4458  (((char *) newsnap) + sizeof(SnapshotData));
4459  newsnap->subxip = newsnap->xip + newsnap->xcnt;
4460  newsnap->copied = true;
4461  break;
4462  }
4463  /* the base struct contains all the data, easy peasy */
4465  {
4466  Oid *relids;
4467 
4468  relids = ReorderBufferGetRelids(rb,
4469  change->data.truncate.nrelids);
4470  memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
4471  change->data.truncate.relids = relids;
4472 
4473  break;
4474  }
4479  break;
4480  }
4481 
4482  dlist_push_tail(&txn->changes, &change->node);
4483  txn->nentries_mem++;
4484 
4485  /*
4486  * Update memory accounting for the restored change. We need to do this
4487  * although we don't check the memory limit when restoring the changes in
4488  * this branch (we only do that when initially queueing the changes after
4489  * decoding), because we will release the changes later, and that will
4490  * update the accounting too (subtracting the size from the counters). And
4491  * we don't want to underflow there.
4492  */
4493  ReorderBufferChangeMemoryUpdate(rb, change, true,
4494  ReorderBufferChangeSize(change));
4495 }
4496 
4497 /*
4498  * Remove all on-disk stored for the passed in transaction.
4499  */
4500 static void
4502 {
4503  XLogSegNo first;
4504  XLogSegNo cur;
4505  XLogSegNo last;
4506 
4509 
4510  XLByteToSeg(txn->first_lsn, first, wal_segment_size);
4511  XLByteToSeg(txn->final_lsn, last, wal_segment_size);
4512 
4513  /* iterate over all possible filenames, and delete them */
4514  for (cur = first; cur <= last; cur++)
4515  {
4516  char path[MAXPGPATH];
4517 
4519  if (unlink(path) != 0 && errno != ENOENT)
4520  ereport(ERROR,
4522  errmsg("could not remove file \"%s\": %m", path)));
4523  }
4524 }
4525 
4526 /*
4527  * Remove any leftover serialized reorder buffers from a slot directory after a
4528  * prior crash or decoding session exit.
4529  */
4530 static void
4532 {
4533  DIR *spill_dir;
4534  struct dirent *spill_de;
4535  struct stat statbuf;
4536  char path[MAXPGPATH * 2 + 12];
4537 
4538  sprintf(path, "pg_replslot/%s", slotname);
4539 
4540  /* we're only handling directories here, skip if it's not ours */
4541  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
4542  return;
4543 
4544  spill_dir = AllocateDir(path);
4545  while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
4546  {
4547  /* only look at names that can be ours */
4548  if (strncmp(spill_de->d_name, "xid", 3) == 0)
4549  {
4550  snprintf(path, sizeof(path),
4551  "pg_replslot/%s/%s", slotname,
4552  spill_de->d_name);
4553 
4554  if (unlink(path) != 0)
4555  ereport(ERROR,
4557  errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
4558  path, slotname)));
4559  }
4560  }
4561  FreeDir(spill_dir);
4562 }
4563 
4564 /*
4565  * Given a replication slot, transaction ID and segment number, fill in the
4566  * corresponding spill file into 'path', which is a caller-owned buffer of size
4567  * at least MAXPGPATH.
4568  */
4569 static void
4571  XLogSegNo segno)
4572 {
4573  XLogRecPtr recptr;
4574 
4575  XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
4576 
4577  snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.spill",
4579  xid, LSN_FORMAT_ARGS(recptr));
4580 }
4581 
4582 /*
4583  * Delete all data spilled to disk after we've restarted/crashed. It will be
4584  * recreated when the respective slots are reused.
4585  */
4586 void
4588 {
4589  DIR *logical_dir;
4590  struct dirent *logical_de;
4591 
4592  logical_dir = AllocateDir("pg_replslot");
4593  while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
4594  {
4595  if (strcmp(logical_de->d_name, ".") == 0 ||
4596  strcmp(logical_de->d_name, "..") == 0)
4597  continue;
4598 
4599  /* if it cannot be a slot, skip the directory */
4600  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
4601  continue;
4602 
4603  /*
4604  * ok, has to be a surviving logical slot, iterate and delete
4605  * everything starting with xid-*
4606  */
4608  }
4609  FreeDir(logical_dir);
4610 }
4611 
4612 /* ---------------------------------------
4613  * toast reassembly support
4614  * ---------------------------------------
4615  */
4616 
4617 /*
4618  * Initialize per tuple toast reconstruction support.
4619  */
4620 static void
4622 {
4623  HASHCTL hash_ctl;
4624 
4625  Assert(txn->toast_hash == NULL);
4626 
4627  hash_ctl.keysize = sizeof(Oid);
4628  hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
4629  hash_ctl.hcxt = rb->context;
4630  txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
4632 }
4633 
4634 /*
4635  * Per toast-chunk handling for toast reconstruction
4636  *
4637  * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
4638  * toasted Datum comes along.
4639  */
4640 static void
4642  Relation relation, ReorderBufferChange *change)
4643 {
4644  ReorderBufferToastEnt *ent;
4645  HeapTuple newtup;
4646  bool found;
4647  int32 chunksize;
4648  bool isnull;
4649  Pointer chunk;
4650  TupleDesc desc = RelationGetDescr(relation);
4651  Oid chunk_id;
4652  int32 chunk_seq;
4653 
4654  if (txn->toast_hash == NULL)
4655  ReorderBufferToastInitHash(rb, txn);
4656 
4657  Assert(IsToastRelation(relation));
4658 
4659  newtup = change->data.tp.newtuple;
4660  chunk_id = DatumGetObjectId(fastgetattr(newtup, 1, desc, &isnull));
4661  Assert(!isnull);
4662  chunk_seq = DatumGetInt32(fastgetattr(newtup, 2, desc, &isnull));
4663  Assert(!isnull);
4664 
4665  ent = (ReorderBufferToastEnt *)
4666  hash_search(txn->toast_hash, &chunk_id, HASH_ENTER, &found);
4667 
4668  if (!found)
4669  {
4670  Assert(ent->chunk_id == chunk_id);
4671  ent->num_chunks = 0;
4672  ent->last_chunk_seq = 0;
4673  ent->size = 0;
4674  ent->reconstructed = NULL;
4675  dlist_init(&ent->chunks);
4676 
4677  if (chunk_seq != 0)
4678  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
4679  chunk_seq, chunk_id);
4680  }
4681  else if (found && chunk_seq != ent->last_chunk_seq + 1)
4682  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
4683  chunk_seq, chunk_id, ent->last_chunk_seq + 1);
4684 
4685  chunk = DatumGetPointer(fastgetattr(newtup, 3, desc, &isnull));
4686  Assert(!isnull);
4687 
4688  /* calculate size so we can allocate the right size at once later */
4689  if (!VARATT_IS_EXTENDED(chunk))
4690  chunksize = VARSIZE(chunk) - VARHDRSZ;
4691  else if (VARATT_IS_SHORT(chunk))
4692  /* could happen due to heap_form_tuple doing its thing */
4693  chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
4694  else
4695  elog(ERROR, "unexpected type of toast chunk");
4696 
4697  ent->size += chunksize;
4698  ent->last_chunk_seq = chunk_seq;
4699  ent->num_chunks++;
4700  dlist_push_tail(&ent->chunks, &change->node);
4701 }
4702 
4703 /*
4704  * Rejigger change->newtuple to point to in-memory toast tuples instead of
4705  * on-disk toast tuples that may no longer exist (think DROP TABLE or VACUUM).
4706  *
4707  * We cannot replace unchanged toast tuples though, so those will still point
4708  * to on-disk toast data.
4709  *
4710  * While updating the existing change with detoasted tuple data, we need to
4711  * update the memory accounting info, because the change size will differ.
4712  * Otherwise the accounting may get out of sync, triggering serialization
4713  * at unexpected times.
4714  *
4715  * We simply subtract size of the change before rejiggering the tuple, and
4716  * then add the new size. This makes it look like the change was removed
4717  * and then added back, except it only tweaks the accounting info.
4718  *
4719  * In particular it can't trigger serialization, which would be pointless
4720  * anyway as it happens during commit processing right before handing
4721  * the change to the output plugin.
4722  */
4723 static void
4725  Relation relation, ReorderBufferChange *change)
4726 {
4727  TupleDesc desc;
4728  int natt;
4729  Datum *attrs;
4730  bool *isnull;
4731  bool *free;
4732  HeapTuple tmphtup;
4733  Relation toast_rel;
4734  TupleDesc toast_desc;
4735  MemoryContext oldcontext;
4736  HeapTuple newtup;
4737  Size old_size;
4738 
4739  /* no toast tuples changed */
4740  if (txn->toast_hash == NULL)
4741  return;
4742 
4743  /*
4744  * We're going to modify the size of the change. So, to make sure the
4745  * accounting is correct we record the current change size and then after
4746  * re-computing the change we'll subtract the recorded size and then
4747  * re-add the new change size at the end. We don't immediately subtract
4748  * the old size because if there is any error before we add the new size,
4749  * we will release the changes and that will update the accounting info
4750  * (subtracting the size from the counters). And we don't want to
4751  * underflow there.
4752  */
4753  old_size = ReorderBufferChangeSize(change);
4754 
4755  oldcontext = MemoryContextSwitchTo(rb->context);
4756 
4757  /* we should only have toast tuples in an INSERT or UPDATE */
4758  Assert(change->data.tp.newtuple);
4759 
4760  desc = RelationGetDescr(relation);
4761 
4762  toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
4763  if (!RelationIsValid(toast_rel))
4764  elog(ERROR, "could not open toast relation with OID %u (base relation \"%s\")",
4765  relation->rd_rel->reltoastrelid, RelationGetRelationName(relation));
4766 
4767  toast_desc = RelationGetDescr(toast_rel);
4768 
4769  /* should we allocate from stack instead? */
4770  attrs = palloc0(sizeof(Datum) * desc->natts);
4771  isnull = palloc0(sizeof(bool) * desc->natts);
4772  free = palloc0(sizeof(bool) * desc->natts);
4773 
4774  newtup = change->data.tp.newtuple;
4775 
4776  heap_deform_tuple(newtup, desc, attrs, isnull);
4777 
4778  for (natt = 0; natt < desc->natts; natt++)
4779  {
4780  Form_pg_attribute attr = TupleDescAttr(desc, natt);
4781  ReorderBufferToastEnt *ent;
4782  struct varlena *varlena;
4783 
4784  /* va_rawsize is the size of the original datum -- including header */
4785  struct varatt_external toast_pointer;
4786  struct varatt_indirect redirect_pointer;
4787  struct varlena *new_datum = NULL;
4788  struct varlena *reconstructed;
4789  dlist_iter it;
4790  Size data_done = 0;
4791 
4792  /* system columns aren't toasted */
4793  if (attr->attnum < 0)
4794  continue;
4795 
4796  if (attr->attisdropped)
4797  continue;
4798 
4799  /* not a varlena datatype */
4800  if (attr->attlen != -1)
4801  continue;
4802 
4803  /* no data */
4804  if (isnull[natt])
4805  continue;
4806 
4807  /* ok, we know we have a toast datum */
4808  varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
4809 
4810  /* no need to do anything if the tuple isn't external */
4812  continue;
4813 
4814  VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
4815 
4816  /*
4817  * Check whether the toast tuple changed, replace if so.
4818  */
4819  ent = (ReorderBufferToastEnt *)
4820  hash_search(txn->toast_hash,
4821  &toast_pointer.va_valueid,
4822  HASH_FIND,
4823  NULL);
4824  if (ent == NULL)
4825  continue;
4826 
4827  new_datum =
4829 
4830  free[natt] = true;
4831 
4832  reconstructed = palloc0(toast_pointer.va_rawsize);
4833 
4834  ent->reconstructed = reconstructed;
4835 
4836  /* stitch toast tuple back together from its parts */
4837  dlist_foreach(it, &ent->chunks)
4838  {
4839  bool cisnull;
4840  ReorderBufferChange *cchange;
4841  HeapTuple ctup;
4842  Pointer chunk;
4843 
4844  cchange = dlist_container(ReorderBufferChange, node, it.cur);
4845  ctup = cchange->data.tp.newtuple;
4846  chunk = DatumGetPointer(fastgetattr(ctup, 3, toast_desc, &cisnull));
4847 
4848  Assert(!cisnull);
4849  Assert(!VARATT_IS_EXTERNAL(chunk));
4850  Assert(!VARATT_IS_SHORT(chunk));
4851 
4852  memcpy(VARDATA(reconstructed) + data_done,
4853  VARDATA(chunk),
4854  VARSIZE(chunk) - VARHDRSZ);
4855  data_done += VARSIZE(chunk) - VARHDRSZ;
4856  }
4857  Assert(data_done == VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer));
4858 
4859  /* make sure its marked as compressed or not */
4860  if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
4861  SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
4862  else
4863  SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
4864 
4865  memset(&redirect_pointer, 0, sizeof(redirect_pointer));
4866  redirect_pointer.pointer = reconstructed;
4867 
4869  memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
4870  sizeof(redirect_pointer));
4871 
4872  attrs[natt] = PointerGetDatum(new_datum);
4873  }
4874 
4875  /*
4876  * Build tuple in separate memory & copy tuple back into the tuplebuf
4877  * passed to the output plugin. We can't directly heap_fill_tuple() into
4878  * the tuplebuf because attrs[] will point back into the current content.
4879  */
4880  tmphtup = heap_form_tuple(desc, attrs, isnull);
4881  Assert(newtup->t_len <= MaxHeapTupleSize);
4882  Assert(newtup->t_data == (HeapTupleHeader) ((char *) newtup + HEAPTUPLESIZE));
4883 
4884  memcpy(newtup->t_data, tmphtup->t_data, tmphtup->t_len);
4885  newtup->t_len = tmphtup->t_len;
4886 
4887  /*
4888  * free resources we won't further need, more persistent stuff will be
4889  * free'd in ReorderBufferToastReset().
4890  */
4891  RelationClose(toast_rel);
4892  pfree(tmphtup);
4893  for (natt = 0; natt < desc->natts; natt++)
4894  {
4895  if (free[natt])
4896  pfree(DatumGetPointer(attrs[natt]));
4897  }
4898  pfree(attrs);
4899  pfree(free);
4900  pfree(isnull);
4901 
4902  MemoryContextSwitchTo(oldcontext);
4903 
4904  /* subtract the old change size */
4905  ReorderBufferChangeMemoryUpdate(rb, change, false, old_size);
4906  /* now add the change back, with the correct size */
4907  ReorderBufferChangeMemoryUpdate(rb, change, true,
4908  ReorderBufferChangeSize(change));
4909 }
4910 
4911 /*
4912  * Free all resources allocated for toast reconstruction.
4913  */
4914 static void
4916 {
4917  HASH_SEQ_STATUS hstat;
4918  ReorderBufferToastEnt *ent;
4919 
4920  if (txn->toast_hash == NULL)
4921  return;
4922 
4923  /* sequentially walk over the hash and free everything */
4924  hash_seq_init(&hstat, txn->toast_hash);
4925  while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
4926  {
4927  dlist_mutable_iter it;
4928 
4929  if (ent->reconstructed != NULL)
4930  pfree(ent->reconstructed);
4931 
4932  dlist_foreach_modify(it, &ent->chunks)
4933  {
4934  ReorderBufferChange *change =
4936 
4937  dlist_delete(&change->node);
4938  ReorderBufferReturnChange(rb, change, true);
4939  }
4940  }
4941 
4942  hash_destroy(txn->toast_hash);
4943  txn->toast_hash = NULL;
4944 }
4945 
4946 
4947 /* ---------------------------------------
4948  * Visibility support for logical decoding
4949  *
4950  *
4951  * Lookup actual cmin/cmax values when using decoding snapshot. We can't
4952  * always rely on stored cmin/cmax values because of two scenarios:
4953  *
4954  * * A tuple got changed multiple times during a single transaction and thus
4955  * has got a combo CID. Combo CIDs are only valid for the duration of a
4956  * single transaction.
4957  * * A tuple with a cmin but no cmax (and thus no combo CID) got
4958  * deleted/updated in another transaction than the one which created it
4959  * which we are looking at right now. As only one of cmin, cmax or combo CID
4960  * is actually stored in the heap we don't have access to the value we
4961  * need anymore.
4962  *
4963  * To resolve those problems we have a per-transaction hash of (cmin,
4964  * cmax) tuples keyed by (relfilelocator, ctid) which contains the actual
4965  * (cmin, cmax) values. That also takes care of combo CIDs by simply
4966  * not caring about them at all. As we have the real cmin/cmax values
4967  * combo CIDs aren't interesting.
4968  *
4969  * As we only care about catalog tuples here the overhead of this
4970  * hashtable should be acceptable.
4971  *
4972  * Heap rewrites complicate this a bit, check rewriteheap.c for
4973  * details.
4974  * -------------------------------------------------------------------------
4975  */
4976 
4977 /* struct for sorting mapping files by LSN efficiently */
4978 typedef struct RewriteMappingFile
4979 {
4983 
4984 #ifdef NOT_USED
4985 static void
4986 DisplayMapping(HTAB *tuplecid_data)
4987 {
4988  HASH_SEQ_STATUS hstat;
4990 
4991  hash_seq_init(&hstat, tuplecid_data);
4992  while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
4993  {
4994  elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
4995  ent->key.rlocator.dbOid,
4996  ent->key.rlocator.spcOid,
4997  ent->key.rlocator.relNumber,
5000  ent->cmin,
5001  ent->cmax
5002  );
5003  }
5004 }
5005 #endif
5006 
5007 /*
5008  * Apply a single mapping file to tuplecid_data.
5009  *
5010  * The mapping file has to have been verified to be a) committed b) for our
5011  * transaction c) applied in LSN order.
5012  */
5013 static void
5014 ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
5015 {
5016  char path[MAXPGPATH];
5017  int fd;
5018  int readBytes;
5020 
5021  sprintf(path, "pg_logical/mappings/%s", fname);
5022  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
5023  if (fd < 0)
5024  ereport(ERROR,
5026  errmsg("could not open file \"%s\": %m", path)));
5027 
5028  while (true)
5029  {
5032  ReorderBufferTupleCidEnt *new_ent;
5033  bool found;
5034 
5035  /* be careful about padding */
5036  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
5037 
5038  /* read all mappings till the end of the file */
5039  pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ);
5040  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
5042 
5043  if (readBytes < 0)
5044  ereport(ERROR,
5046  errmsg("could not read file \"%s\": %m",
5047  path)));
5048  else if (readBytes == 0) /* EOF */
5049  break;
5050  else if (readBytes != sizeof(LogicalRewriteMappingData))
5051  ereport(ERROR,
5053  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
5054  path, readBytes,
5055  (int32) sizeof(LogicalRewriteMappingData))));
5056 
5057  key.rlocator = map.old_locator;
5058  ItemPointerCopy(&map.old_tid,
5059  &key.tid);
5060 
5061 
5062  ent = (ReorderBufferTupleCidEnt *)
5064 
5065  /* no existing mapping, no need to update */
5066  if (!ent)
5067  continue;
5068 
5069  key.rlocator = map.new_locator;
5070  ItemPointerCopy(&map.new_tid,
5071  &key.tid);
5072 
5073  new_ent = (ReorderBufferTupleCidEnt *)
5075 
5076  if (found)
5077  {
5078  /*
5079  * Make sure the existing mapping makes sense. We sometime update
5080  * old records that did not yet have a cmax (e.g. pg_class' own
5081  * entry while rewriting it) during rewrites, so allow that.
5082  */
5083  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
5084  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
5085  }
5086  else
5087  {
5088  /* update mapping */
5089  new_ent->cmin = ent->cmin;
5090  new_ent->cmax = ent->cmax;
5091  new_ent->combocid = ent->combocid;
5092  }
5093  }
5094 
5095  if (CloseTransientFile(fd) != 0)
5096  ereport(ERROR,
5098  errmsg("could not close file \"%s\": %m", path)));
5099 }
5100 
5101 
5102 /*
5103  * Check whether the TransactionId 'xid' is in the pre-sorted array 'xip'.
5104  */
5105 static bool
5107 {
5108  return bsearch(&xid, xip, num,
5109  sizeof(TransactionId), xidComparator) != NULL;
5110 }
5111 
5112 /*
5113  * list_sort() comparator for sorting RewriteMappingFiles in LSN order.
5114  */
5115 static int
5116 file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
5117 {
5120 
5121  return pg_cmp_u64(a->lsn, b->lsn);
5122 }
5123 
5124 /*
5125  * Apply any existing logical remapping files if there are any targeted at our
5126  * transaction for relid.
5127  */
5128 static void
5130 {
5131  DIR *mapping_dir;
5132  struct dirent *mapping_de;
5133  List *files = NIL;
5134  ListCell *file;
5135  Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
5136 
5137  mapping_dir = AllocateDir("pg_logical/mappings");
5138  while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
5139  {
5140  Oid f_dboid;
5141  Oid f_relid;
5142  TransactionId f_mapped_xid;
5143  TransactionId f_create_xid;
5144  XLogRecPtr f_lsn;
5145  uint32 f_hi,
5146  f_lo;
5147  RewriteMappingFile *f;
5148 
5149  if (strcmp(mapping_de->d_name, ".") == 0 ||
5150  strcmp(mapping_de->d_name, "..") == 0)
5151  continue;
5152 
5153  /* Ignore files that aren't ours */
5154  if (strncmp(mapping_de->d_name, "map-", 4) != 0)
5155  continue;
5156 
5157  if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
5158  &f_dboid, &f_relid, &f_hi, &f_lo,
5159  &f_mapped_xid, &f_create_xid) != 6)
5160  elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
5161 
5162  f_lsn = ((uint64) f_hi) << 32 | f_lo;
5163 
5164  /* mapping for another database */
5165  if (f_dboid != dboid)
5166  continue;
5167 
5168  /* mapping for another relation */
5169  if (f_relid != relid)
5170  continue;
5171 
5172  /* did the creating transaction abort? */
5173  if (!TransactionIdDidCommit(f_create_xid))
5174  continue;
5175 
5176  /* not for our transaction */
5177  if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
5178  continue;
5179 
5180  /* ok, relevant, queue for apply */
5181  f = palloc(sizeof(RewriteMappingFile));
5182  f->lsn = f_lsn;
5183  strcpy(f->fname, mapping_de->d_name);
5184  files = lappend(files, f);
5185  }
5186  FreeDir(mapping_dir);
5187 
5188  /* sort files so we apply them in LSN order */
5189  list_sort(files, file_sort_by_lsn);
5190 
5191  foreach(file, files)
5192  {
5194 
5195  elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
5196  snapshot->subxip[0]);
5198  pfree(f);
5199  }
5200 }
5201 
5202 /*
5203  * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
5204  * combo CIDs.
5205  */
5206 bool
5208  Snapshot snapshot,
5209  HeapTuple htup, Buffer buffer,
5210  CommandId *cmin, CommandId *cmax)
5211 {
5214  ForkNumber forkno;
5215  BlockNumber blockno;
5216  bool updated_mapping = false;
5217 
5218  /*
5219  * Return unresolved if tuplecid_data is not valid. That's because when
5220  * streaming in-progress transactions we may run into tuples with the CID
5221  * before actually decoding them. Think e.g. about INSERT followed by
5222  * TRUNCATE, where the TRUNCATE may not be decoded yet when applying the
5223  * INSERT. So in such cases, we assume the CID is from the future
5224  * command.
5225  */
5226  if (tuplecid_data == NULL)
5227  return false;
5228 
5229  /* be careful about padding */
5230  memset(&key, 0, sizeof(key));
5231 
5232  Assert(!BufferIsLocal(buffer));
5233 
5234  /*
5235  * get relfilelocator from the buffer, no convenient way to access it
5236  * other than that.
5237  */
5238  BufferGetTag(buffer, &key.rlocator, &forkno, &blockno);
5239 
5240  /* tuples can only be in the main fork */
5241  Assert(forkno == MAIN_FORKNUM);
5242  Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
5243 
5244  ItemPointerCopy(&htup->t_self,
5245  &key.tid);
5246 
5247 restart:
5248  ent = (ReorderBufferTupleCidEnt *)
5250 
5251  /*
5252  * failed to find a mapping, check whether the table was rewritten and
5253  * apply mapping if so, but only do that once - there can be no new
5254  * mappings while we are in here since we have to hold a lock on the
5255  * relation.
5256  */
5257  if (ent == NULL && !updated_mapping)
5258  {
5260  /* now check but don't update for a mapping again */
5261  updated_mapping = true;
5262  goto restart;
5263  }
5264  else if (ent == NULL)
5265  return false;
5266 
5267  if (cmin)
5268  *cmin = ent->cmin;
5269  if (cmax)
5270  *cmax = ent->cmax;
5271  return true;
5272 }
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:138
void binaryheap_replace_first(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:255
bh_node_type binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:177
bh_node_type binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:192
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:39
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:75
void binaryheap_add_unordered(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:116
uint32 BlockNumber
Definition: block.h:31
static int32 next
Definition: blutils.c:221
static void cleanup(void)
Definition: bootstrap.c:682
int Buffer
Definition: buf.h:23
#define BufferIsLocal(buffer)
Definition: buf.h:37
void BufferGetTag(Buffer buffer, RelFileLocator *rlocator, ForkNumber *forknum, BlockNumber *blknum)
Definition: bufmgr.c:3398
#define NameStr(name)
Definition: c.h:733
#define InvalidCommandId
Definition: c.h:656
unsigned int uint32
Definition: c.h:493
signed int int32
Definition: c.h:481
char * Pointer
Definition: c.h:470
#define VARHDRSZ
Definition: c.h:679
#define PG_BINARY
Definition: c.h:1260
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:385
#define FirstCommandId
Definition: c.h:655
uint32 CommandId
Definition: c.h:653
uint32 TransactionId
Definition: c.h:639
size_t Size
Definition: c.h:592
bool IsToastRelation(Relation relation)
Definition: catalog.c:145
bool IsSharedRelation(Oid relationId)
Definition: catalog.c:243
int64 TimestampTz
Definition: timestamp.h:39
#define INDIRECT_POINTER_SIZE
Definition: detoast.h:34
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
Definition: detoast.h:22
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:865
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1395
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1385
struct cursor * cur
Definition: ecpg.c:28
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1779
int errcode_for_file_access(void)
Definition: elog.c:882
void FlushErrorState(void)
Definition: elog.c:1828
int errmsg(const char *fmt,...)
Definition: elog.c:1072
ErrorData * CopyErrorData(void)
Definition: elog.c:1723
#define PG_RE_THROW()
Definition: elog.h:411
#define DEBUG3
Definition: elog.h:28
#define PG_TRY(...)
Definition: elog.h:370
#define DEBUG2
Definition: elog.h:29
#define PG_END_TRY(...)
Definition: elog.h:395
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define PG_CATCH(...)
Definition: elog.h:380
#define elog(elevel,...)
Definition: elog.h:224
#define INFO
Definition: elog.h:34
#define ereport(elevel,...)
Definition: elog.h:149
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2909
int FreeDir(DIR *dir)
Definition: fd.c:2961
int CloseTransientFile(int fd)
Definition: fd.c:2809
void FileClose(File file)
Definition: fd.c:1978
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1575
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2924
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2633
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2843
static ssize_t FileRead(File file, void *buffer, size_t amount, off_t offset, uint32 wait_event_info)
Definition: fd.h:196
int File
Definition: fd.h:51
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: generation.c:160
Oid MyDatabaseId
Definition: globals.c:91
#define free(a)
Definition: header.h:65
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1116
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:1345
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_REMOVE
Definition: hsearch.h:115
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
#define HEAPTUPLESIZE
Definition: htup.h:73
HeapTupleData * HeapTuple
Definition: htup.h:71
struct HeapTupleData HeapTupleData
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
#define SizeofHeapTupleHeader
Definition: htup_details.h:185
#define MaxHeapTupleSize
Definition: htup_details.h:558
static Datum fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull)
Definition: htup_details.h:749
#define dlist_foreach(iter, lhead)
Definition: ilist.h:623
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
#define dclist_container(type, membername, ptr)
Definition: ilist.h:947
static bool dlist_has_next(const dlist_head *head, const dlist_node *node)
Definition: ilist.h:503
static void dclist_push_tail(dclist_head *head, dlist_node *node)
Definition: ilist.h:709
static void dlist_insert_before(dlist_node *before, dlist_node *node)
Definition: ilist.h:393
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:603
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
static uint32 dclist_count(const dclist_head *head)
Definition: ilist.h:932
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:537
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:450
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:640
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:364
static void dclist_delete_from(dclist_head *head, dlist_node *node)
Definition: ilist.h:763
static void dclist_init(dclist_head *head)
Definition: ilist.h:671
#define dlist_container(type, membername, ptr)
Definition: ilist.h:593
#define dclist_foreach(iter, lhead)
Definition: ilist.h:970
static int pg_cmp_u64(uint64 a, uint64 b)
Definition: int.h:501
#define write(a, b, c)
Definition: win32.h:14
#define read(a, b, c)
Definition: win32.h:13
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:705
int b
Definition: isn.c:70
int a
Definition: isn.c:69
int i
Definition: isn.c:73
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
static OffsetNumber ItemPointerGetOffsetNumber(const ItemPointerData *pointer)
Definition: itemptr.h:124
static BlockNumber ItemPointerGetBlockNumber(const ItemPointerData *pointer)
Definition: itemptr.h:103
static void ItemPointerCopy(const ItemPointerData *fromPointer, ItemPointerData *toPointer)
Definition: itemptr.h:172
Assert(fmt[strlen(fmt) - 1] !='\n')
void list_sort(List *list, list_sort_comparator cmp)
Definition: list.c:1674
List * lappend(List *list, void *datum)
Definition: list.c:339
void UpdateDecodingStats(LogicalDecodingContext *ctx)
Definition: logical.c:1924
char * pstrdup(const char *in)
Definition: mcxt.c:1683
void pfree(void *pointer)
Definition: mcxt.c:1508
void * palloc0(Size size)
Definition: mcxt.c:1334
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1202
MemoryContext CurrentMemoryContext
Definition: mcxt.c:131
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1528
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1168
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:442
void * palloc(Size size)
Definition: mcxt.c:1304
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:182
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:183
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:209
void * arg
#define MAXPGPATH
const void * data
#define lfirst(lc)
Definition: pg_list.h:172
#define NIL
Definition: pg_list.h:68
#define sprintf
Definition: port.h:240
#define snprintf
Definition: port.h:238
#define qsort(a, b, c, d)
Definition: port.h:449
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
uintptr_t Datum
Definition: postgres.h:64
static Oid DatumGetObjectId(Datum X)
Definition: postgres.h:242
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:312
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:202
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
static int fd(const char *x, int i)
Definition: preproc-init.c:105
tree context
Definition: radixtree.h:1797
MemoryContextSwitchTo(old_ctx)
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:703
#define RelationGetDescr(relation)
Definition: rel.h:533
#define RelationGetRelationName(relation)
Definition: rel.h:541
#define RelationIsValid(relation)
Definition: rel.h:480
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2062
void RelationClose(Relation relation)
Definition: relcache.c:2193
Oid RelidByRelfilenumber(Oid reltablespace, RelFileNumber relfilenumber)
ForkNumber
Definition: relpath.h:48
@ MAIN_FORKNUM
Definition: relpath.h:50
#define relpathperm(rlocator, forknum)
Definition: relpath.h:90
static int file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
TransactionId * ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb)
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileLocator locator, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
static void ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, TimestampTz abort_time)
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)
static void ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
struct ReorderBufferDiskChange ReorderBufferDiskChange
HeapTuple ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
void ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb)
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
bool ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid)
ReorderBuffer * ReorderBufferAllocate(void)
static ReorderBufferTXN * ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
void ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
void ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
#define IsSpecInsert(action)
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
int logical_decoding_work_mem
static void AssertChangeLsnOrder(ReorderBufferTXN *txn)
static bool ReorderBufferCanStream(ReorderBuffer *rb)
static void ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid)
Oid * ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
struct ReorderBufferIterTXNState ReorderBufferIterTXNState
void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
ReorderBufferTXN * ReorderBufferGetOldestTXN(ReorderBuffer *rb)
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
int debug_logical_replication_streaming
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define IsInsertOrUpdate(action)
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
struct RewriteMappingFile RewriteMappingFile
void ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
#define CHANGES_THRESHOLD
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *data)
void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static void ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
void ReorderBufferReturnTupleBuf(HeapTuple tuple)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
static void SetupCheckXidLive(TransactionId xid)
static bool TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
static void ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
bool ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb)
struct TXNEntryFile TXNEntryFile
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
struct ReorderBufferToastEnt ReorderBufferToastEnt
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
void ReorderBufferFree(ReorderBuffer *rb)
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
#define IsSpecConfirmOrAbort(action)
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
static const Size max_changes_in_memory
void StartupReorderBuffer(void)
void ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition, Size sz)
static ReorderBufferTXN * ReorderBufferGetTXN(ReorderBuffer *rb)
static void ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)
#define rbtxn_prepared(txn)
#define rbtxn_has_streamable_change(txn)
#define rbtxn_has_catalog_changes(txn)
@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE
Definition: reorderbuffer.h:28
@ DEBUG_LOGICAL_REP_STREAMING_BUFFERED
Definition: reorderbuffer.h:27
#define rbtxn_is_serialized_clear(txn)
#define RBTXN_IS_STREAMED
#define RBTXN_HAS_PARTIAL_CHANGE
#define rbtxn_is_streamed(txn)
struct ReorderBufferChange ReorderBufferChange
#define rbtxn_is_toptxn(txn)
#define rbtxn_get_toptxn(txn)
#define rbtxn_is_known_subxact(txn)
#define rbtxn_is_subtxn(txn)
#define RBTXN_HAS_CATALOG_CHANGES
#define RBTXN_IS_SERIALIZED_CLEAR
#define RBTXN_PREPARE
#define RBTXN_SKIPPED_PREPARE
#define RBTXN_HAS_STREAMABLE_CHANGE
@ REORDER_BUFFER_CHANGE_INVALIDATION
Definition: reorderbuffer.h:50
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
Definition: reorderbuffer.h:55
@ REORDER_BUFFER_CHANGE_INSERT
Definition: reorderbuffer.h:46
@ REORDER_BUFFER_CHANGE_MESSAGE
Definition: reorderbuffer.h:49
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT
Definition: reorderbuffer.h:56
@ REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID
Definition: reorderbuffer.h:52
@ REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID
Definition: reorderbuffer.h:53
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT
Definition: reorderbuffer.h:54
@ REORDER_BUFFER_CHANGE_TRUNCATE
Definition: reorderbuffer.h:57
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:48
@ REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT
Definition: reorderbuffer.h:51
@ REORDER_BUFFER_CHANGE_UPDATE
Definition: reorderbuffer.h:47
#define RBTXN_IS_SERIALIZED
#define rbtxn_is_serialized(txn)
#define RBTXN_IS_SUBXACT
#define rbtxn_has_partial_change(txn)
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
static pg_noinline void Size size
Definition: slab.c:607
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:322
ReplicationSlot * MyReplicationSlot
Definition: slot.c:138
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:252
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:457
bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
Definition: snapbuild.c:433
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
Definition: snapbuild.c:406
@ SNAPBUILD_CONSISTENT
Definition: snapbuild.h:46
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:1665
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:1649
static HTAB * tuplecid_data
Definition: snapmgr.c:102
struct SnapshotData * Snapshot
Definition: snapshot.h:121
struct SnapshotData SnapshotData
Definition: dirent.c:26
int sqlerrcode
Definition: elog.h:438
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
MemoryContext hcxt
Definition: hsearch.h:86
Definition: dynahash.c:220
ItemPointerData t_self
Definition: htup.h:65
uint32 t_len
Definition: htup.h:64
HeapTupleHeader t_data
Definition: htup.h:68
Oid t_tableOid
Definition: htup.h:66
Definition: pg_list.h:54
XLogReaderState * reader
Definition: logical.h:42
struct SnapBuild * snapshot_builder
Definition: logical.h:44
ItemPointerData new_tid
Definition: rewriteheap.h:40
RelFileLocator old_locator
Definition: rewriteheap.h:37
ItemPointerData old_tid
Definition: rewriteheap.h:39
RelFileLocator new_locator
Definition: rewriteheap.h:38
RelFileNumber relNumber
Form_pg_class rd_rel
Definition: rel.h:111
ReorderBufferChangeType action
Definition: reorderbuffer.h:75
struct ReorderBufferChange::@104::@109 inval
union ReorderBufferChange::@104 data
struct ReorderBufferChange::@104::@108 tuplecid
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:78
RepOriginId origin_id
Definition: reorderbuffer.h:80
struct ReorderBufferChange::@104::@105 tp
struct ReorderBufferChange::@104::@107 msg
struct ReorderBufferChange::@104::@106 truncate
ReorderBufferChange change
ReorderBufferChange * change
ReorderBufferTXN * txn
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
ReorderBufferTXN * txn
CommandId command_id
XLogRecPtr restart_decoding_lsn
TimestampTz commit_time
XLogRecPtr base_snapshot_lsn
Snapshot snapshot_now
TransactionId toplevel_xid
dlist_node catchange_node
Snapshot base_snapshot
SharedInvalidationMessage * invalidations
RepOriginId origin_id
struct ReorderBufferTXN * toptxn
dlist_head tuplecids
XLogRecPtr first_lsn
TimestampTz abort_time
XLogRecPtr final_lsn
void * output_plugin_private
XLogRecPtr end_lsn
XLogRecPtr origin_lsn
TimestampTz prepare_time
TransactionId xid
union ReorderBufferTXN::@110 xact_time
dlist_node base_snapshot_node
dlist_head changes
dlist_head subtxns
struct varlena * reconstructed
ReorderBufferTupleCidKey key
ReorderBufferStreamMessageCB stream_message
ReorderBufferStreamChangeCB stream_change
ReorderBufferBeginCB begin_prepare
ReorderBufferStreamTruncateCB stream_truncate
ReorderBufferCommitPreparedCB commit_prepared
ReorderBufferUpdateProgressTxnCB update_progress_txn
ReorderBufferMessageCB message
dlist_head txns_by_base_snapshot_lsn
MemoryContext context
dclist_head catchange_txns
ReorderBufferRollbackPreparedCB rollback_prepared
ReorderBufferPrepareCB prepare
ReorderBufferStreamStopCB stream_stop
ReorderBufferApplyChangeCB apply_change
MemoryContext change_context
ReorderBufferTXN * by_txn_last_txn
TransactionId by_txn_last_xid
ReorderBufferStreamPrepareCB stream_prepare
ReorderBufferStreamAbortCB stream_abort
MemoryContext tup_context
ReorderBufferCommitCB commit
ReorderBufferStreamStartCB stream_start
ReorderBufferStreamCommitCB stream_commit
ReorderBufferApplyTruncateCB apply_truncate
dlist_head toplevel_by_lsn
ReorderBufferBeginCB begin
MemoryContext txn_context
XLogRecPtr current_restart_decoding_lsn
void * private_data
ReplicationSlotPersistentData data
Definition: slot.h:178
char fname[MAXPGPATH]
TransactionId xmin
Definition: snapshot.h:157
int32 subxcnt
Definition: snapshot.h:181
bool copied
Definition: snapshot.h:185
uint32 regd_count
Definition: snapshot.h:205
uint32 active_count
Definition: snapshot.h:204
CommandId curcid
Definition: snapshot.h:187
uint32 xcnt
Definition: snapshot.h:169
TransactionId * subxip
Definition: snapshot.h:180
TransactionId * xip
Definition: snapshot.h:168
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207
XLogRecPtr ReadRecPtr
Definition: xlogreader.h:206
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15
dlist_node * cur
Definition: ilist.h:179
dlist_node * cur
Definition: ilist.h:200
unsigned short st_mode
Definition: win32_port.h:268
Definition: regguts.h:323
int32 va_rawsize
Definition: varatt.h:34
Oid va_valueid
Definition: varatt.h:37
struct varlena * pointer
Definition: varatt.h:59
Definition: c.h:674
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:126
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
#define TransactionIdIsValid(xid)
Definition: transam.h:41
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
#define VARHDRSZ_SHORT
Definition: varatt.h:255
#define VARSIZE_SHORT(PTR)
Definition: varatt.h:281
#define VARATT_IS_EXTENDED(PTR)
Definition: varatt.h:303
#define VARATT_IS_SHORT(PTR)
Definition: varatt.h:302
#define SET_VARSIZE_COMPRESSED(PTR, len)
Definition: varatt.h:307
#define VARDATA(PTR)
Definition: varatt.h:278
#define SET_VARTAG_EXTERNAL(PTR, tag)
Definition: varatt.h:309
#define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)
Definition: varatt.h:354
#define VARDATA_EXTERNAL(PTR)
Definition: varatt.h:286
#define SET_VARSIZE(PTR, len)
Definition: varatt.h:305
#define VARSIZE(PTR)
Definition: varatt.h:279
#define VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer)
Definition: varatt.h:334
#define VARATT_IS_EXTERNAL(PTR)
Definition: varatt.h:289
@ VARTAG_INDIRECT
Definition: varatt.h:86
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:88
static void pgstat_report_wait_end(void)
Definition: wait_event.h:104
#define lstat(path, sb)
Definition: win32_port.h:285
#define S_ISDIR(m)
Definition: win32_port.h:325
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4946
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4656
TransactionId CheckXidAlive
Definition: xact.c:97
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4758
void StartTransactionCommand(void)
Definition: xact.c:2995
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:468
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:451
void AbortCurrentTransaction(void)
Definition: xact.c:3391
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:139
int wal_segment_size
Definition: xlog.c:143
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint64 XLogSegNo
Definition: xlogdefs.h:48