PostgreSQL Source Code  git master
reorderbuffer.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * reorderbuffer.c
4  * PostgreSQL logical replay/reorder buffer management
5  *
6  *
7  * Copyright (c) 2012-2024, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/replication/logical/reorderbuffer.c
12  *
13  * NOTES
14  * This module gets handed individual pieces of transactions in the order
15  * they are written to the WAL and is responsible to reassemble them into
16  * toplevel transaction sized pieces. When a transaction is completely
17  * reassembled - signaled by reading the transaction commit record - it
18  * will then call the output plugin (cf. ReorderBufferCommit()) with the
19  * individual changes. The output plugins rely on snapshots built by
20  * snapbuild.c which hands them to us.
21  *
22  * Transactions and subtransactions/savepoints in postgres are not
23  * immediately linked to each other from outside the performing
24  * backend. Only at commit/abort (or special xact_assignment records) they
25  * are linked together. Which means that we will have to splice together a
26  * toplevel transaction from its subtransactions. To do that efficiently we
27  * build a binary heap indexed by the smallest current lsn of the individual
28  * subtransactions' changestreams. As the individual streams are inherently
29  * ordered by LSN - since that is where we build them from - the transaction
30  * can easily be reassembled by always using the subtransaction with the
31  * smallest current LSN from the heap.
32  *
33  * In order to cope with large transactions - which can be several times as
34  * big as the available memory - this module supports spooling the contents
35  * of large transactions to disk. When the transaction is replayed the
36  * contents of individual (sub-)transactions will be read from disk in
37  * chunks.
38  *
39  * This module also has to deal with reassembling toast records from the
40  * individual chunks stored in WAL. When a new (or initial) version of a
41  * tuple is stored in WAL it will always be preceded by the toast chunks
42  * emitted for the columns stored out of line. Within a single toplevel
43  * transaction there will be no other data carrying records between a row's
44  * toast chunks and the row data itself. See ReorderBufferToast* for
45  * details.
46  *
47  * ReorderBuffer uses two special memory context types - SlabContext for
48  * allocations of fixed-length structures (changes and transactions), and
49  * GenerationContext for the variable-length transaction data (allocated
50  * and freed in groups with similar lifespans).
51  *
52  * To limit the amount of memory used by decoded changes, we track memory
53  * used at the reorder buffer level (i.e. total amount of memory), and for
54  * each transaction. When the total amount of used memory exceeds the
55  * limit, the transaction consuming the most memory is then serialized to
56  * disk.
57  *
58  * Only decoded changes are evicted from memory (spilled to disk), not the
59  * transaction records. The number of toplevel transactions is limited,
60  * but a transaction with many subtransactions may still consume significant
61  * amounts of memory. However, the transaction records are fairly small and
62  * are not included in the memory limit.
63  *
64  * The current eviction algorithm is very simple - the transaction is
65  * picked merely by size, while it might be useful to also consider age
66  * (LSN) of the changes for example. With the new Generational memory
67  * allocator, evicting the oldest changes would make it more likely the
68  * memory gets actually freed.
69  *
70  * We use a max-heap with transaction size as the key to efficiently find
71  * the largest transaction. We update the max-heap whenever the memory
72  * counter is updated; however transactions with size 0 are not stored in
73  * the heap, because they have no changes to evict.
74  *
75  * We still rely on max_changes_in_memory when loading serialized changes
76  * back into memory. At that point we can't use the memory limit directly
77  * as we load the subxacts independently. One option to deal with this
78  * would be to count the subxacts, and allow each to allocate 1/N of the
79  * memory limit. That however does not seem very appealing, because with
80  * many subtransactions it may easily cause thrashing (short cycles of
81  * deserializing and applying very few changes). We probably should give
82  * a bit more memory to the oldest subtransactions, because it's likely
83  * they are the source for the next sequence of changes.
84  *
85  * -------------------------------------------------------------------------
86  */
87 #include "postgres.h"
88 
89 #include <unistd.h>
90 #include <sys/stat.h>
91 
92 #include "access/detoast.h"
93 #include "access/heapam.h"
94 #include "access/rewriteheap.h"
95 #include "access/transam.h"
96 #include "access/xact.h"
97 #include "access/xlog_internal.h"
98 #include "catalog/catalog.h"
99 #include "common/int.h"
100 #include "lib/binaryheap.h"
101 #include "miscadmin.h"
102 #include "pgstat.h"
103 #include "replication/logical.h"
105 #include "replication/slot.h"
106 #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
107 #include "storage/bufmgr.h"
108 #include "storage/fd.h"
109 #include "storage/sinval.h"
110 #include "utils/builtins.h"
111 #include "utils/memutils.h"
112 #include "utils/rel.h"
113 #include "utils/relfilenumbermap.h"
114 
115 /* entry for a hash table we use to map from xid to our transaction state */
117 {
121 
122 /* data structures for (relfilelocator, ctid) => (cmin, cmax) mapping */
124 {
128 
130 {
134  CommandId combocid; /* just for debugging */
136 
137 /* Virtual file descriptor with file offset tracking */
138 typedef struct TXNEntryFile
139 {
140  File vfd; /* -1 when the file is closed */
141  off_t curOffset; /* offset for next write or read. Reset to 0
142  * when vfd is opened. */
144 
145 /* k-way in-order change iteration support structures */
147 {
154 
156 {
162 
163 /* toast datastructures */
164 typedef struct ReorderBufferToastEnt
165 {
166  Oid chunk_id; /* toast_table.chunk_id */
167  int32 last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
168  * have seen */
169  Size num_chunks; /* number of chunks we've already seen */
170  Size size; /* combined size of chunks seen */
171  dlist_head chunks; /* linked list of chunks */
172  struct varlena *reconstructed; /* reconstructed varlena now pointed to in
173  * main tup */
175 
176 /* Disk serialization support datastructures */
178 {
181  /* data follows */
183 
184 #define IsSpecInsert(action) \
185 ( \
186  ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \
187 )
188 #define IsSpecConfirmOrAbort(action) \
189 ( \
190  (((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) || \
191  ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT)) \
192 )
193 #define IsInsertOrUpdate(action) \
194 ( \
195  (((action) == REORDER_BUFFER_CHANGE_INSERT) || \
196  ((action) == REORDER_BUFFER_CHANGE_UPDATE) || \
197  ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT)) \
198 )
199 
200 /*
201  * Maximum number of changes kept in memory, per transaction. After that,
202  * changes are spooled to disk.
203  *
204  * The current value should be sufficient to decode the entire transaction
205  * without hitting disk in OLTP workloads, while starting to spool to disk in
206  * other workloads reasonably fast.
207  *
208  * At some point in the future it probably makes sense to have a more elaborate
209  * resource management here, but it's not entirely clear what that would look
210  * like.
211  */
213 static const Size max_changes_in_memory = 4096; /* XXX for restore only */
214 
215 /* GUC variable */
217 
218 /* ---------------------------------------
219  * primary reorderbuffer support routines
220  * ---------------------------------------
221  */
225  TransactionId xid, bool create, bool *is_new,
226  XLogRecPtr lsn, bool create_as_top);
228  ReorderBufferTXN *subtxn);
229 
230 static void AssertTXNLsnOrder(ReorderBuffer *rb);
231 
232 /* ---------------------------------------
233  * support functions for lsn-order iterating over the ->changes of a
234  * transaction and its subtransactions
235  *
236  * used for iteration over the k-way heap merge of a transaction and its
237  * subtransactions
238  * ---------------------------------------
239  */
241  ReorderBufferIterTXNState *volatile *iter_state);
246 
247 /*
248  * ---------------------------------------
249  * Disk serialization support functions
250  * ---------------------------------------
251  */
255  int fd, ReorderBufferChange *change);
257  TXNEntryFile *file, XLogSegNo *segno);
259  char *data);
262  bool txn_prepared);
263 static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
264 static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
265  TransactionId xid, XLogSegNo segno);
266 static int ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg);
267 
268 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
270  ReorderBufferTXN *txn, CommandId cid);
271 
272 /*
273  * ---------------------------------------
274  * Streaming support functions
275  * ---------------------------------------
276  */
277 static inline bool ReorderBufferCanStream(ReorderBuffer *rb);
278 static inline bool ReorderBufferCanStartStreaming(ReorderBuffer *rb);
281 
282 /* ---------------------------------------
283  * toast reassembly support
284  * ---------------------------------------
285  */
289  Relation relation, ReorderBufferChange *change);
291  Relation relation, ReorderBufferChange *change);
292 
293 /*
294  * ---------------------------------------
295  * memory accounting
296  * ---------------------------------------
297  */
300  ReorderBufferChange *change,
301  ReorderBufferTXN *txn,
302  bool addition, Size sz);
303 
304 /*
305  * Allocate a new ReorderBuffer and clean out any old serialized state from
306  * prior ReorderBuffer instances for the same slot.
307  */
310 {
311  ReorderBuffer *buffer;
312  HASHCTL hash_ctl;
313  MemoryContext new_ctx;
314 
315  Assert(MyReplicationSlot != NULL);
316 
317  /* allocate memory in own context, to have better accountability */
319  "ReorderBuffer",
321 
322  buffer =
323  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
324 
325  memset(&hash_ctl, 0, sizeof(hash_ctl));
326 
327  buffer->context = new_ctx;
328 
329  buffer->change_context = SlabContextCreate(new_ctx,
330  "Change",
332  sizeof(ReorderBufferChange));
333 
334  buffer->txn_context = SlabContextCreate(new_ctx,
335  "TXN",
337  sizeof(ReorderBufferTXN));
338 
339  /*
340  * XXX the allocation sizes used below pre-date generation context's block
341  * growing code. These values should likely be benchmarked and set to
342  * more suitable values.
343  */
344  buffer->tup_context = GenerationContextCreate(new_ctx,
345  "Tuples",
349 
350  hash_ctl.keysize = sizeof(TransactionId);
351  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
352  hash_ctl.hcxt = buffer->context;
353 
354  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
356 
358  buffer->by_txn_last_txn = NULL;
359 
360  buffer->outbuf = NULL;
361  buffer->outbufsize = 0;
362  buffer->size = 0;
363 
364  /* txn_heap is ordered by transaction size */
366 
367  buffer->spillTxns = 0;
368  buffer->spillCount = 0;
369  buffer->spillBytes = 0;
370  buffer->streamTxns = 0;
371  buffer->streamCount = 0;
372  buffer->streamBytes = 0;
373  buffer->totalTxns = 0;
374  buffer->totalBytes = 0;
375 
377 
378  dlist_init(&buffer->toplevel_by_lsn);
380  dclist_init(&buffer->catchange_txns);
381 
382  /*
383  * Ensure there's no stale data from prior uses of this slot, in case some
384  * prior exit avoided calling ReorderBufferFree. Failure to do this can
385  * produce duplicated txns, and it's very cheap if there's nothing there.
386  */
388 
389  return buffer;
390 }
391 
392 /*
393  * Free a ReorderBuffer
394  */
395 void
397 {
399 
400  /*
401  * We free separately allocated data by entirely scrapping reorderbuffer's
402  * memory context.
403  */
405 
406  /* Free disk space used by unconsumed reorder buffers */
408 }
409 
410 /*
411  * Get an unused, possibly preallocated, ReorderBufferTXN.
412  */
413 static ReorderBufferTXN *
415 {
416  ReorderBufferTXN *txn;
417 
418  txn = (ReorderBufferTXN *)
420 
421  memset(txn, 0, sizeof(ReorderBufferTXN));
422 
423  dlist_init(&txn->changes);
424  dlist_init(&txn->tuplecids);
425  dlist_init(&txn->subtxns);
426 
427  /* InvalidCommandId is not zero, so set it explicitly */
429  txn->output_plugin_private = NULL;
430 
431  return txn;
432 }
433 
434 /*
435  * Free a ReorderBufferTXN.
436  */
437 static void
439 {
440  /* clean the lookup cache if we were cached (quite likely) */
441  if (rb->by_txn_last_xid == txn->xid)
442  {
444  rb->by_txn_last_txn = NULL;
445  }
446 
447  /* free data that's contained */
448 
449  if (txn->gid != NULL)
450  {
451  pfree(txn->gid);
452  txn->gid = NULL;
453  }
454 
455  if (txn->tuplecid_hash != NULL)
456  {
458  txn->tuplecid_hash = NULL;
459  }
460 
461  if (txn->invalidations)
462  {
463  pfree(txn->invalidations);
464  txn->invalidations = NULL;
465  }
466 
467  /* Reset the toast hash */
468  ReorderBufferToastReset(rb, txn);
469 
470  /* All changes must be deallocated */
471  Assert(txn->size == 0);
472 
473  pfree(txn);
474 }
475 
476 /*
477  * Get a fresh ReorderBufferChange.
478  */
481 {
482  ReorderBufferChange *change;
483 
484  change = (ReorderBufferChange *)
486 
487  memset(change, 0, sizeof(ReorderBufferChange));
488  return change;
489 }
490 
491 /*
492  * Free a ReorderBufferChange and update memory accounting, if requested.
493  */
494 void
496  bool upd_mem)
497 {
498  /* update memory accounting info */
499  if (upd_mem)
500  ReorderBufferChangeMemoryUpdate(rb, change, NULL, false,
501  ReorderBufferChangeSize(change));
502 
503  /* free contained data */
504  switch (change->action)
505  {
510  if (change->data.tp.newtuple)
511  {
512  ReorderBufferReturnTupleBuf(change->data.tp.newtuple);
513  change->data.tp.newtuple = NULL;
514  }
515 
516  if (change->data.tp.oldtuple)
517  {
518  ReorderBufferReturnTupleBuf(change->data.tp.oldtuple);
519  change->data.tp.oldtuple = NULL;
520  }
521  break;
523  if (change->data.msg.prefix != NULL)
524  pfree(change->data.msg.prefix);
525  change->data.msg.prefix = NULL;
526  if (change->data.msg.message != NULL)
527  pfree(change->data.msg.message);
528  change->data.msg.message = NULL;
529  break;
531  if (change->data.inval.invalidations)
532  pfree(change->data.inval.invalidations);
533  change->data.inval.invalidations = NULL;
534  break;
536  if (change->data.snapshot)
537  {
538  ReorderBufferFreeSnap(rb, change->data.snapshot);
539  change->data.snapshot = NULL;
540  }
541  break;
542  /* no data in addition to the struct itself */
544  if (change->data.truncate.relids != NULL)
545  {
546  ReorderBufferReturnRelids(rb, change->data.truncate.relids);
547  change->data.truncate.relids = NULL;
548  }
549  break;
554  break;
555  }
556 
557  pfree(change);
558 }
559 
560 /*
561  * Get a fresh HeapTuple fitting a tuple of size tuple_len (excluding header
562  * overhead).
563  */
564 HeapTuple
566 {
567  HeapTuple tuple;
568  Size alloc_len;
569 
570  alloc_len = tuple_len + SizeofHeapTupleHeader;
571 
573  HEAPTUPLESIZE + alloc_len);
574  tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
575 
576  return tuple;
577 }
578 
579 /*
580  * Free a HeapTuple returned by ReorderBufferGetTupleBuf().
581  */
582 void
584 {
585  pfree(tuple);
586 }
587 
588 /*
589  * Get an array for relids of truncated relations.
590  *
591  * We use the global memory context (for the whole reorder buffer), because
592  * none of the existing ones seems like a good match (some are SLAB, so we
593  * can't use those, and tup_context is meant for tuple data, not relids). We
594  * could add yet another context, but it seems like an overkill - TRUNCATE is
595  * not particularly common operation, so it does not seem worth it.
596  */
597 Oid *
599 {
600  Oid *relids;
601  Size alloc_len;
602 
603  alloc_len = sizeof(Oid) * nrelids;
604 
605  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
606 
607  return relids;
608 }
609 
610 /*
611  * Free an array of relids.
612  */
613 void
615 {
616  pfree(relids);
617 }
618 
619 /*
620  * Return the ReorderBufferTXN from the given buffer, specified by Xid.
621  * If create is true, and a transaction doesn't already exist, create it
622  * (with the given LSN, and as top transaction if that's specified);
623  * when this happens, is_new is set to true.
624  */
625 static ReorderBufferTXN *
627  bool *is_new, XLogRecPtr lsn, bool create_as_top)
628 {
629  ReorderBufferTXN *txn;
631  bool found;
632 
634 
635  /*
636  * Check the one-entry lookup cache first
637  */
639  rb->by_txn_last_xid == xid)
640  {
641  txn = rb->by_txn_last_txn;
642 
643  if (txn != NULL)
644  {
645  /* found it, and it's valid */
646  if (is_new)
647  *is_new = false;
648  return txn;
649  }
650 
651  /*
652  * cached as non-existent, and asked not to create? Then nothing else
653  * to do.
654  */
655  if (!create)
656  return NULL;
657  /* otherwise fall through to create it */
658  }
659 
660  /*
661  * If the cache wasn't hit or it yielded a "does-not-exist" and we want to
662  * create an entry.
663  */
664 
665  /* search the lookup table */
666  ent = (ReorderBufferTXNByIdEnt *)
667  hash_search(rb->by_txn,
668  &xid,
669  create ? HASH_ENTER : HASH_FIND,
670  &found);
671  if (found)
672  txn = ent->txn;
673  else if (create)
674  {
675  /* initialize the new entry, if creation was requested */
676  Assert(ent != NULL);
677  Assert(lsn != InvalidXLogRecPtr);
678 
679  ent->txn = ReorderBufferGetTXN(rb);
680  ent->txn->xid = xid;
681  txn = ent->txn;
682  txn->first_lsn = lsn;
684 
685  if (create_as_top)
686  {
687  dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
688  AssertTXNLsnOrder(rb);
689  }
690  }
691  else
692  txn = NULL; /* not found and not asked to create */
693 
694  /* update cache */
695  rb->by_txn_last_xid = xid;
696  rb->by_txn_last_txn = txn;
697 
698  if (is_new)
699  *is_new = !found;
700 
701  Assert(!create || txn != NULL);
702  return txn;
703 }
704 
705 /*
706  * Record the partial change for the streaming of in-progress transactions. We
707  * can stream only complete changes so if we have a partial change like toast
708  * table insert or speculative insert then we mark such a 'txn' so that it
709  * can't be streamed. We also ensure that if the changes in such a 'txn' can
710  * be streamed and are above logical_decoding_work_mem threshold then we stream
711  * them as soon as we have a complete change.
712  */
713 static void
715  ReorderBufferChange *change,
716  bool toast_insert)
717 {
718  ReorderBufferTXN *toptxn;
719 
720  /*
721  * The partial changes need to be processed only while streaming
722  * in-progress transactions.
723  */
724  if (!ReorderBufferCanStream(rb))
725  return;
726 
727  /* Get the top transaction. */
728  toptxn = rbtxn_get_toptxn(txn);
729 
730  /*
731  * Indicate a partial change for toast inserts. The change will be
732  * considered as complete once we get the insert or update on the main
733  * table and we are sure that the pending toast chunks are not required
734  * anymore.
735  *
736  * If we allow streaming when there are pending toast chunks then such
737  * chunks won't be released till the insert (multi_insert) is complete and
738  * we expect the txn to have streamed all changes after streaming. This
739  * restriction is mainly to ensure the correctness of streamed
740  * transactions and it doesn't seem worth uplifting such a restriction
741  * just to allow this case because anyway we will stream the transaction
742  * once such an insert is complete.
743  */
744  if (toast_insert)
746  else if (rbtxn_has_partial_change(toptxn) &&
747  IsInsertOrUpdate(change->action) &&
748  change->data.tp.clear_toast_afterwards)
750 
751  /*
752  * Indicate a partial change for speculative inserts. The change will be
753  * considered as complete once we get the speculative confirm or abort
754  * token.
755  */
756  if (IsSpecInsert(change->action))
758  else if (rbtxn_has_partial_change(toptxn) &&
759  IsSpecConfirmOrAbort(change->action))
761 
762  /*
763  * Stream the transaction if it is serialized before and the changes are
764  * now complete in the top-level transaction.
765  *
766  * The reason for doing the streaming of such a transaction as soon as we
767  * get the complete change for it is that previously it would have reached
768  * the memory threshold and wouldn't get streamed because of incomplete
769  * changes. Delaying such transactions would increase apply lag for them.
770  */
772  !(rbtxn_has_partial_change(toptxn)) &&
773  rbtxn_is_serialized(txn) &&
775  ReorderBufferStreamTXN(rb, toptxn);
776 }
777 
778 /*
779  * Queue a change into a transaction so it can be replayed upon commit or will be
780  * streamed when we reach logical_decoding_work_mem threshold.
781  */
782 void
784  ReorderBufferChange *change, bool toast_insert)
785 {
786  ReorderBufferTXN *txn;
787 
788  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
789 
790  /*
791  * While streaming the previous changes we have detected that the
792  * transaction is aborted. So there is no point in collecting further
793  * changes for it.
794  */
795  if (txn->concurrent_abort)
796  {
797  /*
798  * We don't need to update memory accounting for this change as we
799  * have not added it to the queue yet.
800  */
801  ReorderBufferReturnChange(rb, change, false);
802  return;
803  }
804 
805  /*
806  * The changes that are sent downstream are considered streamable. We
807  * remember such transactions so that only those will later be considered
808  * for streaming.
809  */
810  if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
816  {
817  ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
818 
820  }
821 
822  change->lsn = lsn;
823  change->txn = txn;
824 
825  Assert(InvalidXLogRecPtr != lsn);
826  dlist_push_tail(&txn->changes, &change->node);
827  txn->nentries++;
828  txn->nentries_mem++;
829 
830  /* update memory accounting information */
831  ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
832  ReorderBufferChangeSize(change));
833 
834  /* process partial change */
835  ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
836 
837  /* check the memory limits and evict something if needed */
839 }
840 
841 /*
842  * A transactional message is queued to be processed upon commit and a
843  * non-transactional message gets processed immediately.
844  */
845 void
847  Snapshot snap, XLogRecPtr lsn,
848  bool transactional, const char *prefix,
849  Size message_size, const char *message)
850 {
851  if (transactional)
852  {
853  MemoryContext oldcontext;
854  ReorderBufferChange *change;
855 
857 
858  /*
859  * We don't expect snapshots for transactional changes - we'll use the
860  * snapshot derived later during apply (unless the change gets
861  * skipped).
862  */
863  Assert(!snap);
864 
865  oldcontext = MemoryContextSwitchTo(rb->context);
866 
867  change = ReorderBufferGetChange(rb);
869  change->data.msg.prefix = pstrdup(prefix);
870  change->data.msg.message_size = message_size;
871  change->data.msg.message = palloc(message_size);
872  memcpy(change->data.msg.message, message, message_size);
873 
874  ReorderBufferQueueChange(rb, xid, lsn, change, false);
875 
876  MemoryContextSwitchTo(oldcontext);
877  }
878  else
879  {
880  ReorderBufferTXN *txn = NULL;
881  volatile Snapshot snapshot_now = snap;
882 
883  /* Non-transactional changes require a valid snapshot. */
884  Assert(snapshot_now);
885 
886  if (xid != InvalidTransactionId)
887  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
888 
889  /* setup snapshot to allow catalog access */
890  SetupHistoricSnapshot(snapshot_now, NULL);
891  PG_TRY();
892  {
893  rb->message(rb, txn, lsn, false, prefix, message_size, message);
894 
896  }
897  PG_CATCH();
898  {
900  PG_RE_THROW();
901  }
902  PG_END_TRY();
903  }
904 }
905 
906 /*
907  * AssertTXNLsnOrder
908  * Verify LSN ordering of transaction lists in the reorderbuffer
909  *
910  * Other LSN-related invariants are checked too.
911  *
912  * No-op if assertions are not in use.
913  */
914 static void
916 {
917 #ifdef USE_ASSERT_CHECKING
919  dlist_iter iter;
920  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
921  XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
922 
923  /*
924  * Skip the verification if we don't reach the LSN at which we start
925  * decoding the contents of transactions yet because until we reach the
926  * LSN, we could have transactions that don't have the association between
927  * the top-level transaction and subtransaction yet and consequently have
928  * the same LSN. We don't guarantee this association until we try to
929  * decode the actual contents of transaction. The ordering of the records
930  * prior to the start_decoding_at LSN should have been checked before the
931  * restart.
932  */
934  return;
935 
936  dlist_foreach(iter, &rb->toplevel_by_lsn)
937  {
939  iter.cur);
940 
941  /* start LSN must be set */
942  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
943 
944  /* If there is an end LSN, it must be higher than start LSN */
945  if (cur_txn->end_lsn != InvalidXLogRecPtr)
946  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
947 
948  /* Current initial LSN must be strictly higher than previous */
949  if (prev_first_lsn != InvalidXLogRecPtr)
950  Assert(prev_first_lsn < cur_txn->first_lsn);
951 
952  /* known-as-subtxn txns must not be listed */
953  Assert(!rbtxn_is_known_subxact(cur_txn));
954 
955  prev_first_lsn = cur_txn->first_lsn;
956  }
957 
959  {
961  base_snapshot_node,
962  iter.cur);
963 
964  /* base snapshot (and its LSN) must be set */
965  Assert(cur_txn->base_snapshot != NULL);
967 
968  /* current LSN must be strictly higher than previous */
969  if (prev_base_snap_lsn != InvalidXLogRecPtr)
970  Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
971 
972  /* known-as-subtxn txns must not be listed */
973  Assert(!rbtxn_is_known_subxact(cur_txn));
974 
975  prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
976  }
977 #endif
978 }
979 
980 /*
981  * AssertChangeLsnOrder
982  *
983  * Check ordering of changes in the (sub)transaction.
984  */
985 static void
987 {
988 #ifdef USE_ASSERT_CHECKING
989  dlist_iter iter;
990  XLogRecPtr prev_lsn = txn->first_lsn;
991 
992  dlist_foreach(iter, &txn->changes)
993  {
994  ReorderBufferChange *cur_change;
995 
996  cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
997 
999  Assert(cur_change->lsn != InvalidXLogRecPtr);
1000  Assert(txn->first_lsn <= cur_change->lsn);
1001 
1002  if (txn->end_lsn != InvalidXLogRecPtr)
1003  Assert(cur_change->lsn <= txn->end_lsn);
1004 
1005  Assert(prev_lsn <= cur_change->lsn);
1006 
1007  prev_lsn = cur_change->lsn;
1008  }
1009 #endif
1010 }
1011 
1012 /*
1013  * ReorderBufferGetOldestTXN
1014  * Return oldest transaction in reorderbuffer
1015  */
1018 {
1019  ReorderBufferTXN *txn;
1020 
1021  AssertTXNLsnOrder(rb);
1022 
1023  if (dlist_is_empty(&rb->toplevel_by_lsn))
1024  return NULL;
1025 
1027 
1030  return txn;
1031 }
1032 
1033 /*
1034  * ReorderBufferGetOldestXmin
1035  * Return oldest Xmin in reorderbuffer
1036  *
1037  * Returns oldest possibly running Xid from the point of view of snapshots
1038  * used in the transactions kept by reorderbuffer, or InvalidTransactionId if
1039  * there are none.
1040  *
1041  * Since snapshots are assigned monotonically, this equals the Xmin of the
1042  * base snapshot with minimal base_snapshot_lsn.
1043  */
1046 {
1047  ReorderBufferTXN *txn;
1048 
1049  AssertTXNLsnOrder(rb);
1050 
1052  return InvalidTransactionId;
1053 
1054  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
1056  return txn->base_snapshot->xmin;
1057 }
1058 
1059 void
1061 {
1062  rb->current_restart_decoding_lsn = ptr;
1063 }
1064 
1065 /*
1066  * ReorderBufferAssignChild
1067  *
1068  * Make note that we know that subxid is a subtransaction of xid, seen as of
1069  * the given lsn.
1070  */
1071 void
1073  TransactionId subxid, XLogRecPtr lsn)
1074 {
1075  ReorderBufferTXN *txn;
1076  ReorderBufferTXN *subtxn;
1077  bool new_top;
1078  bool new_sub;
1079 
1080  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1081  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1082 
1083  if (!new_sub)
1084  {
1085  if (rbtxn_is_known_subxact(subtxn))
1086  {
1087  /* already associated, nothing to do */
1088  return;
1089  }
1090  else
1091  {
1092  /*
1093  * We already saw this transaction, but initially added it to the
1094  * list of top-level txns. Now that we know it's not top-level,
1095  * remove it from there.
1096  */
1097  dlist_delete(&subtxn->node);
1098  }
1099  }
1100 
1101  subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1102  subtxn->toplevel_xid = xid;
1103  Assert(subtxn->nsubtxns == 0);
1104 
1105  /* set the reference to top-level transaction */
1106  subtxn->toptxn = txn;
1107 
1108  /* add to subtransaction list */
1109  dlist_push_tail(&txn->subtxns, &subtxn->node);
1110  txn->nsubtxns++;
1111 
1112  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1113  ReorderBufferTransferSnapToParent(txn, subtxn);
1114 
1115  /* Verify LSN-ordering invariant */
1116  AssertTXNLsnOrder(rb);
1117 }
1118 
1119 /*
1120  * ReorderBufferTransferSnapToParent
1121  * Transfer base snapshot from subtxn to top-level txn, if needed
1122  *
1123  * This is done if the top-level txn doesn't have a base snapshot, or if the
1124  * subtxn's base snapshot has an earlier LSN than the top-level txn's base
1125  * snapshot's LSN. This can happen if there are no changes in the toplevel
1126  * txn but there are some in the subtxn, or the first change in subtxn has
1127  * earlier LSN than first change in the top-level txn and we learned about
1128  * their kinship only now.
1129  *
1130  * The subtransaction's snapshot is cleared regardless of the transfer
1131  * happening, since it's not needed anymore in either case.
1132  *
1133  * We do this as soon as we become aware of their kinship, to avoid queueing
1134  * extra snapshots to txns known-as-subtxns -- only top-level txns will
1135  * receive further snapshots.
1136  */
1137 static void
1139  ReorderBufferTXN *subtxn)
1140 {
1141  Assert(subtxn->toplevel_xid == txn->xid);
1142 
1143  if (subtxn->base_snapshot != NULL)
1144  {
1145  if (txn->base_snapshot == NULL ||
1146  subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
1147  {
1148  /*
1149  * If the toplevel transaction already has a base snapshot but
1150  * it's newer than the subxact's, purge it.
1151  */
1152  if (txn->base_snapshot != NULL)
1153  {
1156  }
1157 
1158  /*
1159  * The snapshot is now the top transaction's; transfer it, and
1160  * adjust the list position of the top transaction in the list by
1161  * moving it to where the subtransaction is.
1162  */
1163  txn->base_snapshot = subtxn->base_snapshot;
1164  txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
1166  &txn->base_snapshot_node);
1167 
1168  /*
1169  * The subtransaction doesn't have a snapshot anymore (so it
1170  * mustn't be in the list.)
1171  */
1172  subtxn->base_snapshot = NULL;
1174  dlist_delete(&subtxn->base_snapshot_node);
1175  }
1176  else
1177  {
1178  /* Base snap of toplevel is fine, so subxact's is not needed */
1180  dlist_delete(&subtxn->base_snapshot_node);
1181  subtxn->base_snapshot = NULL;
1183  }
1184  }
1185 }
1186 
1187 /*
1188  * Associate a subtransaction with its toplevel transaction at commit
1189  * time. There may be no further changes added after this.
1190  */
1191 void
1193  TransactionId subxid, XLogRecPtr commit_lsn,
1194  XLogRecPtr end_lsn)
1195 {
1196  ReorderBufferTXN *subtxn;
1197 
1198  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1199  InvalidXLogRecPtr, false);
1200 
1201  /*
1202  * No need to do anything if that subtxn didn't contain any changes
1203  */
1204  if (!subtxn)
1205  return;
1206 
1207  subtxn->final_lsn = commit_lsn;
1208  subtxn->end_lsn = end_lsn;
1209 
1210  /*
1211  * Assign this subxact as a child of the toplevel xact (no-op if already
1212  * done.)
1213  */
1214  ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
1215 }
1216 
1217 
1218 /*
1219  * Support for efficiently iterating over a transaction's and its
1220  * subtransactions' changes.
1221  *
1222  * We do by doing a k-way merge between transactions/subtransactions. For that
1223  * we model the current heads of the different transactions as a binary heap
1224  * so we easily know which (sub-)transaction has the change with the smallest
1225  * lsn next.
1226  *
1227  * We assume the changes in individual transactions are already sorted by LSN.
1228  */
1229 
1230 /*
1231  * Binary heap comparison function.
1232  */
1233 static int
1235 {
1237  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
1238  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
1239 
1240  if (pos_a < pos_b)
1241  return 1;
1242  else if (pos_a == pos_b)
1243  return 0;
1244  return -1;
1245 }
1246 
1247 /*
1248  * Allocate & initialize an iterator which iterates in lsn order over a
1249  * transaction and all its subtransactions.
1250  *
1251  * Note: The iterator state is returned through iter_state parameter rather
1252  * than the function's return value. This is because the state gets cleaned up
1253  * in a PG_CATCH block in the caller, so we want to make sure the caller gets
1254  * back the state even if this function throws an exception.
1255  */
1256 static void
1258  ReorderBufferIterTXNState *volatile *iter_state)
1259 {
1260  Size nr_txns = 0;
1262  dlist_iter cur_txn_i;
1263  int32 off;
1264 
1265  *iter_state = NULL;
1266 
1267  /* Check ordering of changes in the toplevel transaction. */
1268  AssertChangeLsnOrder(txn);
1269 
1270  /*
1271  * Calculate the size of our heap: one element for every transaction that
1272  * contains changes. (Besides the transactions already in the reorder
1273  * buffer, we count the one we were directly passed.)
1274  */
1275  if (txn->nentries > 0)
1276  nr_txns++;
1277 
1278  dlist_foreach(cur_txn_i, &txn->subtxns)
1279  {
1280  ReorderBufferTXN *cur_txn;
1281 
1282  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1283 
1284  /* Check ordering of changes in this subtransaction. */
1285  AssertChangeLsnOrder(cur_txn);
1286 
1287  if (cur_txn->nentries > 0)
1288  nr_txns++;
1289  }
1290 
1291  /* allocate iteration state */
1294  sizeof(ReorderBufferIterTXNState) +
1295  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1296 
1297  state->nr_txns = nr_txns;
1298  dlist_init(&state->old_change);
1299 
1300  for (off = 0; off < state->nr_txns; off++)
1301  {
1302  state->entries[off].file.vfd = -1;
1303  state->entries[off].segno = 0;
1304  }
1305 
1306  /* allocate heap */
1307  state->heap = binaryheap_allocate(state->nr_txns,
1309  state);
1310 
1311  /* Now that the state fields are initialized, it is safe to return it. */
1312  *iter_state = state;
1313 
1314  /*
1315  * Now insert items into the binary heap, in an unordered fashion. (We
1316  * will run a heap assembly step at the end; this is more efficient.)
1317  */
1318 
1319  off = 0;
1320 
1321  /* add toplevel transaction if it contains changes */
1322  if (txn->nentries > 0)
1323  {
1324  ReorderBufferChange *cur_change;
1325 
1326  if (rbtxn_is_serialized(txn))
1327  {
1328  /* serialize remaining changes */
1329  ReorderBufferSerializeTXN(rb, txn);
1330  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1331  &state->entries[off].segno);
1332  }
1333 
1334  cur_change = dlist_head_element(ReorderBufferChange, node,
1335  &txn->changes);
1336 
1337  state->entries[off].lsn = cur_change->lsn;
1338  state->entries[off].change = cur_change;
1339  state->entries[off].txn = txn;
1340 
1342  }
1343 
1344  /* add subtransactions if they contain changes */
1345  dlist_foreach(cur_txn_i, &txn->subtxns)
1346  {
1347  ReorderBufferTXN *cur_txn;
1348 
1349  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1350 
1351  if (cur_txn->nentries > 0)
1352  {
1353  ReorderBufferChange *cur_change;
1354 
1355  if (rbtxn_is_serialized(cur_txn))
1356  {
1357  /* serialize remaining changes */
1358  ReorderBufferSerializeTXN(rb, cur_txn);
1359  ReorderBufferRestoreChanges(rb, cur_txn,
1360  &state->entries[off].file,
1361  &state->entries[off].segno);
1362  }
1363  cur_change = dlist_head_element(ReorderBufferChange, node,
1364  &cur_txn->changes);
1365 
1366  state->entries[off].lsn = cur_change->lsn;
1367  state->entries[off].change = cur_change;
1368  state->entries[off].txn = cur_txn;
1369 
1371  }
1372  }
1373 
1374  /* assemble a valid binary heap */
1375  binaryheap_build(state->heap);
1376 }
1377 
1378 /*
1379  * Return the next change when iterating over a transaction and its
1380  * subtransactions.
1381  *
1382  * Returns NULL when no further changes exist.
1383  */
1384 static ReorderBufferChange *
1386 {
1387  ReorderBufferChange *change;
1389  int32 off;
1390 
1391  /* nothing there anymore */
1392  if (state->heap->bh_size == 0)
1393  return NULL;
1394 
1395  off = DatumGetInt32(binaryheap_first(state->heap));
1396  entry = &state->entries[off];
1397 
1398  /* free memory we might have "leaked" in the previous *Next call */
1399  if (!dlist_is_empty(&state->old_change))
1400  {
1401  change = dlist_container(ReorderBufferChange, node,
1402  dlist_pop_head_node(&state->old_change));
1403  ReorderBufferReturnChange(rb, change, true);
1404  Assert(dlist_is_empty(&state->old_change));
1405  }
1406 
1407  change = entry->change;
1408 
1409  /*
1410  * update heap with information about which transaction has the next
1411  * relevant change in LSN order
1412  */
1413 
1414  /* there are in-memory changes */
1415  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1416  {
1417  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1418  ReorderBufferChange *next_change =
1420 
1421  /* txn stays the same */
1422  state->entries[off].lsn = next_change->lsn;
1423  state->entries[off].change = next_change;
1424 
1426  return change;
1427  }
1428 
1429  /* try to load changes from disk */
1430  if (entry->txn->nentries != entry->txn->nentries_mem)
1431  {
1432  /*
1433  * Ugly: restoring changes will reuse *Change records, thus delete the
1434  * current one from the per-tx list and only free in the next call.
1435  */
1436  dlist_delete(&change->node);
1437  dlist_push_tail(&state->old_change, &change->node);
1438 
1439  /*
1440  * Update the total bytes processed by the txn for which we are
1441  * releasing the current set of changes and restoring the new set of
1442  * changes.
1443  */
1444  rb->totalBytes += entry->txn->size;
1445  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1446  &state->entries[off].segno))
1447  {
1448  /* successfully restored changes from disk */
1449  ReorderBufferChange *next_change =
1451  &entry->txn->changes);
1452 
1453  elog(DEBUG2, "restored %u/%u changes from disk",
1454  (uint32) entry->txn->nentries_mem,
1455  (uint32) entry->txn->nentries);
1456 
1457  Assert(entry->txn->nentries_mem);
1458  /* txn stays the same */
1459  state->entries[off].lsn = next_change->lsn;
1460  state->entries[off].change = next_change;
1462 
1463  return change;
1464  }
1465  }
1466 
1467  /* ok, no changes there anymore, remove */
1469 
1470  return change;
1471 }
1472 
1473 /*
1474  * Deallocate the iterator
1475  */
1476 static void
1479 {
1480  int32 off;
1481 
1482  for (off = 0; off < state->nr_txns; off++)
1483  {
1484  if (state->entries[off].file.vfd != -1)
1485  FileClose(state->entries[off].file.vfd);
1486  }
1487 
1488  /* free memory we might have "leaked" in the last *Next call */
1489  if (!dlist_is_empty(&state->old_change))
1490  {
1491  ReorderBufferChange *change;
1492 
1493  change = dlist_container(ReorderBufferChange, node,
1494  dlist_pop_head_node(&state->old_change));
1495  ReorderBufferReturnChange(rb, change, true);
1496  Assert(dlist_is_empty(&state->old_change));
1497  }
1498 
1499  binaryheap_free(state->heap);
1500  pfree(state);
1501 }
1502 
1503 /*
1504  * Cleanup the contents of a transaction, usually after the transaction
1505  * committed or aborted.
1506  */
1507 static void
1509 {
1510  bool found;
1511  dlist_mutable_iter iter;
1512  Size mem_freed = 0;
1513 
1514  /* cleanup subtransactions & their changes */
1515  dlist_foreach_modify(iter, &txn->subtxns)
1516  {
1517  ReorderBufferTXN *subtxn;
1518 
1519  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1520 
1521  /*
1522  * Subtransactions are always associated to the toplevel TXN, even if
1523  * they originally were happening inside another subtxn, so we won't
1524  * ever recurse more than one level deep here.
1525  */
1526  Assert(rbtxn_is_known_subxact(subtxn));
1527  Assert(subtxn->nsubtxns == 0);
1528 
1529  ReorderBufferCleanupTXN(rb, subtxn);
1530  }
1531 
1532  /* cleanup changes in the txn */
1533  dlist_foreach_modify(iter, &txn->changes)
1534  {
1535  ReorderBufferChange *change;
1536 
1537  change = dlist_container(ReorderBufferChange, node, iter.cur);
1538 
1539  /* Check we're not mixing changes from different transactions. */
1540  Assert(change->txn == txn);
1541 
1542  /*
1543  * Instead of updating the memory counter for individual changes, we
1544  * sum up the size of memory to free so we can update the memory
1545  * counter all together below. This saves costs of maintaining the
1546  * max-heap.
1547  */
1548  mem_freed += ReorderBufferChangeSize(change);
1549 
1550  ReorderBufferReturnChange(rb, change, false);
1551  }
1552 
1553  /* Update the memory counter */
1554  ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
1555 
1556  /*
1557  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1558  * They are always stored in the toplevel transaction.
1559  */
1560  dlist_foreach_modify(iter, &txn->tuplecids)
1561  {
1562  ReorderBufferChange *change;
1563 
1564  change = dlist_container(ReorderBufferChange, node, iter.cur);
1565 
1566  /* Check we're not mixing changes from different transactions. */
1567  Assert(change->txn == txn);
1569 
1570  ReorderBufferReturnChange(rb, change, true);
1571  }
1572 
1573  /*
1574  * Cleanup the base snapshot, if set.
1575  */
1576  if (txn->base_snapshot != NULL)
1577  {
1580  }
1581 
1582  /*
1583  * Cleanup the snapshot for the last streamed run.
1584  */
1585  if (txn->snapshot_now != NULL)
1586  {
1587  Assert(rbtxn_is_streamed(txn));
1589  }
1590 
1591  /*
1592  * Remove TXN from its containing lists.
1593  *
1594  * Note: if txn is known as subxact, we are deleting the TXN from its
1595  * parent's list of known subxacts; this leaves the parent's nsubxacts
1596  * count too high, but we don't care. Otherwise, we are deleting the TXN
1597  * from the LSN-ordered list of toplevel TXNs. We remove the TXN from the
1598  * list of catalog modifying transactions as well.
1599  */
1600  dlist_delete(&txn->node);
1601  if (rbtxn_has_catalog_changes(txn))
1603 
1604  /* now remove reference from buffer */
1605  hash_search(rb->by_txn, &txn->xid, HASH_REMOVE, &found);
1606  Assert(found);
1607 
1608  /* remove entries spilled to disk */
1609  if (rbtxn_is_serialized(txn))
1610  ReorderBufferRestoreCleanup(rb, txn);
1611 
1612  /* deallocate */
1613  ReorderBufferReturnTXN(rb, txn);
1614 }
1615 
1616 /*
1617  * Discard changes from a transaction (and subtransactions), either after
1618  * streaming or decoding them at PREPARE. Keep the remaining info -
1619  * transactions, tuplecids, invalidations and snapshots.
1620  *
1621  * We additionally remove tuplecids after decoding the transaction at prepare
1622  * time as we only need to perform invalidation at rollback or commit prepared.
1623  *
1624  * 'txn_prepared' indicates that we have decoded the transaction at prepare
1625  * time.
1626  */
1627 static void
1629 {
1630  dlist_mutable_iter iter;
1631  Size mem_freed = 0;
1632 
1633  /* cleanup subtransactions & their changes */
1634  dlist_foreach_modify(iter, &txn->subtxns)
1635  {
1636  ReorderBufferTXN *subtxn;
1637 
1638  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1639 
1640  /*
1641  * Subtransactions are always associated to the toplevel TXN, even if
1642  * they originally were happening inside another subtxn, so we won't
1643  * ever recurse more than one level deep here.
1644  */
1645  Assert(rbtxn_is_known_subxact(subtxn));
1646  Assert(subtxn->nsubtxns == 0);
1647 
1648  ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
1649  }
1650 
1651  /* cleanup changes in the txn */
1652  dlist_foreach_modify(iter, &txn->changes)
1653  {
1654  ReorderBufferChange *change;
1655 
1656  change = dlist_container(ReorderBufferChange, node, iter.cur);
1657 
1658  /* Check we're not mixing changes from different transactions. */
1659  Assert(change->txn == txn);
1660 
1661  /* remove the change from it's containing list */
1662  dlist_delete(&change->node);
1663 
1664  /*
1665  * Instead of updating the memory counter for individual changes, we
1666  * sum up the size of memory to free so we can update the memory
1667  * counter all together below. This saves costs of maintaining the
1668  * max-heap.
1669  */
1670  mem_freed += ReorderBufferChangeSize(change);
1671 
1672  ReorderBufferReturnChange(rb, change, false);
1673  }
1674 
1675  /* Update the memory counter */
1676  ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
1677 
1678  /*
1679  * Mark the transaction as streamed.
1680  *
1681  * The top-level transaction, is marked as streamed always, even if it
1682  * does not contain any changes (that is, when all the changes are in
1683  * subtransactions).
1684  *
1685  * For subtransactions, we only mark them as streamed when there are
1686  * changes in them.
1687  *
1688  * We do it this way because of aborts - we don't want to send aborts for
1689  * XIDs the downstream is not aware of. And of course, it always knows
1690  * about the toplevel xact (we send the XID in all messages), but we never
1691  * stream XIDs of empty subxacts.
1692  */
1693  if ((!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0)))
1694  txn->txn_flags |= RBTXN_IS_STREAMED;
1695 
1696  if (txn_prepared)
1697  {
1698  /*
1699  * If this is a prepared txn, cleanup the tuplecids we stored for
1700  * decoding catalog snapshot access. They are always stored in the
1701  * toplevel transaction.
1702  */
1703  dlist_foreach_modify(iter, &txn->tuplecids)
1704  {
1705  ReorderBufferChange *change;
1706 
1707  change = dlist_container(ReorderBufferChange, node, iter.cur);
1708 
1709  /* Check we're not mixing changes from different transactions. */
1710  Assert(change->txn == txn);
1712 
1713  /* Remove the change from its containing list. */
1714  dlist_delete(&change->node);
1715 
1716  ReorderBufferReturnChange(rb, change, true);
1717  }
1718  }
1719 
1720  /*
1721  * Destroy the (relfilelocator, ctid) hashtable, so that we don't leak any
1722  * memory. We could also keep the hash table and update it with new ctid
1723  * values, but this seems simpler and good enough for now.
1724  */
1725  if (txn->tuplecid_hash != NULL)
1726  {
1728  txn->tuplecid_hash = NULL;
1729  }
1730 
1731  /* If this txn is serialized then clean the disk space. */
1732  if (rbtxn_is_serialized(txn))
1733  {
1734  ReorderBufferRestoreCleanup(rb, txn);
1735  txn->txn_flags &= ~RBTXN_IS_SERIALIZED;
1736 
1737  /*
1738  * We set this flag to indicate if the transaction is ever serialized.
1739  * We need this to accurately update the stats as otherwise the same
1740  * transaction can be counted as serialized multiple times.
1741  */
1743  }
1744 
1745  /* also reset the number of entries in the transaction */
1746  txn->nentries_mem = 0;
1747  txn->nentries = 0;
1748 }
1749 
1750 /*
1751  * Build a hash with a (relfilelocator, ctid) -> (cmin, cmax) mapping for use by
1752  * HeapTupleSatisfiesHistoricMVCC.
1753  */
1754 static void
1756 {
1757  dlist_iter iter;
1758  HASHCTL hash_ctl;
1759 
1761  return;
1762 
1763  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1764  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1765  hash_ctl.hcxt = rb->context;
1766 
1767  /*
1768  * create the hash with the exact number of to-be-stored tuplecids from
1769  * the start
1770  */
1771  txn->tuplecid_hash =
1772  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1774 
1775  dlist_foreach(iter, &txn->tuplecids)
1776  {
1779  bool found;
1780  ReorderBufferChange *change;
1781 
1782  change = dlist_container(ReorderBufferChange, node, iter.cur);
1783 
1785 
1786  /* be careful about padding */
1787  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1788 
1789  key.rlocator = change->data.tuplecid.locator;
1790 
1791  ItemPointerCopy(&change->data.tuplecid.tid,
1792  &key.tid);
1793 
1794  ent = (ReorderBufferTupleCidEnt *)
1795  hash_search(txn->tuplecid_hash, &key, HASH_ENTER, &found);
1796  if (!found)
1797  {
1798  ent->cmin = change->data.tuplecid.cmin;
1799  ent->cmax = change->data.tuplecid.cmax;
1800  ent->combocid = change->data.tuplecid.combocid;
1801  }
1802  else
1803  {
1804  /*
1805  * Maybe we already saw this tuple before in this transaction, but
1806  * if so it must have the same cmin.
1807  */
1808  Assert(ent->cmin == change->data.tuplecid.cmin);
1809 
1810  /*
1811  * cmax may be initially invalid, but once set it can only grow,
1812  * and never become invalid again.
1813  */
1814  Assert((ent->cmax == InvalidCommandId) ||
1815  ((change->data.tuplecid.cmax != InvalidCommandId) &&
1816  (change->data.tuplecid.cmax > ent->cmax)));
1817  ent->cmax = change->data.tuplecid.cmax;
1818  }
1819  }
1820 }
1821 
1822 /*
1823  * Copy a provided snapshot so we can modify it privately. This is needed so
1824  * that catalog modifying transactions can look into intermediate catalog
1825  * states.
1826  */
1827 static Snapshot
1829  ReorderBufferTXN *txn, CommandId cid)
1830 {
1831  Snapshot snap;
1832  dlist_iter iter;
1833  int i = 0;
1834  Size size;
1835 
1836  size = sizeof(SnapshotData) +
1837  sizeof(TransactionId) * orig_snap->xcnt +
1838  sizeof(TransactionId) * (txn->nsubtxns + 1);
1839 
1840  snap = MemoryContextAllocZero(rb->context, size);
1841  memcpy(snap, orig_snap, sizeof(SnapshotData));
1842 
1843  snap->copied = true;
1844  snap->active_count = 1; /* mark as active so nobody frees it */
1845  snap->regd_count = 0;
1846  snap->xip = (TransactionId *) (snap + 1);
1847 
1848  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1849 
1850  /*
1851  * snap->subxip contains all txids that belong to our transaction which we
1852  * need to check via cmin/cmax. That's why we store the toplevel
1853  * transaction in there as well.
1854  */
1855  snap->subxip = snap->xip + snap->xcnt;
1856  snap->subxip[i++] = txn->xid;
1857 
1858  /*
1859  * subxcnt isn't decreased when subtransactions abort, so count manually.
1860  * Since it's an upper boundary it is safe to use it for the allocation
1861  * above.
1862  */
1863  snap->subxcnt = 1;
1864 
1865  dlist_foreach(iter, &txn->subtxns)
1866  {
1867  ReorderBufferTXN *sub_txn;
1868 
1869  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1870  snap->subxip[i++] = sub_txn->xid;
1871  snap->subxcnt++;
1872  }
1873 
1874  /* sort so we can bsearch() later */
1875  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1876 
1877  /* store the specified current CommandId */
1878  snap->curcid = cid;
1879 
1880  return snap;
1881 }
1882 
1883 /*
1884  * Free a previously ReorderBufferCopySnap'ed snapshot
1885  */
1886 static void
1888 {
1889  if (snap->copied)
1890  pfree(snap);
1891  else
1893 }
1894 
1895 /*
1896  * If the transaction was (partially) streamed, we need to prepare or commit
1897  * it in a 'streamed' way. That is, we first stream the remaining part of the
1898  * transaction, and then invoke stream_prepare or stream_commit message as per
1899  * the case.
1900  */
1901 static void
1903 {
1904  /* we should only call this for previously streamed transactions */
1905  Assert(rbtxn_is_streamed(txn));
1906 
1907  ReorderBufferStreamTXN(rb, txn);
1908 
1909  if (rbtxn_prepared(txn))
1910  {
1911  /*
1912  * Note, we send stream prepare even if a concurrent abort is
1913  * detected. See DecodePrepare for more information.
1914  */
1915  rb->stream_prepare(rb, txn, txn->final_lsn);
1916 
1917  /*
1918  * This is a PREPARED transaction, part of a two-phase commit. The
1919  * full cleanup will happen as part of the COMMIT PREPAREDs, so now
1920  * just truncate txn by removing changes and tuplecids.
1921  */
1922  ReorderBufferTruncateTXN(rb, txn, true);
1923  /* Reset the CheckXidAlive */
1925  }
1926  else
1927  {
1928  rb->stream_commit(rb, txn, txn->final_lsn);
1929  ReorderBufferCleanupTXN(rb, txn);
1930  }
1931 }
1932 
1933 /*
1934  * Set xid to detect concurrent aborts.
1935  *
1936  * While streaming an in-progress transaction or decoding a prepared
1937  * transaction there is a possibility that the (sub)transaction might get
1938  * aborted concurrently. In such case if the (sub)transaction has catalog
1939  * update then we might decode the tuple using wrong catalog version. For
1940  * example, suppose there is one catalog tuple with (xmin: 500, xmax: 0). Now,
1941  * the transaction 501 updates the catalog tuple and after that we will have
1942  * two tuples (xmin: 500, xmax: 501) and (xmin: 501, xmax: 0). Now, if 501 is
1943  * aborted and some other transaction say 502 updates the same catalog tuple
1944  * then the first tuple will be changed to (xmin: 500, xmax: 502). So, the
1945  * problem is that when we try to decode the tuple inserted/updated in 501
1946  * after the catalog update, we will see the catalog tuple with (xmin: 500,
1947  * xmax: 502) as visible because it will consider that the tuple is deleted by
1948  * xid 502 which is not visible to our snapshot. And when we will try to
1949  * decode with that catalog tuple, it can lead to a wrong result or a crash.
1950  * So, it is necessary to detect concurrent aborts to allow streaming of
1951  * in-progress transactions or decoding of prepared transactions.
1952  *
1953  * For detecting the concurrent abort we set CheckXidAlive to the current
1954  * (sub)transaction's xid for which this change belongs to. And, during
1955  * catalog scan we can check the status of the xid and if it is aborted we will
1956  * report a specific error so that we can stop streaming current transaction
1957  * and discard the already streamed changes on such an error. We might have
1958  * already streamed some of the changes for the aborted (sub)transaction, but
1959  * that is fine because when we decode the abort we will stream abort message
1960  * to truncate the changes in the subscriber. Similarly, for prepared
1961  * transactions, we stop decoding if concurrent abort is detected and then
1962  * rollback the changes when rollback prepared is encountered. See
1963  * DecodePrepare.
1964  */
1965 static inline void
1967 {
1968  /*
1969  * If the input transaction id is already set as a CheckXidAlive then
1970  * nothing to do.
1971  */
1973  return;
1974 
1975  /*
1976  * setup CheckXidAlive if it's not committed yet. We don't check if the
1977  * xid is aborted. That will happen during catalog access.
1978  */
1979  if (!TransactionIdDidCommit(xid))
1980  CheckXidAlive = xid;
1981  else
1983 }
1984 
1985 /*
1986  * Helper function for ReorderBufferProcessTXN for applying change.
1987  */
1988 static inline void
1990  Relation relation, ReorderBufferChange *change,
1991  bool streaming)
1992 {
1993  if (streaming)
1994  rb->stream_change(rb, txn, relation, change);
1995  else
1996  rb->apply_change(rb, txn, relation, change);
1997 }
1998 
1999 /*
2000  * Helper function for ReorderBufferProcessTXN for applying the truncate.
2001  */
2002 static inline void
2004  int nrelations, Relation *relations,
2005  ReorderBufferChange *change, bool streaming)
2006 {
2007  if (streaming)
2008  rb->stream_truncate(rb, txn, nrelations, relations, change);
2009  else
2010  rb->apply_truncate(rb, txn, nrelations, relations, change);
2011 }
2012 
2013 /*
2014  * Helper function for ReorderBufferProcessTXN for applying the message.
2015  */
2016 static inline void
2018  ReorderBufferChange *change, bool streaming)
2019 {
2020  if (streaming)
2021  rb->stream_message(rb, txn, change->lsn, true,
2022  change->data.msg.prefix,
2023  change->data.msg.message_size,
2024  change->data.msg.message);
2025  else
2026  rb->message(rb, txn, change->lsn, true,
2027  change->data.msg.prefix,
2028  change->data.msg.message_size,
2029  change->data.msg.message);
2030 }
2031 
2032 /*
2033  * Function to store the command id and snapshot at the end of the current
2034  * stream so that we can reuse the same while sending the next stream.
2035  */
2036 static inline void
2038  Snapshot snapshot_now, CommandId command_id)
2039 {
2040  txn->command_id = command_id;
2041 
2042  /* Avoid copying if it's already copied. */
2043  if (snapshot_now->copied)
2044  txn->snapshot_now = snapshot_now;
2045  else
2046  txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2047  txn, command_id);
2048 }
2049 
2050 /*
2051  * Helper function for ReorderBufferProcessTXN to handle the concurrent
2052  * abort of the streaming transaction. This resets the TXN such that it
2053  * can be used to stream the remaining data of transaction being processed.
2054  * This can happen when the subtransaction is aborted and we still want to
2055  * continue processing the main or other subtransactions data.
2056  */
2057 static void
2059  Snapshot snapshot_now,
2060  CommandId command_id,
2061  XLogRecPtr last_lsn,
2062  ReorderBufferChange *specinsert)
2063 {
2064  /* Discard the changes that we just streamed */
2066 
2067  /* Free all resources allocated for toast reconstruction */
2068  ReorderBufferToastReset(rb, txn);
2069 
2070  /* Return the spec insert change if it is not NULL */
2071  if (specinsert != NULL)
2072  {
2073  ReorderBufferReturnChange(rb, specinsert, true);
2074  specinsert = NULL;
2075  }
2076 
2077  /*
2078  * For the streaming case, stop the stream and remember the command ID and
2079  * snapshot for the streaming run.
2080  */
2081  if (rbtxn_is_streamed(txn))
2082  {
2083  rb->stream_stop(rb, txn, last_lsn);
2084  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2085  }
2086 
2087  /* All changes must be deallocated */
2088  Assert(txn->size == 0);
2089 }
2090 
2091 /*
2092  * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN.
2093  *
2094  * Send data of a transaction (and its subtransactions) to the
2095  * output plugin. We iterate over the top and subtransactions (using a k-way
2096  * merge) and replay the changes in lsn order.
2097  *
2098  * If streaming is true then data will be sent using stream API.
2099  *
2100  * Note: "volatile" markers on some parameters are to avoid trouble with
2101  * PG_TRY inside the function.
2102  */
2103 static void
2105  XLogRecPtr commit_lsn,
2106  volatile Snapshot snapshot_now,
2107  volatile CommandId command_id,
2108  bool streaming)
2109 {
2110  bool using_subtxn;
2112  ReorderBufferIterTXNState *volatile iterstate = NULL;
2113  volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
2114  ReorderBufferChange *volatile specinsert = NULL;
2115  volatile bool stream_started = false;
2116  ReorderBufferTXN *volatile curtxn = NULL;
2117 
2118  /* build data to be able to lookup the CommandIds of catalog tuples */
2120 
2121  /* setup the initial snapshot */
2122  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2123 
2124  /*
2125  * Decoding needs access to syscaches et al., which in turn use
2126  * heavyweight locks and such. Thus we need to have enough state around to
2127  * keep track of those. The easiest way is to simply use a transaction
2128  * internally. That also allows us to easily enforce that nothing writes
2129  * to the database by checking for xid assignments.
2130  *
2131  * When we're called via the SQL SRF there's already a transaction
2132  * started, so start an explicit subtransaction there.
2133  */
2134  using_subtxn = IsTransactionOrTransactionBlock();
2135 
2136  PG_TRY();
2137  {
2138  ReorderBufferChange *change;
2139  int changes_count = 0; /* used to accumulate the number of
2140  * changes */
2141 
2142  if (using_subtxn)
2143  BeginInternalSubTransaction(streaming ? "stream" : "replay");
2144  else
2146 
2147  /*
2148  * We only need to send begin/begin-prepare for non-streamed
2149  * transactions.
2150  */
2151  if (!streaming)
2152  {
2153  if (rbtxn_prepared(txn))
2154  rb->begin_prepare(rb, txn);
2155  else
2156  rb->begin(rb, txn);
2157  }
2158 
2159  ReorderBufferIterTXNInit(rb, txn, &iterstate);
2160  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
2161  {
2162  Relation relation = NULL;
2163  Oid reloid;
2164 
2166 
2167  /*
2168  * We can't call start stream callback before processing first
2169  * change.
2170  */
2171  if (prev_lsn == InvalidXLogRecPtr)
2172  {
2173  if (streaming)
2174  {
2175  txn->origin_id = change->origin_id;
2176  rb->stream_start(rb, txn, change->lsn);
2177  stream_started = true;
2178  }
2179  }
2180 
2181  /*
2182  * Enforce correct ordering of changes, merged from multiple
2183  * subtransactions. The changes may have the same LSN due to
2184  * MULTI_INSERT xlog records.
2185  */
2186  Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
2187 
2188  prev_lsn = change->lsn;
2189 
2190  /*
2191  * Set the current xid to detect concurrent aborts. This is
2192  * required for the cases when we decode the changes before the
2193  * COMMIT record is processed.
2194  */
2195  if (streaming || rbtxn_prepared(change->txn))
2196  {
2197  curtxn = change->txn;
2198  SetupCheckXidLive(curtxn->xid);
2199  }
2200 
2201  switch (change->action)
2202  {
2204 
2205  /*
2206  * Confirmation for speculative insertion arrived. Simply
2207  * use as a normal record. It'll be cleaned up at the end
2208  * of INSERT processing.
2209  */
2210  if (specinsert == NULL)
2211  elog(ERROR, "invalid ordering of speculative insertion changes");
2212  Assert(specinsert->data.tp.oldtuple == NULL);
2213  change = specinsert;
2215 
2216  /* intentionally fall through */
2220  Assert(snapshot_now);
2221 
2222  reloid = RelidByRelfilenumber(change->data.tp.rlocator.spcOid,
2223  change->data.tp.rlocator.relNumber);
2224 
2225  /*
2226  * Mapped catalog tuple without data, emitted while
2227  * catalog table was in the process of being rewritten. We
2228  * can fail to look up the relfilenumber, because the
2229  * relmapper has no "historic" view, in contrast to the
2230  * normal catalog during decoding. Thus repeated rewrites
2231  * can cause a lookup failure. That's OK because we do not
2232  * decode catalog changes anyway. Normally such tuples
2233  * would be skipped over below, but we can't identify
2234  * whether the table should be logically logged without
2235  * mapping the relfilenumber to the oid.
2236  */
2237  if (reloid == InvalidOid &&
2238  change->data.tp.newtuple == NULL &&
2239  change->data.tp.oldtuple == NULL)
2240  goto change_done;
2241  else if (reloid == InvalidOid)
2242  elog(ERROR, "could not map filenumber \"%s\" to relation OID",
2243  relpathperm(change->data.tp.rlocator,
2244  MAIN_FORKNUM));
2245 
2246  relation = RelationIdGetRelation(reloid);
2247 
2248  if (!RelationIsValid(relation))
2249  elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",
2250  reloid,
2251  relpathperm(change->data.tp.rlocator,
2252  MAIN_FORKNUM));
2253 
2254  if (!RelationIsLogicallyLogged(relation))
2255  goto change_done;
2256 
2257  /*
2258  * Ignore temporary heaps created during DDL unless the
2259  * plugin has asked for them.
2260  */
2261  if (relation->rd_rel->relrewrite && !rb->output_rewrites)
2262  goto change_done;
2263 
2264  /*
2265  * For now ignore sequence changes entirely. Most of the
2266  * time they don't log changes using records we
2267  * understand, so it doesn't make sense to handle the few
2268  * cases we do.
2269  */
2270  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
2271  goto change_done;
2272 
2273  /* user-triggered change */
2274  if (!IsToastRelation(relation))
2275  {
2276  ReorderBufferToastReplace(rb, txn, relation, change);
2277  ReorderBufferApplyChange(rb, txn, relation, change,
2278  streaming);
2279 
2280  /*
2281  * Only clear reassembled toast chunks if we're sure
2282  * they're not required anymore. The creator of the
2283  * tuple tells us.
2284  */
2285  if (change->data.tp.clear_toast_afterwards)
2286  ReorderBufferToastReset(rb, txn);
2287  }
2288  /* we're not interested in toast deletions */
2289  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
2290  {
2291  /*
2292  * Need to reassemble the full toasted Datum in
2293  * memory, to ensure the chunks don't get reused till
2294  * we're done remove it from the list of this
2295  * transaction's changes. Otherwise it will get
2296  * freed/reused while restoring spooled data from
2297  * disk.
2298  */
2299  Assert(change->data.tp.newtuple != NULL);
2300 
2301  dlist_delete(&change->node);
2302  ReorderBufferToastAppendChunk(rb, txn, relation,
2303  change);
2304  }
2305 
2306  change_done:
2307 
2308  /*
2309  * If speculative insertion was confirmed, the record
2310  * isn't needed anymore.
2311  */
2312  if (specinsert != NULL)
2313  {
2314  ReorderBufferReturnChange(rb, specinsert, true);
2315  specinsert = NULL;
2316  }
2317 
2318  if (RelationIsValid(relation))
2319  {
2320  RelationClose(relation);
2321  relation = NULL;
2322  }
2323  break;
2324 
2326 
2327  /*
2328  * Speculative insertions are dealt with by delaying the
2329  * processing of the insert until the confirmation record
2330  * arrives. For that we simply unlink the record from the
2331  * chain, so it does not get freed/reused while restoring
2332  * spooled data from disk.
2333  *
2334  * This is safe in the face of concurrent catalog changes
2335  * because the relevant relation can't be changed between
2336  * speculative insertion and confirmation due to
2337  * CheckTableNotInUse() and locking.
2338  */
2339 
2340  /* clear out a pending (and thus failed) speculation */
2341  if (specinsert != NULL)
2342  {
2343  ReorderBufferReturnChange(rb, specinsert, true);
2344  specinsert = NULL;
2345  }
2346 
2347  /* and memorize the pending insertion */
2348  dlist_delete(&change->node);
2349  specinsert = change;
2350  break;
2351 
2353 
2354  /*
2355  * Abort for speculative insertion arrived. So cleanup the
2356  * specinsert tuple and toast hash.
2357  *
2358  * Note that we get the spec abort change for each toast
2359  * entry but we need to perform the cleanup only the first
2360  * time we get it for the main table.
2361  */
2362  if (specinsert != NULL)
2363  {
2364  /*
2365  * We must clean the toast hash before processing a
2366  * completely new tuple to avoid confusion about the
2367  * previous tuple's toast chunks.
2368  */
2369  Assert(change->data.tp.clear_toast_afterwards);
2370  ReorderBufferToastReset(rb, txn);
2371 
2372  /* We don't need this record anymore. */
2373  ReorderBufferReturnChange(rb, specinsert, true);
2374  specinsert = NULL;
2375  }
2376  break;
2377 
2379  {
2380  int i;
2381  int nrelids = change->data.truncate.nrelids;
2382  int nrelations = 0;
2383  Relation *relations;
2384 
2385  relations = palloc0(nrelids * sizeof(Relation));
2386  for (i = 0; i < nrelids; i++)
2387  {
2388  Oid relid = change->data.truncate.relids[i];
2389  Relation rel;
2390 
2391  rel = RelationIdGetRelation(relid);
2392 
2393  if (!RelationIsValid(rel))
2394  elog(ERROR, "could not open relation with OID %u", relid);
2395 
2396  if (!RelationIsLogicallyLogged(rel))
2397  continue;
2398 
2399  relations[nrelations++] = rel;
2400  }
2401 
2402  /* Apply the truncate. */
2403  ReorderBufferApplyTruncate(rb, txn, nrelations,
2404  relations, change,
2405  streaming);
2406 
2407  for (i = 0; i < nrelations; i++)
2408  RelationClose(relations[i]);
2409 
2410  break;
2411  }
2412 
2414  ReorderBufferApplyMessage(rb, txn, change, streaming);
2415  break;
2416 
2418  /* Execute the invalidation messages locally */
2419  ReorderBufferExecuteInvalidations(change->data.inval.ninvalidations,
2420  change->data.inval.invalidations);
2421  break;
2422 
2424  /* get rid of the old */
2425  TeardownHistoricSnapshot(false);
2426 
2427  if (snapshot_now->copied)
2428  {
2429  ReorderBufferFreeSnap(rb, snapshot_now);
2430  snapshot_now =
2431  ReorderBufferCopySnap(rb, change->data.snapshot,
2432  txn, command_id);
2433  }
2434 
2435  /*
2436  * Restored from disk, need to be careful not to double
2437  * free. We could introduce refcounting for that, but for
2438  * now this seems infrequent enough not to care.
2439  */
2440  else if (change->data.snapshot->copied)
2441  {
2442  snapshot_now =
2443  ReorderBufferCopySnap(rb, change->data.snapshot,
2444  txn, command_id);
2445  }
2446  else
2447  {
2448  snapshot_now = change->data.snapshot;
2449  }
2450 
2451  /* and continue with the new one */
2452  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2453  break;
2454 
2456  Assert(change->data.command_id != InvalidCommandId);
2457 
2458  if (command_id < change->data.command_id)
2459  {
2460  command_id = change->data.command_id;
2461 
2462  if (!snapshot_now->copied)
2463  {
2464  /* we don't use the global one anymore */
2465  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2466  txn, command_id);
2467  }
2468 
2469  snapshot_now->curcid = command_id;
2470 
2471  TeardownHistoricSnapshot(false);
2472  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2473  }
2474 
2475  break;
2476 
2478  elog(ERROR, "tuplecid value in changequeue");
2479  break;
2480  }
2481 
2482  /*
2483  * It is possible that the data is not sent to downstream for a
2484  * long time either because the output plugin filtered it or there
2485  * is a DDL that generates a lot of data that is not processed by
2486  * the plugin. So, in such cases, the downstream can timeout. To
2487  * avoid that we try to send a keepalive message if required.
2488  * Trying to send a keepalive message after every change has some
2489  * overhead, but testing showed there is no noticeable overhead if
2490  * we do it after every ~100 changes.
2491  */
2492 #define CHANGES_THRESHOLD 100
2493 
2494  if (++changes_count >= CHANGES_THRESHOLD)
2495  {
2496  rb->update_progress_txn(rb, txn, change->lsn);
2497  changes_count = 0;
2498  }
2499  }
2500 
2501  /* speculative insertion record must be freed by now */
2502  Assert(!specinsert);
2503 
2504  /* clean up the iterator */
2505  ReorderBufferIterTXNFinish(rb, iterstate);
2506  iterstate = NULL;
2507 
2508  /*
2509  * Update total transaction count and total bytes processed by the
2510  * transaction and its subtransactions. Ensure to not count the
2511  * streamed transaction multiple times.
2512  *
2513  * Note that the statistics computation has to be done after
2514  * ReorderBufferIterTXNFinish as it releases the serialized change
2515  * which we have already accounted in ReorderBufferIterTXNNext.
2516  */
2517  if (!rbtxn_is_streamed(txn))
2518  rb->totalTxns++;
2519 
2520  rb->totalBytes += txn->total_size;
2521 
2522  /*
2523  * Done with current changes, send the last message for this set of
2524  * changes depending upon streaming mode.
2525  */
2526  if (streaming)
2527  {
2528  if (stream_started)
2529  {
2530  rb->stream_stop(rb, txn, prev_lsn);
2531  stream_started = false;
2532  }
2533  }
2534  else
2535  {
2536  /*
2537  * Call either PREPARE (for two-phase transactions) or COMMIT (for
2538  * regular ones).
2539  */
2540  if (rbtxn_prepared(txn))
2541  rb->prepare(rb, txn, commit_lsn);
2542  else
2543  rb->commit(rb, txn, commit_lsn);
2544  }
2545 
2546  /* this is just a sanity check against bad output plugin behaviour */
2548  elog(ERROR, "output plugin used XID %u",
2550 
2551  /*
2552  * Remember the command ID and snapshot for the next set of changes in
2553  * streaming mode.
2554  */
2555  if (streaming)
2556  ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2557  else if (snapshot_now->copied)
2558  ReorderBufferFreeSnap(rb, snapshot_now);
2559 
2560  /* cleanup */
2561  TeardownHistoricSnapshot(false);
2562 
2563  /*
2564  * Aborting the current (sub-)transaction as a whole has the right
2565  * semantics. We want all locks acquired in here to be released, not
2566  * reassigned to the parent and we do not want any database access
2567  * have persistent effects.
2568  */
2570 
2571  /* make sure there's no cache pollution */
2573 
2574  if (using_subtxn)
2576 
2577  /*
2578  * We are here due to one of the four reasons: 1. Decoding an
2579  * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
2580  * prepared txn that was (partially) streamed. 4. Decoding a committed
2581  * txn.
2582  *
2583  * For 1, we allow truncation of txn data by removing the changes
2584  * already streamed but still keeping other things like invalidations,
2585  * snapshot, and tuplecids. For 2 and 3, we indicate
2586  * ReorderBufferTruncateTXN to do more elaborate truncation of txn
2587  * data as the entire transaction has been decoded except for commit.
2588  * For 4, as the entire txn has been decoded, we can fully clean up
2589  * the TXN reorder buffer.
2590  */
2591  if (streaming || rbtxn_prepared(txn))
2592  {
2594  /* Reset the CheckXidAlive */
2596  }
2597  else
2598  ReorderBufferCleanupTXN(rb, txn);
2599  }
2600  PG_CATCH();
2601  {
2602  MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
2603  ErrorData *errdata = CopyErrorData();
2604 
2605  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
2606  if (iterstate)
2607  ReorderBufferIterTXNFinish(rb, iterstate);
2608 
2610 
2611  /*
2612  * Force cache invalidation to happen outside of a valid transaction
2613  * to prevent catalog access as we just caught an error.
2614  */
2616 
2617  /* make sure there's no cache pollution */
2619  txn->invalidations);
2620 
2621  if (using_subtxn)
2623 
2624  /*
2625  * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
2626  * abort of the (sub)transaction we are streaming or preparing. We
2627  * need to do the cleanup and return gracefully on this error, see
2628  * SetupCheckXidLive.
2629  *
2630  * This error code can be thrown by one of the callbacks we call
2631  * during decoding so we need to ensure that we return gracefully only
2632  * when we are sending the data in streaming mode and the streaming is
2633  * not finished yet or when we are sending the data out on a PREPARE
2634  * during a two-phase commit.
2635  */
2636  if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
2637  (stream_started || rbtxn_prepared(txn)))
2638  {
2639  /* curtxn must be set for streaming or prepared transactions */
2640  Assert(curtxn);
2641 
2642  /* Cleanup the temporary error state. */
2643  FlushErrorState();
2644  FreeErrorData(errdata);
2645  errdata = NULL;
2646  curtxn->concurrent_abort = true;
2647 
2648  /* Reset the TXN so that it is allowed to stream remaining data. */
2649  ReorderBufferResetTXN(rb, txn, snapshot_now,
2650  command_id, prev_lsn,
2651  specinsert);
2652  }
2653  else
2654  {
2655  ReorderBufferCleanupTXN(rb, txn);
2656  MemoryContextSwitchTo(ecxt);
2657  PG_RE_THROW();
2658  }
2659  }
2660  PG_END_TRY();
2661 }
2662 
2663 /*
2664  * Perform the replay of a transaction and its non-aborted subtransactions.
2665  *
2666  * Subtransactions previously have to be processed by
2667  * ReorderBufferCommitChild(), even if previously assigned to the toplevel
2668  * transaction with ReorderBufferAssignChild.
2669  *
2670  * This interface is called once a prepare or toplevel commit is read for both
2671  * streamed as well as non-streamed transactions.
2672  */
2673 static void
2675  ReorderBuffer *rb, TransactionId xid,
2676  XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2677  TimestampTz commit_time,
2678  RepOriginId origin_id, XLogRecPtr origin_lsn)
2679 {
2680  Snapshot snapshot_now;
2681  CommandId command_id = FirstCommandId;
2682 
2683  txn->final_lsn = commit_lsn;
2684  txn->end_lsn = end_lsn;
2685  txn->xact_time.commit_time = commit_time;
2686  txn->origin_id = origin_id;
2687  txn->origin_lsn = origin_lsn;
2688 
2689  /*
2690  * If the transaction was (partially) streamed, we need to commit it in a
2691  * 'streamed' way. That is, we first stream the remaining part of the
2692  * transaction, and then invoke stream_commit message.
2693  *
2694  * Called after everything (origin ID, LSN, ...) is stored in the
2695  * transaction to avoid passing that information directly.
2696  */
2697  if (rbtxn_is_streamed(txn))
2698  {
2699  ReorderBufferStreamCommit(rb, txn);
2700  return;
2701  }
2702 
2703  /*
2704  * If this transaction has no snapshot, it didn't make any changes to the
2705  * database, so there's nothing to decode. Note that
2706  * ReorderBufferCommitChild will have transferred any snapshots from
2707  * subtransactions if there were any.
2708  */
2709  if (txn->base_snapshot == NULL)
2710  {
2711  Assert(txn->ninvalidations == 0);
2712 
2713  /*
2714  * Removing this txn before a commit might result in the computation
2715  * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
2716  */
2717  if (!rbtxn_prepared(txn))
2718  ReorderBufferCleanupTXN(rb, txn);
2719  return;
2720  }
2721 
2722  snapshot_now = txn->base_snapshot;
2723 
2724  /* Process and send the changes to output plugin. */
2725  ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2726  command_id, false);
2727 }
2728 
2729 /*
2730  * Commit a transaction.
2731  *
2732  * See comments for ReorderBufferReplay().
2733  */
2734 void
2736  XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2737  TimestampTz commit_time,
2738  RepOriginId origin_id, XLogRecPtr origin_lsn)
2739 {
2740  ReorderBufferTXN *txn;
2741 
2742  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2743  false);
2744 
2745  /* unknown transaction, nothing to replay */
2746  if (txn == NULL)
2747  return;
2748 
2749  ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2750  origin_id, origin_lsn);
2751 }
2752 
2753 /*
2754  * Record the prepare information for a transaction.
2755  */
2756 bool
2758  XLogRecPtr prepare_lsn, XLogRecPtr end_lsn,
2759  TimestampTz prepare_time,
2760  RepOriginId origin_id, XLogRecPtr origin_lsn)
2761 {
2762  ReorderBufferTXN *txn;
2763 
2764  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2765 
2766  /* unknown transaction, nothing to do */
2767  if (txn == NULL)
2768  return false;
2769 
2770  /*
2771  * Remember the prepare information to be later used by commit prepared in
2772  * case we skip doing prepare.
2773  */
2774  txn->final_lsn = prepare_lsn;
2775  txn->end_lsn = end_lsn;
2776  txn->xact_time.prepare_time = prepare_time;
2777  txn->origin_id = origin_id;
2778  txn->origin_lsn = origin_lsn;
2779 
2780  return true;
2781 }
2782 
2783 /* Remember that we have skipped prepare */
2784 void
2786 {
2787  ReorderBufferTXN *txn;
2788 
2789  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2790 
2791  /* unknown transaction, nothing to do */
2792  if (txn == NULL)
2793  return;
2794 
2796 }
2797 
2798 /*
2799  * Prepare a two-phase transaction.
2800  *
2801  * See comments for ReorderBufferReplay().
2802  */
2803 void
2805  char *gid)
2806 {
2807  ReorderBufferTXN *txn;
2808 
2809  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2810  false);
2811 
2812  /* unknown transaction, nothing to replay */
2813  if (txn == NULL)
2814  return;
2815 
2816  txn->txn_flags |= RBTXN_PREPARE;
2817  txn->gid = pstrdup(gid);
2818 
2819  /* The prepare info must have been updated in txn by now. */
2821 
2822  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2823  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2824 
2825  /*
2826  * We send the prepare for the concurrently aborted xacts so that later
2827  * when rollback prepared is decoded and sent, the downstream should be
2828  * able to rollback such a xact. See comments atop DecodePrepare.
2829  *
2830  * Note, for the concurrent_abort + streaming case a stream_prepare was
2831  * already sent within the ReorderBufferReplay call above.
2832  */
2833  if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
2834  rb->prepare(rb, txn, txn->final_lsn);
2835 }
2836 
2837 /*
2838  * This is used to handle COMMIT/ROLLBACK PREPARED.
2839  */
2840 void
2842  XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2843  XLogRecPtr two_phase_at,
2844  TimestampTz commit_time, RepOriginId origin_id,
2845  XLogRecPtr origin_lsn, char *gid, bool is_commit)
2846 {
2847  ReorderBufferTXN *txn;
2848  XLogRecPtr prepare_end_lsn;
2849  TimestampTz prepare_time;
2850 
2851  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
2852 
2853  /* unknown transaction, nothing to do */
2854  if (txn == NULL)
2855  return;
2856 
2857  /*
2858  * By this time the txn has the prepare record information, remember it to
2859  * be later used for rollback.
2860  */
2861  prepare_end_lsn = txn->end_lsn;
2862  prepare_time = txn->xact_time.prepare_time;
2863 
2864  /* add the gid in the txn */
2865  txn->gid = pstrdup(gid);
2866 
2867  /*
2868  * It is possible that this transaction is not decoded at prepare time
2869  * either because by that time we didn't have a consistent snapshot, or
2870  * two_phase was not enabled, or it was decoded earlier but we have
2871  * restarted. We only need to send the prepare if it was not decoded
2872  * earlier. We don't need to decode the xact for aborts if it is not done
2873  * already.
2874  */
2875  if ((txn->final_lsn < two_phase_at) && is_commit)
2876  {
2877  txn->txn_flags |= RBTXN_PREPARE;
2878 
2879  /*
2880  * The prepare info must have been updated in txn even if we skip
2881  * prepare.
2882  */
2884 
2885  /*
2886  * By this time the txn has the prepare record information and it is
2887  * important to use that so that downstream gets the accurate
2888  * information. If instead, we have passed commit information here
2889  * then downstream can behave as it has already replayed commit
2890  * prepared after the restart.
2891  */
2892  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2893  txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2894  }
2895 
2896  txn->final_lsn = commit_lsn;
2897  txn->end_lsn = end_lsn;
2898  txn->xact_time.commit_time = commit_time;
2899  txn->origin_id = origin_id;
2900  txn->origin_lsn = origin_lsn;
2901 
2902  if (is_commit)
2903  rb->commit_prepared(rb, txn, commit_lsn);
2904  else
2905  rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
2906 
2907  /* cleanup: make sure there's no cache pollution */
2909  txn->invalidations);
2910  ReorderBufferCleanupTXN(rb, txn);
2911 }
2912 
2913 /*
2914  * Abort a transaction that possibly has previous changes. Needs to be first
2915  * called for subtransactions and then for the toplevel xid.
2916  *
2917  * NB: Transactions handled here have to have actively aborted (i.e. have
2918  * produced an abort record). Implicitly aborted transactions are handled via
2919  * ReorderBufferAbortOld(); transactions we're just not interested in, but
2920  * which have committed are handled in ReorderBufferForget().
2921  *
2922  * This function purges this transaction and its contents from memory and
2923  * disk.
2924  */
2925 void
2927  TimestampTz abort_time)
2928 {
2929  ReorderBufferTXN *txn;
2930 
2931  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2932  false);
2933 
2934  /* unknown, nothing to remove */
2935  if (txn == NULL)
2936  return;
2937 
2938  txn->xact_time.abort_time = abort_time;
2939 
2940  /* For streamed transactions notify the remote node about the abort. */
2941  if (rbtxn_is_streamed(txn))
2942  {
2943  rb->stream_abort(rb, txn, lsn);
2944 
2945  /*
2946  * We might have decoded changes for this transaction that could load
2947  * the cache as per the current transaction's view (consider DDL's
2948  * happened in this transaction). We don't want the decoding of future
2949  * transactions to use those cache entries so execute invalidations.
2950  */
2951  if (txn->ninvalidations > 0)
2953  txn->invalidations);
2954  }
2955 
2956  /* cosmetic... */
2957  txn->final_lsn = lsn;
2958 
2959  /* remove potential on-disk data, and deallocate */
2960  ReorderBufferCleanupTXN(rb, txn);
2961 }
2962 
2963 /*
2964  * Abort all transactions that aren't actually running anymore because the
2965  * server restarted.
2966  *
2967  * NB: These really have to be transactions that have aborted due to a server
2968  * crash/immediate restart, as we don't deal with invalidations here.
2969  */
2970 void
2972 {
2973  dlist_mutable_iter it;
2974 
2975  /*
2976  * Iterate through all (potential) toplevel TXNs and abort all that are
2977  * older than what possibly can be running. Once we've found the first
2978  * that is alive we stop, there might be some that acquired an xid earlier
2979  * but started writing later, but it's unlikely and they will be cleaned
2980  * up in a later call to this function.
2981  */
2983  {
2984  ReorderBufferTXN *txn;
2985 
2986  txn = dlist_container(ReorderBufferTXN, node, it.cur);
2987 
2988  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
2989  {
2990  elog(DEBUG2, "aborting old transaction %u", txn->xid);
2991 
2992  /* Notify the remote node about the crash/immediate restart. */
2993  if (rbtxn_is_streamed(txn))
2994  rb->stream_abort(rb, txn, InvalidXLogRecPtr);
2995 
2996  /* remove potential on-disk data, and deallocate this tx */
2997  ReorderBufferCleanupTXN(rb, txn);
2998  }
2999  else
3000  return;
3001  }
3002 }
3003 
3004 /*
3005  * Forget the contents of a transaction if we aren't interested in its
3006  * contents. Needs to be first called for subtransactions and then for the
3007  * toplevel xid.
3008  *
3009  * This is significantly different to ReorderBufferAbort() because
3010  * transactions that have committed need to be treated differently from aborted
3011  * ones since they may have modified the catalog.
3012  *
3013  * Note that this is only allowed to be called in the moment a transaction
3014  * commit has just been read, not earlier; otherwise later records referring
3015  * to this xid might re-create the transaction incompletely.
3016  */
3017 void
3019 {
3020  ReorderBufferTXN *txn;
3021 
3022  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3023  false);
3024 
3025  /* unknown, nothing to forget */
3026  if (txn == NULL)
3027  return;
3028 
3029  /* this transaction mustn't be streamed */
3030  Assert(!rbtxn_is_streamed(txn));
3031 
3032  /* cosmetic... */
3033  txn->final_lsn = lsn;
3034 
3035  /*
3036  * Process cache invalidation messages if there are any. Even if we're not
3037  * interested in the transaction's contents, it could have manipulated the
3038  * catalog and we need to update the caches according to that.
3039  */
3040  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3042  txn->invalidations);
3043  else
3044  Assert(txn->ninvalidations == 0);
3045 
3046  /* remove potential on-disk data, and deallocate */
3047  ReorderBufferCleanupTXN(rb, txn);
3048 }
3049 
3050 /*
3051  * Invalidate cache for those transactions that need to be skipped just in case
3052  * catalogs were manipulated as part of the transaction.
3053  *
3054  * Note that this is a special-purpose function for prepared transactions where
3055  * we don't want to clean up the TXN even when we decide to skip it. See
3056  * DecodePrepare.
3057  */
3058 void
3060 {
3061  ReorderBufferTXN *txn;
3062 
3063  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3064  false);
3065 
3066  /* unknown, nothing to do */
3067  if (txn == NULL)
3068  return;
3069 
3070  /*
3071  * Process cache invalidation messages if there are any. Even if we're not
3072  * interested in the transaction's contents, it could have manipulated the
3073  * catalog and we need to update the caches according to that.
3074  */
3075  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3077  txn->invalidations);
3078  else
3079  Assert(txn->ninvalidations == 0);
3080 }
3081 
3082 
3083 /*
3084  * Execute invalidations happening outside the context of a decoded
3085  * transaction. That currently happens either for xid-less commits
3086  * (cf. RecordTransactionCommit()) or for invalidations in uninteresting
3087  * transactions (via ReorderBufferForget()).
3088  */
3089 void
3091  SharedInvalidationMessage *invalidations)
3092 {
3093  bool use_subtxn = IsTransactionOrTransactionBlock();
3094  int i;
3095 
3096  if (use_subtxn)
3097  BeginInternalSubTransaction("replay");
3098 
3099  /*
3100  * Force invalidations to happen outside of a valid transaction - that way
3101  * entries will just be marked as invalid without accessing the catalog.
3102  * That's advantageous because we don't need to setup the full state
3103  * necessary for catalog access.
3104  */
3105  if (use_subtxn)
3107 
3108  for (i = 0; i < ninvalidations; i++)
3109  LocalExecuteInvalidationMessage(&invalidations[i]);
3110 
3111  if (use_subtxn)
3113 }
3114 
3115 /*
3116  * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
3117  * least once for every xid in XLogRecord->xl_xid (other places in records
3118  * may, but do not have to be passed through here).
3119  *
3120  * Reorderbuffer keeps some data structures about transactions in LSN order,
3121  * for efficiency. To do that it has to know about when transactions are seen
3122  * first in the WAL. As many types of records are not actually interesting for
3123  * logical decoding, they do not necessarily pass through here.
3124  */
3125 void
3127 {
3128  /* many records won't have an xid assigned, centralize check here */
3129  if (xid != InvalidTransactionId)
3130  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3131 }
3132 
3133 /*
3134  * Add a new snapshot to this transaction that may only used after lsn 'lsn'
3135  * because the previous snapshot doesn't describe the catalog correctly for
3136  * following rows.
3137  */
3138 void
3140  XLogRecPtr lsn, Snapshot snap)
3141 {
3143 
3144  change->data.snapshot = snap;
3146 
3147  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3148 }
3149 
3150 /*
3151  * Set up the transaction's base snapshot.
3152  *
3153  * If we know that xid is a subtransaction, set the base snapshot on the
3154  * top-level transaction instead.
3155  */
3156 void
3158  XLogRecPtr lsn, Snapshot snap)
3159 {
3160  ReorderBufferTXN *txn;
3161  bool is_new;
3162 
3163  Assert(snap != NULL);
3164 
3165  /*
3166  * Fetch the transaction to operate on. If we know it's a subtransaction,
3167  * operate on its top-level transaction instead.
3168  */
3169  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
3170  if (rbtxn_is_known_subxact(txn))
3171  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3172  NULL, InvalidXLogRecPtr, false);
3173  Assert(txn->base_snapshot == NULL);
3174 
3175  txn->base_snapshot = snap;
3176  txn->base_snapshot_lsn = lsn;
3178 
3179  AssertTXNLsnOrder(rb);
3180 }
3181 
3182 /*
3183  * Access the catalog with this CommandId at this point in the changestream.
3184  *
3185  * May only be called for command ids > 1
3186  */
3187 void
3189  XLogRecPtr lsn, CommandId cid)
3190 {
3192 
3193  change->data.command_id = cid;
3195 
3196  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3197 }
3198 
3199 /*
3200  * Update memory counters to account for the new or removed change.
3201  *
3202  * We update two counters - in the reorder buffer, and in the transaction
3203  * containing the change. The reorder buffer counter allows us to quickly
3204  * decide if we reached the memory limit, the transaction counter allows
3205  * us to quickly pick the largest transaction for eviction.
3206  *
3207  * Either txn or change must be non-NULL at least. We update the memory
3208  * counter of txn if it's non-NULL, otherwise change->txn.
3209  *
3210  * When streaming is enabled, we need to update the toplevel transaction
3211  * counters instead - we don't really care about subtransactions as we
3212  * can't stream them individually anyway, and we only pick toplevel
3213  * transactions for eviction. So only toplevel transactions matter.
3214  */
3215 static void
3217  ReorderBufferChange *change,
3218  ReorderBufferTXN *txn,
3219  bool addition, Size sz)
3220 {
3221  ReorderBufferTXN *toptxn;
3222 
3223  Assert(txn || change);
3224 
3225  /*
3226  * Ignore tuple CID changes, because those are not evicted when reaching
3227  * memory limit. So we just don't count them, because it might easily
3228  * trigger a pointless attempt to spill.
3229  */
3230  if (change && change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
3231  return;
3232 
3233  if (sz == 0)
3234  return;
3235 
3236  if (txn == NULL)
3237  txn = change->txn;
3238  Assert(txn != NULL);
3239 
3240  /*
3241  * Update the total size in top level as well. This is later used to
3242  * compute the decoding stats.
3243  */
3244  toptxn = rbtxn_get_toptxn(txn);
3245 
3246  if (addition)
3247  {
3248  Size oldsize = txn->size;
3249 
3250  txn->size += sz;
3251  rb->size += sz;
3252 
3253  /* Update the total size in the top transaction. */
3254  toptxn->total_size += sz;
3255 
3256  /* Update the max-heap */
3257  if (oldsize != 0)
3258  pairingheap_remove(rb->txn_heap, &txn->txn_node);
3259  pairingheap_add(rb->txn_heap, &txn->txn_node);
3260  }
3261  else
3262  {
3263  Assert((rb->size >= sz) && (txn->size >= sz));
3264  txn->size -= sz;
3265  rb->size -= sz;
3266 
3267  /* Update the total size in the top transaction. */
3268  toptxn->total_size -= sz;
3269 
3270  /* Update the max-heap */
3271  pairingheap_remove(rb->txn_heap, &txn->txn_node);
3272  if (txn->size != 0)
3273  pairingheap_add(rb->txn_heap, &txn->txn_node);
3274  }
3275 
3276  Assert(txn->size <= rb->size);
3277 }
3278 
3279 /*
3280  * Add new (relfilelocator, tid) -> (cmin, cmax) mappings.
3281  *
3282  * We do not include this change type in memory accounting, because we
3283  * keep CIDs in a separate list and do not evict them when reaching
3284  * the memory limit.
3285  */
3286 void
3288  XLogRecPtr lsn, RelFileLocator locator,
3289  ItemPointerData tid, CommandId cmin,
3290  CommandId cmax, CommandId combocid)
3291 {
3293  ReorderBufferTXN *txn;
3294 
3295  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3296 
3297  change->data.tuplecid.locator = locator;
3298  change->data.tuplecid.tid = tid;
3299  change->data.tuplecid.cmin = cmin;
3300  change->data.tuplecid.cmax = cmax;
3301  change->data.tuplecid.combocid = combocid;
3302  change->lsn = lsn;
3303  change->txn = txn;
3305 
3306  dlist_push_tail(&txn->tuplecids, &change->node);
3307  txn->ntuplecids++;
3308 }
3309 
3310 /*
3311  * Accumulate the invalidations for executing them later.
3312  *
3313  * This needs to be called for each XLOG_XACT_INVALIDATIONS message and
3314  * accumulates all the invalidation messages in the toplevel transaction, if
3315  * available, otherwise in the current transaction, as well as in the form of
3316  * change in reorder buffer. We require to record it in form of the change
3317  * so that we can execute only the required invalidations instead of executing
3318  * all the invalidations on each CommandId increment. We also need to
3319  * accumulate these in the txn buffer because in some cases where we skip
3320  * processing the transaction (see ReorderBufferForget), we need to execute
3321  * all the invalidations together.
3322  */
3323 void
3325  XLogRecPtr lsn, Size nmsgs,
3327 {
3328  ReorderBufferTXN *txn;
3329  MemoryContext oldcontext;
3330  ReorderBufferChange *change;
3331 
3332  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3333 
3334  oldcontext = MemoryContextSwitchTo(rb->context);
3335 
3336  /*
3337  * Collect all the invalidations under the top transaction, if available,
3338  * so that we can execute them all together. See comments atop this
3339  * function.
3340  */
3341  txn = rbtxn_get_toptxn(txn);
3342 
3343  Assert(nmsgs > 0);
3344 
3345  /* Accumulate invalidations. */
3346  if (txn->ninvalidations == 0)
3347  {
3348  txn->ninvalidations = nmsgs;
3350  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3351  memcpy(txn->invalidations, msgs,
3352  sizeof(SharedInvalidationMessage) * nmsgs);
3353  }
3354  else
3355  {
3358  (txn->ninvalidations + nmsgs));
3359 
3360  memcpy(txn->invalidations + txn->ninvalidations, msgs,
3361  nmsgs * sizeof(SharedInvalidationMessage));
3362  txn->ninvalidations += nmsgs;
3363  }
3364 
3365  change = ReorderBufferGetChange(rb);
3367  change->data.inval.ninvalidations = nmsgs;
3368  change->data.inval.invalidations = (SharedInvalidationMessage *)
3369  palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3370  memcpy(change->data.inval.invalidations, msgs,
3371  sizeof(SharedInvalidationMessage) * nmsgs);
3372 
3373  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3374 
3375  MemoryContextSwitchTo(oldcontext);
3376 }
3377 
3378 /*
3379  * Apply all invalidations we know. Possibly we only need parts at this point
3380  * in the changestream but we don't know which those are.
3381  */
3382 static void
3384 {
3385  int i;
3386 
3387  for (i = 0; i < nmsgs; i++)
3389 }
3390 
3391 /*
3392  * Mark a transaction as containing catalog changes
3393  */
3394 void
3396  XLogRecPtr lsn)
3397 {
3398  ReorderBufferTXN *txn;
3399 
3400  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3401 
3402  if (!rbtxn_has_catalog_changes(txn))
3403  {
3406  }
3407 
3408  /*
3409  * Mark top-level transaction as having catalog changes too if one of its
3410  * children has so that the ReorderBufferBuildTupleCidHash can
3411  * conveniently check just top-level transaction and decide whether to
3412  * build the hash table or not.
3413  */
3414  if (rbtxn_is_subtxn(txn))
3415  {
3416  ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
3417 
3418  if (!rbtxn_has_catalog_changes(toptxn))
3419  {
3422  }
3423  }
3424 }
3425 
3426 /*
3427  * Return palloc'ed array of the transactions that have changed catalogs.
3428  * The returned array is sorted in xidComparator order.
3429  *
3430  * The caller must free the returned array when done with it.
3431  */
3432 TransactionId *
3434 {
3435  dlist_iter iter;
3436  TransactionId *xids = NULL;
3437  size_t xcnt = 0;
3438 
3439  /* Quick return if the list is empty */
3440  if (dclist_count(&rb->catchange_txns) == 0)
3441  return NULL;
3442 
3443  /* Initialize XID array */
3444  xids = (TransactionId *) palloc(sizeof(TransactionId) *
3446  dclist_foreach(iter, &rb->catchange_txns)
3447  {
3449  catchange_node,
3450  iter.cur);
3451 
3453 
3454  xids[xcnt++] = txn->xid;
3455  }
3456 
3457  qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
3458 
3459  Assert(xcnt == dclist_count(&rb->catchange_txns));
3460  return xids;
3461 }
3462 
3463 /*
3464  * Query whether a transaction is already *known* to contain catalog
3465  * changes. This can be wrong until directly before the commit!
3466  */
3467 bool
3469 {
3470  ReorderBufferTXN *txn;
3471 
3472  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3473  false);
3474  if (txn == NULL)
3475  return false;
3476 
3477  return rbtxn_has_catalog_changes(txn);
3478 }
3479 
3480 /*
3481  * ReorderBufferXidHasBaseSnapshot
3482  * Have we already set the base snapshot for the given txn/subtxn?
3483  */
3484 bool
3486 {
3487  ReorderBufferTXN *txn;
3488 
3489  txn = ReorderBufferTXNByXid(rb, xid, false,
3490  NULL, InvalidXLogRecPtr, false);
3491 
3492  /* transaction isn't known yet, ergo no snapshot */
3493  if (txn == NULL)
3494  return false;
3495 
3496  /* a known subtxn? operate on top-level txn instead */
3497  if (rbtxn_is_known_subxact(txn))
3498  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3499  NULL, InvalidXLogRecPtr, false);
3500 
3501  return txn->base_snapshot != NULL;
3502 }
3503 
3504 
3505 /*
3506  * ---------------------------------------
3507  * Disk serialization support
3508  * ---------------------------------------
3509  */
3510 
3511 /*
3512  * Ensure the IO buffer is >= sz.
3513  */
3514 static void
3516 {
3517  if (!rb->outbufsize)
3518  {
3519  rb->outbuf = MemoryContextAlloc(rb->context, sz);
3520  rb->outbufsize = sz;
3521  }
3522  else if (rb->outbufsize < sz)
3523  {
3524  rb->outbuf = repalloc(rb->outbuf, sz);
3525  rb->outbufsize = sz;
3526  }
3527 }
3528 
3529 
3530 /* Compare two transactions by size */
3531 static int
3533 {
3536 
3537  if (ta->size < tb->size)
3538  return -1;
3539  if (ta->size > tb->size)
3540  return 1;
3541  return 0;
3542 }
3543 
3544 /*
3545  * Find the largest transaction (toplevel or subxact) to evict (spill to disk).
3546  */
3547 static ReorderBufferTXN *
3549 {
3550  ReorderBufferTXN *largest;
3551 
3552  /* Get the largest transaction from the max-heap */
3553  largest = pairingheap_container(ReorderBufferTXN, txn_node,
3555 
3556  Assert(largest);
3557  Assert(largest->size > 0);
3558  Assert(largest->size <= rb->size);
3559 
3560  return largest;
3561 }
3562 
3563 /*
3564  * Find the largest streamable toplevel transaction to evict (by streaming).
3565  *
3566  * This can be seen as an optimized version of ReorderBufferLargestTXN, which
3567  * should give us the same transaction (because we don't update memory account
3568  * for subtransaction with streaming, so it's always 0). But we can simply
3569  * iterate over the limited number of toplevel transactions that have a base
3570  * snapshot. There is no use of selecting a transaction that doesn't have base
3571  * snapshot because we don't decode such transactions. Also, we do not select
3572  * the transaction which doesn't have any streamable change.
3573  *
3574  * Note that, we skip transactions that contain incomplete changes. There
3575  * is a scope of optimization here such that we can select the largest
3576  * transaction which has incomplete changes. But that will make the code and
3577  * design quite complex and that might not be worth the benefit. If we plan to
3578  * stream the transactions that contain incomplete changes then we need to
3579  * find a way to partially stream/truncate the transaction changes in-memory
3580  * and build a mechanism to partially truncate the spilled files.
3581  * Additionally, whenever we partially stream the transaction we need to
3582  * maintain the last streamed lsn and next time we need to restore from that
3583  * segment and the offset in WAL. As we stream the changes from the top
3584  * transaction and restore them subtransaction wise, we need to even remember
3585  * the subxact from where we streamed the last change.
3586  */
3587 static ReorderBufferTXN *
3589 {
3590  dlist_iter iter;
3591  Size largest_size = 0;
3592  ReorderBufferTXN *largest = NULL;
3593 
3594  /* Find the largest top-level transaction having a base snapshot. */
3596  {
3597  ReorderBufferTXN *txn;
3598 
3599  txn = dlist_container(ReorderBufferTXN, base_snapshot_node, iter.cur);
3600 
3601  /* must not be a subtxn */
3603  /* base_snapshot must be set */
3604  Assert(txn->base_snapshot != NULL);
3605 
3606  if ((largest == NULL || txn->total_size > largest_size) &&
3607  (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) &&
3609  {
3610  largest = txn;
3611  largest_size = txn->total_size;
3612  }
3613  }
3614 
3615  return largest;
3616 }
3617 
3618 /*
3619  * Check whether the logical_decoding_work_mem limit was reached, and if yes
3620  * pick the largest (sub)transaction at-a-time to evict and spill its changes to
3621  * disk or send to the output plugin until we reach under the memory limit.
3622  *
3623  * If debug_logical_replication_streaming is set to "immediate", stream or
3624  * serialize the changes immediately.
3625  *
3626  * XXX At this point we select the transactions until we reach under the memory
3627  * limit, but we might also adapt a more elaborate eviction strategy - for example
3628  * evicting enough transactions to free certain fraction (e.g. 50%) of the memory
3629  * limit.
3630  */
3631 static void
3633 {
3634  ReorderBufferTXN *txn;
3635 
3636  /*
3637  * Bail out if debug_logical_replication_streaming is buffered and we
3638  * haven't exceeded the memory limit.
3639  */
3641  rb->size < logical_decoding_work_mem * 1024L)
3642  return;
3643 
3644  /*
3645  * If debug_logical_replication_streaming is immediate, loop until there's
3646  * no change. Otherwise, loop until we reach under the memory limit. One
3647  * might think that just by evicting the largest (sub)transaction we will
3648  * come under the memory limit based on assumption that the selected
3649  * transaction is at least as large as the most recent change (which
3650  * caused us to go over the memory limit). However, that is not true
3651  * because a user can reduce the logical_decoding_work_mem to a smaller
3652  * value before the most recent change.
3653  */
3654  while (rb->size >= logical_decoding_work_mem * 1024L ||
3656  rb->size > 0))
3657  {
3658  /*
3659  * Pick the largest transaction and evict it from memory by streaming,
3660  * if possible. Otherwise, spill to disk.
3661  */
3663  (txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
3664  {
3665  /* we know there has to be one, because the size is not zero */
3666  Assert(txn && rbtxn_is_toptxn(txn));
3667  Assert(txn->total_size > 0);
3668  Assert(rb->size >= txn->total_size);
3669 
3670  ReorderBufferStreamTXN(rb, txn);
3671  }
3672  else
3673  {
3674  /*
3675  * Pick the largest transaction (or subtransaction) and evict it
3676  * from memory by serializing it to disk.
3677  */
3678  txn = ReorderBufferLargestTXN(rb);
3679 
3680  /* we know there has to be one, because the size is not zero */
3681  Assert(txn);
3682  Assert(txn->size > 0);
3683  Assert(rb->size >= txn->size);
3684 
3685  ReorderBufferSerializeTXN(rb, txn);
3686  }
3687 
3688  /*
3689  * After eviction, the transaction should have no entries in memory,
3690  * and should use 0 bytes for changes.
3691  */
3692  Assert(txn->size == 0);
3693  Assert(txn->nentries_mem == 0);
3694  }
3695 
3696  /* We must be under the memory limit now. */
3697  Assert(rb->size < logical_decoding_work_mem * 1024L);
3698 
3699 }
3700 
3701 /*
3702  * Spill data of a large transaction (and its subtransactions) to disk.
3703  */
3704 static void
3706 {
3707  dlist_iter subtxn_i;
3708  dlist_mutable_iter change_i;
3709  int fd = -1;
3710  XLogSegNo curOpenSegNo = 0;
3711  Size spilled = 0;
3712  Size size = txn->size;
3713 
3714  elog(DEBUG2, "spill %u changes in XID %u to disk",
3715  (uint32) txn->nentries_mem, txn->xid);
3716 
3717  /* do the same to all child TXs */
3718  dlist_foreach(subtxn_i, &txn->subtxns)
3719  {
3720  ReorderBufferTXN *subtxn;
3721 
3722  subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
3723  ReorderBufferSerializeTXN(rb, subtxn);
3724  }
3725 
3726  /* serialize changestream */
3727  dlist_foreach_modify(change_i, &txn->changes)
3728  {
3729  ReorderBufferChange *change;
3730 
3731  change = dlist_container(ReorderBufferChange, node, change_i.cur);
3732 
3733  /*
3734  * store in segment in which it belongs by start lsn, don't split over
3735  * multiple segments tho
3736  */
3737  if (fd == -1 ||
3738  !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
3739  {
3740  char path[MAXPGPATH];
3741 
3742  if (fd != -1)
3744 
3745  XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
3746 
3747  /*
3748  * No need to care about TLIs here, only used during a single run,
3749  * so each LSN only maps to a specific WAL record.
3750  */
3752  curOpenSegNo);
3753 
3754  /* open segment, create it if necessary */
3755  fd = OpenTransientFile(path,
3756  O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
3757 
3758  if (fd < 0)
3759  ereport(ERROR,
3761  errmsg("could not open file \"%s\": %m", path)));
3762  }
3763 
3764  ReorderBufferSerializeChange(rb, txn, fd, change);
3765  dlist_delete(&change->node);
3766  ReorderBufferReturnChange(rb, change, false);
3767 
3768  spilled++;
3769  }
3770 
3771  /* Update the memory counter */
3772  ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, size);
3773 
3774  /* update the statistics iff we have spilled anything */
3775  if (spilled)
3776  {
3777  rb->spillCount += 1;
3778  rb->spillBytes += size;
3779 
3780  /* don't consider already serialized transactions */
3781  rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
3782 
3783  /* update the decoding stats */
3785  }
3786 
3787  Assert(spilled == txn->nentries_mem);
3788  Assert(dlist_is_empty(&txn->changes));
3789  txn->nentries_mem = 0;
3791 
3792  if (fd != -1)
3794 }
3795 
3796 /*
3797  * Serialize individual change to disk.
3798  */
3799 static void
3801  int fd, ReorderBufferChange *change)
3802 {
3803  ReorderBufferDiskChange *ondisk;
3804  Size sz = sizeof(ReorderBufferDiskChange);
3805 
3807 
3808  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3809  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
3810 
3811  switch (change->action)
3812  {
3813  /* fall through these, they're all similar enough */
3818  {
3819  char *data;
3820  HeapTuple oldtup,
3821  newtup;
3822  Size oldlen = 0;
3823  Size newlen = 0;
3824 
3825  oldtup = change->data.tp.oldtuple;
3826  newtup = change->data.tp.newtuple;
3827 
3828  if (oldtup)
3829  {
3830  sz += sizeof(HeapTupleData);
3831  oldlen = oldtup->t_len;
3832  sz += oldlen;
3833  }
3834 
3835  if (newtup)
3836  {
3837  sz += sizeof(HeapTupleData);
3838  newlen = newtup->t_len;
3839  sz += newlen;
3840  }
3841 
3842  /* make sure we have enough space */
3844 
3845  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3846  /* might have been reallocated above */
3847  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3848 
3849  if (oldlen)
3850  {
3851  memcpy(data, oldtup, sizeof(HeapTupleData));
3852  data += sizeof(HeapTupleData);
3853 
3854  memcpy(data, oldtup->t_data, oldlen);
3855  data += oldlen;
3856  }
3857 
3858  if (newlen)
3859  {
3860  memcpy(data, newtup, sizeof(HeapTupleData));
3861  data += sizeof(HeapTupleData);
3862 
3863  memcpy(data, newtup->t_data, newlen);
3864  data += newlen;
3865  }
3866  break;
3867  }
3869  {
3870  char *data;
3871  Size prefix_size = strlen(change->data.msg.prefix) + 1;
3872 
3873  sz += prefix_size + change->data.msg.message_size +
3874  sizeof(Size) + sizeof(Size);
3876 
3877  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3878 
3879  /* might have been reallocated above */
3880  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3881 
3882  /* write the prefix including the size */
3883  memcpy(data, &prefix_size, sizeof(Size));
3884  data += sizeof(Size);
3885  memcpy(data, change->data.msg.prefix,
3886  prefix_size);
3887  data += prefix_size;
3888 
3889  /* write the message including the size */
3890  memcpy(data, &change->data.msg.message_size, sizeof(Size));
3891  data += sizeof(Size);
3892  memcpy(data, change->data.msg.message,
3893  change->data.msg.message_size);
3894  data += change->data.msg.message_size;
3895 
3896  break;
3897  }
3899  {
3900  char *data;
3901  Size inval_size = sizeof(SharedInvalidationMessage) *
3902  change->data.inval.ninvalidations;
3903 
3904  sz += inval_size;
3905 
3907  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3908 
3909  /* might have been reallocated above */
3910  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3911  memcpy(data, change->data.inval.invalidations, inval_size);
3912  data += inval_size;
3913 
3914  break;
3915  }
3917  {
3918  Snapshot snap;
3919  char *data;
3920 
3921  snap = change->data.snapshot;
3922 
3923  sz += sizeof(SnapshotData) +
3924  sizeof(TransactionId) * snap->xcnt +
3925  sizeof(TransactionId) * snap->subxcnt;
3926 
3927  /* make sure we have enough space */
3929  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3930  /* might have been reallocated above */
3931  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3932 
3933  memcpy(data, snap, sizeof(SnapshotData));
3934  data += sizeof(SnapshotData);
3935 
3936  if (snap->xcnt)
3937  {
3938  memcpy(data, snap->xip,
3939  sizeof(TransactionId) * snap->xcnt);
3940  data += sizeof(TransactionId) * snap->xcnt;
3941  }
3942 
3943  if (snap->subxcnt)
3944  {
3945  memcpy(data, snap->subxip,
3946  sizeof(TransactionId) * snap->subxcnt);
3947  data += sizeof(TransactionId) * snap->subxcnt;
3948  }
3949  break;
3950  }
3952  {
3953  Size size;
3954  char *data;
3955 
3956  /* account for the OIDs of truncated relations */
3957  size = sizeof(Oid) * change->data.truncate.nrelids;
3958  sz += size;
3959 
3960  /* make sure we have enough space */
3962 
3963  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3964  /* might have been reallocated above */
3965  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3966 
3967  memcpy(data, change->data.truncate.relids, size);
3968  data += size;
3969 
3970  break;
3971  }
3976  /* ReorderBufferChange contains everything important */
3977  break;
3978  }
3979 
3980  ondisk->size = sz;
3981 
3982  errno = 0;
3983  pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
3984  if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
3985  {
3986  int save_errno = errno;
3987 
3989 
3990  /* if write didn't set errno, assume problem is no disk space */
3991  errno = save_errno ? save_errno : ENOSPC;
3992  ereport(ERROR,
3994  errmsg("could not write to data file for XID %u: %m",
3995  txn->xid)));
3996  }
3998 
3999  /*
4000  * Keep the transaction's final_lsn up to date with each change we send to
4001  * disk, so that ReorderBufferRestoreCleanup works correctly. (We used to
4002  * only do this on commit and abort records, but that doesn't work if a
4003  * system crash leaves a transaction without its abort record).
4004  *
4005  * Make sure not to move it backwards.
4006  */
4007  if (txn->final_lsn < change->lsn)
4008  txn->final_lsn = change->lsn;
4009 
4010  Assert(ondisk->change.action == change->action);
4011 }
4012 
4013 /* Returns true, if the output plugin supports streaming, false, otherwise. */
4014 static inline bool
4016 {
4018 
4019  return ctx->streaming;
4020 }
4021 
4022 /* Returns true, if the streaming can be started now, false, otherwise. */
4023 static inline bool
4025 {
4027  SnapBuild *builder = ctx->snapshot_builder;
4028 
4029  /* We can't start streaming unless a consistent state is reached. */
4031  return false;
4032 
4033  /*
4034  * We can't start streaming immediately even if the streaming is enabled
4035  * because we previously decoded this transaction and now just are
4036  * restarting.
4037  */
4038  if (ReorderBufferCanStream(rb) &&
4039  !SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr))
4040  return true;
4041 
4042  return false;
4043 }
4044 
4045 /*
4046  * Send data of a large transaction (and its subtransactions) to the
4047  * output plugin, but using the stream API.
4048  */
4049 static void
4051 {
4052  Snapshot snapshot_now;
4053  CommandId command_id;
4054  Size stream_bytes;
4055  bool txn_is_streamed;
4056 
4057  /* We can never reach here for a subtransaction. */
4058  Assert(rbtxn_is_toptxn(txn));
4059 
4060  /*
4061  * We can't make any assumptions about base snapshot here, similar to what
4062  * ReorderBufferCommit() does. That relies on base_snapshot getting
4063  * transferred from subxact in ReorderBufferCommitChild(), but that was
4064  * not yet called as the transaction is in-progress.
4065  *
4066  * So just walk the subxacts and use the same logic here. But we only need
4067  * to do that once, when the transaction is streamed for the first time.
4068  * After that we need to reuse the snapshot from the previous run.
4069  *
4070  * Unlike DecodeCommit which adds xids of all the subtransactions in
4071  * snapshot's xip array via SnapBuildCommitTxn, we can't do that here but
4072  * we do add them to subxip array instead via ReorderBufferCopySnap. This
4073  * allows the catalog changes made in subtransactions decoded till now to
4074  * be visible.
4075  */
4076  if (txn->snapshot_now == NULL)
4077  {
4078  dlist_iter subxact_i;
4079 
4080  /* make sure this transaction is streamed for the first time */
4081  Assert(!rbtxn_is_streamed(txn));
4082 
4083  /* at the beginning we should have invalid command ID */
4085 
4086  dlist_foreach(subxact_i, &txn->subtxns)
4087  {
4088  ReorderBufferTXN *subtxn;
4089 
4090  subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
4091  ReorderBufferTransferSnapToParent(txn, subtxn);
4092  }
4093 
4094  /*
4095  * If this transaction has no snapshot, it didn't make any changes to
4096  * the database till now, so there's nothing to decode.
4097  */
4098  if (txn->base_snapshot == NULL)
4099  {
4100  Assert(txn->ninvalidations == 0);
4101  return;
4102  }
4103 
4104  command_id = FirstCommandId;
4105  snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
4106  txn, command_id);
4107  }
4108  else
4109  {
4110  /* the transaction must have been already streamed */
4111  Assert(rbtxn_is_streamed(txn));
4112 
4113  /*
4114  * Nah, we already have snapshot from the previous streaming run. We
4115  * assume new subxacts can't move the LSN backwards, and so can't beat
4116  * the LSN condition in the previous branch (so no need to walk
4117  * through subxacts again). In fact, we must not do that as we may be
4118  * using snapshot half-way through the subxact.
4119  */
4120  command_id = txn->command_id;
4121 
4122  /*
4123  * We can't use txn->snapshot_now directly because after the last
4124  * streaming run, we might have got some new sub-transactions. So we
4125  * need to add them to the snapshot.
4126  */
4127  snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now,
4128  txn, command_id);
4129 
4130  /* Free the previously copied snapshot. */
4131  Assert(txn->snapshot_now->copied);
4133  txn->snapshot_now = NULL;
4134  }
4135 
4136  /*
4137  * Remember this information to be used later to update stats. We can't
4138  * update the stats here as an error while processing the changes would
4139  * lead to the accumulation of stats even though we haven't streamed all
4140  * the changes.
4141  */
4142  txn_is_streamed = rbtxn_is_streamed(txn);
4143  stream_bytes = txn->total_size;
4144 
4145  /* Process and send the changes to output plugin. */
4146  ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
4147  command_id, true);
4148 
4149  rb->streamCount += 1;
4150  rb->streamBytes += stream_bytes;
4151 
4152  /* Don't consider already streamed transaction. */
4153  rb->streamTxns += (txn_is_streamed) ? 0 : 1;
4154 
4155  /* update the decoding stats */
4157 
4158  Assert(dlist_is_empty(&txn->changes));
4159  Assert(txn->nentries == 0);
4160  Assert(txn->nentries_mem == 0);
4161 }
4162 
4163 /*
4164  * Size of a change in memory.
4165  */
4166 static Size
4168 {
4169  Size sz = sizeof(ReorderBufferChange);
4170 
4171  switch (change->action)
4172  {
4173  /* fall through these, they're all similar enough */
4178  {
4179  HeapTuple oldtup,
4180  newtup;
4181  Size oldlen = 0;
4182  Size newlen = 0;
4183 
4184  oldtup = change->data.tp.oldtuple;
4185  newtup = change->data.tp.newtuple;
4186 
4187  if (oldtup)
4188  {
4189  sz += sizeof(HeapTupleData);
4190  oldlen = oldtup->t_len;
4191  sz += oldlen;
4192  }
4193 
4194  if (newtup)
4195  {
4196  sz += sizeof(HeapTupleData);
4197  newlen = newtup->t_len;
4198  sz += newlen;
4199  }
4200 
4201  break;
4202  }
4204  {
4205  Size prefix_size = strlen(change->data.msg.prefix) + 1;
4206 
4207  sz += prefix_size + change->data.msg.message_size +
4208  sizeof(Size) + sizeof(Size);
4209 
4210  break;
4211  }
4213  {
4214  sz += sizeof(SharedInvalidationMessage) *
4215  change->data.inval.ninvalidations;
4216  break;
4217  }
4219  {
4220  Snapshot snap;
4221 
4222  snap = change->data.snapshot;
4223 
4224  sz += sizeof(SnapshotData) +
4225  sizeof(TransactionId) * snap->xcnt +
4226  sizeof(TransactionId) * snap->subxcnt;
4227 
4228  break;
4229  }
4231  {
4232  sz += sizeof(Oid) * change->data.truncate.nrelids;
4233 
4234  break;
4235  }
4240  /* ReorderBufferChange contains everything important */
4241  break;
4242  }
4243 
4244  return sz;
4245 }
4246 
4247 
4248 /*
4249  * Restore a number of changes spilled to disk back into memory.
4250  */
4251 static Size
4253  TXNEntryFile *file, XLogSegNo *segno)
4254 {
4255  Size restored = 0;
4256  XLogSegNo last_segno;
4257  dlist_mutable_iter cleanup_iter;
4258  File *fd = &file->vfd;
4259 
4262 
4263  /* free current entries, so we have memory for more */
4264  dlist_foreach_modify(cleanup_iter, &txn->changes)
4265  {
4267  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
4268 
4269  dlist_delete(&cleanup->node);
4271  }
4272  txn->nentries_mem = 0;
4273  Assert(dlist_is_empty(&txn->changes));
4274 
4275  XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
4276 
4277  while (restored < max_changes_in_memory && *segno <= last_segno)
4278  {
4279  int readBytes;
4280  ReorderBufferDiskChange *ondisk;
4281 
4283 
4284  if (*fd == -1)
4285  {
4286  char path[MAXPGPATH];
4287 
4288  /* first time in */
4289  if (*segno == 0)
4290  XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
4291 
4292  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
4293 
4294  /*
4295  * No need to care about TLIs here, only used during a single run,
4296  * so each LSN only maps to a specific WAL record.
4297  */
4299  *segno);
4300 
4301  *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
4302 
4303  /* No harm in resetting the offset even in case of failure */
4304  file->curOffset = 0;
4305 
4306  if (*fd < 0 && errno == ENOENT)
4307  {
4308  *fd = -1;
4309  (*segno)++;
4310  continue;
4311  }
4312  else if (*fd < 0)
4313  ereport(ERROR,
4315  errmsg("could not open file \"%s\": %m",
4316  path)));
4317  }
4318 
4319  /*
4320  * Read the statically sized part of a change which has information
4321  * about the total size. If we couldn't read a record, we're at the
4322  * end of this file.
4323  */
4325  readBytes = FileRead(file->vfd, rb->outbuf,
4326  sizeof(ReorderBufferDiskChange),
4327  file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
4328 
4329  /* eof */
4330  if (readBytes == 0)
4331  {
4332  FileClose(*fd);
4333  *fd = -1;
4334  (*segno)++;
4335  continue;
4336  }
4337  else if (readBytes < 0)
4338  ereport(ERROR,
4340  errmsg("could not read from reorderbuffer spill file: %m")));
4341  else if (readBytes != sizeof(ReorderBufferDiskChange))
4342  ereport(ERROR,
4344  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4345  readBytes,
4346  (uint32) sizeof(ReorderBufferDiskChange))));
4347 
4348  file->curOffset += readBytes;
4349 
4350  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4351 
4353  sizeof(ReorderBufferDiskChange) + ondisk->size);
4354  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4355 
4356  readBytes = FileRead(file->vfd,
4357  rb->outbuf + sizeof(ReorderBufferDiskChange),
4358  ondisk->size - sizeof(ReorderBufferDiskChange),
4359  file->curOffset,
4360  WAIT_EVENT_REORDER_BUFFER_READ);
4361 
4362  if (readBytes < 0)
4363  ereport(ERROR,
4365  errmsg("could not read from reorderbuffer spill file: %m")));
4366  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
4367  ereport(ERROR,
4369  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4370  readBytes,
4371  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
4372 
4373  file->curOffset += readBytes;
4374 
4375  /*
4376  * ok, read a full change from disk, now restore it into proper
4377  * in-memory format
4378  */
4379  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
4380  restored++;
4381  }
4382 
4383  return restored;
4384 }
4385 
4386 /*
4387  * Convert change from its on-disk format to in-memory format and queue it onto
4388  * the TXN's ->changes list.
4389  *
4390  * Note: although "data" is declared char*, at entry it points to a
4391  * maxalign'd buffer, making it safe in most of this function to assume
4392  * that the pointed-to data is suitably aligned for direct access.
4393  */
4394 static void
4396  char *data)
4397 {
4398  ReorderBufferDiskChange *ondisk;
4399  ReorderBufferChange *change;
4400 
4401  ondisk = (ReorderBufferDiskChange *) data;
4402 
4403  change = ReorderBufferGetChange(rb);
4404 
4405  /* copy static part */
4406  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
4407 
4408  data += sizeof(ReorderBufferDiskChange);
4409 
4410  /* restore individual stuff */
4411  switch (change->action)
4412  {
4413  /* fall through these, they're all similar enough */
4418  if (change->data.tp.oldtuple)
4419  {
4420  uint32 tuplelen = ((HeapTuple) data)->t_len;
4421 
4422  change->data.tp.oldtuple =
4424 
4425  /* restore ->tuple */
4426  memcpy(change->data.tp.oldtuple, data,
4427  sizeof(HeapTupleData));
4428  data += sizeof(HeapTupleData);
4429 
4430  /* reset t_data pointer into the new tuplebuf */
4431  change->data.tp.oldtuple->t_data =
4432  (HeapTupleHeader) ((char *) change->data.tp.oldtuple + HEAPTUPLESIZE);
4433 
4434  /* restore tuple data itself */
4435  memcpy(change->data.tp.oldtuple->t_data, data, tuplelen);
4436  data += tuplelen;
4437  }
4438 
4439  if (change->data.tp.newtuple)
4440  {
4441  /* here, data might not be suitably aligned! */
4442  uint32 tuplelen;
4443 
4444  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
4445  sizeof(uint32));
4446 
4447  change->data.tp.newtuple =
4449 
4450  /* restore ->tuple */
4451  memcpy(change->data.tp.newtuple, data,
4452  sizeof(HeapTupleData));
4453  data += sizeof(HeapTupleData);
4454 
4455  /* reset t_data pointer into the new tuplebuf */
4456  change->data.tp.newtuple->t_data =
4457  (HeapTupleHeader) ((char *) change->data.tp.newtuple + HEAPTUPLESIZE);
4458 
4459  /* restore tuple data itself */
4460  memcpy(change->data.tp.newtuple->t_data, data, tuplelen);
4461  data += tuplelen;
4462  }
4463 
4464  break;
4466  {
4467  Size prefix_size;
4468 
4469  /* read prefix */
4470  memcpy(&prefix_size, data, sizeof(Size));
4471  data += sizeof(Size);
4472  change->data.msg.prefix = MemoryContextAlloc(rb->context,
4473  prefix_size);
4474  memcpy(change->data.msg.prefix, data, prefix_size);
4475  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
4476  data += prefix_size;
4477 
4478  /* read the message */
4479  memcpy(&change->data.msg.message_size, data, sizeof(Size));
4480  data += sizeof(Size);
4481  change->data.msg.message = MemoryContextAlloc(rb->context,
4482  change->data.msg.message_size);
4483  memcpy(change->data.msg.message, data,
4484  change->data.msg.message_size);
4485  data += change->data.msg.message_size;
4486 
4487  break;
4488  }
4490  {
4491  Size inval_size = sizeof(SharedInvalidationMessage) *
4492  change->data.inval.ninvalidations;
4493 
4494  change->data.inval.invalidations =
4495  MemoryContextAlloc(rb->context, inval_size);
4496 
4497  /* read the message */
4498  memcpy(change->data.inval.invalidations, data, inval_size);
4499 
4500  break;
4501  }
4503  {
4504  Snapshot oldsnap;
4505  Snapshot newsnap;
4506  Size size;
4507 
4508  oldsnap = (Snapshot) data;
4509 
4510  size = sizeof(SnapshotData) +
4511  sizeof(TransactionId) * oldsnap->xcnt +
4512  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
4513 
4515 
4516  newsnap = change->data.snapshot;
4517 
4518  memcpy(newsnap, data, size);
4519  newsnap->xip = (TransactionId *)
4520  (((char *) newsnap) + sizeof(SnapshotData));
4521  newsnap->subxip = newsnap->xip + newsnap->xcnt;
4522  newsnap->copied = true;
4523  break;
4524  }
4525  /* the base struct contains all the data, easy peasy */
4527  {
4528  Oid *relids;
4529 
4530  relids = ReorderBufferGetRelids(rb,
4531  change->data.truncate.nrelids);
4532  memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
4533  change->data.truncate.relids = relids;
4534 
4535  break;
4536  }
4541  break;
4542  }
4543 
4544  dlist_push_tail(&txn->changes, &change->node);
4545  txn->nentries_mem++;
4546 
4547  /*
4548  * Update memory accounting for the restored change. We need to do this
4549  * although we don't check the memory limit when restoring the changes in
4550  * this branch (we only do that when initially queueing the changes after
4551  * decoding), because we will release the changes later, and that will
4552  * update the accounting too (subtracting the size from the counters). And
4553  * we don't want to underflow there.
4554  */
4555  ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
4556  ReorderBufferChangeSize(change));
4557 }
4558 
4559 /*
4560  * Remove all on-disk stored for the passed in transaction.
4561  */
4562 static void
4564 {
4565  XLogSegNo first;
4566  XLogSegNo cur;
4567  XLogSegNo last;
4568 
4571 
4572  XLByteToSeg(txn->first_lsn, first, wal_segment_size);
4573  XLByteToSeg(txn->final_lsn, last, wal_segment_size);
4574 
4575  /* iterate over all possible filenames, and delete them */
4576  for (cur = first; cur <= last; cur++)
4577  {
4578  char path[MAXPGPATH];
4579 
4581  if (unlink(path) != 0 && errno != ENOENT)
4582  ereport(ERROR,
4584  errmsg("could not remove file \"%s\": %m", path)));
4585  }
4586 }
4587 
4588 /*
4589  * Remove any leftover serialized reorder buffers from a slot directory after a
4590  * prior crash or decoding session exit.
4591  */
4592 static void
4594 {
4595  DIR *spill_dir;
4596  struct dirent *spill_de;
4597  struct stat statbuf;
4598  char path[MAXPGPATH * 2 + sizeof(PG_REPLSLOT_DIR)];
4599 
4600  sprintf(path, "%s/%s", PG_REPLSLOT_DIR, slotname);
4601 
4602  /* we're only handling directories here, skip if it's not ours */
4603  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
4604  return;
4605 
4606  spill_dir = AllocateDir(path);
4607  while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
4608  {
4609  /* only look at names that can be ours */
4610  if (strncmp(spill_de->d_name, "xid", 3) == 0)
4611  {
4612  snprintf(path, sizeof(path),
4613  "%s/%s/%s", PG_REPLSLOT_DIR, slotname,
4614  spill_de->d_name);
4615 
4616  if (unlink(path) != 0)
4617  ereport(ERROR,
4619  errmsg("could not remove file \"%s\" during removal of %s/%s/xid*: %m",
4620  path, PG_REPLSLOT_DIR, slotname)));
4621  }
4622  }
4623  FreeDir(spill_dir);
4624 }
4625 
4626 /*
4627  * Given a replication slot, transaction ID and segment number, fill in the
4628  * corresponding spill file into 'path', which is a caller-owned buffer of size
4629  * at least MAXPGPATH.
4630  */
4631 static void
4633  XLogSegNo segno)
4634 {
4635  XLogRecPtr recptr;
4636 
4637  XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
4638 
4639  snprintf(path, MAXPGPATH, "%s/%s/xid-%u-lsn-%X-%X.spill",
4642  xid, LSN_FORMAT_ARGS(recptr));
4643 }
4644 
4645 /*
4646  * Delete all data spilled to disk after we've restarted/crashed. It will be
4647  * recreated when the respective slots are reused.
4648  */
4649 void
4651 {
4652  DIR *logical_dir;
4653  struct dirent *logical_de;
4654 
4655  logical_dir = AllocateDir(PG_REPLSLOT_DIR);
4656  while ((logical_de = ReadDir(logical_dir, PG_REPLSLOT_DIR)) != NULL)
4657  {
4658  if (strcmp(logical_de->d_name, ".") == 0 ||
4659  strcmp(logical_de->d_name, "..") == 0)
4660  continue;
4661 
4662  /* if it cannot be a slot, skip the directory */
4663  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
4664  continue;
4665 
4666  /*
4667  * ok, has to be a surviving logical slot, iterate and delete
4668  * everything starting with xid-*
4669  */
4671  }
4672  FreeDir(logical_dir);
4673 }
4674 
4675 /* ---------------------------------------
4676  * toast reassembly support
4677  * ---------------------------------------
4678  */
4679 
4680 /*
4681  * Initialize per tuple toast reconstruction support.
4682  */
4683 static void
4685 {
4686  HASHCTL hash_ctl;
4687 
4688  Assert(txn->toast_hash == NULL);
4689 
4690  hash_ctl.keysize = sizeof(Oid);
4691  hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
4692  hash_ctl.hcxt = rb->context;
4693  txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
4695 }
4696 
4697 /*
4698  * Per toast-chunk handling for toast reconstruction
4699  *
4700  * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
4701  * toasted Datum comes along.
4702  */
4703 static void
4705  Relation relation, ReorderBufferChange *change)
4706 {
4707  ReorderBufferToastEnt *ent;
4708  HeapTuple newtup;
4709  bool found;
4710  int32 chunksize;
4711  bool isnull;
4712  Pointer chunk;
4713  TupleDesc desc = RelationGetDescr(relation);
4714  Oid chunk_id;
4715  int32 chunk_seq;
4716 
4717  if (txn->toast_hash == NULL)
4718  ReorderBufferToastInitHash(rb, txn);
4719 
4720  Assert(IsToastRelation(relation));
4721 
4722  newtup = change->data.tp.newtuple;
4723  chunk_id = DatumGetObjectId(fastgetattr(newtup, 1, desc, &isnull));
4724  Assert(!isnull);
4725  chunk_seq = DatumGetInt32(fastgetattr(newtup, 2, desc, &isnull));
4726  Assert(!isnull);
4727 
4728  ent = (ReorderBufferToastEnt *)
4729  hash_search(txn->toast_hash, &chunk_id, HASH_ENTER, &found);
4730 
4731  if (!found)
4732  {
4733  Assert(ent->chunk_id == chunk_id);
4734  ent->num_chunks = 0;
4735  ent->last_chunk_seq = 0;
4736  ent->size = 0;
4737  ent->reconstructed = NULL;
4738  dlist_init(&ent->chunks);
4739 
4740  if (chunk_seq != 0)
4741  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
4742  chunk_seq, chunk_id);
4743  }
4744  else if (found && chunk_seq != ent->last_chunk_seq + 1)
4745  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
4746  chunk_seq, chunk_id, ent->last_chunk_seq + 1);
4747 
4748  chunk = DatumGetPointer(fastgetattr(newtup, 3, desc, &isnull));
4749  Assert(!isnull);
4750 
4751  /* calculate size so we can allocate the right size at once later */
4752  if (!VARATT_IS_EXTENDED(chunk))
4753  chunksize = VARSIZE(chunk) - VARHDRSZ;
4754  else if (VARATT_IS_SHORT(chunk))
4755  /* could happen due to heap_form_tuple doing its thing */
4756  chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
4757  else
4758  elog(ERROR, "unexpected type of toast chunk");
4759 
4760  ent->size += chunksize;
4761  ent->last_chunk_seq = chunk_seq;
4762  ent->num_chunks++;
4763  dlist_push_tail(&ent->chunks, &change->node);
4764 }
4765 
4766 /*
4767  * Rejigger change->newtuple to point to in-memory toast tuples instead of
4768  * on-disk toast tuples that may no longer exist (think DROP TABLE or VACUUM).
4769  *
4770  * We cannot replace unchanged toast tuples though, so those will still point
4771  * to on-disk toast data.
4772  *
4773  * While updating the existing change with detoasted tuple data, we need to
4774  * update the memory accounting info, because the change size will differ.
4775  * Otherwise the accounting may get out of sync, triggering serialization
4776  * at unexpected times.
4777  *
4778  * We simply subtract size of the change before rejiggering the tuple, and
4779  * then add the new size. This makes it look like the change was removed
4780  * and then added back, except it only tweaks the accounting info.
4781  *
4782  * In particular it can't trigger serialization, which would be pointless
4783  * anyway as it happens during commit processing right before handing
4784  * the change to the output plugin.
4785  */
4786 static void
4788  Relation relation, ReorderBufferChange *change)
4789 {
4790  TupleDesc desc;
4791  int natt;
4792  Datum *attrs;
4793  bool *isnull;
4794  bool *free;
4795  HeapTuple tmphtup;
4796  Relation toast_rel;
4797  TupleDesc toast_desc;
4798  MemoryContext oldcontext;
4799  HeapTuple newtup;
4800  Size old_size;
4801 
4802  /* no toast tuples changed */
4803  if (txn->toast_hash == NULL)
4804  return;
4805 
4806  /*
4807  * We're going to modify the size of the change. So, to make sure the
4808  * accounting is correct we record the current change size and then after
4809  * re-computing the change we'll subtract the recorded size and then
4810  * re-add the new change size at the end. We don't immediately subtract
4811  * the old size because if there is any error before we add the new size,
4812  * we will release the changes and that will update the accounting info
4813  * (subtracting the size from the counters). And we don't want to
4814  * underflow there.
4815  */
4816  old_size = ReorderBufferChangeSize(change);
4817 
4818  oldcontext = MemoryContextSwitchTo(rb->context);
4819 
4820  /* we should only have toast tuples in an INSERT or UPDATE */
4821  Assert(change->data.tp.newtuple);
4822 
4823  desc = RelationGetDescr(relation);
4824 
4825  toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
4826  if (!RelationIsValid(toast_rel))
4827  elog(ERROR, "could not open toast relation with OID %u (base relation \"%s\")",
4828  relation->rd_rel->reltoastrelid, RelationGetRelationName(relation));
4829 
4830  toast_desc = RelationGetDescr(toast_rel);
4831 
4832  /* should we allocate from stack instead? */
4833  attrs = palloc0(sizeof(Datum) * desc->natts);
4834  isnull = palloc0(sizeof(bool) * desc->natts);
4835  free = palloc0(sizeof(bool) * desc->natts);
4836 
4837  newtup = change->data.tp.newtuple;
4838 
4839  heap_deform_tuple(newtup, desc, attrs, isnull);
4840 
4841  for (natt = 0; natt < desc->natts; natt++)
4842  {
4843  Form_pg_attribute attr = TupleDescAttr(desc, natt);
4844  ReorderBufferToastEnt *ent;
4845  struct varlena *varlena;
4846 
4847  /* va_rawsize is the size of the original datum -- including header */
4848  struct varatt_external toast_pointer;
4849  struct varatt_indirect redirect_pointer;
4850  struct varlena *new_datum = NULL;
4851  struct varlena *reconstructed;
4852  dlist_iter it;
4853  Size data_done = 0;
4854 
4855  /* system columns aren't toasted */
4856  if (attr->attnum < 0)
4857  continue;
4858 
4859  if (attr->attisdropped)
4860  continue;
4861 
4862  /* not a varlena datatype */
4863  if (attr->attlen != -1)
4864  continue;
4865 
4866  /* no data */
4867  if (isnull[natt])
4868  continue;
4869 
4870  /* ok, we know we have a toast datum */
4871  varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
4872 
4873  /* no need to do anything if the tuple isn't external */
4875  continue;
4876 
4877  VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
4878 
4879  /*
4880  * Check whether the toast tuple changed, replace if so.
4881  */
4882  ent = (ReorderBufferToastEnt *)
4883  hash_search(txn->toast_hash,
4884  &toast_pointer.va_valueid,
4885  HASH_FIND,
4886  NULL);
4887  if (ent == NULL)
4888  continue;
4889 
4890  new_datum =
4892 
4893  free[natt] = true;
4894 
4895  reconstructed = palloc0(toast_pointer.va_rawsize);
4896 
4897  ent->reconstructed = reconstructed;
4898 
4899  /* stitch toast tuple back together from its parts */
4900  dlist_foreach(it, &ent->chunks)
4901  {
4902  bool cisnull;
4903  ReorderBufferChange *cchange;
4904  HeapTuple ctup;
4905  Pointer chunk;
4906 
4907  cchange = dlist_container(ReorderBufferChange, node, it.cur);
4908  ctup = cchange->data.tp.newtuple;
4909  chunk = DatumGetPointer(fastgetattr(ctup, 3, toast_desc, &cisnull));
4910 
4911  Assert(!cisnull);
4914 
4915  memcpy(VARDATA(reconstructed) + data_done,
4916  VARDATA(chunk),
4917  VARSIZE(chunk) - VARHDRSZ);
4918  data_done += VARSIZE(chunk) - VARHDRSZ;
4919  }
4920  Assert(data_done == VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer));
4921 
4922  /* make sure its marked as compressed or not */
4923  if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
4924  SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
4925  else
4926  SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
4927 
4928  memset(&redirect_pointer, 0, sizeof(redirect_pointer));
4929  redirect_pointer.pointer = reconstructed;
4930 
4932  memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
4933  sizeof(redirect_pointer));
4934 
4935  attrs[natt] = PointerGetDatum(new_datum);
4936  }
4937 
4938  /*
4939  * Build tuple in separate memory & copy tuple back into the tuplebuf
4940  * passed to the output plugin. We can't directly heap_fill_tuple() into
4941  * the tuplebuf because attrs[] will point back into the current content.
4942  */
4943  tmphtup = heap_form_tuple(desc, attrs, isnull);
4944  Assert(newtup->t_len <= MaxHeapTupleSize);
4945  Assert(newtup->t_data == (HeapTupleHeader) ((char *) newtup + HEAPTUPLESIZE));
4946 
4947  memcpy(newtup->t_data, tmphtup->t_data, tmphtup->t_len);
4948  newtup->t_len = tmphtup->t_len;
4949 
4950  /*
4951  * free resources we won't further need, more persistent stuff will be
4952  * free'd in ReorderBufferToastReset().
4953  */
4954  RelationClose(toast_rel);
4955  pfree(tmphtup);
4956  for (natt = 0; natt < desc->natts; natt++)
4957  {
4958  if (free[natt])
4959  pfree(DatumGetPointer(attrs[natt]));
4960  }
4961  pfree(attrs);
4962  pfree(free);
4963  pfree(isnull);
4964 
4965  MemoryContextSwitchTo(oldcontext);
4966 
4967  /* subtract the old change size */
4968  ReorderBufferChangeMemoryUpdate(rb, change, NULL, false, old_size);
4969  /* now add the change back, with the correct size */
4970  ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
4971  ReorderBufferChangeSize(change));
4972 }
4973 
4974 /*
4975  * Free all resources allocated for toast reconstruction.
4976  */
4977 static void
4979 {
4980  HASH_SEQ_STATUS hstat;
4981  ReorderBufferToastEnt *ent;
4982 
4983  if (txn->toast_hash == NULL)
4984  return;
4985 
4986  /* sequentially walk over the hash and free everything */
4987  hash_seq_init(&hstat, txn->toast_hash);
4988  while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
4989  {
4990  dlist_mutable_iter it;
4991 
4992  if (ent->reconstructed != NULL)
4993  pfree(ent->reconstructed);
4994 
4995  dlist_foreach_modify(it, &ent->chunks)
4996  {
4997  ReorderBufferChange *change =
4999 
5000  dlist_delete(&change->node);
5001  ReorderBufferReturnChange(rb, change, true);
5002  }
5003  }
5004 
5005  hash_destroy(txn->toast_hash);
5006  txn->toast_hash = NULL;
5007 }
5008 
5009 
5010 /* ---------------------------------------
5011  * Visibility support for logical decoding
5012  *
5013  *
5014  * Lookup actual cmin/cmax values when using decoding snapshot. We can't
5015  * always rely on stored cmin/cmax values because of two scenarios:
5016  *
5017  * * A tuple got changed multiple times during a single transaction and thus
5018  * has got a combo CID. Combo CIDs are only valid for the duration of a
5019  * single transaction.
5020  * * A tuple with a cmin but no cmax (and thus no combo CID) got
5021  * deleted/updated in another transaction than the one which created it
5022  * which we are looking at right now. As only one of cmin, cmax or combo CID
5023  * is actually stored in the heap we don't have access to the value we
5024  * need anymore.
5025  *
5026  * To resolve those problems we have a per-transaction hash of (cmin,
5027  * cmax) tuples keyed by (relfilelocator, ctid) which contains the actual
5028  * (cmin, cmax) values. That also takes care of combo CIDs by simply
5029  * not caring about them at all. As we have the real cmin/cmax values
5030  * combo CIDs aren't interesting.
5031  *
5032  * As we only care about catalog tuples here the overhead of this
5033  * hashtable should be acceptable.
5034  *
5035  * Heap rewrites complicate this a bit, check rewriteheap.c for
5036  * details.
5037  * -------------------------------------------------------------------------
5038  */
5039 
5040 /* struct for sorting mapping files by LSN efficiently */
5041 typedef struct RewriteMappingFile
5042 {
5046 
5047 #ifdef NOT_USED
5048 static void
5049 DisplayMapping(HTAB *tuplecid_data)
5050 {
5051  HASH_SEQ_STATUS hstat;
5053 
5054  hash_seq_init(&hstat, tuplecid_data);
5055  while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
5056  {
5057  elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
5058  ent->key.rlocator.dbOid,
5059  ent->key.rlocator.spcOid,
5060  ent->key.rlocator.relNumber,
5063  ent->cmin,
5064  ent->cmax
5065  );
5066  }
5067 }
5068 #endif
5069 
5070 /*
5071  * Apply a single mapping file to tuplecid_data.
5072  *
5073  * The mapping file has to have been verified to be a) committed b) for our
5074  * transaction c) applied in LSN order.
5075  */
5076 static void
5077 ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
5078 {
5079  char path[MAXPGPATH];
5080  int fd;
5081  int readBytes;
5083 
5084  sprintf(path, "%s/%s", PG_LOGICAL_MAPPINGS_DIR, fname);
5085  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
5086  if (fd < 0)
5087  ereport(ERROR,
5089  errmsg("could not open file \"%s\": %m", path)));
5090 
5091  while (true)
5092  {
5095  ReorderBufferTupleCidEnt *new_ent;
5096  bool found;
5097 
5098  /* be careful about padding */
5099  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
5100 
5101  /* read all mappings till the end of the file */
5102  pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ);
5103  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
5105 
5106  if (readBytes < 0)
5107  ereport(ERROR,
5109  errmsg("could not read file \"%s\": %m",
5110  path)));
5111  else if (readBytes == 0) /* EOF */
5112  break;
5113  else if (readBytes != sizeof(LogicalRewriteMappingData))
5114  ereport(ERROR,
5116  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
5117  path, readBytes,
5118  (int32) sizeof(LogicalRewriteMappingData))));
5119 
5120  key.rlocator = map.old_locator;
5121  ItemPointerCopy(&map.old_tid,
5122  &key.tid);
5123 
5124 
5125  ent = (ReorderBufferTupleCidEnt *)
5127 
5128  /* no existing mapping, no need to update */
5129  if (!ent)
5130  continue;
5131 
5132  key.rlocator = map.new_locator;
5133  ItemPointerCopy(&map.new_tid,
5134  &key.tid);
5135 
5136  new_ent = (ReorderBufferTupleCidEnt *)
5138 
5139  if (found)
5140  {
5141  /*
5142  * Make sure the existing mapping makes sense. We sometime update
5143  * old records that did not yet have a cmax (e.g. pg_class' own
5144  * entry while rewriting it) during rewrites, so allow that.
5145  */
5146  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
5147  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
5148  }
5149  else
5150  {
5151  /* update mapping */
5152  new_ent->cmin = ent->cmin;
5153  new_ent->cmax = ent->cmax;
5154  new_ent->combocid = ent->combocid;
5155  }
5156  }
5157 
5158  if (CloseTransientFile(fd) != 0)
5159  ereport(ERROR,
5161  errmsg("could not close file \"%s\": %m", path)));
5162 }
5163 
5164 
5165 /*
5166  * Check whether the TransactionId 'xid' is in the pre-sorted array 'xip'.
5167  */
5168 static bool
5170 {
5171  return bsearch(&xid, xip, num,
5172  sizeof(TransactionId), xidComparator) != NULL;
5173 }
5174 
5175 /*
5176  * list_sort() comparator for sorting RewriteMappingFiles in LSN order.
5177  */
5178 static int
5179 file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
5180 {
5183 
5184  return pg_cmp_u64(a->lsn, b->lsn);
5185 }
5186 
5187 /*
5188  * Apply any existing logical remapping files if there are any targeted at our
5189  * transaction for relid.
5190  */
5191 static void
5193 {
5194  DIR *mapping_dir;
5195  struct dirent *mapping_de;
5196  List *files = NIL;
5197  ListCell *file;
5198  Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
5199 
5200  mapping_dir = AllocateDir(PG_LOGICAL_MAPPINGS_DIR);
5201  while ((mapping_de = ReadDir(mapping_dir, PG_LOGICAL_MAPPINGS_DIR)) != NULL)
5202  {
5203  Oid f_dboid;
5204  Oid f_relid;
5205  TransactionId f_mapped_xid;
5206  TransactionId f_create_xid;
5207  XLogRecPtr f_lsn;
5208  uint32 f_hi,
5209  f_lo;
5210  RewriteMappingFile *f;
5211 
5212  if (strcmp(mapping_de->d_name, ".") == 0 ||
5213  strcmp(mapping_de->d_name, "..") == 0)
5214  continue;
5215 
5216  /* Ignore files that aren't ours */
5217  if (strncmp(mapping_de->d_name, "map-", 4) != 0)
5218  continue;
5219 
5220  if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
5221  &f_dboid, &f_relid, &f_hi, &f_lo,
5222  &f_mapped_xid, &f_create_xid) != 6)
5223  elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
5224 
5225  f_lsn = ((uint64) f_hi) << 32 | f_lo;
5226 
5227  /* mapping for another database */
5228  if (f_dboid != dboid)
5229  continue;
5230 
5231  /* mapping for another relation */
5232  if (f_relid != relid)
5233  continue;
5234 
5235  /* did the creating transaction abort? */
5236  if (!TransactionIdDidCommit(f_create_xid))
5237  continue;
5238 
5239  /* not for our transaction */
5240  if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
5241  continue;
5242 
5243  /* ok, relevant, queue for apply */
5244  f = palloc(sizeof(RewriteMappingFile));
5245  f->lsn = f_lsn;
5246  strcpy(f->fname, mapping_de->d_name);
5247  files = lappend(files, f);
5248  }
5249  FreeDir(mapping_dir);
5250 
5251  /* sort files so we apply them in LSN order */
5252  list_sort(files, file_sort_by_lsn);
5253 
5254  foreach(file, files)
5255  {
5257 
5258  elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
5259  snapshot->subxip[0]);
5261  pfree(f);
5262  }
5263 }
5264 
5265 /*
5266  * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
5267  * combo CIDs.
5268  */
5269 bool
5271  Snapshot snapshot,
5272  HeapTuple htup, Buffer buffer,
5273  CommandId *cmin, CommandId *cmax)
5274 {
5277  ForkNumber forkno;
5278  BlockNumber blockno;
5279  bool updated_mapping = false;
5280 
5281  /*
5282  * Return unresolved if tuplecid_data is not valid. That's because when
5283  * streaming in-progress transactions we may run into tuples with the CID
5284  * before actually decoding them. Think e.g. about INSERT followed by
5285  * TRUNCATE, where the TRUNCATE may not be decoded yet when applying the
5286  * INSERT. So in such cases, we assume the CID is from the future
5287  * command.
5288  */
5289  if (tuplecid_data == NULL)
5290  return false;
5291 
5292  /* be careful about padding */
5293  memset(&key, 0, sizeof(key));
5294 
5295  Assert(!BufferIsLocal(buffer));
5296 
5297  /*
5298  * get relfilelocator from the buffer, no convenient way to access it
5299  * other than that.
5300  */
5301  BufferGetTag(buffer, &key.rlocator, &forkno, &blockno);
5302 
5303  /* tuples can only be in the main fork */
5304  Assert(forkno == MAIN_FORKNUM);
5305  Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
5306 
5307  ItemPointerCopy(&htup->t_self,
5308  &key.tid);
5309 
5310 restart:
5311  ent = (ReorderBufferTupleCidEnt *)
5313 
5314  /*
5315  * failed to find a mapping, check whether the table was rewritten and
5316  * apply mapping if so, but only do that once - there can be no new
5317  * mappings while we are in here since we have to hold a lock on the
5318  * relation.
5319  */
5320  if (ent == NULL && !updated_mapping)
5321  {
5323  /* now check but don't update for a mapping again */
5324  updated_mapping = true;
5325  goto restart;
5326  }
5327  else if (ent == NULL)
5328  return false;
5329 
5330  if (cmin)
5331  *cmin = ent->cmin;
5332  if (cmax)
5333  *cmax = ent->cmax;
5334  return true;
5335 }
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:138
void binaryheap_replace_first(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:255
bh_node_type binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:177
bh_node_type binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:192
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:39
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:75
void binaryheap_add_unordered(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:116
uint32 BlockNumber
Definition: block.h:31
static int32 next
Definition: blutils.c:222
static void cleanup(void)
Definition: bootstrap.c:681
int Buffer
Definition: buf.h:23
#define BufferIsLocal(buffer)
Definition: buf.h:37
void BufferGetTag(Buffer buffer, RelFileLocator *rlocator, ForkNumber *forknum, BlockNumber *blknum)
Definition: bufmgr.c:3727
#define NameStr(name)
Definition: c.h:746
#define InvalidCommandId
Definition: c.h:669
unsigned int uint32
Definition: c.h:506
signed int int32
Definition: c.h:494
char * Pointer
Definition: c.h:483
#define VARHDRSZ
Definition: c.h:692
#define Assert(condition)
Definition: c.h:858
#define PG_BINARY
Definition: c.h:1273
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:398
#define FirstCommandId
Definition: c.h:668
uint32 CommandId
Definition: c.h:666
uint32 TransactionId
Definition: c.h:652
size_t Size
Definition: c.h:605
bool IsToastRelation(Relation relation)
Definition: catalog.c:166
bool IsSharedRelation(Oid relationId)
Definition: catalog.c:264
int64 TimestampTz
Definition: timestamp.h:39
#define INDIRECT_POINTER_SIZE
Definition: detoast.h:34
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
Definition: detoast.h:22
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:865
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1420
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1385
struct cursor * cur
Definition: ecpg.c:28
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1818
int errcode_for_file_access(void)
Definition: elog.c:876
void FlushErrorState(void)
Definition: elog.c:1867
int errmsg(const char *fmt,...)
Definition: elog.c:1070
ErrorData * CopyErrorData(void)
Definition: elog.c:1746
#define PG_RE_THROW()
Definition: elog.h:412
#define DEBUG3
Definition: elog.h:28
#define PG_TRY(...)
Definition: elog.h:371
#define DEBUG2
Definition: elog.h:29
#define PG_END_TRY(...)
Definition: elog.h:396
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define PG_CATCH(...)
Definition: elog.h:381
#define elog(elevel,...)
Definition: elog.h:225
#define INFO
Definition: elog.h:34
#define ereport(elevel,...)
Definition: elog.h:149
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2932
int FreeDir(DIR *dir)
Definition: fd.c:2984
int CloseTransientFile(int fd)
Definition: fd.c:2832
void FileClose(File file)
Definition: fd.c:1978
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1575
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2947
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2656
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2866
static ssize_t FileRead(File file, void *buffer, size_t amount, off_t offset, uint32 wait_event_info)
Definition: fd.h:196
int File
Definition: fd.h:51
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: generation.c:160
Oid MyDatabaseId
Definition: globals.c:93
uint64 chunk
#define free(a)
Definition: header.h:65
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1116
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:1345
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_REMOVE
Definition: hsearch.h:115
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
#define HEAPTUPLESIZE
Definition: htup.h:73
HeapTupleData * HeapTuple
Definition: htup.h:71
struct HeapTupleData HeapTupleData
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
#define SizeofHeapTupleHeader
Definition: htup_details.h:185
#define MaxHeapTupleSize
Definition: htup_details.h:558
static Datum fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull)
Definition: htup_details.h:749
#define dlist_foreach(iter, lhead)
Definition: ilist.h:623
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
#define dclist_container(type, membername, ptr)
Definition: ilist.h:947
static bool dlist_has_next(const dlist_head *head, const dlist_node *node)
Definition: ilist.h:503
static void dclist_push_tail(dclist_head *head, dlist_node *node)
Definition: ilist.h:709
static void dlist_insert_before(dlist_node *before, dlist_node *node)
Definition: ilist.h:393
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:603
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
static uint32 dclist_count(const dclist_head *head)
Definition: ilist.h:932
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:537
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:450
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:640
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:364
static void dclist_delete_from(dclist_head *head, dlist_node *node)
Definition: ilist.h:763
static void dclist_init(dclist_head *head)
Definition: ilist.h:671
#define dlist_container(type, membername, ptr)
Definition: ilist.h:593
#define dclist_foreach(iter, lhead)
Definition: ilist.h:970
static int pg_cmp_u64(uint64 a, uint64 b)
Definition: int.h:616
#define write(a, b, c)
Definition: win32.h:14
#define read(a, b, c)
Definition: win32.h:13
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:705
int b
Definition: isn.c:70
int a
Definition: isn.c:69
int i
Definition: isn.c:73
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
static OffsetNumber ItemPointerGetOffsetNumber(const ItemPointerData *pointer)
Definition: itemptr.h:124
static BlockNumber ItemPointerGetBlockNumber(const ItemPointerData *pointer)
Definition: itemptr.h:103
static void ItemPointerCopy(const ItemPointerData *fromPointer, ItemPointerData *toPointer)
Definition: itemptr.h:172
void list_sort(List *list, list_sort_comparator cmp)
Definition: list.c:1674
List * lappend(List *list, void *datum)
Definition: list.c:339
void UpdateDecodingStats(LogicalDecodingContext *ctx)
Definition: logical.c:1932
char * pstrdup(const char *in)
Definition: mcxt.c:1696
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc0(Size size)
Definition: mcxt.c:1347
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1215
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1541
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1181
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
void * palloc(Size size)
Definition: mcxt.c:1317
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:189
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:190
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:184
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:126
pairingheap_node * pairingheap_first(pairingheap *heap)
Definition: pairingheap.c:144
pairingheap * pairingheap_allocate(pairingheap_comparator compare, void *arg)
Definition: pairingheap.c:42
#define pairingheap_container(type, membername, ptr)
Definition: pairingheap.h:43
#define pairingheap_const_container(type, membername, ptr)
Definition: pairingheap.h:51
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:209
void * arg
#define MAXPGPATH
const void * data
#define lfirst(lc)
Definition: pg_list.h:172
#define NIL
Definition: pg_list.h:68
#define sprintf
Definition: port.h:240
#define snprintf
Definition: port.h:238
#define qsort(a, b, c, d)