PostgreSQL Source Code  git master
reorderbuffer.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * reorderbuffer.c
4  * PostgreSQL logical replay/reorder buffer management
5  *
6  *
7  * Copyright (c) 2012-2019, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/replication/reorderbuffer.c
12  *
13  * NOTES
14  * This module gets handed individual pieces of transactions in the order
15  * they are written to the WAL and is responsible to reassemble them into
16  * toplevel transaction sized pieces. When a transaction is completely
17  * reassembled - signalled by reading the transaction commit record - it
18  * will then call the output plugin (cf. ReorderBufferCommit()) with the
19  * individual changes. The output plugins rely on snapshots built by
20  * snapbuild.c which hands them to us.
21  *
22  * Transactions and subtransactions/savepoints in postgres are not
23  * immediately linked to each other from outside the performing
24  * backend. Only at commit/abort (or special xact_assignment records) they
25  * are linked together. Which means that we will have to splice together a
26  * toplevel transaction from its subtransactions. To do that efficiently we
27  * build a binary heap indexed by the smallest current lsn of the individual
28  * subtransactions' changestreams. As the individual streams are inherently
29  * ordered by LSN - since that is where we build them from - the transaction
30  * can easily be reassembled by always using the subtransaction with the
31  * smallest current LSN from the heap.
32  *
33  * In order to cope with large transactions - which can be several times as
34  * big as the available memory - this module supports spooling the contents
35  * of a large transactions to disk. When the transaction is replayed the
36  * contents of individual (sub-)transactions will be read from disk in
37  * chunks.
38  *
39  * This module also has to deal with reassembling toast records from the
40  * individual chunks stored in WAL. When a new (or initial) version of a
41  * tuple is stored in WAL it will always be preceded by the toast chunks
42  * emitted for the columns stored out of line. Within a single toplevel
43  * transaction there will be no other data carrying records between a row's
44  * toast chunks and the row data itself. See ReorderBufferToast* for
45  * details.
46  *
47  * ReorderBuffer uses two special memory context types - SlabContext for
48  * allocations of fixed-length structures (changes and transactions), and
49  * GenerationContext for the variable-length transaction data (allocated
50  * and freed in groups with similar lifespan).
51  *
52  * To limit the amount of memory used by decoded changes, we track memory
53  * used at the reorder buffer level (i.e. total amount of memory), and for
54  * each transaction. When the total amount of used memory exceeds the
55  * limit, the transaction consuming the most memory is then serialized to
56  * disk.
57  *
58  * Only decoded changes are evicted from memory (spilled to disk), not the
59  * transaction records. The number of toplevel transactions is limited,
60  * but a transaction with many subtransactions may still consume significant
61  * amounts of memory. The transaction records are fairly small, though, and
62  * are not included in the memory limit.
63  *
64  * The current eviction algorithm is very simple - the transaction is
65  * picked merely by size, while it might be useful to also consider age
66  * (LSN) of the changes for example. With the new Generational memory
67  * allocator, evicting the oldest changes would make it more likely the
68  * memory gets actually freed.
69  *
70  * We still rely on max_changes_in_memory when loading serialized changes
71  * back into memory. At that point we can't use the memory limit directly
72  * as we load the subxacts independently. One option do deal with this
73  * would be to count the subxacts, and allow each to allocate 1/N of the
74  * memory limit. That however does not seem very appealing, because with
75  * many subtransactions it may easily cause trashing (short cycles of
76  * deserializing and applying very few changes). We probably should give
77  * a bit more memory to the oldest subtransactions, because it's likely
78  * the source for the next sequence of changes.
79  *
80  * -------------------------------------------------------------------------
81  */
82 #include "postgres.h"
83 
84 #include <unistd.h>
85 #include <sys/stat.h>
86 
87 #include "access/detoast.h"
88 #include "access/heapam.h"
89 #include "access/rewriteheap.h"
90 #include "access/transam.h"
91 #include "access/xact.h"
92 #include "access/xlog_internal.h"
93 #include "catalog/catalog.h"
94 #include "lib/binaryheap.h"
95 #include "miscadmin.h"
96 #include "pgstat.h"
97 #include "replication/logical.h"
99 #include "replication/slot.h"
100 #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
101 #include "storage/bufmgr.h"
102 #include "storage/fd.h"
103 #include "storage/sinval.h"
104 #include "utils/builtins.h"
105 #include "utils/combocid.h"
106 #include "utils/memdebug.h"
107 #include "utils/memutils.h"
108 #include "utils/rel.h"
109 #include "utils/relfilenodemap.h"
110 
111 
112 /* entry for a hash table we use to map from xid to our transaction state */
114 {
118 
119 /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
121 {
125 
127 {
131  CommandId combocid; /* just for debugging */
133 
134 /* k-way in-order change iteration support structures */
136 {
140  int fd;
143 
145 {
149  ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER];
151 
152 /* toast datastructures */
153 typedef struct ReorderBufferToastEnt
154 {
155  Oid chunk_id; /* toast_table.chunk_id */
156  int32 last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
157  * have seen */
158  Size num_chunks; /* number of chunks we've already seen */
159  Size size; /* combined size of chunks seen */
160  dlist_head chunks; /* linked list of chunks */
161  struct varlena *reconstructed; /* reconstructed varlena now pointed to in
162  * main tup */
164 
165 /* Disk serialization support datastructures */
167 {
170  /* data follows */
172 
173 /*
174  * Maximum number of changes kept in memory, per transaction. After that,
175  * changes are spooled to disk.
176  *
177  * The current value should be sufficient to decode the entire transaction
178  * without hitting disk in OLTP workloads, while starting to spool to disk in
179  * other workloads reasonably fast.
180  *
181  * At some point in the future it probably makes sense to have a more elaborate
182  * resource management here, but it's not entirely clear what that would look
183  * like.
184  */
186 static const Size max_changes_in_memory = 4096; /* XXX for restore only */
187 
188 /* ---------------------------------------
189  * primary reorderbuffer support routines
190  * ---------------------------------------
191  */
195  TransactionId xid, bool create, bool *is_new,
196  XLogRecPtr lsn, bool create_as_top);
198  ReorderBufferTXN *subtxn);
199 
200 static void AssertTXNLsnOrder(ReorderBuffer *rb);
201 
202 /* ---------------------------------------
203  * support functions for lsn-order iterating over the ->changes of a
204  * transaction and its subtransactions
205  *
206  * used for iteration over the k-way heap merge of a transaction and its
207  * subtransactions
208  * ---------------------------------------
209  */
215 
216 /*
217  * ---------------------------------------
218  * Disk serialization support functions
219  * ---------------------------------------
220  */
224  int fd, ReorderBufferChange *change);
226  int *fd, XLogSegNo *segno);
228  char *change);
230 static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
231 static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
232  TransactionId xid, XLogSegNo segno);
233 
234 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
237 
238 /* ---------------------------------------
239  * toast reassembly support
240  * ---------------------------------------
241  */
245  Relation relation, ReorderBufferChange *change);
247  Relation relation, ReorderBufferChange *change);
248 
249 /*
250  * ---------------------------------------
251  * memory accounting
252  * ---------------------------------------
253  */
256  ReorderBufferChange *change, bool addition);
257 
258 /*
259  * Allocate a new ReorderBuffer and clean out any old serialized state from
260  * prior ReorderBuffer instances for the same slot.
261  */
264 {
265  ReorderBuffer *buffer;
266  HASHCTL hash_ctl;
267  MemoryContext new_ctx;
268 
269  Assert(MyReplicationSlot != NULL);
270 
271  /* allocate memory in own context, to have better accountability */
273  "ReorderBuffer",
275 
276  buffer =
277  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
278 
279  memset(&hash_ctl, 0, sizeof(hash_ctl));
280 
281  buffer->context = new_ctx;
282 
283  buffer->change_context = SlabContextCreate(new_ctx,
284  "Change",
286  sizeof(ReorderBufferChange));
287 
288  buffer->txn_context = SlabContextCreate(new_ctx,
289  "TXN",
291  sizeof(ReorderBufferTXN));
292 
293  buffer->tup_context = GenerationContextCreate(new_ctx,
294  "Tuples",
296 
297  hash_ctl.keysize = sizeof(TransactionId);
298  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
299  hash_ctl.hcxt = buffer->context;
300 
301  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
303 
305  buffer->by_txn_last_txn = NULL;
306 
307  buffer->outbuf = NULL;
308  buffer->outbufsize = 0;
309  buffer->size = 0;
310 
311  buffer->spillCount = 0;
312  buffer->spillTxns = 0;
313  buffer->spillBytes = 0;
314 
316 
317  dlist_init(&buffer->toplevel_by_lsn);
319 
320  /*
321  * Ensure there's no stale data from prior uses of this slot, in case some
322  * prior exit avoided calling ReorderBufferFree. Failure to do this can
323  * produce duplicated txns, and it's very cheap if there's nothing there.
324  */
326 
327  return buffer;
328 }
329 
330 /*
331  * Free a ReorderBuffer
332  */
333 void
335 {
336  MemoryContext context = rb->context;
337 
338  /*
339  * We free separately allocated data by entirely scrapping reorderbuffer's
340  * memory context.
341  */
342  MemoryContextDelete(context);
343 
344  /* Free disk space used by unconsumed reorder buffers */
346 }
347 
348 /*
349  * Get an unused, possibly preallocated, ReorderBufferTXN.
350  */
351 static ReorderBufferTXN *
353 {
355 
356  txn = (ReorderBufferTXN *)
358 
359  memset(txn, 0, sizeof(ReorderBufferTXN));
360 
361  dlist_init(&txn->changes);
362  dlist_init(&txn->tuplecids);
363  dlist_init(&txn->subtxns);
364 
365  return txn;
366 }
367 
368 /*
369  * Free a ReorderBufferTXN.
370  */
371 static void
373 {
374  /* clean the lookup cache if we were cached (quite likely) */
375  if (rb->by_txn_last_xid == txn->xid)
376  {
378  rb->by_txn_last_txn = NULL;
379  }
380 
381  /* free data that's contained */
382 
383  if (txn->tuplecid_hash != NULL)
384  {
386  txn->tuplecid_hash = NULL;
387  }
388 
389  if (txn->invalidations)
390  {
391  pfree(txn->invalidations);
392  txn->invalidations = NULL;
393  }
394 
395  pfree(txn);
396 }
397 
398 /*
399  * Get an fresh ReorderBufferChange.
400  */
403 {
404  ReorderBufferChange *change;
405 
406  change = (ReorderBufferChange *)
408 
409  memset(change, 0, sizeof(ReorderBufferChange));
410  return change;
411 }
412 
413 /*
414  * Free an ReorderBufferChange.
415  */
416 void
418 {
419  /* update memory accounting info */
420  ReorderBufferChangeMemoryUpdate(rb, change, false);
421 
422  /* free contained data */
423  switch (change->action)
424  {
429  if (change->data.tp.newtuple)
430  {
431  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
432  change->data.tp.newtuple = NULL;
433  }
434 
435  if (change->data.tp.oldtuple)
436  {
437  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
438  change->data.tp.oldtuple = NULL;
439  }
440  break;
442  if (change->data.msg.prefix != NULL)
443  pfree(change->data.msg.prefix);
444  change->data.msg.prefix = NULL;
445  if (change->data.msg.message != NULL)
446  pfree(change->data.msg.message);
447  change->data.msg.message = NULL;
448  break;
450  if (change->data.snapshot)
451  {
452  ReorderBufferFreeSnap(rb, change->data.snapshot);
453  change->data.snapshot = NULL;
454  }
455  break;
456  /* no data in addition to the struct itself */
458  if (change->data.truncate.relids != NULL)
459  {
460  ReorderBufferReturnRelids(rb, change->data.truncate.relids);
461  change->data.truncate.relids = NULL;
462  }
463  break;
467  break;
468  }
469 
470  pfree(change);
471 }
472 
473 /*
474  * Get a fresh ReorderBufferTupleBuf fitting at least a tuple of size
475  * tuple_len (excluding header overhead).
476  */
479 {
480  ReorderBufferTupleBuf *tuple;
481  Size alloc_len;
482 
483  alloc_len = tuple_len + SizeofHeapTupleHeader;
484 
485  tuple = (ReorderBufferTupleBuf *)
487  sizeof(ReorderBufferTupleBuf) +
488  MAXIMUM_ALIGNOF + alloc_len);
489  tuple->alloc_tuple_size = alloc_len;
490  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
491 
492  return tuple;
493 }
494 
495 /*
496  * Free an ReorderBufferTupleBuf.
497  */
498 void
500 {
501  pfree(tuple);
502 }
503 
504 /*
505  * Get an array for relids of truncated relations.
506  *
507  * We use the global memory context (for the whole reorder buffer), because
508  * none of the existing ones seems like a good match (some are SLAB, so we
509  * can't use those, and tup_context is meant for tuple data, not relids). We
510  * could add yet another context, but it seems like an overkill - TRUNCATE is
511  * not particularly common operation, so it does not seem worth it.
512  */
513 Oid *
515 {
516  Oid *relids;
517  Size alloc_len;
518 
519  alloc_len = sizeof(Oid) * nrelids;
520 
521  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
522 
523  return relids;
524 }
525 
526 /*
527  * Free an array of relids.
528  */
529 void
531 {
532  pfree(relids);
533 }
534 
535 /*
536  * Return the ReorderBufferTXN from the given buffer, specified by Xid.
537  * If create is true, and a transaction doesn't already exist, create it
538  * (with the given LSN, and as top transaction if that's specified);
539  * when this happens, is_new is set to true.
540  */
541 static ReorderBufferTXN *
543  bool *is_new, XLogRecPtr lsn, bool create_as_top)
544 {
547  bool found;
548 
550 
551  /*
552  * Check the one-entry lookup cache first
553  */
555  rb->by_txn_last_xid == xid)
556  {
557  txn = rb->by_txn_last_txn;
558 
559  if (txn != NULL)
560  {
561  /* found it, and it's valid */
562  if (is_new)
563  *is_new = false;
564  return txn;
565  }
566 
567  /*
568  * cached as non-existent, and asked not to create? Then nothing else
569  * to do.
570  */
571  if (!create)
572  return NULL;
573  /* otherwise fall through to create it */
574  }
575 
576  /*
577  * If the cache wasn't hit or it yielded an "does-not-exist" and we want
578  * to create an entry.
579  */
580 
581  /* search the lookup table */
582  ent = (ReorderBufferTXNByIdEnt *)
583  hash_search(rb->by_txn,
584  (void *) &xid,
585  create ? HASH_ENTER : HASH_FIND,
586  &found);
587  if (found)
588  txn = ent->txn;
589  else if (create)
590  {
591  /* initialize the new entry, if creation was requested */
592  Assert(ent != NULL);
593  Assert(lsn != InvalidXLogRecPtr);
594 
595  ent->txn = ReorderBufferGetTXN(rb);
596  ent->txn->xid = xid;
597  txn = ent->txn;
598  txn->first_lsn = lsn;
600 
601  if (create_as_top)
602  {
603  dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
604  AssertTXNLsnOrder(rb);
605  }
606  }
607  else
608  txn = NULL; /* not found and not asked to create */
609 
610  /* update cache */
611  rb->by_txn_last_xid = xid;
612  rb->by_txn_last_txn = txn;
613 
614  if (is_new)
615  *is_new = !found;
616 
617  Assert(!create || txn != NULL);
618  return txn;
619 }
620 
621 /*
622  * Queue a change into a transaction so it can be replayed upon commit.
623  */
624 void
626  ReorderBufferChange *change)
627 {
629 
630  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
631 
632  change->lsn = lsn;
633  change->txn = txn;
634 
635  Assert(InvalidXLogRecPtr != lsn);
636  dlist_push_tail(&txn->changes, &change->node);
637  txn->nentries++;
638  txn->nentries_mem++;
639 
640  /* update memory accounting information */
641  ReorderBufferChangeMemoryUpdate(rb, change, true);
642 
643  /* check the memory limits and evict something if needed */
645 }
646 
647 /*
648  * Queue message into a transaction so it can be processed upon commit.
649  */
650 void
652  Snapshot snapshot, XLogRecPtr lsn,
653  bool transactional, const char *prefix,
654  Size message_size, const char *message)
655 {
656  if (transactional)
657  {
658  MemoryContext oldcontext;
659  ReorderBufferChange *change;
660 
662 
663  oldcontext = MemoryContextSwitchTo(rb->context);
664 
665  change = ReorderBufferGetChange(rb);
667  change->data.msg.prefix = pstrdup(prefix);
668  change->data.msg.message_size = message_size;
669  change->data.msg.message = palloc(message_size);
670  memcpy(change->data.msg.message, message, message_size);
671 
672  ReorderBufferQueueChange(rb, xid, lsn, change);
673 
674  MemoryContextSwitchTo(oldcontext);
675  }
676  else
677  {
678  ReorderBufferTXN *txn = NULL;
679  volatile Snapshot snapshot_now = snapshot;
680 
681  if (xid != InvalidTransactionId)
682  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
683 
684  /* setup snapshot to allow catalog access */
685  SetupHistoricSnapshot(snapshot_now, NULL);
686  PG_TRY();
687  {
688  rb->message(rb, txn, lsn, false, prefix, message_size, message);
689 
691  }
692  PG_CATCH();
693  {
695  PG_RE_THROW();
696  }
697  PG_END_TRY();
698  }
699 }
700 
701 /*
702  * AssertTXNLsnOrder
703  * Verify LSN ordering of transaction lists in the reorderbuffer
704  *
705  * Other LSN-related invariants are checked too.
706  *
707  * No-op if assertions are not in use.
708  */
709 static void
711 {
712 #ifdef USE_ASSERT_CHECKING
713  dlist_iter iter;
714  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
715  XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
716 
717  dlist_foreach(iter, &rb->toplevel_by_lsn)
718  {
720  iter.cur);
721 
722  /* start LSN must be set */
723  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
724 
725  /* If there is an end LSN, it must be higher than start LSN */
726  if (cur_txn->end_lsn != InvalidXLogRecPtr)
727  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
728 
729  /* Current initial LSN must be strictly higher than previous */
730  if (prev_first_lsn != InvalidXLogRecPtr)
731  Assert(prev_first_lsn < cur_txn->first_lsn);
732 
733  /* known-as-subtxn txns must not be listed */
734  Assert(!cur_txn->is_known_as_subxact);
735 
736  prev_first_lsn = cur_txn->first_lsn;
737  }
738 
740  {
742  base_snapshot_node,
743  iter.cur);
744 
745  /* base snapshot (and its LSN) must be set */
746  Assert(cur_txn->base_snapshot != NULL);
748 
749  /* current LSN must be strictly higher than previous */
750  if (prev_base_snap_lsn != InvalidXLogRecPtr)
751  Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
752 
753  /* known-as-subtxn txns must not be listed */
754  Assert(!cur_txn->is_known_as_subxact);
755 
756  prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
757  }
758 #endif
759 }
760 
761 /*
762  * ReorderBufferGetOldestTXN
763  * Return oldest transaction in reorderbuffer
764  */
767 {
769 
770  AssertTXNLsnOrder(rb);
771 
773  return NULL;
774 
776 
779  return txn;
780 }
781 
782 /*
783  * ReorderBufferGetOldestXmin
784  * Return oldest Xmin in reorderbuffer
785  *
786  * Returns oldest possibly running Xid from the point of view of snapshots
787  * used in the transactions kept by reorderbuffer, or InvalidTransactionId if
788  * there are none.
789  *
790  * Since snapshots are assigned monotonically, this equals the Xmin of the
791  * base snapshot with minimal base_snapshot_lsn.
792  */
795 {
797 
798  AssertTXNLsnOrder(rb);
799 
801  return InvalidTransactionId;
802 
803  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
805  return txn->base_snapshot->xmin;
806 }
807 
808 void
810 {
812 }
813 
814 /*
815  * ReorderBufferAssignChild
816  *
817  * Make note that we know that subxid is a subtransaction of xid, seen as of
818  * the given lsn.
819  */
820 void
822  TransactionId subxid, XLogRecPtr lsn)
823 {
825  ReorderBufferTXN *subtxn;
826  bool new_top;
827  bool new_sub;
828 
829  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
830  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
831 
832  if (new_top && !new_sub)
833  elog(ERROR, "subtransaction logged without previous top-level txn record");
834 
835  if (!new_sub)
836  {
837  if (subtxn->is_known_as_subxact)
838  {
839  /* already associated, nothing to do */
840  return;
841  }
842  else
843  {
844  /*
845  * We already saw this transaction, but initially added it to the
846  * list of top-level txns. Now that we know it's not top-level,
847  * remove it from there.
848  */
849  dlist_delete(&subtxn->node);
850  }
851  }
852 
853  subtxn->is_known_as_subxact = true;
854  subtxn->toplevel_xid = xid;
855  Assert(subtxn->nsubtxns == 0);
856 
857  /* add to subtransaction list */
858  dlist_push_tail(&txn->subtxns, &subtxn->node);
859  txn->nsubtxns++;
860 
861  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
863 
864  /* Verify LSN-ordering invariant */
865  AssertTXNLsnOrder(rb);
866 }
867 
868 /*
869  * ReorderBufferTransferSnapToParent
870  * Transfer base snapshot from subtxn to top-level txn, if needed
871  *
872  * This is done if the top-level txn doesn't have a base snapshot, or if the
873  * subtxn's base snapshot has an earlier LSN than the top-level txn's base
874  * snapshot's LSN. This can happen if there are no changes in the toplevel
875  * txn but there are some in the subtxn, or the first change in subtxn has
876  * earlier LSN than first change in the top-level txn and we learned about
877  * their kinship only now.
878  *
879  * The subtransaction's snapshot is cleared regardless of the transfer
880  * happening, since it's not needed anymore in either case.
881  *
882  * We do this as soon as we become aware of their kinship, to avoid queueing
883  * extra snapshots to txns known-as-subtxns -- only top-level txns will
884  * receive further snapshots.
885  */
886 static void
888  ReorderBufferTXN *subtxn)
889 {
890  Assert(subtxn->toplevel_xid == txn->xid);
891 
892  if (subtxn->base_snapshot != NULL)
893  {
894  if (txn->base_snapshot == NULL ||
895  subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
896  {
897  /*
898  * If the toplevel transaction already has a base snapshot but
899  * it's newer than the subxact's, purge it.
900  */
901  if (txn->base_snapshot != NULL)
902  {
905  }
906 
907  /*
908  * The snapshot is now the top transaction's; transfer it, and
909  * adjust the list position of the top transaction in the list by
910  * moving it to where the subtransaction is.
911  */
912  txn->base_snapshot = subtxn->base_snapshot;
913  txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
915  &txn->base_snapshot_node);
916 
917  /*
918  * The subtransaction doesn't have a snapshot anymore (so it
919  * mustn't be in the list.)
920  */
921  subtxn->base_snapshot = NULL;
924  }
925  else
926  {
927  /* Base snap of toplevel is fine, so subxact's is not needed */
930  subtxn->base_snapshot = NULL;
932  }
933  }
934 }
935 
936 /*
937  * Associate a subtransaction with its toplevel transaction at commit
938  * time. There may be no further changes added after this.
939  */
940 void
942  TransactionId subxid, XLogRecPtr commit_lsn,
943  XLogRecPtr end_lsn)
944 {
945  ReorderBufferTXN *subtxn;
946 
947  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
948  InvalidXLogRecPtr, false);
949 
950  /*
951  * No need to do anything if that subtxn didn't contain any changes
952  */
953  if (!subtxn)
954  return;
955 
956  subtxn->final_lsn = commit_lsn;
957  subtxn->end_lsn = end_lsn;
958 
959  /*
960  * Assign this subxact as a child of the toplevel xact (no-op if already
961  * done.)
962  */
964 }
965 
966 
967 /*
968  * Support for efficiently iterating over a transaction's and its
969  * subtransactions' changes.
970  *
971  * We do by doing a k-way merge between transactions/subtransactions. For that
972  * we model the current heads of the different transactions as a binary heap
973  * so we easily know which (sub-)transaction has the change with the smallest
974  * lsn next.
975  *
976  * We assume the changes in individual transactions are already sorted by LSN.
977  */
978 
979 /*
980  * Binary heap comparison function.
981  */
982 static int
984 {
986  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
987  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
988 
989  if (pos_a < pos_b)
990  return 1;
991  else if (pos_a == pos_b)
992  return 0;
993  return -1;
994 }
995 
996 /*
997  * Allocate & initialize an iterator which iterates in lsn order over a
998  * transaction and all its subtransactions.
999  */
1002 {
1003  Size nr_txns = 0;
1005  dlist_iter cur_txn_i;
1006  int32 off;
1007 
1008  /*
1009  * Calculate the size of our heap: one element for every transaction that
1010  * contains changes. (Besides the transactions already in the reorder
1011  * buffer, we count the one we were directly passed.)
1012  */
1013  if (txn->nentries > 0)
1014  nr_txns++;
1015 
1016  dlist_foreach(cur_txn_i, &txn->subtxns)
1017  {
1018  ReorderBufferTXN *cur_txn;
1019 
1020  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1021 
1022  if (cur_txn->nentries > 0)
1023  nr_txns++;
1024  }
1025 
1026  /*
1027  * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no
1028  * need to allocate/build a heap then.
1029  */
1030 
1031  /* allocate iteration state */
1032  state = (ReorderBufferIterTXNState *)
1034  sizeof(ReorderBufferIterTXNState) +
1035  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1036 
1037  state->nr_txns = nr_txns;
1038  dlist_init(&state->old_change);
1039 
1040  for (off = 0; off < state->nr_txns; off++)
1041  {
1042  state->entries[off].fd = -1;
1043  state->entries[off].segno = 0;
1044  }
1045 
1046  /* allocate heap */
1047  state->heap = binaryheap_allocate(state->nr_txns,
1049  state);
1050 
1051  /*
1052  * Now insert items into the binary heap, in an unordered fashion. (We
1053  * will run a heap assembly step at the end; this is more efficient.)
1054  */
1055 
1056  off = 0;
1057 
1058  /* add toplevel transaction if it contains changes */
1059  if (txn->nentries > 0)
1060  {
1061  ReorderBufferChange *cur_change;
1062 
1063  if (txn->serialized)
1064  {
1065  /* serialize remaining changes */
1066  ReorderBufferSerializeTXN(rb, txn);
1067  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
1068  &state->entries[off].segno);
1069  }
1070 
1071  cur_change = dlist_head_element(ReorderBufferChange, node,
1072  &txn->changes);
1073 
1074  state->entries[off].lsn = cur_change->lsn;
1075  state->entries[off].change = cur_change;
1076  state->entries[off].txn = txn;
1077 
1079  }
1080 
1081  /* add subtransactions if they contain changes */
1082  dlist_foreach(cur_txn_i, &txn->subtxns)
1083  {
1084  ReorderBufferTXN *cur_txn;
1085 
1086  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1087 
1088  if (cur_txn->nentries > 0)
1089  {
1090  ReorderBufferChange *cur_change;
1091 
1092  if (cur_txn->serialized)
1093  {
1094  /* serialize remaining changes */
1095  ReorderBufferSerializeTXN(rb, cur_txn);
1096  ReorderBufferRestoreChanges(rb, cur_txn,
1097  &state->entries[off].fd,
1098  &state->entries[off].segno);
1099  }
1100  cur_change = dlist_head_element(ReorderBufferChange, node,
1101  &cur_txn->changes);
1102 
1103  state->entries[off].lsn = cur_change->lsn;
1104  state->entries[off].change = cur_change;
1105  state->entries[off].txn = cur_txn;
1106 
1108  }
1109  }
1110 
1111  /* assemble a valid binary heap */
1112  binaryheap_build(state->heap);
1113 
1114  return state;
1115 }
1116 
1117 /*
1118  * Return the next change when iterating over a transaction and its
1119  * subtransactions.
1120  *
1121  * Returns NULL when no further changes exist.
1122  */
1123 static ReorderBufferChange *
1125 {
1126  ReorderBufferChange *change;
1128  int32 off;
1129 
1130  /* nothing there anymore */
1131  if (state->heap->bh_size == 0)
1132  return NULL;
1133 
1134  off = DatumGetInt32(binaryheap_first(state->heap));
1135  entry = &state->entries[off];
1136 
1137  /* free memory we might have "leaked" in the previous *Next call */
1138  if (!dlist_is_empty(&state->old_change))
1139  {
1140  change = dlist_container(ReorderBufferChange, node,
1141  dlist_pop_head_node(&state->old_change));
1142  ReorderBufferReturnChange(rb, change);
1143  Assert(dlist_is_empty(&state->old_change));
1144  }
1145 
1146  change = entry->change;
1147 
1148  /*
1149  * update heap with information about which transaction has the next
1150  * relevant change in LSN order
1151  */
1152 
1153  /* there are in-memory changes */
1154  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1155  {
1156  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1157  ReorderBufferChange *next_change =
1158  dlist_container(ReorderBufferChange, node, next);
1159 
1160  /* txn stays the same */
1161  state->entries[off].lsn = next_change->lsn;
1162  state->entries[off].change = next_change;
1163 
1165  return change;
1166  }
1167 
1168  /* try to load changes from disk */
1169  if (entry->txn->nentries != entry->txn->nentries_mem)
1170  {
1171  /*
1172  * Ugly: restoring changes will reuse *Change records, thus delete the
1173  * current one from the per-tx list and only free in the next call.
1174  */
1175  dlist_delete(&change->node);
1176  dlist_push_tail(&state->old_change, &change->node);
1177 
1178  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
1179  &state->entries[off].segno))
1180  {
1181  /* successfully restored changes from disk */
1182  ReorderBufferChange *next_change =
1184  &entry->txn->changes);
1185 
1186  elog(DEBUG2, "restored %u/%u changes from disk",
1187  (uint32) entry->txn->nentries_mem,
1188  (uint32) entry->txn->nentries);
1189 
1190  Assert(entry->txn->nentries_mem);
1191  /* txn stays the same */
1192  state->entries[off].lsn = next_change->lsn;
1193  state->entries[off].change = next_change;
1195 
1196  return change;
1197  }
1198  }
1199 
1200  /* ok, no changes there anymore, remove */
1201  binaryheap_remove_first(state->heap);
1202 
1203  return change;
1204 }
1205 
1206 /*
1207  * Deallocate the iterator
1208  */
1209 static void
1212 {
1213  int32 off;
1214 
1215  for (off = 0; off < state->nr_txns; off++)
1216  {
1217  if (state->entries[off].fd != -1)
1218  CloseTransientFile(state->entries[off].fd);
1219  }
1220 
1221  /* free memory we might have "leaked" in the last *Next call */
1222  if (!dlist_is_empty(&state->old_change))
1223  {
1224  ReorderBufferChange *change;
1225 
1226  change = dlist_container(ReorderBufferChange, node,
1227  dlist_pop_head_node(&state->old_change));
1228  ReorderBufferReturnChange(rb, change);
1229  Assert(dlist_is_empty(&state->old_change));
1230  }
1231 
1232  binaryheap_free(state->heap);
1233  pfree(state);
1234 }
1235 
1236 /*
1237  * Cleanup the contents of a transaction, usually after the transaction
1238  * committed or aborted.
1239  */
1240 static void
1242 {
1243  bool found;
1244  dlist_mutable_iter iter;
1245 
1246  /* cleanup subtransactions & their changes */
1247  dlist_foreach_modify(iter, &txn->subtxns)
1248  {
1249  ReorderBufferTXN *subtxn;
1250 
1251  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1252 
1253  /*
1254  * Subtransactions are always associated to the toplevel TXN, even if
1255  * they originally were happening inside another subtxn, so we won't
1256  * ever recurse more than one level deep here.
1257  */
1258  Assert(subtxn->is_known_as_subxact);
1259  Assert(subtxn->nsubtxns == 0);
1260 
1261  ReorderBufferCleanupTXN(rb, subtxn);
1262  }
1263 
1264  /* cleanup changes in the toplevel txn */
1265  dlist_foreach_modify(iter, &txn->changes)
1266  {
1267  ReorderBufferChange *change;
1268 
1269  change = dlist_container(ReorderBufferChange, node, iter.cur);
1270 
1271  /* Check we're not mixing changes from different transactions. */
1272  Assert(change->txn == txn);
1273 
1274  ReorderBufferReturnChange(rb, change);
1275  }
1276 
1277  /*
1278  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1279  * They are always stored in the toplevel transaction.
1280  */
1281  dlist_foreach_modify(iter, &txn->tuplecids)
1282  {
1283  ReorderBufferChange *change;
1284 
1285  change = dlist_container(ReorderBufferChange, node, iter.cur);
1286 
1287  /* Check we're not mixing changes from different transactions. */
1288  Assert(change->txn == txn);
1290 
1291  ReorderBufferReturnChange(rb, change);
1292  }
1293 
1294  /*
1295  * Cleanup the base snapshot, if set.
1296  */
1297  if (txn->base_snapshot != NULL)
1298  {
1301  }
1302 
1303  /*
1304  * Remove TXN from its containing list.
1305  *
1306  * Note: if txn->is_known_as_subxact, we are deleting the TXN from its
1307  * parent's list of known subxacts; this leaves the parent's nsubxacts
1308  * count too high, but we don't care. Otherwise, we are deleting the TXN
1309  * from the LSN-ordered list of toplevel TXNs.
1310  */
1311  dlist_delete(&txn->node);
1312 
1313  /* now remove reference from buffer */
1314  hash_search(rb->by_txn,
1315  (void *) &txn->xid,
1316  HASH_REMOVE,
1317  &found);
1318  Assert(found);
1319 
1320  /* remove entries spilled to disk */
1321  if (txn->serialized)
1322  ReorderBufferRestoreCleanup(rb, txn);
1323 
1324  /* deallocate */
1325  ReorderBufferReturnTXN(rb, txn);
1326 }
1327 
1328 /*
1329  * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by
1330  * HeapTupleSatisfiesHistoricMVCC.
1331  */
1332 static void
1334 {
1335  dlist_iter iter;
1336  HASHCTL hash_ctl;
1337 
1338  if (!txn->has_catalog_changes || dlist_is_empty(&txn->tuplecids))
1339  return;
1340 
1341  memset(&hash_ctl, 0, sizeof(hash_ctl));
1342 
1343  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1344  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1345  hash_ctl.hcxt = rb->context;
1346 
1347  /*
1348  * create the hash with the exact number of to-be-stored tuplecids from
1349  * the start
1350  */
1351  txn->tuplecid_hash =
1352  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1354 
1355  dlist_foreach(iter, &txn->tuplecids)
1356  {
1359  bool found;
1360  ReorderBufferChange *change;
1361 
1362  change = dlist_container(ReorderBufferChange, node, iter.cur);
1363 
1365 
1366  /* be careful about padding */
1367  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1368 
1369  key.relnode = change->data.tuplecid.node;
1370 
1371  ItemPointerCopy(&change->data.tuplecid.tid,
1372  &key.tid);
1373 
1374  ent = (ReorderBufferTupleCidEnt *)
1376  (void *) &key,
1378  &found);
1379  if (!found)
1380  {
1381  ent->cmin = change->data.tuplecid.cmin;
1382  ent->cmax = change->data.tuplecid.cmax;
1383  ent->combocid = change->data.tuplecid.combocid;
1384  }
1385  else
1386  {
1387  /*
1388  * Maybe we already saw this tuple before in this transaction, but
1389  * if so it must have the same cmin.
1390  */
1391  Assert(ent->cmin == change->data.tuplecid.cmin);
1392 
1393  /*
1394  * cmax may be initially invalid, but once set it can only grow,
1395  * and never become invalid again.
1396  */
1397  Assert((ent->cmax == InvalidCommandId) ||
1398  ((change->data.tuplecid.cmax != InvalidCommandId) &&
1399  (change->data.tuplecid.cmax > ent->cmax)));
1400  ent->cmax = change->data.tuplecid.cmax;
1401  }
1402  }
1403 }
1404 
1405 /*
1406  * Copy a provided snapshot so we can modify it privately. This is needed so
1407  * that catalog modifying transactions can look into intermediate catalog
1408  * states.
1409  */
1410 static Snapshot
1413 {
1414  Snapshot snap;
1415  dlist_iter iter;
1416  int i = 0;
1417  Size size;
1418 
1419  size = sizeof(SnapshotData) +
1420  sizeof(TransactionId) * orig_snap->xcnt +
1421  sizeof(TransactionId) * (txn->nsubtxns + 1);
1422 
1423  snap = MemoryContextAllocZero(rb->context, size);
1424  memcpy(snap, orig_snap, sizeof(SnapshotData));
1425 
1426  snap->copied = true;
1427  snap->active_count = 1; /* mark as active so nobody frees it */
1428  snap->regd_count = 0;
1429  snap->xip = (TransactionId *) (snap + 1);
1430 
1431  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1432 
1433  /*
1434  * snap->subxip contains all txids that belong to our transaction which we
1435  * need to check via cmin/cmax. That's why we store the toplevel
1436  * transaction in there as well.
1437  */
1438  snap->subxip = snap->xip + snap->xcnt;
1439  snap->subxip[i++] = txn->xid;
1440 
1441  /*
1442  * subxcnt isn't decreased when subtransactions abort, so count manually.
1443  * Since it's an upper boundary it is safe to use it for the allocation
1444  * above.
1445  */
1446  snap->subxcnt = 1;
1447 
1448  dlist_foreach(iter, &txn->subtxns)
1449  {
1450  ReorderBufferTXN *sub_txn;
1451 
1452  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1453  snap->subxip[i++] = sub_txn->xid;
1454  snap->subxcnt++;
1455  }
1456 
1457  /* sort so we can bsearch() later */
1458  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1459 
1460  /* store the specified current CommandId */
1461  snap->curcid = cid;
1462 
1463  return snap;
1464 }
1465 
1466 /*
1467  * Free a previously ReorderBufferCopySnap'ed snapshot
1468  */
1469 static void
1471 {
1472  if (snap->copied)
1473  pfree(snap);
1474  else
1476 }
1477 
1478 /*
1479  * Perform the replay of a transaction and its non-aborted subtransactions.
1480  *
1481  * Subtransactions previously have to be processed by
1482  * ReorderBufferCommitChild(), even if previously assigned to the toplevel
1483  * transaction with ReorderBufferAssignChild.
1484  *
1485  * We currently can only decode a transaction's contents when its commit
1486  * record is read because that's the only place where we know about cache
1487  * invalidations. Thus, once a toplevel commit is read, we iterate over the top
1488  * and subtransactions (using a k-way merge) and replay the changes in lsn
1489  * order.
1490  */
1491 void
1493  XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
1494  TimestampTz commit_time,
1495  RepOriginId origin_id, XLogRecPtr origin_lsn)
1496 {
1498  volatile Snapshot snapshot_now;
1499  volatile CommandId command_id = FirstCommandId;
1500  bool using_subtxn;
1501  ReorderBufferIterTXNState *volatile iterstate = NULL;
1502 
1503  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1504  false);
1505 
1506  /* unknown transaction, nothing to replay */
1507  if (txn == NULL)
1508  return;
1509 
1510  txn->final_lsn = commit_lsn;
1511  txn->end_lsn = end_lsn;
1512  txn->commit_time = commit_time;
1513  txn->origin_id = origin_id;
1514  txn->origin_lsn = origin_lsn;
1515 
1516  /*
1517  * If this transaction has no snapshot, it didn't make any changes to the
1518  * database, so there's nothing to decode. Note that
1519  * ReorderBufferCommitChild will have transferred any snapshots from
1520  * subtransactions if there were any.
1521  */
1522  if (txn->base_snapshot == NULL)
1523  {
1524  Assert(txn->ninvalidations == 0);
1525  ReorderBufferCleanupTXN(rb, txn);
1526  return;
1527  }
1528 
1529  snapshot_now = txn->base_snapshot;
1530 
1531  /* build data to be able to lookup the CommandIds of catalog tuples */
1533 
1534  /* setup the initial snapshot */
1535  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1536 
1537  /*
1538  * Decoding needs access to syscaches et al., which in turn use
1539  * heavyweight locks and such. Thus we need to have enough state around to
1540  * keep track of those. The easiest way is to simply use a transaction
1541  * internally. That also allows us to easily enforce that nothing writes
1542  * to the database by checking for xid assignments.
1543  *
1544  * When we're called via the SQL SRF there's already a transaction
1545  * started, so start an explicit subtransaction there.
1546  */
1547  using_subtxn = IsTransactionOrTransactionBlock();
1548 
1549  PG_TRY();
1550  {
1551  ReorderBufferChange *change;
1552  ReorderBufferChange *specinsert = NULL;
1553 
1554  if (using_subtxn)
1555  BeginInternalSubTransaction("replay");
1556  else
1558 
1559  rb->begin(rb, txn);
1560 
1561  iterstate = ReorderBufferIterTXNInit(rb, txn);
1562  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1563  {
1564  Relation relation = NULL;
1565  Oid reloid;
1566 
1567  switch (change->action)
1568  {
1570 
1571  /*
1572  * Confirmation for speculative insertion arrived. Simply
1573  * use as a normal record. It'll be cleaned up at the end
1574  * of INSERT processing.
1575  */
1576  if (specinsert == NULL)
1577  elog(ERROR, "invalid ordering of speculative insertion changes");
1578  Assert(specinsert->data.tp.oldtuple == NULL);
1579  change = specinsert;
1581 
1582  /* intentionally fall through */
1586  Assert(snapshot_now);
1587 
1588  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1589  change->data.tp.relnode.relNode);
1590 
1591  /*
1592  * Mapped catalog tuple without data, emitted while
1593  * catalog table was in the process of being rewritten. We
1594  * can fail to look up the relfilenode, because the
1595  * relmapper has no "historic" view, in contrast to normal
1596  * the normal catalog during decoding. Thus repeated
1597  * rewrites can cause a lookup failure. That's OK because
1598  * we do not decode catalog changes anyway. Normally such
1599  * tuples would be skipped over below, but we can't
1600  * identify whether the table should be logically logged
1601  * without mapping the relfilenode to the oid.
1602  */
1603  if (reloid == InvalidOid &&
1604  change->data.tp.newtuple == NULL &&
1605  change->data.tp.oldtuple == NULL)
1606  goto change_done;
1607  else if (reloid == InvalidOid)
1608  elog(ERROR, "could not map filenode \"%s\" to relation OID",
1609  relpathperm(change->data.tp.relnode,
1610  MAIN_FORKNUM));
1611 
1612  relation = RelationIdGetRelation(reloid);
1613 
1614  if (!RelationIsValid(relation))
1615  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1616  reloid,
1617  relpathperm(change->data.tp.relnode,
1618  MAIN_FORKNUM));
1619 
1620  if (!RelationIsLogicallyLogged(relation))
1621  goto change_done;
1622 
1623  /*
1624  * Ignore temporary heaps created during DDL unless the
1625  * plugin has asked for them.
1626  */
1627  if (relation->rd_rel->relrewrite && !rb->output_rewrites)
1628  goto change_done;
1629 
1630  /*
1631  * For now ignore sequence changes entirely. Most of the
1632  * time they don't log changes using records we
1633  * understand, so it doesn't make sense to handle the few
1634  * cases we do.
1635  */
1636  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1637  goto change_done;
1638 
1639  /* user-triggered change */
1640  if (!IsToastRelation(relation))
1641  {
1642  ReorderBufferToastReplace(rb, txn, relation, change);
1643  rb->apply_change(rb, txn, relation, change);
1644 
1645  /*
1646  * Only clear reassembled toast chunks if we're sure
1647  * they're not required anymore. The creator of the
1648  * tuple tells us.
1649  */
1650  if (change->data.tp.clear_toast_afterwards)
1651  ReorderBufferToastReset(rb, txn);
1652  }
1653  /* we're not interested in toast deletions */
1654  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1655  {
1656  /*
1657  * Need to reassemble the full toasted Datum in
1658  * memory, to ensure the chunks don't get reused till
1659  * we're done remove it from the list of this
1660  * transaction's changes. Otherwise it will get
1661  * freed/reused while restoring spooled data from
1662  * disk.
1663  */
1664  Assert(change->data.tp.newtuple != NULL);
1665 
1666  dlist_delete(&change->node);
1667  ReorderBufferToastAppendChunk(rb, txn, relation,
1668  change);
1669  }
1670 
1671  change_done:
1672 
1673  /*
1674  * Either speculative insertion was confirmed, or it was
1675  * unsuccessful and the record isn't needed anymore.
1676  */
1677  if (specinsert != NULL)
1678  {
1679  ReorderBufferReturnChange(rb, specinsert);
1680  specinsert = NULL;
1681  }
1682 
1683  if (relation != NULL)
1684  {
1685  RelationClose(relation);
1686  relation = NULL;
1687  }
1688  break;
1689 
1691 
1692  /*
1693  * Speculative insertions are dealt with by delaying the
1694  * processing of the insert until the confirmation record
1695  * arrives. For that we simply unlink the record from the
1696  * chain, so it does not get freed/reused while restoring
1697  * spooled data from disk.
1698  *
1699  * This is safe in the face of concurrent catalog changes
1700  * because the relevant relation can't be changed between
1701  * speculative insertion and confirmation due to
1702  * CheckTableNotInUse() and locking.
1703  */
1704 
1705  /* clear out a pending (and thus failed) speculation */
1706  if (specinsert != NULL)
1707  {
1708  ReorderBufferReturnChange(rb, specinsert);
1709  specinsert = NULL;
1710  }
1711 
1712  /* and memorize the pending insertion */
1713  dlist_delete(&change->node);
1714  specinsert = change;
1715  break;
1716 
1718  {
1719  int i;
1720  int nrelids = change->data.truncate.nrelids;
1721  int nrelations = 0;
1722  Relation *relations;
1723 
1724  relations = palloc0(nrelids * sizeof(Relation));
1725  for (i = 0; i < nrelids; i++)
1726  {
1727  Oid relid = change->data.truncate.relids[i];
1728  Relation relation;
1729 
1730  relation = RelationIdGetRelation(relid);
1731 
1732  if (!RelationIsValid(relation))
1733  elog(ERROR, "could not open relation with OID %u", relid);
1734 
1735  if (!RelationIsLogicallyLogged(relation))
1736  continue;
1737 
1738  relations[nrelations++] = relation;
1739  }
1740 
1741  rb->apply_truncate(rb, txn, nrelations, relations, change);
1742 
1743  for (i = 0; i < nrelations; i++)
1744  RelationClose(relations[i]);
1745 
1746  break;
1747  }
1748 
1750  rb->message(rb, txn, change->lsn, true,
1751  change->data.msg.prefix,
1752  change->data.msg.message_size,
1753  change->data.msg.message);
1754  break;
1755 
1757  /* get rid of the old */
1758  TeardownHistoricSnapshot(false);
1759 
1760  if (snapshot_now->copied)
1761  {
1762  ReorderBufferFreeSnap(rb, snapshot_now);
1763  snapshot_now =
1764  ReorderBufferCopySnap(rb, change->data.snapshot,
1765  txn, command_id);
1766  }
1767 
1768  /*
1769  * Restored from disk, need to be careful not to double
1770  * free. We could introduce refcounting for that, but for
1771  * now this seems infrequent enough not to care.
1772  */
1773  else if (change->data.snapshot->copied)
1774  {
1775  snapshot_now =
1776  ReorderBufferCopySnap(rb, change->data.snapshot,
1777  txn, command_id);
1778  }
1779  else
1780  {
1781  snapshot_now = change->data.snapshot;
1782  }
1783 
1784 
1785  /* and continue with the new one */
1786  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1787  break;
1788 
1790  Assert(change->data.command_id != InvalidCommandId);
1791 
1792  if (command_id < change->data.command_id)
1793  {
1794  command_id = change->data.command_id;
1795 
1796  if (!snapshot_now->copied)
1797  {
1798  /* we don't use the global one anymore */
1799  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1800  txn, command_id);
1801  }
1802 
1803  snapshot_now->curcid = command_id;
1804 
1805  TeardownHistoricSnapshot(false);
1806  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1807 
1808  /*
1809  * Every time the CommandId is incremented, we could
1810  * see new catalog contents, so execute all
1811  * invalidations.
1812  */
1814  }
1815 
1816  break;
1817 
1819  elog(ERROR, "tuplecid value in changequeue");
1820  break;
1821  }
1822  }
1823 
1824  /*
1825  * There's a speculative insertion remaining, just clean in up, it
1826  * can't have been successful, otherwise we'd gotten a confirmation
1827  * record.
1828  */
1829  if (specinsert)
1830  {
1831  ReorderBufferReturnChange(rb, specinsert);
1832  specinsert = NULL;
1833  }
1834 
1835  /* clean up the iterator */
1836  ReorderBufferIterTXNFinish(rb, iterstate);
1837  iterstate = NULL;
1838 
1839  /* call commit callback */
1840  rb->commit(rb, txn, commit_lsn);
1841 
1842  /* this is just a sanity check against bad output plugin behaviour */
1844  elog(ERROR, "output plugin used XID %u",
1846 
1847  /* cleanup */
1848  TeardownHistoricSnapshot(false);
1849 
1850  /*
1851  * Aborting the current (sub-)transaction as a whole has the right
1852  * semantics. We want all locks acquired in here to be released, not
1853  * reassigned to the parent and we do not want any database access
1854  * have persistent effects.
1855  */
1857 
1858  /* make sure there's no cache pollution */
1860 
1861  if (using_subtxn)
1863 
1864  if (snapshot_now->copied)
1865  ReorderBufferFreeSnap(rb, snapshot_now);
1866 
1867  /* remove potential on-disk data, and deallocate */
1868  ReorderBufferCleanupTXN(rb, txn);
1869  }
1870  PG_CATCH();
1871  {
1872  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1873  if (iterstate)
1874  ReorderBufferIterTXNFinish(rb, iterstate);
1875 
1877 
1878  /*
1879  * Force cache invalidation to happen outside of a valid transaction
1880  * to prevent catalog access as we just caught an error.
1881  */
1883 
1884  /* make sure there's no cache pollution */
1886 
1887  if (using_subtxn)
1889 
1890  if (snapshot_now->copied)
1891  ReorderBufferFreeSnap(rb, snapshot_now);
1892 
1893  /* remove potential on-disk data, and deallocate */
1894  ReorderBufferCleanupTXN(rb, txn);
1895 
1896  PG_RE_THROW();
1897  }
1898  PG_END_TRY();
1899 }
1900 
1901 /*
1902  * Abort a transaction that possibly has previous changes. Needs to be first
1903  * called for subtransactions and then for the toplevel xid.
1904  *
1905  * NB: Transactions handled here have to have actively aborted (i.e. have
1906  * produced an abort record). Implicitly aborted transactions are handled via
1907  * ReorderBufferAbortOld(); transactions we're just not interested in, but
1908  * which have committed are handled in ReorderBufferForget().
1909  *
1910  * This function purges this transaction and its contents from memory and
1911  * disk.
1912  */
1913 void
1915 {
1917 
1918  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1919  false);
1920 
1921  /* unknown, nothing to remove */
1922  if (txn == NULL)
1923  return;
1924 
1925  /* cosmetic... */
1926  txn->final_lsn = lsn;
1927 
1928  /* remove potential on-disk data, and deallocate */
1929  ReorderBufferCleanupTXN(rb, txn);
1930 }
1931 
1932 /*
1933  * Abort all transactions that aren't actually running anymore because the
1934  * server restarted.
1935  *
1936  * NB: These really have to be transactions that have aborted due to a server
1937  * crash/immediate restart, as we don't deal with invalidations here.
1938  */
1939 void
1941 {
1942  dlist_mutable_iter it;
1943 
1944  /*
1945  * Iterate through all (potential) toplevel TXNs and abort all that are
1946  * older than what possibly can be running. Once we've found the first
1947  * that is alive we stop, there might be some that acquired an xid earlier
1948  * but started writing later, but it's unlikely and they will be cleaned
1949  * up in a later call to this function.
1950  */
1952  {
1954 
1955  txn = dlist_container(ReorderBufferTXN, node, it.cur);
1956 
1957  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1958  {
1959  /*
1960  * We set final_lsn on a transaction when we decode its commit or
1961  * abort record, but we never see those records for crashed
1962  * transactions. To ensure cleanup of these transactions, set
1963  * final_lsn to that of their last change; this causes
1964  * ReorderBufferRestoreCleanup to do the right thing.
1965  */
1966  if (txn->serialized && txn->final_lsn == 0)
1967  {
1968  ReorderBufferChange *last =
1970 
1971  txn->final_lsn = last->lsn;
1972  }
1973 
1974  elog(DEBUG2, "aborting old transaction %u", txn->xid);
1975 
1976  /* remove potential on-disk data, and deallocate this tx */
1977  ReorderBufferCleanupTXN(rb, txn);
1978  }
1979  else
1980  return;
1981  }
1982 }
1983 
1984 /*
1985  * Forget the contents of a transaction if we aren't interested in its
1986  * contents. Needs to be first called for subtransactions and then for the
1987  * toplevel xid.
1988  *
1989  * This is significantly different to ReorderBufferAbort() because
1990  * transactions that have committed need to be treated differently from aborted
1991  * ones since they may have modified the catalog.
1992  *
1993  * Note that this is only allowed to be called in the moment a transaction
1994  * commit has just been read, not earlier; otherwise later records referring
1995  * to this xid might re-create the transaction incompletely.
1996  */
1997 void
1999 {
2001 
2002  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2003  false);
2004 
2005  /* unknown, nothing to forget */
2006  if (txn == NULL)
2007  return;
2008 
2009  /* cosmetic... */
2010  txn->final_lsn = lsn;
2011 
2012  /*
2013  * Process cache invalidation messages if there are any. Even if we're not
2014  * interested in the transaction's contents, it could have manipulated the
2015  * catalog and we need to update the caches according to that.
2016  */
2017  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
2019  txn->invalidations);
2020  else
2021  Assert(txn->ninvalidations == 0);
2022 
2023  /* remove potential on-disk data, and deallocate */
2024  ReorderBufferCleanupTXN(rb, txn);
2025 }
2026 
2027 /*
2028  * Execute invalidations happening outside the context of a decoded
2029  * transaction. That currently happens either for xid-less commits
2030  * (cf. RecordTransactionCommit()) or for invalidations in uninteresting
2031  * transactions (via ReorderBufferForget()).
2032  */
2033 void
2035  SharedInvalidationMessage *invalidations)
2036 {
2037  bool use_subtxn = IsTransactionOrTransactionBlock();
2038  int i;
2039 
2040  if (use_subtxn)
2041  BeginInternalSubTransaction("replay");
2042 
2043  /*
2044  * Force invalidations to happen outside of a valid transaction - that way
2045  * entries will just be marked as invalid without accessing the catalog.
2046  * That's advantageous because we don't need to setup the full state
2047  * necessary for catalog access.
2048  */
2049  if (use_subtxn)
2051 
2052  for (i = 0; i < ninvalidations; i++)
2053  LocalExecuteInvalidationMessage(&invalidations[i]);
2054 
2055  if (use_subtxn)
2057 }
2058 
2059 /*
2060  * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
2061  * least once for every xid in XLogRecord->xl_xid (other places in records
2062  * may, but do not have to be passed through here).
2063  *
2064  * Reorderbuffer keeps some datastructures about transactions in LSN order,
2065  * for efficiency. To do that it has to know about when transactions are seen
2066  * first in the WAL. As many types of records are not actually interesting for
2067  * logical decoding, they do not necessarily pass though here.
2068  */
2069 void
2071 {
2072  /* many records won't have an xid assigned, centralize check here */
2073  if (xid != InvalidTransactionId)
2074  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2075 }
2076 
2077 /*
2078  * Add a new snapshot to this transaction that may only used after lsn 'lsn'
2079  * because the previous snapshot doesn't describe the catalog correctly for
2080  * following rows.
2081  */
2082 void
2084  XLogRecPtr lsn, Snapshot snap)
2085 {
2087 
2088  change->data.snapshot = snap;
2090 
2091  ReorderBufferQueueChange(rb, xid, lsn, change);
2092 }
2093 
2094 /*
2095  * Set up the transaction's base snapshot.
2096  *
2097  * If we know that xid is a subtransaction, set the base snapshot on the
2098  * top-level transaction instead.
2099  */
2100 void
2102  XLogRecPtr lsn, Snapshot snap)
2103 {
2105  bool is_new;
2106 
2107  AssertArg(snap != NULL);
2108 
2109  /*
2110  * Fetch the transaction to operate on. If we know it's a subtransaction,
2111  * operate on its top-level transaction instead.
2112  */
2113  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
2114  if (txn->is_known_as_subxact)
2115  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
2116  NULL, InvalidXLogRecPtr, false);
2117  Assert(txn->base_snapshot == NULL);
2118 
2119  txn->base_snapshot = snap;
2120  txn->base_snapshot_lsn = lsn;
2122 
2123  AssertTXNLsnOrder(rb);
2124 }
2125 
2126 /*
2127  * Access the catalog with this CommandId at this point in the changestream.
2128  *
2129  * May only be called for command ids > 1
2130  */
2131 void
2133  XLogRecPtr lsn, CommandId cid)
2134 {
2136 
2137  change->data.command_id = cid;
2139 
2140  ReorderBufferQueueChange(rb, xid, lsn, change);
2141 }
2142 
2143 /*
2144  * Update the memory accounting info. We track memory used by the whole
2145  * reorder buffer and the transaction containing the change.
2146  */
2147 static void
2149  ReorderBufferChange *change,
2150  bool addition)
2151 {
2152  Size sz;
2153 
2154  Assert(change->txn);
2155 
2156  /*
2157  * Ignore tuple CID changes, because those are not evicted when reaching
2158  * memory limit. So we just don't count them, because it might easily
2159  * trigger a pointless attempt to spill.
2160  */
2162  return;
2163 
2164  sz = ReorderBufferChangeSize(change);
2165 
2166  if (addition)
2167  {
2168  change->txn->size += sz;
2169  rb->size += sz;
2170  }
2171  else
2172  {
2173  Assert((rb->size >= sz) && (change->txn->size >= sz));
2174  change->txn->size -= sz;
2175  rb->size -= sz;
2176  }
2177 }
2178 
2179 /*
2180  * Add new (relfilenode, tid) -> (cmin, cmax) mappings.
2181  *
2182  * We do not include this change type in memory accounting, because we
2183  * keep CIDs in a separate list and do not evict them when reaching
2184  * the memory limit.
2185  */
2186 void
2188  XLogRecPtr lsn, RelFileNode node,
2189  ItemPointerData tid, CommandId cmin,
2190  CommandId cmax, CommandId combocid)
2191 {
2194 
2195  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2196 
2197  change->data.tuplecid.node = node;
2198  change->data.tuplecid.tid = tid;
2199  change->data.tuplecid.cmin = cmin;
2200  change->data.tuplecid.cmax = cmax;
2201  change->data.tuplecid.combocid = combocid;
2202  change->lsn = lsn;
2203  change->txn = txn;
2205 
2206  dlist_push_tail(&txn->tuplecids, &change->node);
2207  txn->ntuplecids++;
2208 }
2209 
2210 /*
2211  * Setup the invalidation of the toplevel transaction.
2212  *
2213  * This needs to be done before ReorderBufferCommit is called!
2214  */
2215 void
2217  XLogRecPtr lsn, Size nmsgs,
2219 {
2221 
2222  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2223 
2224  if (txn->ninvalidations != 0)
2225  elog(ERROR, "only ever add one set of invalidations");
2226 
2227  Assert(nmsgs > 0);
2228 
2229  txn->ninvalidations = nmsgs;
2232  sizeof(SharedInvalidationMessage) * nmsgs);
2233  memcpy(txn->invalidations, msgs,
2234  sizeof(SharedInvalidationMessage) * nmsgs);
2235 }
2236 
2237 /*
2238  * Apply all invalidations we know. Possibly we only need parts at this point
2239  * in the changestream but we don't know which those are.
2240  */
2241 static void
2243 {
2244  int i;
2245 
2246  for (i = 0; i < txn->ninvalidations; i++)
2248 }
2249 
2250 /*
2251  * Mark a transaction as containing catalog changes
2252  */
2253 void
2255  XLogRecPtr lsn)
2256 {
2258 
2259  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2260 
2261  txn->has_catalog_changes = true;
2262 }
2263 
2264 /*
2265  * Query whether a transaction is already *known* to contain catalog
2266  * changes. This can be wrong until directly before the commit!
2267  */
2268 bool
2270 {
2272 
2273  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2274  false);
2275  if (txn == NULL)
2276  return false;
2277 
2278  return txn->has_catalog_changes;
2279 }
2280 
2281 /*
2282  * ReorderBufferXidHasBaseSnapshot
2283  * Have we already set the base snapshot for the given txn/subtxn?
2284  */
2285 bool
2287 {
2289 
2290  txn = ReorderBufferTXNByXid(rb, xid, false,
2291  NULL, InvalidXLogRecPtr, false);
2292 
2293  /* transaction isn't known yet, ergo no snapshot */
2294  if (txn == NULL)
2295  return false;
2296 
2297  /* a known subtxn? operate on top-level txn instead */
2298  if (txn->is_known_as_subxact)
2299  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
2300  NULL, InvalidXLogRecPtr, false);
2301 
2302  return txn->base_snapshot != NULL;
2303 }
2304 
2305 
2306 /*
2307  * ---------------------------------------
2308  * Disk serialization support
2309  * ---------------------------------------
2310  */
2311 
2312 /*
2313  * Ensure the IO buffer is >= sz.
2314  */
2315 static void
2317 {
2318  if (!rb->outbufsize)
2319  {
2320  rb->outbuf = MemoryContextAlloc(rb->context, sz);
2321  rb->outbufsize = sz;
2322  }
2323  else if (rb->outbufsize < sz)
2324  {
2325  rb->outbuf = repalloc(rb->outbuf, sz);
2326  rb->outbufsize = sz;
2327  }
2328 }
2329 
2330 /*
2331  * Find the largest transaction (toplevel or subxact) to evict (spill to disk).
2332  *
2333  * XXX With many subtransactions this might be quite slow, because we'll have
2334  * to walk through all of them. There are some options how we could improve
2335  * that: (a) maintain some secondary structure with transactions sorted by
2336  * amount of changes, (b) not looking for the entirely largest transaction,
2337  * but e.g. for transaction using at least some fraction of the memory limit,
2338  * and (c) evicting multiple transactions at once, e.g. to free a given portion
2339  * of the memory limit (e.g. 50%).
2340  */
2341 static ReorderBufferTXN *
2343 {
2344  HASH_SEQ_STATUS hash_seq;
2346  ReorderBufferTXN *largest = NULL;
2347 
2348  hash_seq_init(&hash_seq, rb->by_txn);
2349  while ((ent = hash_seq_search(&hash_seq)) != NULL)
2350  {
2351  ReorderBufferTXN *txn = ent->txn;
2352 
2353  /* if the current transaction is larger, remember it */
2354  if ((!largest) || (txn->size > largest->size))
2355  largest = txn;
2356  }
2357 
2358  Assert(largest);
2359  Assert(largest->size > 0);
2360  Assert(largest->size <= rb->size);
2361 
2362  return largest;
2363 }
2364 
2365 /*
2366  * Check whether the logical_decoding_work_mem limit was reached, and if yes
2367  * pick the transaction to evict and spill the changes to disk.
2368  *
2369  * XXX At this point we select just a single (largest) transaction, but
2370  * we might also adapt a more elaborate eviction strategy - for example
2371  * evicting enough transactions to free certain fraction (e.g. 50%) of
2372  * the memory limit.
2373  */
2374 static void
2376 {
2378 
2379  /* bail out if we haven't exceeded the memory limit */
2380  if (rb->size < logical_decoding_work_mem * 1024L)
2381  return;
2382 
2383  /*
2384  * Pick the largest transaction (or subtransaction) and evict it from
2385  * memory by serializing it to disk.
2386  */
2387  txn = ReorderBufferLargestTXN(rb);
2388 
2389  ReorderBufferSerializeTXN(rb, txn);
2390 
2391  /*
2392  * After eviction, the transaction should have no entries in memory, and
2393  * should use 0 bytes for changes.
2394  */
2395  Assert(txn->size == 0);
2396  Assert(txn->nentries_mem == 0);
2397 
2398  /*
2399  * And furthermore, evicting the transaction should get us below the
2400  * memory limit again - it is not possible that we're still exceeding the
2401  * memory limit after evicting the transaction.
2402  *
2403  * This follows from the simple fact that the selected transaction is at
2404  * least as large as the most recent change (which caused us to go over
2405  * the memory limit). So by evicting it we're definitely back below the
2406  * memory limit.
2407  */
2408  Assert(rb->size < logical_decoding_work_mem * 1024L);
2409 }
2410 
2411 /*
2412  * Spill data of a large transaction (and its subtransactions) to disk.
2413  */
2414 static void
2416 {
2417  dlist_iter subtxn_i;
2418  dlist_mutable_iter change_i;
2419  int fd = -1;
2420  XLogSegNo curOpenSegNo = 0;
2421  Size spilled = 0;
2422  Size size = txn->size;
2423 
2424  elog(DEBUG2, "spill %u changes in XID %u to disk",
2425  (uint32) txn->nentries_mem, txn->xid);
2426 
2427  /* do the same to all child TXs */
2428  dlist_foreach(subtxn_i, &txn->subtxns)
2429  {
2430  ReorderBufferTXN *subtxn;
2431 
2432  subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
2433  ReorderBufferSerializeTXN(rb, subtxn);
2434  }
2435 
2436  /* serialize changestream */
2437  dlist_foreach_modify(change_i, &txn->changes)
2438  {
2439  ReorderBufferChange *change;
2440 
2441  change = dlist_container(ReorderBufferChange, node, change_i.cur);
2442 
2443  /*
2444  * store in segment in which it belongs by start lsn, don't split over
2445  * multiple segments tho
2446  */
2447  if (fd == -1 ||
2448  !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
2449  {
2450  char path[MAXPGPATH];
2451 
2452  if (fd != -1)
2453  CloseTransientFile(fd);
2454 
2455  XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
2456 
2457  /*
2458  * No need to care about TLIs here, only used during a single run,
2459  * so each LSN only maps to a specific WAL record.
2460  */
2462  curOpenSegNo);
2463 
2464  /* open segment, create it if necessary */
2465  fd = OpenTransientFile(path,
2466  O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
2467 
2468  if (fd < 0)
2469  ereport(ERROR,
2471  errmsg("could not open file \"%s\": %m", path)));
2472  }
2473 
2474  ReorderBufferSerializeChange(rb, txn, fd, change);
2475  dlist_delete(&change->node);
2476  ReorderBufferReturnChange(rb, change);
2477 
2478  spilled++;
2479  }
2480 
2481  /* update the statistics */
2482  rb->spillCount += 1;
2483  rb->spillBytes += size;
2484 
2485  /* Don't consider already serialized transaction. */
2486  rb->spillTxns += txn->serialized ? 0 : 1;
2487 
2488  Assert(spilled == txn->nentries_mem);
2489  Assert(dlist_is_empty(&txn->changes));
2490  txn->nentries_mem = 0;
2491  txn->serialized = true;
2492 
2493  if (fd != -1)
2494  CloseTransientFile(fd);
2495 }
2496 
2497 /*
2498  * Serialize individual change to disk.
2499  */
2500 static void
2502  int fd, ReorderBufferChange *change)
2503 {
2504  ReorderBufferDiskChange *ondisk;
2505  Size sz = sizeof(ReorderBufferDiskChange);
2506 
2508 
2509  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2510  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
2511 
2512  switch (change->action)
2513  {
2514  /* fall through these, they're all similar enough */
2519  {
2520  char *data;
2521  ReorderBufferTupleBuf *oldtup,
2522  *newtup;
2523  Size oldlen = 0;
2524  Size newlen = 0;
2525 
2526  oldtup = change->data.tp.oldtuple;
2527  newtup = change->data.tp.newtuple;
2528 
2529  if (oldtup)
2530  {
2531  sz += sizeof(HeapTupleData);
2532  oldlen = oldtup->tuple.t_len;
2533  sz += oldlen;
2534  }
2535 
2536  if (newtup)
2537  {
2538  sz += sizeof(HeapTupleData);
2539  newlen = newtup->tuple.t_len;
2540  sz += newlen;
2541  }
2542 
2543  /* make sure we have enough space */
2545 
2546  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2547  /* might have been reallocated above */
2548  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2549 
2550  if (oldlen)
2551  {
2552  memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
2553  data += sizeof(HeapTupleData);
2554 
2555  memcpy(data, oldtup->tuple.t_data, oldlen);
2556  data += oldlen;
2557  }
2558 
2559  if (newlen)
2560  {
2561  memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
2562  data += sizeof(HeapTupleData);
2563 
2564  memcpy(data, newtup->tuple.t_data, newlen);
2565  data += newlen;
2566  }
2567  break;
2568  }
2570  {
2571  char *data;
2572  Size prefix_size = strlen(change->data.msg.prefix) + 1;
2573 
2574  sz += prefix_size + change->data.msg.message_size +
2575  sizeof(Size) + sizeof(Size);
2577 
2578  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2579 
2580  /* might have been reallocated above */
2581  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2582 
2583  /* write the prefix including the size */
2584  memcpy(data, &prefix_size, sizeof(Size));
2585  data += sizeof(Size);
2586  memcpy(data, change->data.msg.prefix,
2587  prefix_size);
2588  data += prefix_size;
2589 
2590  /* write the message including the size */
2591  memcpy(data, &change->data.msg.message_size, sizeof(Size));
2592  data += sizeof(Size);
2593  memcpy(data, change->data.msg.message,
2594  change->data.msg.message_size);
2595  data += change->data.msg.message_size;
2596 
2597  break;
2598  }
2600  {
2601  Snapshot snap;
2602  char *data;
2603 
2604  snap = change->data.snapshot;
2605 
2606  sz += sizeof(SnapshotData) +
2607  sizeof(TransactionId) * snap->xcnt +
2608  sizeof(TransactionId) * snap->subxcnt
2609  ;
2610 
2611  /* make sure we have enough space */
2613  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2614  /* might have been reallocated above */
2615  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2616 
2617  memcpy(data, snap, sizeof(SnapshotData));
2618  data += sizeof(SnapshotData);
2619 
2620  if (snap->xcnt)
2621  {
2622  memcpy(data, snap->xip,
2623  sizeof(TransactionId) * snap->xcnt);
2624  data += sizeof(TransactionId) * snap->xcnt;
2625  }
2626 
2627  if (snap->subxcnt)
2628  {
2629  memcpy(data, snap->subxip,
2630  sizeof(TransactionId) * snap->subxcnt);
2631  data += sizeof(TransactionId) * snap->subxcnt;
2632  }
2633  break;
2634  }
2636  {
2637  Size size;
2638  char *data;
2639 
2640  /* account for the OIDs of truncated relations */
2641  size = sizeof(Oid) * change->data.truncate.nrelids;
2642  sz += size;
2643 
2644  /* make sure we have enough space */
2646 
2647  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2648  /* might have been reallocated above */
2649  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2650 
2651  memcpy(data, change->data.truncate.relids, size);
2652  data += size;
2653 
2654  break;
2655  }
2659  /* ReorderBufferChange contains everything important */
2660  break;
2661  }
2662 
2663  ondisk->size = sz;
2664 
2665  errno = 0;
2667  if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
2668  {
2669  int save_errno = errno;
2670 
2671  CloseTransientFile(fd);
2672 
2673  /* if write didn't set errno, assume problem is no disk space */
2674  errno = save_errno ? save_errno : ENOSPC;
2675  ereport(ERROR,
2677  errmsg("could not write to data file for XID %u: %m",
2678  txn->xid)));
2679  }
2681 
2682  Assert(ondisk->change.action == change->action);
2683 }
2684 
2685 /*
2686  * Size of a change in memory.
2687  */
2688 static Size
2690 {
2691  Size sz = sizeof(ReorderBufferChange);
2692 
2693  switch (change->action)
2694  {
2695  /* fall through these, they're all similar enough */
2700  {
2701  ReorderBufferTupleBuf *oldtup,
2702  *newtup;
2703  Size oldlen = 0;
2704  Size newlen = 0;
2705 
2706  oldtup = change->data.tp.oldtuple;
2707  newtup = change->data.tp.newtuple;
2708 
2709  if (oldtup)
2710  {
2711  sz += sizeof(HeapTupleData);
2712  oldlen = oldtup->tuple.t_len;
2713  sz += oldlen;
2714  }
2715 
2716  if (newtup)
2717  {
2718  sz += sizeof(HeapTupleData);
2719  newlen = newtup->tuple.t_len;
2720  sz += newlen;
2721  }
2722 
2723  break;
2724  }
2726  {
2727  Size prefix_size = strlen(change->data.msg.prefix) + 1;
2728 
2729  sz += prefix_size + change->data.msg.message_size +
2730  sizeof(Size) + sizeof(Size);
2731 
2732  break;
2733  }
2735  {
2736  Snapshot snap;
2737 
2738  snap = change->data.snapshot;
2739 
2740  sz += sizeof(SnapshotData) +
2741  sizeof(TransactionId) * snap->xcnt +
2742  sizeof(TransactionId) * snap->subxcnt;
2743 
2744  break;
2745  }
2747  {
2748  sz += sizeof(Oid) * change->data.truncate.nrelids;
2749 
2750  break;
2751  }
2755  /* ReorderBufferChange contains everything important */
2756  break;
2757  }
2758 
2759  return sz;
2760 }
2761 
2762 
2763 /*
2764  * Restore a number of changes spilled to disk back into memory.
2765  */
2766 static Size
2768  int *fd, XLogSegNo *segno)
2769 {
2770  Size restored = 0;
2771  XLogSegNo last_segno;
2772  dlist_mutable_iter cleanup_iter;
2773 
2776 
2777  /* free current entries, so we have memory for more */
2778  dlist_foreach_modify(cleanup_iter, &txn->changes)
2779  {
2781  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
2782 
2783  dlist_delete(&cleanup->node);
2784  ReorderBufferReturnChange(rb, cleanup);
2785  }
2786  txn->nentries_mem = 0;
2787  Assert(dlist_is_empty(&txn->changes));
2788 
2789  XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
2790 
2791  while (restored < max_changes_in_memory && *segno <= last_segno)
2792  {
2793  int readBytes;
2794  ReorderBufferDiskChange *ondisk;
2795 
2796  if (*fd == -1)
2797  {
2798  char path[MAXPGPATH];
2799 
2800  /* first time in */
2801  if (*segno == 0)
2802  XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
2803 
2804  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
2805 
2806  /*
2807  * No need to care about TLIs here, only used during a single run,
2808  * so each LSN only maps to a specific WAL record.
2809  */
2811  *segno);
2812 
2813  *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
2814  if (*fd < 0 && errno == ENOENT)
2815  {
2816  *fd = -1;
2817  (*segno)++;
2818  continue;
2819  }
2820  else if (*fd < 0)
2821  ereport(ERROR,
2823  errmsg("could not open file \"%s\": %m",
2824  path)));
2825  }
2826 
2827  /*
2828  * Read the statically sized part of a change which has information
2829  * about the total size. If we couldn't read a record, we're at the
2830  * end of this file.
2831  */
2834  readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
2836 
2837  /* eof */
2838  if (readBytes == 0)
2839  {
2840  CloseTransientFile(*fd);
2841  *fd = -1;
2842  (*segno)++;
2843  continue;
2844  }
2845  else if (readBytes < 0)
2846  ereport(ERROR,
2848  errmsg("could not read from reorderbuffer spill file: %m")));
2849  else if (readBytes != sizeof(ReorderBufferDiskChange))
2850  ereport(ERROR,
2852  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2853  readBytes,
2854  (uint32) sizeof(ReorderBufferDiskChange))));
2855 
2856  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2857 
2859  sizeof(ReorderBufferDiskChange) + ondisk->size);
2860  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2861 
2863  readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
2864  ondisk->size - sizeof(ReorderBufferDiskChange));
2866 
2867  if (readBytes < 0)
2868  ereport(ERROR,
2870  errmsg("could not read from reorderbuffer spill file: %m")));
2871  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
2872  ereport(ERROR,
2874  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2875  readBytes,
2876  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
2877 
2878  /*
2879  * ok, read a full change from disk, now restore it into proper
2880  * in-memory format
2881  */
2882  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
2883  restored++;
2884  }
2885 
2886  return restored;
2887 }
2888 
2889 /*
2890  * Convert change from its on-disk format to in-memory format and queue it onto
2891  * the TXN's ->changes list.
2892  *
2893  * Note: although "data" is declared char*, at entry it points to a
2894  * maxalign'd buffer, making it safe in most of this function to assume
2895  * that the pointed-to data is suitably aligned for direct access.
2896  */
2897 static void
2899  char *data)
2900 {
2901  ReorderBufferDiskChange *ondisk;
2902  ReorderBufferChange *change;
2903 
2904  ondisk = (ReorderBufferDiskChange *) data;
2905 
2906  change = ReorderBufferGetChange(rb);
2907 
2908  /* copy static part */
2909  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
2910 
2911  data += sizeof(ReorderBufferDiskChange);
2912 
2913  /* restore individual stuff */
2914  switch (change->action)
2915  {
2916  /* fall through these, they're all similar enough */
2921  if (change->data.tp.oldtuple)
2922  {
2923  uint32 tuplelen = ((HeapTuple) data)->t_len;
2924 
2925  change->data.tp.oldtuple =
2927 
2928  /* restore ->tuple */
2929  memcpy(&change->data.tp.oldtuple->tuple, data,
2930  sizeof(HeapTupleData));
2931  data += sizeof(HeapTupleData);
2932 
2933  /* reset t_data pointer into the new tuplebuf */
2934  change->data.tp.oldtuple->tuple.t_data =
2935  ReorderBufferTupleBufData(change->data.tp.oldtuple);
2936 
2937  /* restore tuple data itself */
2938  memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
2939  data += tuplelen;
2940  }
2941 
2942  if (change->data.tp.newtuple)
2943  {
2944  /* here, data might not be suitably aligned! */
2945  uint32 tuplelen;
2946 
2947  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
2948  sizeof(uint32));
2949 
2950  change->data.tp.newtuple =
2952 
2953  /* restore ->tuple */
2954  memcpy(&change->data.tp.newtuple->tuple, data,
2955  sizeof(HeapTupleData));
2956  data += sizeof(HeapTupleData);
2957 
2958  /* reset t_data pointer into the new tuplebuf */
2959  change->data.tp.newtuple->tuple.t_data =
2960  ReorderBufferTupleBufData(change->data.tp.newtuple);
2961 
2962  /* restore tuple data itself */
2963  memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
2964  data += tuplelen;
2965  }
2966 
2967  break;
2969  {
2970  Size prefix_size;
2971 
2972  /* read prefix */
2973  memcpy(&prefix_size, data, sizeof(Size));
2974  data += sizeof(Size);
2975  change->data.msg.prefix = MemoryContextAlloc(rb->context,
2976  prefix_size);
2977  memcpy(change->data.msg.prefix, data, prefix_size);
2978  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
2979  data += prefix_size;
2980 
2981  /* read the message */
2982  memcpy(&change->data.msg.message_size, data, sizeof(Size));
2983  data += sizeof(Size);
2984  change->data.msg.message = MemoryContextAlloc(rb->context,
2985  change->data.msg.message_size);
2986  memcpy(change->data.msg.message, data,
2987  change->data.msg.message_size);
2988  data += change->data.msg.message_size;
2989 
2990  break;
2991  }
2993  {
2994  Snapshot oldsnap;
2995  Snapshot newsnap;
2996  Size size;
2997 
2998  oldsnap = (Snapshot) data;
2999 
3000  size = sizeof(SnapshotData) +
3001  sizeof(TransactionId) * oldsnap->xcnt +
3002  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
3003 
3004  change->data.snapshot = MemoryContextAllocZero(rb->context, size);
3005 
3006  newsnap = change->data.snapshot;
3007 
3008  memcpy(newsnap, data, size);
3009  newsnap->xip = (TransactionId *)
3010  (((char *) newsnap) + sizeof(SnapshotData));
3011  newsnap->subxip = newsnap->xip + newsnap->xcnt;
3012  newsnap->copied = true;
3013  break;
3014  }
3015  /* the base struct contains all the data, easy peasy */
3017  {
3018  Oid *relids;
3019 
3020  relids = ReorderBufferGetRelids(rb,
3021  change->data.truncate.nrelids);
3022  memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
3023  change->data.truncate.relids = relids;
3024 
3025  break;
3026  }
3030  break;
3031  }
3032 
3033  dlist_push_tail(&txn->changes, &change->node);
3034  txn->nentries_mem++;
3035 
3036  /*
3037  * Update memory accounting for the restored change. We need to do this
3038  * although we don't check the memory limit when restoring the changes in
3039  * this branch (we only do that when initially queueing the changes after
3040  * decoding), because we will release the changes later, and that will
3041  * update the accounting too (subtracting the size from the counters). And
3042  * we don't want to underflow there.
3043  */
3044  ReorderBufferChangeMemoryUpdate(rb, change, true);
3045 }
3046 
3047 /*
3048  * Remove all on-disk stored for the passed in transaction.
3049  */
3050 static void
3052 {
3053  XLogSegNo first;
3054  XLogSegNo cur;
3055  XLogSegNo last;
3056 
3059 
3060  XLByteToSeg(txn->first_lsn, first, wal_segment_size);
3061  XLByteToSeg(txn->final_lsn, last, wal_segment_size);
3062 
3063  /* iterate over all possible filenames, and delete them */
3064  for (cur = first; cur <= last; cur++)
3065  {
3066  char path[MAXPGPATH];
3067 
3069  if (unlink(path) != 0 && errno != ENOENT)
3070  ereport(ERROR,
3072  errmsg("could not remove file \"%s\": %m", path)));
3073  }
3074 }
3075 
3076 /*
3077  * Remove any leftover serialized reorder buffers from a slot directory after a
3078  * prior crash or decoding session exit.
3079  */
3080 static void
3082 {
3083  DIR *spill_dir;
3084  struct dirent *spill_de;
3085  struct stat statbuf;
3086  char path[MAXPGPATH * 2 + 12];
3087 
3088  sprintf(path, "pg_replslot/%s", slotname);
3089 
3090  /* we're only handling directories here, skip if it's not ours */
3091  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
3092  return;
3093 
3094  spill_dir = AllocateDir(path);
3095  while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
3096  {
3097  /* only look at names that can be ours */
3098  if (strncmp(spill_de->d_name, "xid", 3) == 0)
3099  {
3100  snprintf(path, sizeof(path),
3101  "pg_replslot/%s/%s", slotname,
3102  spill_de->d_name);
3103 
3104  if (unlink(path) != 0)
3105  ereport(ERROR,
3107  errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
3108  path, slotname)));
3109  }
3110  }
3111  FreeDir(spill_dir);
3112 }
3113 
3114 /*
3115  * Given a replication slot, transaction ID and segment number, fill in the
3116  * corresponding spill file into 'path', which is a caller-owned buffer of size
3117  * at least MAXPGPATH.
3118  */
3119 static void
3121  XLogSegNo segno)
3122 {
3123  XLogRecPtr recptr;
3124 
3125  XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
3126 
3127  snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.spill",
3129  xid,
3130  (uint32) (recptr >> 32), (uint32) recptr);
3131 }
3132 
3133 /*
3134  * Delete all data spilled to disk after we've restarted/crashed. It will be
3135  * recreated when the respective slots are reused.
3136  */
3137 void
3139 {
3140  DIR *logical_dir;
3141  struct dirent *logical_de;
3142 
3143  logical_dir = AllocateDir("pg_replslot");
3144  while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
3145  {
3146  if (strcmp(logical_de->d_name, ".") == 0 ||
3147  strcmp(logical_de->d_name, "..") == 0)
3148  continue;
3149 
3150  /* if it cannot be a slot, skip the directory */
3151  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
3152  continue;
3153 
3154  /*
3155  * ok, has to be a surviving logical slot, iterate and delete
3156  * everything starting with xid-*
3157  */
3159  }
3160  FreeDir(logical_dir);
3161 }
3162 
3163 /* ---------------------------------------
3164  * toast reassembly support
3165  * ---------------------------------------
3166  */
3167 
3168 /*
3169  * Initialize per tuple toast reconstruction support.
3170  */
3171 static void
3173 {
3174  HASHCTL hash_ctl;
3175 
3176  Assert(txn->toast_hash == NULL);
3177 
3178  memset(&hash_ctl, 0, sizeof(hash_ctl));
3179  hash_ctl.keysize = sizeof(Oid);
3180  hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
3181  hash_ctl.hcxt = rb->context;
3182  txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
3184 }
3185 
3186 /*
3187  * Per toast-chunk handling for toast reconstruction
3188  *
3189  * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
3190  * toasted Datum comes along.
3191  */
3192 static void
3194  Relation relation, ReorderBufferChange *change)
3195 {
3196  ReorderBufferToastEnt *ent;
3197  ReorderBufferTupleBuf *newtup;
3198  bool found;
3199  int32 chunksize;
3200  bool isnull;
3201  Pointer chunk;
3202  TupleDesc desc = RelationGetDescr(relation);
3203  Oid chunk_id;
3204  int32 chunk_seq;
3205 
3206  if (txn->toast_hash == NULL)
3207  ReorderBufferToastInitHash(rb, txn);
3208 
3209  Assert(IsToastRelation(relation));
3210 
3211  newtup = change->data.tp.newtuple;
3212  chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
3213  Assert(!isnull);
3214  chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
3215  Assert(!isnull);
3216 
3217  ent = (ReorderBufferToastEnt *)
3218  hash_search(txn->toast_hash,
3219  (void *) &chunk_id,
3220  HASH_ENTER,
3221  &found);
3222 
3223  if (!found)
3224  {
3225  Assert(ent->chunk_id == chunk_id);
3226  ent->num_chunks = 0;
3227  ent->last_chunk_seq = 0;
3228  ent->size = 0;
3229  ent->reconstructed = NULL;
3230  dlist_init(&ent->chunks);
3231 
3232  if (chunk_seq != 0)
3233  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
3234  chunk_seq, chunk_id);
3235  }
3236  else if (found && chunk_seq != ent->last_chunk_seq + 1)
3237  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
3238  chunk_seq, chunk_id, ent->last_chunk_seq + 1);
3239 
3240  chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
3241  Assert(!isnull);
3242 
3243  /* calculate size so we can allocate the right size at once later */
3244  if (!VARATT_IS_EXTENDED(chunk))
3245  chunksize = VARSIZE(chunk) - VARHDRSZ;
3246  else if (VARATT_IS_SHORT(chunk))
3247  /* could happen due to heap_form_tuple doing its thing */
3248  chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
3249  else
3250  elog(ERROR, "unexpected type of toast chunk");
3251 
3252  ent->size += chunksize;
3253  ent->last_chunk_seq = chunk_seq;
3254  ent->num_chunks++;
3255  dlist_push_tail(&ent->chunks, &change->node);
3256 }
3257 
3258 /*
3259  * Rejigger change->newtuple to point to in-memory toast tuples instead to
3260  * on-disk toast tuples that may not longer exist (think DROP TABLE or VACUUM).
3261  *
3262  * We cannot replace unchanged toast tuples though, so those will still point
3263  * to on-disk toast data.
3264  *
3265  * While updating the existing change with detoasted tuple data, we need to
3266  * update the memory accounting info, because the change size will differ.
3267  * Otherwise the accounting may get out of sync, triggering serialization
3268  * at unexpected times.
3269  *
3270  * We simply subtract size of the change before rejiggering the tuple, and
3271  * then adding the new size. This makes it look like the change was removed
3272  * and then added back, except it only tweaks the accounting info.
3273  *
3274  * In particular it can't trigger serialization, which would be pointless
3275  * anyway as it happens during commit processing right before handing
3276  * the change to the output plugin.
3277  */
3278 static void
3280  Relation relation, ReorderBufferChange *change)
3281 {
3282  TupleDesc desc;
3283  int natt;
3284  Datum *attrs;
3285  bool *isnull;
3286  bool *free;
3287  HeapTuple tmphtup;
3288  Relation toast_rel;
3289  TupleDesc toast_desc;
3290  MemoryContext oldcontext;
3291  ReorderBufferTupleBuf *newtup;
3292 
3293  /* no toast tuples changed */
3294  if (txn->toast_hash == NULL)
3295  return;
3296 
3297  /*
3298  * We're going to modify the size of the change, so to make sure the
3299  * accounting is correct we'll make it look like we're removing the change
3300  * now (with the old size), and then re-add it at the end.
3301  */
3302  ReorderBufferChangeMemoryUpdate(rb, change, false);
3303 
3304  oldcontext = MemoryContextSwitchTo(rb->context);
3305 
3306  /* we should only have toast tuples in an INSERT or UPDATE */
3307  Assert(change->data.tp.newtuple);
3308 
3309  desc = RelationGetDescr(relation);
3310 
3311  toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
3312  if (!RelationIsValid(toast_rel))
3313  elog(ERROR, "could not open relation with OID %u",
3314  relation->rd_rel->reltoastrelid);
3315 
3316  toast_desc = RelationGetDescr(toast_rel);
3317 
3318  /* should we allocate from stack instead? */
3319  attrs = palloc0(sizeof(Datum) * desc->natts);
3320  isnull = palloc0(sizeof(bool) * desc->natts);
3321  free = palloc0(sizeof(bool) * desc->natts);
3322 
3323  newtup = change->data.tp.newtuple;
3324 
3325  heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
3326 
3327  for (natt = 0; natt < desc->natts; natt++)
3328  {
3329  Form_pg_attribute attr = TupleDescAttr(desc, natt);
3330  ReorderBufferToastEnt *ent;
3331  struct varlena *varlena;
3332 
3333  /* va_rawsize is the size of the original datum -- including header */
3334  struct varatt_external toast_pointer;
3335  struct varatt_indirect redirect_pointer;
3336  struct varlena *new_datum = NULL;
3337  struct varlena *reconstructed;
3338  dlist_iter it;
3339  Size data_done = 0;
3340 
3341  /* system columns aren't toasted */
3342  if (attr->attnum < 0)
3343  continue;
3344 
3345  if (attr->attisdropped)
3346  continue;
3347 
3348  /* not a varlena datatype */
3349  if (attr->attlen != -1)
3350  continue;
3351 
3352  /* no data */
3353  if (isnull[natt])
3354  continue;
3355 
3356  /* ok, we know we have a toast datum */
3357  varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
3358 
3359  /* no need to do anything if the tuple isn't external */
3360  if (!VARATT_IS_EXTERNAL(varlena))
3361  continue;
3362 
3363  VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
3364 
3365  /*
3366  * Check whether the toast tuple changed, replace if so.
3367  */
3368  ent = (ReorderBufferToastEnt *)
3369  hash_search(txn->toast_hash,
3370  (void *) &toast_pointer.va_valueid,
3371  HASH_FIND,
3372  NULL);
3373  if (ent == NULL)
3374  continue;
3375 
3376  new_datum =
3377  (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
3378 
3379  free[natt] = true;
3380 
3381  reconstructed = palloc0(toast_pointer.va_rawsize);
3382 
3383  ent->reconstructed = reconstructed;
3384 
3385  /* stitch toast tuple back together from its parts */
3386  dlist_foreach(it, &ent->chunks)
3387  {
3388  bool isnull;
3389  ReorderBufferChange *cchange;
3390  ReorderBufferTupleBuf *ctup;
3391  Pointer chunk;
3392 
3393  cchange = dlist_container(ReorderBufferChange, node, it.cur);
3394  ctup = cchange->data.tp.newtuple;
3395  chunk = DatumGetPointer(
3396  fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
3397 
3398  Assert(!isnull);
3399  Assert(!VARATT_IS_EXTERNAL(chunk));
3400  Assert(!VARATT_IS_SHORT(chunk));
3401 
3402  memcpy(VARDATA(reconstructed) + data_done,
3403  VARDATA(chunk),
3404  VARSIZE(chunk) - VARHDRSZ);
3405  data_done += VARSIZE(chunk) - VARHDRSZ;
3406  }
3407  Assert(data_done == toast_pointer.va_extsize);
3408 
3409  /* make sure its marked as compressed or not */
3410  if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
3411  SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
3412  else
3413  SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
3414 
3415  memset(&redirect_pointer, 0, sizeof(redirect_pointer));
3416  redirect_pointer.pointer = reconstructed;
3417 
3419  memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
3420  sizeof(redirect_pointer));
3421 
3422  attrs[natt] = PointerGetDatum(new_datum);
3423  }
3424 
3425  /*
3426  * Build tuple in separate memory & copy tuple back into the tuplebuf
3427  * passed to the output plugin. We can't directly heap_fill_tuple() into
3428  * the tuplebuf because attrs[] will point back into the current content.
3429  */
3430  tmphtup = heap_form_tuple(desc, attrs, isnull);
3431  Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
3432  Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
3433 
3434  memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
3435  newtup->tuple.t_len = tmphtup->t_len;
3436 
3437  /*
3438  * free resources we won't further need, more persistent stuff will be
3439  * free'd in ReorderBufferToastReset().
3440  */
3441  RelationClose(toast_rel);
3442  pfree(tmphtup);
3443  for (natt = 0; natt < desc->natts; natt++)
3444  {
3445  if (free[natt])
3446  pfree(DatumGetPointer(attrs[natt]));
3447  }
3448  pfree(attrs);
3449  pfree(free);
3450  pfree(isnull);
3451 
3452  MemoryContextSwitchTo(oldcontext);
3453 
3454  /* now add the change back, with the correct size */
3455  ReorderBufferChangeMemoryUpdate(rb, change, true);
3456 }
3457 
3458 /*
3459  * Free all resources allocated for toast reconstruction.
3460  */
3461 static void
3463 {
3464  HASH_SEQ_STATUS hstat;
3465  ReorderBufferToastEnt *ent;
3466 
3467  if (txn->toast_hash == NULL)
3468  return;
3469 
3470  /* sequentially walk over the hash and free everything */
3471  hash_seq_init(&hstat, txn->toast_hash);
3472  while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
3473  {
3474  dlist_mutable_iter it;
3475 
3476  if (ent->reconstructed != NULL)
3477  pfree(ent->reconstructed);
3478 
3479  dlist_foreach_modify(it, &ent->chunks)
3480  {
3481  ReorderBufferChange *change =
3483 
3484  dlist_delete(&change->node);
3485  ReorderBufferReturnChange(rb, change);
3486  }
3487  }
3488 
3489  hash_destroy(txn->toast_hash);
3490  txn->toast_hash = NULL;
3491 }
3492 
3493 
3494 /* ---------------------------------------
3495  * Visibility support for logical decoding
3496  *
3497  *
3498  * Lookup actual cmin/cmax values when using decoding snapshot. We can't
3499  * always rely on stored cmin/cmax values because of two scenarios:
3500  *
3501  * * A tuple got changed multiple times during a single transaction and thus
3502  * has got a combocid. Combocid's are only valid for the duration of a
3503  * single transaction.
3504  * * A tuple with a cmin but no cmax (and thus no combocid) got
3505  * deleted/updated in another transaction than the one which created it
3506  * which we are looking at right now. As only one of cmin, cmax or combocid
3507  * is actually stored in the heap we don't have access to the value we
3508  * need anymore.
3509  *
3510  * To resolve those problems we have a per-transaction hash of (cmin,
3511  * cmax) tuples keyed by (relfilenode, ctid) which contains the actual
3512  * (cmin, cmax) values. That also takes care of combocids by simply
3513  * not caring about them at all. As we have the real cmin/cmax values
3514  * combocids aren't interesting.
3515  *
3516  * As we only care about catalog tuples here the overhead of this
3517  * hashtable should be acceptable.
3518  *
3519  * Heap rewrites complicate this a bit, check rewriteheap.c for
3520  * details.
3521  * -------------------------------------------------------------------------
3522  */
3523 
3524 /* struct for sorting mapping files by LSN efficiently */
3525 typedef struct RewriteMappingFile
3526 {
3528  char fname[MAXPGPATH];
3530 
3531 #ifdef NOT_USED
3532 static void
3533 DisplayMapping(HTAB *tuplecid_data)
3534 {
3535  HASH_SEQ_STATUS hstat;
3537 
3538  hash_seq_init(&hstat, tuplecid_data);
3539  while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
3540  {
3541  elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
3542  ent->key.relnode.dbNode,
3543  ent->key.relnode.spcNode,
3544  ent->key.relnode.relNode,
3547  ent->cmin,
3548  ent->cmax
3549  );
3550  }
3551 }
3552 #endif
3553 
3554 /*
3555  * Apply a single mapping file to tuplecid_data.
3556  *
3557  * The mapping file has to have been verified to be a) committed b) for our
3558  * transaction c) applied in LSN order.
3559  */
3560 static void
3561 ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
3562 {
3563  char path[MAXPGPATH];
3564  int fd;
3565  int readBytes;
3567 
3568  sprintf(path, "pg_logical/mappings/%s", fname);
3569  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
3570  if (fd < 0)
3571  ereport(ERROR,
3573  errmsg("could not open file \"%s\": %m", path)));
3574 
3575  while (true)
3576  {
3579  ReorderBufferTupleCidEnt *new_ent;
3580  bool found;
3581 
3582  /* be careful about padding */
3583  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
3584 
3585  /* read all mappings till the end of the file */
3587  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
3589 
3590  if (readBytes < 0)
3591  ereport(ERROR,
3593  errmsg("could not read file \"%s\": %m",
3594  path)));
3595  else if (readBytes == 0) /* EOF */
3596  break;
3597  else if (readBytes != sizeof(LogicalRewriteMappingData))
3598  ereport(ERROR,
3600  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
3601  path, readBytes,
3602  (int32) sizeof(LogicalRewriteMappingData))));
3603 
3604  key.relnode = map.old_node;
3605  ItemPointerCopy(&map.old_tid,
3606  &key.tid);
3607 
3608 
3609  ent = (ReorderBufferTupleCidEnt *)
3610  hash_search(tuplecid_data,
3611  (void *) &key,
3612  HASH_FIND,
3613  NULL);
3614 
3615  /* no existing mapping, no need to update */
3616  if (!ent)
3617  continue;
3618 
3619  key.relnode = map.new_node;
3620  ItemPointerCopy(&map.new_tid,
3621  &key.tid);
3622 
3623  new_ent = (ReorderBufferTupleCidEnt *)
3624  hash_search(tuplecid_data,
3625  (void *) &key,
3626  HASH_ENTER,
3627  &found);
3628 
3629  if (found)
3630  {
3631  /*
3632  * Make sure the existing mapping makes sense. We sometime update
3633  * old records that did not yet have a cmax (e.g. pg_class' own
3634  * entry while rewriting it) during rewrites, so allow that.
3635  */
3636  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
3637  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
3638  }
3639  else
3640  {
3641  /* update mapping */
3642  new_ent->cmin = ent->cmin;
3643  new_ent->cmax = ent->cmax;
3644  new_ent->combocid = ent->combocid;
3645  }
3646  }
3647 
3648  if (CloseTransientFile(fd) != 0)
3649  ereport(ERROR,
3651  errmsg("could not close file \"%s\": %m", path)));
3652 }
3653 
3654 
3655 /*
3656  * Check whether the TransactionId 'xid' is in the pre-sorted array 'xip'.
3657  */
3658 static bool
3660 {
3661  return bsearch(&xid, xip, num,
3662  sizeof(TransactionId), xidComparator) != NULL;
3663 }
3664 
3665 /*
3666  * list_sort() comparator for sorting RewriteMappingFiles in LSN order.
3667  */
3668 static int
3669 file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
3670 {
3673 
3674  if (a->lsn < b->lsn)
3675  return -1;
3676  else if (a->lsn > b->lsn)
3677  return 1;
3678  return 0;
3679 }
3680 
3681 /*
3682  * Apply any existing logical remapping files if there are any targeted at our
3683  * transaction for relid.
3684  */
3685 static void
3687 {
3688  DIR *mapping_dir;
3689  struct dirent *mapping_de;
3690  List *files = NIL;
3691  ListCell *file;
3692  Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
3693 
3694  mapping_dir = AllocateDir("pg_logical/mappings");
3695  while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
3696  {
3697  Oid f_dboid;
3698  Oid f_relid;
3699  TransactionId f_mapped_xid;
3700  TransactionId f_create_xid;
3701  XLogRecPtr f_lsn;
3702  uint32 f_hi,
3703  f_lo;
3704  RewriteMappingFile *f;
3705 
3706  if (strcmp(mapping_de->d_name, ".") == 0 ||
3707  strcmp(mapping_de->d_name, "..") == 0)
3708  continue;
3709 
3710  /* Ignore files that aren't ours */
3711  if (strncmp(mapping_de->d_name, "map-", 4) != 0)
3712  continue;
3713 
3714  if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
3715  &f_dboid, &f_relid, &f_hi, &f_lo,
3716  &f_mapped_xid, &f_create_xid) != 6)
3717  elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
3718 
3719  f_lsn = ((uint64) f_hi) << 32 | f_lo;
3720 
3721  /* mapping for another database */
3722  if (f_dboid != dboid)
3723  continue;
3724 
3725  /* mapping for another relation */
3726  if (f_relid != relid)
3727  continue;
3728 
3729  /* did the creating transaction abort? */
3730  if (!TransactionIdDidCommit(f_create_xid))
3731  continue;
3732 
3733  /* not for our transaction */
3734  if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
3735  continue;
3736 
3737  /* ok, relevant, queue for apply */
3738  f = palloc(sizeof(RewriteMappingFile));
3739  f->lsn = f_lsn;
3740  strcpy(f->fname, mapping_de->d_name);
3741  files = lappend(files, f);
3742  }
3743  FreeDir(mapping_dir);
3744 
3745  /* sort files so we apply them in LSN order */
3746  list_sort(files, file_sort_by_lsn);
3747 
3748  foreach(file, files)
3749  {
3751 
3752  elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
3753  snapshot->subxip[0]);
3754  ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
3755  pfree(f);
3756  }
3757 }
3758 
3759 /*
3760  * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
3761  * combocids.
3762  */
3763 bool
3765  Snapshot snapshot,
3766  HeapTuple htup, Buffer buffer,
3767  CommandId *cmin, CommandId *cmax)
3768 {
3771  ForkNumber forkno;
3772  BlockNumber blockno;
3773  bool updated_mapping = false;
3774 
3775  /* be careful about padding */
3776  memset(&key, 0, sizeof(key));
3777 
3778  Assert(!BufferIsLocal(buffer));
3779 
3780  /*
3781  * get relfilenode from the buffer, no convenient way to access it other
3782  * than that.
3783  */
3784  BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
3785 
3786  /* tuples can only be in the main fork */
3787  Assert(forkno == MAIN_FORKNUM);
3788  Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
3789 
3790  ItemPointerCopy(&htup->t_self,
3791  &key.tid);
3792 
3793 restart:
3794  ent = (ReorderBufferTupleCidEnt *)
3795  hash_search(tuplecid_data,
3796  (void *) &key,
3797  HASH_FIND,
3798  NULL);
3799 
3800  /*
3801  * failed to find a mapping, check whether the table was rewritten and
3802  * apply mapping if so, but only do that once - there can be no new
3803  * mappings while we are in here since we have to hold a lock on the
3804  * relation.
3805  */
3806  if (ent == NULL && !updated_mapping)
3807  {
3808  UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
3809  /* now check but don't update for a mapping again */
3810  updated_mapping = true;
3811  goto restart;
3812  }
3813  else if (ent == NULL)
3814  return false;
3815 
3816  if (cmin)
3817  *cmin = ent->cmin;
3818  if (cmax)
3819  *cmax = ent->cmax;
3820  return true;
3821 }
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
XLogRecPtr first_lsn
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
#define NIL
Definition: pg_list.h:65
uint32 CommandId
Definition: c.h:528
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
TimestampTz commit_time
struct ReorderBufferToastEnt ReorderBufferToastEnt
void AbortCurrentTransaction(void)
Definition: xact.c:3162
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
bool IsToastRelation(Relation relation)
Definition: catalog.c:141
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:814
void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
#define relpathperm(rnode, forknum)
Definition: relpath.h:83
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
ReorderBufferApplyChangeCB apply_change
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
#define AllocSetContextCreate
Definition: memutils.h:170
HeapTupleData * HeapTuple
Definition: htup.h:71
dlist_node base_snapshot_node
#define DEBUG1
Definition: elog.h:25
dlist_node * cur
Definition: ilist.h:180
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
void StartupReorderBuffer(void)
#define VARDATA(PTR)
Definition: postgres.h:302
#define fastgetattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:712
static int32 next
Definition: blutils.c:213
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
int wal_segment_size
Definition: xlog.c:112
void ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
uint32 TransactionId
Definition: c.h:514
void ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
bool copied
Definition: snapshot.h:185
void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
MemoryContext hcxt
Definition: hsearch.h:78
#define DatumGetInt32(X)
Definition: postgres.h:472
#define RelationGetDescr(relation)
Definition: rel.h:448
static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb)
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
#define DEBUG3
Definition: elog.h:23
struct ReorderBufferChange::@101::@102 tp
#define write(a, b, c)
Definition: win32.h:14
#define VARHDRSZ_SHORT
Definition: postgres.h:268
TransactionId by_txn_last_xid
#define VARSIZE(PTR)
Definition: postgres.h:303
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
Definition: detoast.h:32
int64 TimestampTz
Definition: timestamp.h:39
#define PointerGetDatum(X)
Definition: postgres.h:556
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
ReorderBufferTXN * txn
static bool TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
#define VARHDRSZ
Definition: c.h:562
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
struct ReorderBufferChange::@101::@104 msg
XLogRecPtr current_restart_decoding_lsn
#define DatumGetObjectId(X)
Definition: postgres.h:500
char * pstrdup(const char *in)
Definition: mcxt.c:1186
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2584
static ReorderBufferTXN * ReorderBufferGetTXN(ReorderBuffer *rb)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:86
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void ReorderBufferFree(ReorderBuffer *rb)
uint16 RepOriginId
Definition: xlogdefs.h:58
Size entrysize
Definition: hsearch.h:73
struct cursor * cur
Definition: ecpg.c:28
char fname[MAXPGPATH]
int32 va_rawsize
Definition: postgres.h:69
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4653
#define INFO
Definition: elog.h:33
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:187
void binaryheap_replace_first(binaryheap *heap, Datum d)
Definition: binaryheap.c:204
void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileNode node, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
uint32 BlockNumber
Definition: block.h:31
struct ReorderBufferChange::@101::@105 tuplecid
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2051
ReorderBufferCommitCB commit
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:594
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
static int64 files
Definition: pg_checksums.c:34
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:125
ReplicationSlotPersistentData data
Definition: slot.h:132
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, int *fd, XLogSegNo *segno)
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
struct SnapshotData * Snapshot
Definition: snapshot.h:121
Form_pg_class rd_rel
Definition: rel.h:83
unsigned int Oid
Definition: postgres_ext.h:31
XLogRecPtr base_snapshot_lsn
Definition: dirent.h:9
uint32 regd_count
Definition: snapshot.h:199
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
void binaryheap_add_unordered(binaryheap *heap, Datum d)
Definition: binaryheap.c:110
MemoryContext change_context
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define VARDATA_EXTERNAL(PTR)
Definition: postgres.h:310
#define PG_BINARY
Definition: c.h:1222
XLogRecPtr origin_lsn
static void ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
signed int int32
Definition: c.h:347
#define FirstCommandId
Definition: c.h:530
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
HeapTupleHeader t_data
Definition: htup.h:68
#define VARATT_IS_EXTERNAL(PTR)
Definition: postgres.h:313
#define sprintf
Definition: port.h:194
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:174
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:421
Definition: dynahash.c:208
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:38
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:1056
char * Pointer
Definition: c.h:336
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:496
Definition: dirent.c:25
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2292
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:222
#define VARATT_IS_SHORT(PTR)
Definition: postgres.h:326
#define RelationIsValid(relation)
Definition: rel.h:395
dlist_head changes
void ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)
dlist_head txns_by_base_snapshot_lsn
Datum binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:159
#define MAXPGPATH
ItemPointerData t_self
Definition: htup.h:65
ReorderBufferTupleCidKey key
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
#define DEBUG2
Definition: elog.h:24
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:422
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
uint32 t_len
Definition: htup.h:64
#define MaxHeapTupleSize
Definition: htup_details.h:560
struct varlena * reconstructed
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4464
#define SET_VARTAG_EXTERNAL(PTR, tag)
Definition: postgres.h:333
uint64 XLogSegNo
Definition: xlogdefs.h:41
int errcode_for_file_access(void)
Definition: elog.c:631
HeapTupleData tuple
Definition: reorderbuffer.h:29
struct SnapshotData SnapshotData
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:439
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
#define InvalidTransactionId
Definition: transam.h:31
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:200
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
unsigned int uint32
Definition: c.h:359
XLogRecPtr final_lsn
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2503
Oid t_tableOid
Definition: htup.h:66
static int file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1342
void RelationClose(Relation relation)
Definition: relcache.c:2088
TransactionId xmin
Definition: snapshot.h:157
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
#define INDIRECT_POINTER_SIZE
Definition: detoast.h:44
ReorderBufferMessageCB message
#define ereport(elevel, rest)
Definition: elog.h:141
#define AssertArg(condition)
Definition: c.h:741
int bh_size
Definition: binaryheap.h:32
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
TransactionId * xip
Definition: snapshot.h:168
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
#define VARSIZE_SHORT(PTR)
Definition: postgres.h:305
ForkNumber
Definition: relpath.h:40
List * lappend(List *list, void *datum)
Definition: list.c:322
static HTAB * tuplecid_data
Definition: snapmgr.c:172
int CloseTransientFile(int fd)
Definition: fd.c:2469
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define stat(a, b)
Definition: win32_port.h:255
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
#define HASH_BLOBS
Definition: hsearch.h:88
TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb)
ReorderBufferChange * change
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size blockSize)
Definition: generation.c:212
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
MemoryContext context
void ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
void * palloc0(Size size)
Definition: mcxt.c:980
static void dlist_insert_before(dlist_node *before, dlist_node *node)
Definition: ilist.h:346
static bool dlist_has_next(dlist_head *head, dlist_node *node)
Definition: ilist.h:402
#define InvalidCommandId
Definition: c.h:531
uintptr_t Datum
Definition: postgres.h:367
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
ReorderBufferTXN * by_txn_last_txn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void cleanup(void)
Definition: bootstrap.c:901
dlist_head toplevel_by_lsn
struct RewriteMappingFile RewriteMappingFile
Oid MyDatabaseId
Definition: globals.c:85
TransactionId xid
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, bool addition)
bool IsSharedRelation(Oid relationId)
Definition: catalog.c:239
Size keysize
Definition: hsearch.h:72
dlist_node * cur
Definition: ilist.h:161
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:839
static void AssertTXNLsnOrder(ReorderBuffer *rb)
void ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
#define InvalidOid
Definition: postgres_ext.h:36
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
CommandId curcid
Definition: snapshot.h:187
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
struct ReorderBufferIterTXNState ReorderBufferIterTXNState
static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
struct ReorderBufferDiskChange ReorderBufferDiskChange
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:126
#define free(a)
Definition: header.h:65
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
#define PG_CATCH()
Definition: elog.h:332
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
ItemPointerData new_tid
Definition: rewriteheap.h:40
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:739
#define lfirst(lc)
Definition: pg_list.h:190
ReorderBufferTXN * ReorderBufferGetOldestTXN(ReorderBuffer *rb)
union ReorderBufferChange::@101 data
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
Definition: regguts.h:298
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2569
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
int32 va_extsize
Definition: postgres.h:70
XLogRecPtr end_lsn
void StartTransactionCommand(void)
Definition: xact.c:2797
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4359
void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
int logical_decoding_work_mem
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
size_t Size
Definition: c.h:467
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1318
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:69
SharedInvalidationMessage * invalidations
#define BufferIsLocal(buffer)
Definition: buf.h:37
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:221
#define ItemPointerGetOffsetNumber(pointer)
Definition: itemptr.h:117
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
#define PG_RE_THROW()
Definition: elog.h:363
ReorderBuffer * ReorderBufferAllocate(void)
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1389
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1069
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1379
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
struct varlena * pointer
Definition: postgres.h:86
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:428
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
dlist_head subtxns
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:33
#define VARATT_IS_EXTENDED(PTR)
Definition: postgres.h:327
#define S_ISDIR(m)
Definition: win32_port.h:296
#define DatumGetPointer(X)
Definition: postgres.h:549
#define lstat(path, sb)
Definition: win32_port.h:244
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:1249
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:556
#define Int32GetDatum(X)
Definition: postgres.h:479
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
uint32 xcnt
Definition: snapshot.h:169
void * palloc(Size size)
Definition: mcxt.c:949
void list_sort(List *list, list_sort_comparator cmp)
Definition: list.c:1482
int errmsg(const char *fmt,...)
Definition: elog.c:822
static void UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
ReorderBufferApplyTruncateCB apply_truncate
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:796
ReorderBufferTXN * txn
void ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2035
#define elog(elevel,...)
Definition: elog.h:228
Oid * ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
int i
XLogRecPtr restart_decoding_lsn
#define NameStr(name)
Definition: c.h:616
#define SET_VARSIZE_COMPRESSED(PTR, len)
Definition: postgres.h:331
#define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)
Definition: detoast.h:22
void * arg
Datum binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:174
static const Size max_changes_in_memory
Definition: c.h:556
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
MemoryContext tup_context
#define SET_VARSIZE(PTR, len)
Definition: postgres.h:329
char d_name[MAX_PATH]
Definition: dirent.h:14
#define ItemPointerGetBlockNumber(pointer)
Definition: itemptr.h:98
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define qsort(a, b, c, d)
Definition: port.h:488
#define TransactionIdIsValid(xid)
Definition: transam.h:41
ReorderBufferChange change
ReorderBufferBeginCB begin
static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
void BufferGetTag(Buffer buffer, RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
Definition: bufmgr.c:2634
#define PG_TRY()
Definition: elog.h:322
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
Definition: pg_list.h:50
#define snprintf
Definition: port.h:192
int Buffer
Definition: buf.h:23
static ReorderBufferIterTXNState * ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:1989
#define PG_END_TRY()
Definition: elog.h:347
#define read(a, b, c)
Definition: win32.h:13
int FreeDir(DIR *dir)
Definition: fd.c:2621
struct HeapTupleData HeapTupleData
TransactionId toplevel_xid
#define offsetof(type, field)
Definition: c.h:662
MemoryContext txn_context
dlist_head tuplecids
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:161
TransactionId * subxip
Definition: snapshot.h:180
uint32 active_count
Definition: snapshot.h:198
struct ReorderBufferChange::@101::@103 truncate
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:138
struct ReorderBufferChange ReorderBufferChange
int32 subxcnt
Definition: snapshot.h:181
void ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
ReorderBufferTupleBuf * ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
bool ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
ItemPointerData old_tid
Definition: rewriteheap.h:39