PostgreSQL Source Code  git master
reorderbuffer.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * reorderbuffer.c
4  * PostgreSQL logical replay/reorder buffer management
5  *
6  *
7  * Copyright (c) 2012-2019, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/replication/reorderbuffer.c
12  *
13  * NOTES
14  * This module gets handed individual pieces of transactions in the order
15  * they are written to the WAL and is responsible to reassemble them into
16  * toplevel transaction sized pieces. When a transaction is completely
17  * reassembled - signalled by reading the transaction commit record - it
18  * will then call the output plugin (cf. ReorderBufferCommit()) with the
19  * individual changes. The output plugins rely on snapshots built by
20  * snapbuild.c which hands them to us.
21  *
22  * Transactions and subtransactions/savepoints in postgres are not
23  * immediately linked to each other from outside the performing
24  * backend. Only at commit/abort (or special xact_assignment records) they
25  * are linked together. Which means that we will have to splice together a
26  * toplevel transaction from its subtransactions. To do that efficiently we
27  * build a binary heap indexed by the smallest current lsn of the individual
28  * subtransactions' changestreams. As the individual streams are inherently
29  * ordered by LSN - since that is where we build them from - the transaction
30  * can easily be reassembled by always using the subtransaction with the
31  * smallest current LSN from the heap.
32  *
33  * In order to cope with large transactions - which can be several times as
34  * big as the available memory - this module supports spooling the contents
35  * of a large transactions to disk. When the transaction is replayed the
36  * contents of individual (sub-)transactions will be read from disk in
37  * chunks.
38  *
39  * This module also has to deal with reassembling toast records from the
40  * individual chunks stored in WAL. When a new (or initial) version of a
41  * tuple is stored in WAL it will always be preceded by the toast chunks
42  * emitted for the columns stored out of line. Within a single toplevel
43  * transaction there will be no other data carrying records between a row's
44  * toast chunks and the row data itself. See ReorderBufferToast* for
45  * details.
46  *
47  * ReorderBuffer uses two special memory context types - SlabContext for
48  * allocations of fixed-length structures (changes and transactions), and
49  * GenerationContext for the variable-length transaction data (allocated
50  * and freed in groups with similar lifespan).
51  *
52  * -------------------------------------------------------------------------
53  */
54 #include "postgres.h"
55 
56 #include <unistd.h>
57 #include <sys/stat.h>
58 
59 #include "access/heapam.h"
60 #include "access/rewriteheap.h"
61 #include "access/transam.h"
62 #include "access/tuptoaster.h"
63 #include "access/xact.h"
64 #include "access/xlog_internal.h"
65 #include "catalog/catalog.h"
66 #include "lib/binaryheap.h"
67 #include "miscadmin.h"
68 #include "pgstat.h"
69 #include "replication/logical.h"
71 #include "replication/slot.h"
72 #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
73 #include "storage/bufmgr.h"
74 #include "storage/fd.h"
75 #include "storage/sinval.h"
76 #include "utils/builtins.h"
77 #include "utils/combocid.h"
78 #include "utils/memdebug.h"
79 #include "utils/memutils.h"
80 #include "utils/rel.h"
81 #include "utils/relfilenodemap.h"
82 
83 
84 /* entry for a hash table we use to map from xid to our transaction state */
86 {
90 
91 /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
93 {
97 
99 {
103  CommandId combocid; /* just for debugging */
105 
106 /* k-way in-order change iteration support structures */
108 {
112  int fd;
115 
117 {
121  ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER];
123 
124 /* toast datastructures */
125 typedef struct ReorderBufferToastEnt
126 {
127  Oid chunk_id; /* toast_table.chunk_id */
128  int32 last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
129  * have seen */
130  Size num_chunks; /* number of chunks we've already seen */
131  Size size; /* combined size of chunks seen */
132  dlist_head chunks; /* linked list of chunks */
133  struct varlena *reconstructed; /* reconstructed varlena now pointed to in
134  * main tup */
136 
137 /* Disk serialization support datastructures */
139 {
142  /* data follows */
144 
145 /*
146  * Maximum number of changes kept in memory, per transaction. After that,
147  * changes are spooled to disk.
148  *
149  * The current value should be sufficient to decode the entire transaction
150  * without hitting disk in OLTP workloads, while starting to spool to disk in
151  * other workloads reasonably fast.
152  *
153  * At some point in the future it probably makes sense to have a more elaborate
154  * resource management here, but it's not entirely clear what that would look
155  * like.
156  */
157 static const Size max_changes_in_memory = 4096;
158 
159 /* ---------------------------------------
160  * primary reorderbuffer support routines
161  * ---------------------------------------
162  */
166  TransactionId xid, bool create, bool *is_new,
167  XLogRecPtr lsn, bool create_as_top);
169  ReorderBufferTXN *subtxn);
170 
171 static void AssertTXNLsnOrder(ReorderBuffer *rb);
172 
173 /* ---------------------------------------
174  * support functions for lsn-order iterating over the ->changes of a
175  * transaction and its subtransactions
176  *
177  * used for iteration over the k-way heap merge of a transaction and its
178  * subtransactions
179  * ---------------------------------------
180  */
186 
187 /*
188  * ---------------------------------------
189  * Disk serialization support functions
190  * ---------------------------------------
191  */
195  int fd, ReorderBufferChange *change);
197  int *fd, XLogSegNo *segno);
199  char *change);
201 static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
202 static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
203  TransactionId xid, XLogSegNo segno);
204 
205 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
208 
209 /* ---------------------------------------
210  * toast reassembly support
211  * ---------------------------------------
212  */
216  Relation relation, ReorderBufferChange *change);
218  Relation relation, ReorderBufferChange *change);
219 
220 
221 /*
222  * Allocate a new ReorderBuffer and clean out any old serialized state from
223  * prior ReorderBuffer instances for the same slot.
224  */
227 {
228  ReorderBuffer *buffer;
229  HASHCTL hash_ctl;
230  MemoryContext new_ctx;
231 
232  Assert(MyReplicationSlot != NULL);
233 
234  /* allocate memory in own context, to have better accountability */
236  "ReorderBuffer",
238 
239  buffer =
240  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
241 
242  memset(&hash_ctl, 0, sizeof(hash_ctl));
243 
244  buffer->context = new_ctx;
245 
246  buffer->change_context = SlabContextCreate(new_ctx,
247  "Change",
249  sizeof(ReorderBufferChange));
250 
251  buffer->txn_context = SlabContextCreate(new_ctx,
252  "TXN",
254  sizeof(ReorderBufferTXN));
255 
256  buffer->tup_context = GenerationContextCreate(new_ctx,
257  "Tuples",
259 
260  hash_ctl.keysize = sizeof(TransactionId);
261  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
262  hash_ctl.hcxt = buffer->context;
263 
264  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
266 
268  buffer->by_txn_last_txn = NULL;
269 
270  buffer->outbuf = NULL;
271  buffer->outbufsize = 0;
272 
274 
275  dlist_init(&buffer->toplevel_by_lsn);
277 
278  /*
279  * Ensure there's no stale data from prior uses of this slot, in case some
280  * prior exit avoided calling ReorderBufferFree. Failure to do this can
281  * produce duplicated txns, and it's very cheap if there's nothing there.
282  */
284 
285  return buffer;
286 }
287 
288 /*
289  * Free a ReorderBuffer
290  */
291 void
293 {
294  MemoryContext context = rb->context;
295 
296  /*
297  * We free separately allocated data by entirely scrapping reorderbuffer's
298  * memory context.
299  */
300  MemoryContextDelete(context);
301 
302  /* Free disk space used by unconsumed reorder buffers */
304 }
305 
306 /*
307  * Get an unused, possibly preallocated, ReorderBufferTXN.
308  */
309 static ReorderBufferTXN *
311 {
313 
314  txn = (ReorderBufferTXN *)
316 
317  memset(txn, 0, sizeof(ReorderBufferTXN));
318 
319  dlist_init(&txn->changes);
320  dlist_init(&txn->tuplecids);
321  dlist_init(&txn->subtxns);
322 
323  return txn;
324 }
325 
326 /*
327  * Free a ReorderBufferTXN.
328  */
329 static void
331 {
332  /* clean the lookup cache if we were cached (quite likely) */
333  if (rb->by_txn_last_xid == txn->xid)
334  {
336  rb->by_txn_last_txn = NULL;
337  }
338 
339  /* free data that's contained */
340 
341  if (txn->tuplecid_hash != NULL)
342  {
344  txn->tuplecid_hash = NULL;
345  }
346 
347  if (txn->invalidations)
348  {
349  pfree(txn->invalidations);
350  txn->invalidations = NULL;
351  }
352 
353  pfree(txn);
354 }
355 
356 /*
357  * Get an fresh ReorderBufferChange.
358  */
361 {
362  ReorderBufferChange *change;
363 
364  change = (ReorderBufferChange *)
366 
367  memset(change, 0, sizeof(ReorderBufferChange));
368  return change;
369 }
370 
371 /*
372  * Free an ReorderBufferChange.
373  */
374 void
376 {
377  /* free contained data */
378  switch (change->action)
379  {
384  if (change->data.tp.newtuple)
385  {
386  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
387  change->data.tp.newtuple = NULL;
388  }
389 
390  if (change->data.tp.oldtuple)
391  {
392  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
393  change->data.tp.oldtuple = NULL;
394  }
395  break;
397  if (change->data.msg.prefix != NULL)
398  pfree(change->data.msg.prefix);
399  change->data.msg.prefix = NULL;
400  if (change->data.msg.message != NULL)
401  pfree(change->data.msg.message);
402  change->data.msg.message = NULL;
403  break;
405  if (change->data.snapshot)
406  {
407  ReorderBufferFreeSnap(rb, change->data.snapshot);
408  change->data.snapshot = NULL;
409  }
410  break;
411  /* no data in addition to the struct itself */
413  if (change->data.truncate.relids != NULL)
414  {
415  ReorderBufferReturnRelids(rb, change->data.truncate.relids);
416  change->data.truncate.relids = NULL;
417  }
418  break;
422  break;
423  }
424 
425  pfree(change);
426 }
427 
428 /*
429  * Get a fresh ReorderBufferTupleBuf fitting at least a tuple of size
430  * tuple_len (excluding header overhead).
431  */
434 {
435  ReorderBufferTupleBuf *tuple;
436  Size alloc_len;
437 
438  alloc_len = tuple_len + SizeofHeapTupleHeader;
439 
440  tuple = (ReorderBufferTupleBuf *)
442  sizeof(ReorderBufferTupleBuf) +
443  MAXIMUM_ALIGNOF + alloc_len);
444  tuple->alloc_tuple_size = alloc_len;
445  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
446 
447  return tuple;
448 }
449 
450 /*
451  * Free an ReorderBufferTupleBuf.
452  */
453 void
455 {
456  pfree(tuple);
457 }
458 
459 /*
460  * Get an array for relids of truncated relations.
461  *
462  * We use the global memory context (for the whole reorder buffer), because
463  * none of the existing ones seems like a good match (some are SLAB, so we
464  * can't use those, and tup_context is meant for tuple data, not relids). We
465  * could add yet another context, but it seems like an overkill - TRUNCATE is
466  * not particularly common operation, so it does not seem worth it.
467  */
468 Oid *
470 {
471  Oid *relids;
472  Size alloc_len;
473 
474  alloc_len = sizeof(Oid) * nrelids;
475 
476  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
477 
478  return relids;
479 }
480 
481 /*
482  * Free an array of relids.
483  */
484 void
486 {
487  pfree(relids);
488 }
489 
490 /*
491  * Return the ReorderBufferTXN from the given buffer, specified by Xid.
492  * If create is true, and a transaction doesn't already exist, create it
493  * (with the given LSN, and as top transaction if that's specified);
494  * when this happens, is_new is set to true.
495  */
496 static ReorderBufferTXN *
498  bool *is_new, XLogRecPtr lsn, bool create_as_top)
499 {
502  bool found;
503 
505 
506  /*
507  * Check the one-entry lookup cache first
508  */
510  rb->by_txn_last_xid == xid)
511  {
512  txn = rb->by_txn_last_txn;
513 
514  if (txn != NULL)
515  {
516  /* found it, and it's valid */
517  if (is_new)
518  *is_new = false;
519  return txn;
520  }
521 
522  /*
523  * cached as non-existent, and asked not to create? Then nothing else
524  * to do.
525  */
526  if (!create)
527  return NULL;
528  /* otherwise fall through to create it */
529  }
530 
531  /*
532  * If the cache wasn't hit or it yielded an "does-not-exist" and we want
533  * to create an entry.
534  */
535 
536  /* search the lookup table */
537  ent = (ReorderBufferTXNByIdEnt *)
538  hash_search(rb->by_txn,
539  (void *) &xid,
540  create ? HASH_ENTER : HASH_FIND,
541  &found);
542  if (found)
543  txn = ent->txn;
544  else if (create)
545  {
546  /* initialize the new entry, if creation was requested */
547  Assert(ent != NULL);
548  Assert(lsn != InvalidXLogRecPtr);
549 
550  ent->txn = ReorderBufferGetTXN(rb);
551  ent->txn->xid = xid;
552  txn = ent->txn;
553  txn->first_lsn = lsn;
555 
556  if (create_as_top)
557  {
558  dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
559  AssertTXNLsnOrder(rb);
560  }
561  }
562  else
563  txn = NULL; /* not found and not asked to create */
564 
565  /* update cache */
566  rb->by_txn_last_xid = xid;
567  rb->by_txn_last_txn = txn;
568 
569  if (is_new)
570  *is_new = !found;
571 
572  Assert(!create || txn != NULL);
573  return txn;
574 }
575 
576 /*
577  * Queue a change into a transaction so it can be replayed upon commit.
578  */
579 void
581  ReorderBufferChange *change)
582 {
584 
585  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
586 
587  change->lsn = lsn;
588  Assert(InvalidXLogRecPtr != lsn);
589  dlist_push_tail(&txn->changes, &change->node);
590  txn->nentries++;
591  txn->nentries_mem++;
592 
594 }
595 
596 /*
597  * Queue message into a transaction so it can be processed upon commit.
598  */
599 void
601  Snapshot snapshot, XLogRecPtr lsn,
602  bool transactional, const char *prefix,
603  Size message_size, const char *message)
604 {
605  if (transactional)
606  {
607  MemoryContext oldcontext;
608  ReorderBufferChange *change;
609 
611 
612  oldcontext = MemoryContextSwitchTo(rb->context);
613 
614  change = ReorderBufferGetChange(rb);
616  change->data.msg.prefix = pstrdup(prefix);
617  change->data.msg.message_size = message_size;
618  change->data.msg.message = palloc(message_size);
619  memcpy(change->data.msg.message, message, message_size);
620 
621  ReorderBufferQueueChange(rb, xid, lsn, change);
622 
623  MemoryContextSwitchTo(oldcontext);
624  }
625  else
626  {
627  ReorderBufferTXN *txn = NULL;
628  volatile Snapshot snapshot_now = snapshot;
629 
630  if (xid != InvalidTransactionId)
631  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
632 
633  /* setup snapshot to allow catalog access */
634  SetupHistoricSnapshot(snapshot_now, NULL);
635  PG_TRY();
636  {
637  rb->message(rb, txn, lsn, false, prefix, message_size, message);
638 
640  }
641  PG_CATCH();
642  {
644  PG_RE_THROW();
645  }
646  PG_END_TRY();
647  }
648 }
649 
650 /*
651  * AssertTXNLsnOrder
652  * Verify LSN ordering of transaction lists in the reorderbuffer
653  *
654  * Other LSN-related invariants are checked too.
655  *
656  * No-op if assertions are not in use.
657  */
658 static void
660 {
661 #ifdef USE_ASSERT_CHECKING
662  dlist_iter iter;
663  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
664  XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
665 
666  dlist_foreach(iter, &rb->toplevel_by_lsn)
667  {
669  iter.cur);
670 
671  /* start LSN must be set */
672  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
673 
674  /* If there is an end LSN, it must be higher than start LSN */
675  if (cur_txn->end_lsn != InvalidXLogRecPtr)
676  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
677 
678  /* Current initial LSN must be strictly higher than previous */
679  if (prev_first_lsn != InvalidXLogRecPtr)
680  Assert(prev_first_lsn < cur_txn->first_lsn);
681 
682  /* known-as-subtxn txns must not be listed */
683  Assert(!cur_txn->is_known_as_subxact);
684 
685  prev_first_lsn = cur_txn->first_lsn;
686  }
687 
689  {
691  base_snapshot_node,
692  iter.cur);
693 
694  /* base snapshot (and its LSN) must be set */
695  Assert(cur_txn->base_snapshot != NULL);
697 
698  /* current LSN must be strictly higher than previous */
699  if (prev_base_snap_lsn != InvalidXLogRecPtr)
700  Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
701 
702  /* known-as-subtxn txns must not be listed */
703  Assert(!cur_txn->is_known_as_subxact);
704 
705  prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
706  }
707 #endif
708 }
709 
710 /*
711  * ReorderBufferGetOldestTXN
712  * Return oldest transaction in reorderbuffer
713  */
716 {
718 
719  AssertTXNLsnOrder(rb);
720 
722  return NULL;
723 
725 
728  return txn;
729 }
730 
731 /*
732  * ReorderBufferGetOldestXmin
733  * Return oldest Xmin in reorderbuffer
734  *
735  * Returns oldest possibly running Xid from the point of view of snapshots
736  * used in the transactions kept by reorderbuffer, or InvalidTransactionId if
737  * there are none.
738  *
739  * Since snapshots are assigned monotonically, this equals the Xmin of the
740  * base snapshot with minimal base_snapshot_lsn.
741  */
744 {
746 
747  AssertTXNLsnOrder(rb);
748 
750  return InvalidTransactionId;
751 
752  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
754  return txn->base_snapshot->xmin;
755 }
756 
757 void
759 {
761 }
762 
763 /*
764  * ReorderBufferAssignChild
765  *
766  * Make note that we know that subxid is a subtransaction of xid, seen as of
767  * the given lsn.
768  */
769 void
771  TransactionId subxid, XLogRecPtr lsn)
772 {
774  ReorderBufferTXN *subtxn;
775  bool new_top;
776  bool new_sub;
777 
778  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
779  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
780 
781  if (new_top && !new_sub)
782  elog(ERROR, "subtransaction logged without previous top-level txn record");
783 
784  if (!new_sub)
785  {
786  if (subtxn->is_known_as_subxact)
787  {
788  /* already associated, nothing to do */
789  return;
790  }
791  else
792  {
793  /*
794  * We already saw this transaction, but initially added it to the
795  * list of top-level txns. Now that we know it's not top-level,
796  * remove it from there.
797  */
798  dlist_delete(&subtxn->node);
799  }
800  }
801 
802  subtxn->is_known_as_subxact = true;
803  subtxn->toplevel_xid = xid;
804  Assert(subtxn->nsubtxns == 0);
805 
806  /* add to subtransaction list */
807  dlist_push_tail(&txn->subtxns, &subtxn->node);
808  txn->nsubtxns++;
809 
810  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
812 
813  /* Verify LSN-ordering invariant */
814  AssertTXNLsnOrder(rb);
815 }
816 
817 /*
818  * ReorderBufferTransferSnapToParent
819  * Transfer base snapshot from subtxn to top-level txn, if needed
820  *
821  * This is done if the top-level txn doesn't have a base snapshot, or if the
822  * subtxn's base snapshot has an earlier LSN than the top-level txn's base
823  * snapshot's LSN. This can happen if there are no changes in the toplevel
824  * txn but there are some in the subtxn, or the first change in subtxn has
825  * earlier LSN than first change in the top-level txn and we learned about
826  * their kinship only now.
827  *
828  * The subtransaction's snapshot is cleared regardless of the transfer
829  * happening, since it's not needed anymore in either case.
830  *
831  * We do this as soon as we become aware of their kinship, to avoid queueing
832  * extra snapshots to txns known-as-subtxns -- only top-level txns will
833  * receive further snapshots.
834  */
835 static void
837  ReorderBufferTXN *subtxn)
838 {
839  Assert(subtxn->toplevel_xid == txn->xid);
840 
841  if (subtxn->base_snapshot != NULL)
842  {
843  if (txn->base_snapshot == NULL ||
844  subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
845  {
846  /*
847  * If the toplevel transaction already has a base snapshot but
848  * it's newer than the subxact's, purge it.
849  */
850  if (txn->base_snapshot != NULL)
851  {
854  }
855 
856  /*
857  * The snapshot is now the top transaction's; transfer it, and
858  * adjust the list position of the top transaction in the list by
859  * moving it to where the subtransaction is.
860  */
861  txn->base_snapshot = subtxn->base_snapshot;
862  txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
864  &txn->base_snapshot_node);
865 
866  /*
867  * The subtransaction doesn't have a snapshot anymore (so it
868  * mustn't be in the list.)
869  */
870  subtxn->base_snapshot = NULL;
873  }
874  else
875  {
876  /* Base snap of toplevel is fine, so subxact's is not needed */
879  subtxn->base_snapshot = NULL;
881  }
882  }
883 }
884 
885 /*
886  * Associate a subtransaction with its toplevel transaction at commit
887  * time. There may be no further changes added after this.
888  */
889 void
891  TransactionId subxid, XLogRecPtr commit_lsn,
892  XLogRecPtr end_lsn)
893 {
894  ReorderBufferTXN *subtxn;
895 
896  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
897  InvalidXLogRecPtr, false);
898 
899  /*
900  * No need to do anything if that subtxn didn't contain any changes
901  */
902  if (!subtxn)
903  return;
904 
905  subtxn->final_lsn = commit_lsn;
906  subtxn->end_lsn = end_lsn;
907 
908  /*
909  * Assign this subxact as a child of the toplevel xact (no-op if already
910  * done.)
911  */
913 }
914 
915 
916 /*
917  * Support for efficiently iterating over a transaction's and its
918  * subtransactions' changes.
919  *
920  * We do by doing a k-way merge between transactions/subtransactions. For that
921  * we model the current heads of the different transactions as a binary heap
922  * so we easily know which (sub-)transaction has the change with the smallest
923  * lsn next.
924  *
925  * We assume the changes in individual transactions are already sorted by LSN.
926  */
927 
928 /*
929  * Binary heap comparison function.
930  */
931 static int
933 {
935  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
936  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
937 
938  if (pos_a < pos_b)
939  return 1;
940  else if (pos_a == pos_b)
941  return 0;
942  return -1;
943 }
944 
945 /*
946  * Allocate & initialize an iterator which iterates in lsn order over a
947  * transaction and all its subtransactions.
948  */
951 {
952  Size nr_txns = 0;
954  dlist_iter cur_txn_i;
955  int32 off;
956 
957  /*
958  * Calculate the size of our heap: one element for every transaction that
959  * contains changes. (Besides the transactions already in the reorder
960  * buffer, we count the one we were directly passed.)
961  */
962  if (txn->nentries > 0)
963  nr_txns++;
964 
965  dlist_foreach(cur_txn_i, &txn->subtxns)
966  {
967  ReorderBufferTXN *cur_txn;
968 
969  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
970 
971  if (cur_txn->nentries > 0)
972  nr_txns++;
973  }
974 
975  /*
976  * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no
977  * need to allocate/build a heap then.
978  */
979 
980  /* allocate iteration state */
981  state = (ReorderBufferIterTXNState *)
983  sizeof(ReorderBufferIterTXNState) +
984  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
985 
986  state->nr_txns = nr_txns;
987  dlist_init(&state->old_change);
988 
989  for (off = 0; off < state->nr_txns; off++)
990  {
991  state->entries[off].fd = -1;
992  state->entries[off].segno = 0;
993  }
994 
995  /* allocate heap */
996  state->heap = binaryheap_allocate(state->nr_txns,
998  state);
999 
1000  /*
1001  * Now insert items into the binary heap, in an unordered fashion. (We
1002  * will run a heap assembly step at the end; this is more efficient.)
1003  */
1004 
1005  off = 0;
1006 
1007  /* add toplevel transaction if it contains changes */
1008  if (txn->nentries > 0)
1009  {
1010  ReorderBufferChange *cur_change;
1011 
1012  if (txn->serialized)
1013  {
1014  /* serialize remaining changes */
1015  ReorderBufferSerializeTXN(rb, txn);
1016  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
1017  &state->entries[off].segno);
1018  }
1019 
1020  cur_change = dlist_head_element(ReorderBufferChange, node,
1021  &txn->changes);
1022 
1023  state->entries[off].lsn = cur_change->lsn;
1024  state->entries[off].change = cur_change;
1025  state->entries[off].txn = txn;
1026 
1028  }
1029 
1030  /* add subtransactions if they contain changes */
1031  dlist_foreach(cur_txn_i, &txn->subtxns)
1032  {
1033  ReorderBufferTXN *cur_txn;
1034 
1035  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1036 
1037  if (cur_txn->nentries > 0)
1038  {
1039  ReorderBufferChange *cur_change;
1040 
1041  if (cur_txn->serialized)
1042  {
1043  /* serialize remaining changes */
1044  ReorderBufferSerializeTXN(rb, cur_txn);
1045  ReorderBufferRestoreChanges(rb, cur_txn,
1046  &state->entries[off].fd,
1047  &state->entries[off].segno);
1048  }
1049  cur_change = dlist_head_element(ReorderBufferChange, node,
1050  &cur_txn->changes);
1051 
1052  state->entries[off].lsn = cur_change->lsn;
1053  state->entries[off].change = cur_change;
1054  state->entries[off].txn = cur_txn;
1055 
1057  }
1058  }
1059 
1060  /* assemble a valid binary heap */
1061  binaryheap_build(state->heap);
1062 
1063  return state;
1064 }
1065 
1066 /*
1067  * Return the next change when iterating over a transaction and its
1068  * subtransactions.
1069  *
1070  * Returns NULL when no further changes exist.
1071  */
1072 static ReorderBufferChange *
1074 {
1075  ReorderBufferChange *change;
1077  int32 off;
1078 
1079  /* nothing there anymore */
1080  if (state->heap->bh_size == 0)
1081  return NULL;
1082 
1083  off = DatumGetInt32(binaryheap_first(state->heap));
1084  entry = &state->entries[off];
1085 
1086  /* free memory we might have "leaked" in the previous *Next call */
1087  if (!dlist_is_empty(&state->old_change))
1088  {
1089  change = dlist_container(ReorderBufferChange, node,
1090  dlist_pop_head_node(&state->old_change));
1091  ReorderBufferReturnChange(rb, change);
1092  Assert(dlist_is_empty(&state->old_change));
1093  }
1094 
1095  change = entry->change;
1096 
1097  /*
1098  * update heap with information about which transaction has the next
1099  * relevant change in LSN order
1100  */
1101 
1102  /* there are in-memory changes */
1103  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1104  {
1105  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1106  ReorderBufferChange *next_change =
1107  dlist_container(ReorderBufferChange, node, next);
1108 
1109  /* txn stays the same */
1110  state->entries[off].lsn = next_change->lsn;
1111  state->entries[off].change = next_change;
1112 
1114  return change;
1115  }
1116 
1117  /* try to load changes from disk */
1118  if (entry->txn->nentries != entry->txn->nentries_mem)
1119  {
1120  /*
1121  * Ugly: restoring changes will reuse *Change records, thus delete the
1122  * current one from the per-tx list and only free in the next call.
1123  */
1124  dlist_delete(&change->node);
1125  dlist_push_tail(&state->old_change, &change->node);
1126 
1127  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
1128  &state->entries[off].segno))
1129  {
1130  /* successfully restored changes from disk */
1131  ReorderBufferChange *next_change =
1133  &entry->txn->changes);
1134 
1135  elog(DEBUG2, "restored %u/%u changes from disk",
1136  (uint32) entry->txn->nentries_mem,
1137  (uint32) entry->txn->nentries);
1138 
1139  Assert(entry->txn->nentries_mem);
1140  /* txn stays the same */
1141  state->entries[off].lsn = next_change->lsn;
1142  state->entries[off].change = next_change;
1144 
1145  return change;
1146  }
1147  }
1148 
1149  /* ok, no changes there anymore, remove */
1150  binaryheap_remove_first(state->heap);
1151 
1152  return change;
1153 }
1154 
1155 /*
1156  * Deallocate the iterator
1157  */
1158 static void
1161 {
1162  int32 off;
1163 
1164  for (off = 0; off < state->nr_txns; off++)
1165  {
1166  if (state->entries[off].fd != -1)
1167  CloseTransientFile(state->entries[off].fd);
1168  }
1169 
1170  /* free memory we might have "leaked" in the last *Next call */
1171  if (!dlist_is_empty(&state->old_change))
1172  {
1173  ReorderBufferChange *change;
1174 
1175  change = dlist_container(ReorderBufferChange, node,
1176  dlist_pop_head_node(&state->old_change));
1177  ReorderBufferReturnChange(rb, change);
1178  Assert(dlist_is_empty(&state->old_change));
1179  }
1180 
1181  binaryheap_free(state->heap);
1182  pfree(state);
1183 }
1184 
1185 /*
1186  * Cleanup the contents of a transaction, usually after the transaction
1187  * committed or aborted.
1188  */
1189 static void
1191 {
1192  bool found;
1193  dlist_mutable_iter iter;
1194 
1195  /* cleanup subtransactions & their changes */
1196  dlist_foreach_modify(iter, &txn->subtxns)
1197  {
1198  ReorderBufferTXN *subtxn;
1199 
1200  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1201 
1202  /*
1203  * Subtransactions are always associated to the toplevel TXN, even if
1204  * they originally were happening inside another subtxn, so we won't
1205  * ever recurse more than one level deep here.
1206  */
1207  Assert(subtxn->is_known_as_subxact);
1208  Assert(subtxn->nsubtxns == 0);
1209 
1210  ReorderBufferCleanupTXN(rb, subtxn);
1211  }
1212 
1213  /* cleanup changes in the toplevel txn */
1214  dlist_foreach_modify(iter, &txn->changes)
1215  {
1216  ReorderBufferChange *change;
1217 
1218  change = dlist_container(ReorderBufferChange, node, iter.cur);
1219 
1220  ReorderBufferReturnChange(rb, change);
1221  }
1222 
1223  /*
1224  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1225  * They are always stored in the toplevel transaction.
1226  */
1227  dlist_foreach_modify(iter, &txn->tuplecids)
1228  {
1229  ReorderBufferChange *change;
1230 
1231  change = dlist_container(ReorderBufferChange, node, iter.cur);
1233  ReorderBufferReturnChange(rb, change);
1234  }
1235 
1236  /*
1237  * Cleanup the base snapshot, if set.
1238  */
1239  if (txn->base_snapshot != NULL)
1240  {
1243  }
1244 
1245  /*
1246  * Remove TXN from its containing list.
1247  *
1248  * Note: if txn->is_known_as_subxact, we are deleting the TXN from its
1249  * parent's list of known subxacts; this leaves the parent's nsubxacts
1250  * count too high, but we don't care. Otherwise, we are deleting the TXN
1251  * from the LSN-ordered list of toplevel TXNs.
1252  */
1253  dlist_delete(&txn->node);
1254 
1255  /* now remove reference from buffer */
1256  hash_search(rb->by_txn,
1257  (void *) &txn->xid,
1258  HASH_REMOVE,
1259  &found);
1260  Assert(found);
1261 
1262  /* remove entries spilled to disk */
1263  if (txn->serialized)
1264  ReorderBufferRestoreCleanup(rb, txn);
1265 
1266  /* deallocate */
1267  ReorderBufferReturnTXN(rb, txn);
1268 }
1269 
1270 /*
1271  * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by
1272  * HeapTupleSatisfiesHistoricMVCC.
1273  */
1274 static void
1276 {
1277  dlist_iter iter;
1278  HASHCTL hash_ctl;
1279 
1280  if (!txn->has_catalog_changes || dlist_is_empty(&txn->tuplecids))
1281  return;
1282 
1283  memset(&hash_ctl, 0, sizeof(hash_ctl));
1284 
1285  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1286  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1287  hash_ctl.hcxt = rb->context;
1288 
1289  /*
1290  * create the hash with the exact number of to-be-stored tuplecids from
1291  * the start
1292  */
1293  txn->tuplecid_hash =
1294  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1296 
1297  dlist_foreach(iter, &txn->tuplecids)
1298  {
1301  bool found;
1302  ReorderBufferChange *change;
1303 
1304  change = dlist_container(ReorderBufferChange, node, iter.cur);
1305 
1307 
1308  /* be careful about padding */
1309  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1310 
1311  key.relnode = change->data.tuplecid.node;
1312 
1313  ItemPointerCopy(&change->data.tuplecid.tid,
1314  &key.tid);
1315 
1316  ent = (ReorderBufferTupleCidEnt *)
1318  (void *) &key,
1320  &found);
1321  if (!found)
1322  {
1323  ent->cmin = change->data.tuplecid.cmin;
1324  ent->cmax = change->data.tuplecid.cmax;
1325  ent->combocid = change->data.tuplecid.combocid;
1326  }
1327  else
1328  {
1329  /*
1330  * Maybe we already saw this tuple before in this transaction, but
1331  * if so it must have the same cmin.
1332  */
1333  Assert(ent->cmin == change->data.tuplecid.cmin);
1334 
1335  /*
1336  * cmax may be initially invalid, but once set it can only grow,
1337  * and never become invalid again.
1338  */
1339  Assert((ent->cmax == InvalidCommandId) ||
1340  ((change->data.tuplecid.cmax != InvalidCommandId) &&
1341  (change->data.tuplecid.cmax > ent->cmax)));
1342  ent->cmax = change->data.tuplecid.cmax;
1343  }
1344  }
1345 }
1346 
1347 /*
1348  * Copy a provided snapshot so we can modify it privately. This is needed so
1349  * that catalog modifying transactions can look into intermediate catalog
1350  * states.
1351  */
1352 static Snapshot
1355 {
1356  Snapshot snap;
1357  dlist_iter iter;
1358  int i = 0;
1359  Size size;
1360 
1361  size = sizeof(SnapshotData) +
1362  sizeof(TransactionId) * orig_snap->xcnt +
1363  sizeof(TransactionId) * (txn->nsubtxns + 1);
1364 
1365  snap = MemoryContextAllocZero(rb->context, size);
1366  memcpy(snap, orig_snap, sizeof(SnapshotData));
1367 
1368  snap->copied = true;
1369  snap->active_count = 1; /* mark as active so nobody frees it */
1370  snap->regd_count = 0;
1371  snap->xip = (TransactionId *) (snap + 1);
1372 
1373  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1374 
1375  /*
1376  * snap->subxip contains all txids that belong to our transaction which we
1377  * need to check via cmin/cmax. That's why we store the toplevel
1378  * transaction in there as well.
1379  */
1380  snap->subxip = snap->xip + snap->xcnt;
1381  snap->subxip[i++] = txn->xid;
1382 
1383  /*
1384  * subxcnt isn't decreased when subtransactions abort, so count manually.
1385  * Since it's an upper boundary it is safe to use it for the allocation
1386  * above.
1387  */
1388  snap->subxcnt = 1;
1389 
1390  dlist_foreach(iter, &txn->subtxns)
1391  {
1392  ReorderBufferTXN *sub_txn;
1393 
1394  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1395  snap->subxip[i++] = sub_txn->xid;
1396  snap->subxcnt++;
1397  }
1398 
1399  /* sort so we can bsearch() later */
1400  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1401 
1402  /* store the specified current CommandId */
1403  snap->curcid = cid;
1404 
1405  return snap;
1406 }
1407 
1408 /*
1409  * Free a previously ReorderBufferCopySnap'ed snapshot
1410  */
1411 static void
1413 {
1414  if (snap->copied)
1415  pfree(snap);
1416  else
1418 }
1419 
1420 /*
1421  * Perform the replay of a transaction and its non-aborted subtransactions.
1422  *
1423  * Subtransactions previously have to be processed by
1424  * ReorderBufferCommitChild(), even if previously assigned to the toplevel
1425  * transaction with ReorderBufferAssignChild.
1426  *
1427  * We currently can only decode a transaction's contents when its commit
1428  * record is read because that's the only place where we know about cache
1429  * invalidations. Thus, once a toplevel commit is read, we iterate over the top
1430  * and subtransactions (using a k-way merge) and replay the changes in lsn
1431  * order.
1432  */
1433 void
1435  XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
1436  TimestampTz commit_time,
1437  RepOriginId origin_id, XLogRecPtr origin_lsn)
1438 {
1440  volatile Snapshot snapshot_now;
1441  volatile CommandId command_id = FirstCommandId;
1442  bool using_subtxn;
1443  ReorderBufferIterTXNState *volatile iterstate = NULL;
1444 
1445  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1446  false);
1447 
1448  /* unknown transaction, nothing to replay */
1449  if (txn == NULL)
1450  return;
1451 
1452  txn->final_lsn = commit_lsn;
1453  txn->end_lsn = end_lsn;
1454  txn->commit_time = commit_time;
1455  txn->origin_id = origin_id;
1456  txn->origin_lsn = origin_lsn;
1457 
1458  /*
1459  * If this transaction has no snapshot, it didn't make any changes to the
1460  * database, so there's nothing to decode. Note that
1461  * ReorderBufferCommitChild will have transferred any snapshots from
1462  * subtransactions if there were any.
1463  */
1464  if (txn->base_snapshot == NULL)
1465  {
1466  Assert(txn->ninvalidations == 0);
1467  ReorderBufferCleanupTXN(rb, txn);
1468  return;
1469  }
1470 
1471  snapshot_now = txn->base_snapshot;
1472 
1473  /* build data to be able to lookup the CommandIds of catalog tuples */
1475 
1476  /* setup the initial snapshot */
1477  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1478 
1479  /*
1480  * Decoding needs access to syscaches et al., which in turn use
1481  * heavyweight locks and such. Thus we need to have enough state around to
1482  * keep track of those. The easiest way is to simply use a transaction
1483  * internally. That also allows us to easily enforce that nothing writes
1484  * to the database by checking for xid assignments.
1485  *
1486  * When we're called via the SQL SRF there's already a transaction
1487  * started, so start an explicit subtransaction there.
1488  */
1489  using_subtxn = IsTransactionOrTransactionBlock();
1490 
1491  PG_TRY();
1492  {
1493  ReorderBufferChange *change;
1494  ReorderBufferChange *specinsert = NULL;
1495 
1496  if (using_subtxn)
1497  BeginInternalSubTransaction("replay");
1498  else
1500 
1501  rb->begin(rb, txn);
1502 
1503  iterstate = ReorderBufferIterTXNInit(rb, txn);
1504  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1505  {
1506  Relation relation = NULL;
1507  Oid reloid;
1508 
1509  switch (change->action)
1510  {
1512 
1513  /*
1514  * Confirmation for speculative insertion arrived. Simply
1515  * use as a normal record. It'll be cleaned up at the end
1516  * of INSERT processing.
1517  */
1518  if (specinsert == NULL)
1519  elog(ERROR, "invalid ordering of speculative insertion changes");
1520  Assert(specinsert->data.tp.oldtuple == NULL);
1521  change = specinsert;
1523 
1524  /* intentionally fall through */
1528  Assert(snapshot_now);
1529 
1530  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1531  change->data.tp.relnode.relNode);
1532 
1533  /*
1534  * Mapped catalog tuple without data, emitted while
1535  * catalog table was in the process of being rewritten. We
1536  * can fail to look up the relfilenode, because the
1537  * relmapper has no "historic" view, in contrast to normal
1538  * the normal catalog during decoding. Thus repeated
1539  * rewrites can cause a lookup failure. That's OK because
1540  * we do not decode catalog changes anyway. Normally such
1541  * tuples would be skipped over below, but we can't
1542  * identify whether the table should be logically logged
1543  * without mapping the relfilenode to the oid.
1544  */
1545  if (reloid == InvalidOid &&
1546  change->data.tp.newtuple == NULL &&
1547  change->data.tp.oldtuple == NULL)
1548  goto change_done;
1549  else if (reloid == InvalidOid)
1550  elog(ERROR, "could not map filenode \"%s\" to relation OID",
1551  relpathperm(change->data.tp.relnode,
1552  MAIN_FORKNUM));
1553 
1554  relation = RelationIdGetRelation(reloid);
1555 
1556  if (relation == NULL)
1557  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1558  reloid,
1559  relpathperm(change->data.tp.relnode,
1560  MAIN_FORKNUM));
1561 
1562  if (!RelationIsLogicallyLogged(relation))
1563  goto change_done;
1564 
1565  /*
1566  * Ignore temporary heaps created during DDL unless the
1567  * plugin has asked for them.
1568  */
1569  if (relation->rd_rel->relrewrite && !rb->output_rewrites)
1570  goto change_done;
1571 
1572  /*
1573  * For now ignore sequence changes entirely. Most of the
1574  * time they don't log changes using records we
1575  * understand, so it doesn't make sense to handle the few
1576  * cases we do.
1577  */
1578  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1579  goto change_done;
1580 
1581  /* user-triggered change */
1582  if (!IsToastRelation(relation))
1583  {
1584  ReorderBufferToastReplace(rb, txn, relation, change);
1585  rb->apply_change(rb, txn, relation, change);
1586 
1587  /*
1588  * Only clear reassembled toast chunks if we're sure
1589  * they're not required anymore. The creator of the
1590  * tuple tells us.
1591  */
1592  if (change->data.tp.clear_toast_afterwards)
1593  ReorderBufferToastReset(rb, txn);
1594  }
1595  /* we're not interested in toast deletions */
1596  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1597  {
1598  /*
1599  * Need to reassemble the full toasted Datum in
1600  * memory, to ensure the chunks don't get reused till
1601  * we're done remove it from the list of this
1602  * transaction's changes. Otherwise it will get
1603  * freed/reused while restoring spooled data from
1604  * disk.
1605  */
1606  Assert(change->data.tp.newtuple != NULL);
1607 
1608  dlist_delete(&change->node);
1609  ReorderBufferToastAppendChunk(rb, txn, relation,
1610  change);
1611  }
1612 
1613  change_done:
1614 
1615  /*
1616  * Either speculative insertion was confirmed, or it was
1617  * unsuccessful and the record isn't needed anymore.
1618  */
1619  if (specinsert != NULL)
1620  {
1621  ReorderBufferReturnChange(rb, specinsert);
1622  specinsert = NULL;
1623  }
1624 
1625  if (relation != NULL)
1626  {
1627  RelationClose(relation);
1628  relation = NULL;
1629  }
1630  break;
1631 
1633 
1634  /*
1635  * Speculative insertions are dealt with by delaying the
1636  * processing of the insert until the confirmation record
1637  * arrives. For that we simply unlink the record from the
1638  * chain, so it does not get freed/reused while restoring
1639  * spooled data from disk.
1640  *
1641  * This is safe in the face of concurrent catalog changes
1642  * because the relevant relation can't be changed between
1643  * speculative insertion and confirmation due to
1644  * CheckTableNotInUse() and locking.
1645  */
1646 
1647  /* clear out a pending (and thus failed) speculation */
1648  if (specinsert != NULL)
1649  {
1650  ReorderBufferReturnChange(rb, specinsert);
1651  specinsert = NULL;
1652  }
1653 
1654  /* and memorize the pending insertion */
1655  dlist_delete(&change->node);
1656  specinsert = change;
1657  break;
1658 
1660  {
1661  int i;
1662  int nrelids = change->data.truncate.nrelids;
1663  int nrelations = 0;
1664  Relation *relations;
1665 
1666  relations = palloc0(nrelids * sizeof(Relation));
1667  for (i = 0; i < nrelids; i++)
1668  {
1669  Oid relid = change->data.truncate.relids[i];
1670  Relation relation;
1671 
1672  relation = RelationIdGetRelation(relid);
1673 
1674  if (relation == NULL)
1675  elog(ERROR, "could not open relation with OID %u", relid);
1676 
1677  if (!RelationIsLogicallyLogged(relation))
1678  continue;
1679 
1680  relations[nrelations++] = relation;
1681  }
1682 
1683  rb->apply_truncate(rb, txn, nrelations, relations, change);
1684 
1685  for (i = 0; i < nrelations; i++)
1686  RelationClose(relations[i]);
1687 
1688  break;
1689  }
1690 
1692  rb->message(rb, txn, change->lsn, true,
1693  change->data.msg.prefix,
1694  change->data.msg.message_size,
1695  change->data.msg.message);
1696  break;
1697 
1699  /* get rid of the old */
1700  TeardownHistoricSnapshot(false);
1701 
1702  if (snapshot_now->copied)
1703  {
1704  ReorderBufferFreeSnap(rb, snapshot_now);
1705  snapshot_now =
1706  ReorderBufferCopySnap(rb, change->data.snapshot,
1707  txn, command_id);
1708  }
1709 
1710  /*
1711  * Restored from disk, need to be careful not to double
1712  * free. We could introduce refcounting for that, but for
1713  * now this seems infrequent enough not to care.
1714  */
1715  else if (change->data.snapshot->copied)
1716  {
1717  snapshot_now =
1718  ReorderBufferCopySnap(rb, change->data.snapshot,
1719  txn, command_id);
1720  }
1721  else
1722  {
1723  snapshot_now = change->data.snapshot;
1724  }
1725 
1726 
1727  /* and continue with the new one */
1728  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1729  break;
1730 
1732  Assert(change->data.command_id != InvalidCommandId);
1733 
1734  if (command_id < change->data.command_id)
1735  {
1736  command_id = change->data.command_id;
1737 
1738  if (!snapshot_now->copied)
1739  {
1740  /* we don't use the global one anymore */
1741  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1742  txn, command_id);
1743  }
1744 
1745  snapshot_now->curcid = command_id;
1746 
1747  TeardownHistoricSnapshot(false);
1748  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1749 
1750  /*
1751  * Every time the CommandId is incremented, we could
1752  * see new catalog contents, so execute all
1753  * invalidations.
1754  */
1756  }
1757 
1758  break;
1759 
1761  elog(ERROR, "tuplecid value in changequeue");
1762  break;
1763  }
1764  }
1765 
1766  /*
1767  * There's a speculative insertion remaining, just clean in up, it
1768  * can't have been successful, otherwise we'd gotten a confirmation
1769  * record.
1770  */
1771  if (specinsert)
1772  {
1773  ReorderBufferReturnChange(rb, specinsert);
1774  specinsert = NULL;
1775  }
1776 
1777  /* clean up the iterator */
1778  ReorderBufferIterTXNFinish(rb, iterstate);
1779  iterstate = NULL;
1780 
1781  /* call commit callback */
1782  rb->commit(rb, txn, commit_lsn);
1783 
1784  /* this is just a sanity check against bad output plugin behaviour */
1786  elog(ERROR, "output plugin used XID %u",
1788 
1789  /* cleanup */
1790  TeardownHistoricSnapshot(false);
1791 
1792  /*
1793  * Aborting the current (sub-)transaction as a whole has the right
1794  * semantics. We want all locks acquired in here to be released, not
1795  * reassigned to the parent and we do not want any database access
1796  * have persistent effects.
1797  */
1799 
1800  /* make sure there's no cache pollution */
1802 
1803  if (using_subtxn)
1805 
1806  if (snapshot_now->copied)
1807  ReorderBufferFreeSnap(rb, snapshot_now);
1808 
1809  /* remove potential on-disk data, and deallocate */
1810  ReorderBufferCleanupTXN(rb, txn);
1811  }
1812  PG_CATCH();
1813  {
1814  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1815  if (iterstate)
1816  ReorderBufferIterTXNFinish(rb, iterstate);
1817 
1819 
1820  /*
1821  * Force cache invalidation to happen outside of a valid transaction
1822  * to prevent catalog access as we just caught an error.
1823  */
1825 
1826  /* make sure there's no cache pollution */
1828 
1829  if (using_subtxn)
1831 
1832  if (snapshot_now->copied)
1833  ReorderBufferFreeSnap(rb, snapshot_now);
1834 
1835  /* remove potential on-disk data, and deallocate */
1836  ReorderBufferCleanupTXN(rb, txn);
1837 
1838  PG_RE_THROW();
1839  }
1840  PG_END_TRY();
1841 }
1842 
1843 /*
1844  * Abort a transaction that possibly has previous changes. Needs to be first
1845  * called for subtransactions and then for the toplevel xid.
1846  *
1847  * NB: Transactions handled here have to have actively aborted (i.e. have
1848  * produced an abort record). Implicitly aborted transactions are handled via
1849  * ReorderBufferAbortOld(); transactions we're just not interested in, but
1850  * which have committed are handled in ReorderBufferForget().
1851  *
1852  * This function purges this transaction and its contents from memory and
1853  * disk.
1854  */
1855 void
1857 {
1859 
1860  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1861  false);
1862 
1863  /* unknown, nothing to remove */
1864  if (txn == NULL)
1865  return;
1866 
1867  /* cosmetic... */
1868  txn->final_lsn = lsn;
1869 
1870  /* remove potential on-disk data, and deallocate */
1871  ReorderBufferCleanupTXN(rb, txn);
1872 }
1873 
1874 /*
1875  * Abort all transactions that aren't actually running anymore because the
1876  * server restarted.
1877  *
1878  * NB: These really have to be transactions that have aborted due to a server
1879  * crash/immediate restart, as we don't deal with invalidations here.
1880  */
1881 void
1883 {
1884  dlist_mutable_iter it;
1885 
1886  /*
1887  * Iterate through all (potential) toplevel TXNs and abort all that are
1888  * older than what possibly can be running. Once we've found the first
1889  * that is alive we stop, there might be some that acquired an xid earlier
1890  * but started writing later, but it's unlikely and they will be cleaned
1891  * up in a later call to this function.
1892  */
1894  {
1896 
1897  txn = dlist_container(ReorderBufferTXN, node, it.cur);
1898 
1899  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1900  {
1901  /*
1902  * We set final_lsn on a transaction when we decode its commit or
1903  * abort record, but we never see those records for crashed
1904  * transactions. To ensure cleanup of these transactions, set
1905  * final_lsn to that of their last change; this causes
1906  * ReorderBufferRestoreCleanup to do the right thing.
1907  */
1908  if (txn->serialized && txn->final_lsn == 0)
1909  {
1910  ReorderBufferChange *last =
1912 
1913  txn->final_lsn = last->lsn;
1914  }
1915 
1916  elog(DEBUG2, "aborting old transaction %u", txn->xid);
1917 
1918  /* remove potential on-disk data, and deallocate this tx */
1919  ReorderBufferCleanupTXN(rb, txn);
1920  }
1921  else
1922  return;
1923  }
1924 }
1925 
1926 /*
1927  * Forget the contents of a transaction if we aren't interested in its
1928  * contents. Needs to be first called for subtransactions and then for the
1929  * toplevel xid.
1930  *
1931  * This is significantly different to ReorderBufferAbort() because
1932  * transactions that have committed need to be treated differently from aborted
1933  * ones since they may have modified the catalog.
1934  *
1935  * Note that this is only allowed to be called in the moment a transaction
1936  * commit has just been read, not earlier; otherwise later records referring
1937  * to this xid might re-create the transaction incompletely.
1938  */
1939 void
1941 {
1943 
1944  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1945  false);
1946 
1947  /* unknown, nothing to forget */
1948  if (txn == NULL)
1949  return;
1950 
1951  /* cosmetic... */
1952  txn->final_lsn = lsn;
1953 
1954  /*
1955  * Process cache invalidation messages if there are any. Even if we're not
1956  * interested in the transaction's contents, it could have manipulated the
1957  * catalog and we need to update the caches according to that.
1958  */
1959  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
1961  txn->invalidations);
1962  else
1963  Assert(txn->ninvalidations == 0);
1964 
1965  /* remove potential on-disk data, and deallocate */
1966  ReorderBufferCleanupTXN(rb, txn);
1967 }
1968 
1969 /*
1970  * Execute invalidations happening outside the context of a decoded
1971  * transaction. That currently happens either for xid-less commits
1972  * (cf. RecordTransactionCommit()) or for invalidations in uninteresting
1973  * transactions (via ReorderBufferForget()).
1974  */
1975 void
1977  SharedInvalidationMessage *invalidations)
1978 {
1979  bool use_subtxn = IsTransactionOrTransactionBlock();
1980  int i;
1981 
1982  if (use_subtxn)
1983  BeginInternalSubTransaction("replay");
1984 
1985  /*
1986  * Force invalidations to happen outside of a valid transaction - that way
1987  * entries will just be marked as invalid without accessing the catalog.
1988  * That's advantageous because we don't need to setup the full state
1989  * necessary for catalog access.
1990  */
1991  if (use_subtxn)
1993 
1994  for (i = 0; i < ninvalidations; i++)
1995  LocalExecuteInvalidationMessage(&invalidations[i]);
1996 
1997  if (use_subtxn)
1999 }
2000 
2001 /*
2002  * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
2003  * least once for every xid in XLogRecord->xl_xid (other places in records
2004  * may, but do not have to be passed through here).
2005  *
2006  * Reorderbuffer keeps some datastructures about transactions in LSN order,
2007  * for efficiency. To do that it has to know about when transactions are seen
2008  * first in the WAL. As many types of records are not actually interesting for
2009  * logical decoding, they do not necessarily pass though here.
2010  */
2011 void
2013 {
2014  /* many records won't have an xid assigned, centralize check here */
2015  if (xid != InvalidTransactionId)
2016  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2017 }
2018 
2019 /*
2020  * Add a new snapshot to this transaction that may only used after lsn 'lsn'
2021  * because the previous snapshot doesn't describe the catalog correctly for
2022  * following rows.
2023  */
2024 void
2026  XLogRecPtr lsn, Snapshot snap)
2027 {
2029 
2030  change->data.snapshot = snap;
2032 
2033  ReorderBufferQueueChange(rb, xid, lsn, change);
2034 }
2035 
2036 /*
2037  * Set up the transaction's base snapshot.
2038  *
2039  * If we know that xid is a subtransaction, set the base snapshot on the
2040  * top-level transaction instead.
2041  */
2042 void
2044  XLogRecPtr lsn, Snapshot snap)
2045 {
2047  bool is_new;
2048 
2049  AssertArg(snap != NULL);
2050 
2051  /*
2052  * Fetch the transaction to operate on. If we know it's a subtransaction,
2053  * operate on its top-level transaction instead.
2054  */
2055  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
2056  if (txn->is_known_as_subxact)
2057  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
2058  NULL, InvalidXLogRecPtr, false);
2059  Assert(txn->base_snapshot == NULL);
2060 
2061  txn->base_snapshot = snap;
2062  txn->base_snapshot_lsn = lsn;
2064 
2065  AssertTXNLsnOrder(rb);
2066 }
2067 
2068 /*
2069  * Access the catalog with this CommandId at this point in the changestream.
2070  *
2071  * May only be called for command ids > 1
2072  */
2073 void
2075  XLogRecPtr lsn, CommandId cid)
2076 {
2078 
2079  change->data.command_id = cid;
2081 
2082  ReorderBufferQueueChange(rb, xid, lsn, change);
2083 }
2084 
2085 
2086 /*
2087  * Add new (relfilenode, tid) -> (cmin, cmax) mappings.
2088  */
2089 void
2091  XLogRecPtr lsn, RelFileNode node,
2092  ItemPointerData tid, CommandId cmin,
2093  CommandId cmax, CommandId combocid)
2094 {
2097 
2098  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2099 
2100  change->data.tuplecid.node = node;
2101  change->data.tuplecid.tid = tid;
2102  change->data.tuplecid.cmin = cmin;
2103  change->data.tuplecid.cmax = cmax;
2104  change->data.tuplecid.combocid = combocid;
2105  change->lsn = lsn;
2107 
2108  dlist_push_tail(&txn->tuplecids, &change->node);
2109  txn->ntuplecids++;
2110 }
2111 
2112 /*
2113  * Setup the invalidation of the toplevel transaction.
2114  *
2115  * This needs to be done before ReorderBufferCommit is called!
2116  */
2117 void
2119  XLogRecPtr lsn, Size nmsgs,
2121 {
2123 
2124  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2125 
2126  if (txn->ninvalidations != 0)
2127  elog(ERROR, "only ever add one set of invalidations");
2128 
2129  Assert(nmsgs > 0);
2130 
2131  txn->ninvalidations = nmsgs;
2134  sizeof(SharedInvalidationMessage) * nmsgs);
2135  memcpy(txn->invalidations, msgs,
2136  sizeof(SharedInvalidationMessage) * nmsgs);
2137 }
2138 
2139 /*
2140  * Apply all invalidations we know. Possibly we only need parts at this point
2141  * in the changestream but we don't know which those are.
2142  */
2143 static void
2145 {
2146  int i;
2147 
2148  for (i = 0; i < txn->ninvalidations; i++)
2150 }
2151 
2152 /*
2153  * Mark a transaction as containing catalog changes
2154  */
2155 void
2157  XLogRecPtr lsn)
2158 {
2160 
2161  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2162 
2163  txn->has_catalog_changes = true;
2164 }
2165 
2166 /*
2167  * Query whether a transaction is already *known* to contain catalog
2168  * changes. This can be wrong until directly before the commit!
2169  */
2170 bool
2172 {
2174 
2175  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2176  false);
2177  if (txn == NULL)
2178  return false;
2179 
2180  return txn->has_catalog_changes;
2181 }
2182 
2183 /*
2184  * ReorderBufferXidHasBaseSnapshot
2185  * Have we already set the base snapshot for the given txn/subtxn?
2186  */
2187 bool
2189 {
2191 
2192  txn = ReorderBufferTXNByXid(rb, xid, false,
2193  NULL, InvalidXLogRecPtr, false);
2194 
2195  /* transaction isn't known yet, ergo no snapshot */
2196  if (txn == NULL)
2197  return false;
2198 
2199  /* a known subtxn? operate on top-level txn instead */
2200  if (txn->is_known_as_subxact)
2201  txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
2202  NULL, InvalidXLogRecPtr, false);
2203 
2204  return txn->base_snapshot != NULL;
2205 }
2206 
2207 
2208 /*
2209  * ---------------------------------------
2210  * Disk serialization support
2211  * ---------------------------------------
2212  */
2213 
2214 /*
2215  * Ensure the IO buffer is >= sz.
2216  */
2217 static void
2219 {
2220  if (!rb->outbufsize)
2221  {
2222  rb->outbuf = MemoryContextAlloc(rb->context, sz);
2223  rb->outbufsize = sz;
2224  }
2225  else if (rb->outbufsize < sz)
2226  {
2227  rb->outbuf = repalloc(rb->outbuf, sz);
2228  rb->outbufsize = sz;
2229  }
2230 }
2231 
2232 /*
2233  * Check whether the transaction tx should spill its data to disk.
2234  */
2235 static void
2237 {
2238  /*
2239  * TODO: improve accounting so we cheaply can take subtransactions into
2240  * account here.
2241  */
2242  if (txn->nentries_mem >= max_changes_in_memory)
2243  {
2244  ReorderBufferSerializeTXN(rb, txn);
2245  Assert(txn->nentries_mem == 0);
2246  }
2247 }
2248 
2249 /*
2250  * Spill data of a large transaction (and its subtransactions) to disk.
2251  */
2252 static void
2254 {
2255  dlist_iter subtxn_i;
2256  dlist_mutable_iter change_i;
2257  int fd = -1;
2258  XLogSegNo curOpenSegNo = 0;
2259  Size spilled = 0;
2260 
2261  elog(DEBUG2, "spill %u changes in XID %u to disk",
2262  (uint32) txn->nentries_mem, txn->xid);
2263 
2264  /* do the same to all child TXs */
2265  dlist_foreach(subtxn_i, &txn->subtxns)
2266  {
2267  ReorderBufferTXN *subtxn;
2268 
2269  subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
2270  ReorderBufferSerializeTXN(rb, subtxn);
2271  }
2272 
2273  /* serialize changestream */
2274  dlist_foreach_modify(change_i, &txn->changes)
2275  {
2276  ReorderBufferChange *change;
2277 
2278  change = dlist_container(ReorderBufferChange, node, change_i.cur);
2279 
2280  /*
2281  * store in segment in which it belongs by start lsn, don't split over
2282  * multiple segments tho
2283  */
2284  if (fd == -1 ||
2285  !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
2286  {
2287  char path[MAXPGPATH];
2288 
2289  if (fd != -1)
2290  CloseTransientFile(fd);
2291 
2292  XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
2293 
2294  /*
2295  * No need to care about TLIs here, only used during a single run,
2296  * so each LSN only maps to a specific WAL record.
2297  */
2299  curOpenSegNo);
2300 
2301  /* open segment, create it if necessary */
2302  fd = OpenTransientFile(path,
2303  O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
2304 
2305  if (fd < 0)
2306  ereport(ERROR,
2308  errmsg("could not open file \"%s\": %m", path)));
2309  }
2310 
2311  ReorderBufferSerializeChange(rb, txn, fd, change);
2312  dlist_delete(&change->node);
2313  ReorderBufferReturnChange(rb, change);
2314 
2315  spilled++;
2316  }
2317 
2318  Assert(spilled == txn->nentries_mem);
2319  Assert(dlist_is_empty(&txn->changes));
2320  txn->nentries_mem = 0;
2321  txn->serialized = true;
2322 
2323  if (fd != -1)
2324  CloseTransientFile(fd);
2325 }
2326 
2327 /*
2328  * Serialize individual change to disk.
2329  */
2330 static void
2332  int fd, ReorderBufferChange *change)
2333 {
2334  ReorderBufferDiskChange *ondisk;
2335  Size sz = sizeof(ReorderBufferDiskChange);
2336 
2338 
2339  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2340  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
2341 
2342  switch (change->action)
2343  {
2344  /* fall through these, they're all similar enough */
2349  {
2350  char *data;
2351  ReorderBufferTupleBuf *oldtup,
2352  *newtup;
2353  Size oldlen = 0;
2354  Size newlen = 0;
2355 
2356  oldtup = change->data.tp.oldtuple;
2357  newtup = change->data.tp.newtuple;
2358 
2359  if (oldtup)
2360  {
2361  sz += sizeof(HeapTupleData);
2362  oldlen = oldtup->tuple.t_len;
2363  sz += oldlen;
2364  }
2365 
2366  if (newtup)
2367  {
2368  sz += sizeof(HeapTupleData);
2369  newlen = newtup->tuple.t_len;
2370  sz += newlen;
2371  }
2372 
2373  /* make sure we have enough space */
2375 
2376  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2377  /* might have been reallocated above */
2378  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2379 
2380  if (oldlen)
2381  {
2382  memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
2383  data += sizeof(HeapTupleData);
2384 
2385  memcpy(data, oldtup->tuple.t_data, oldlen);
2386  data += oldlen;
2387  }
2388 
2389  if (newlen)
2390  {
2391  memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
2392  data += sizeof(HeapTupleData);
2393 
2394  memcpy(data, newtup->tuple.t_data, newlen);
2395  data += newlen;
2396  }
2397  break;
2398  }
2400  {
2401  char *data;
2402  Size prefix_size = strlen(change->data.msg.prefix) + 1;
2403 
2404  sz += prefix_size + change->data.msg.message_size +
2405  sizeof(Size) + sizeof(Size);
2407 
2408  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2409 
2410  /* might have been reallocated above */
2411  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2412 
2413  /* write the prefix including the size */
2414  memcpy(data, &prefix_size, sizeof(Size));
2415  data += sizeof(Size);
2416  memcpy(data, change->data.msg.prefix,
2417  prefix_size);
2418  data += prefix_size;
2419 
2420  /* write the message including the size */
2421  memcpy(data, &change->data.msg.message_size, sizeof(Size));
2422  data += sizeof(Size);
2423  memcpy(data, change->data.msg.message,
2424  change->data.msg.message_size);
2425  data += change->data.msg.message_size;
2426 
2427  break;
2428  }
2430  {
2431  Snapshot snap;
2432  char *data;
2433 
2434  snap = change->data.snapshot;
2435 
2436  sz += sizeof(SnapshotData) +
2437  sizeof(TransactionId) * snap->xcnt +
2438  sizeof(TransactionId) * snap->subxcnt
2439  ;
2440 
2441  /* make sure we have enough space */
2443  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2444  /* might have been reallocated above */
2445  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2446 
2447  memcpy(data, snap, sizeof(SnapshotData));
2448  data += sizeof(SnapshotData);
2449 
2450  if (snap->xcnt)
2451  {
2452  memcpy(data, snap->xip,
2453  sizeof(TransactionId) * snap->xcnt);
2454  data += sizeof(TransactionId) * snap->xcnt;
2455  }
2456 
2457  if (snap->subxcnt)
2458  {
2459  memcpy(data, snap->subxip,
2460  sizeof(TransactionId) * snap->subxcnt);
2461  data += sizeof(TransactionId) * snap->subxcnt;
2462  }
2463  break;
2464  }
2466  {
2467  Size size;
2468  char *data;
2469 
2470  /* account for the OIDs of truncated relations */
2471  size = sizeof(Oid) * change->data.truncate.nrelids;
2472  sz += size;
2473 
2474  /* make sure we have enough space */
2476 
2477  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2478  /* might have been reallocated above */
2479  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2480 
2481  memcpy(data, change->data.truncate.relids, size);
2482  data += size;
2483 
2484  break;
2485  }
2489  /* ReorderBufferChange contains everything important */
2490  break;
2491  }
2492 
2493  ondisk->size = sz;
2494 
2495  errno = 0;
2497  if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
2498  {
2499  int save_errno = errno;
2500 
2501  CloseTransientFile(fd);
2502 
2503  /* if write didn't set errno, assume problem is no disk space */
2504  errno = save_errno ? save_errno : ENOSPC;
2505  ereport(ERROR,
2507  errmsg("could not write to data file for XID %u: %m",
2508  txn->xid)));
2509  }
2511 
2512  Assert(ondisk->change.action == change->action);
2513 }
2514 
2515 /*
2516  * Restore a number of changes spilled to disk back into memory.
2517  */
2518 static Size
2520  int *fd, XLogSegNo *segno)
2521 {
2522  Size restored = 0;
2523  XLogSegNo last_segno;
2524  dlist_mutable_iter cleanup_iter;
2525 
2528 
2529  /* free current entries, so we have memory for more */
2530  dlist_foreach_modify(cleanup_iter, &txn->changes)
2531  {
2533  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
2534 
2535  dlist_delete(&cleanup->node);
2536  ReorderBufferReturnChange(rb, cleanup);
2537  }
2538  txn->nentries_mem = 0;
2539  Assert(dlist_is_empty(&txn->changes));
2540 
2541  XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
2542 
2543  while (restored < max_changes_in_memory && *segno <= last_segno)
2544  {
2545  int readBytes;
2546  ReorderBufferDiskChange *ondisk;
2547 
2548  if (*fd == -1)
2549  {
2550  char path[MAXPGPATH];
2551 
2552  /* first time in */
2553  if (*segno == 0)
2554  XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
2555 
2556  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
2557 
2558  /*
2559  * No need to care about TLIs here, only used during a single run,
2560  * so each LSN only maps to a specific WAL record.
2561  */
2563  *segno);
2564 
2565  *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
2566  if (*fd < 0 && errno == ENOENT)
2567  {
2568  *fd = -1;
2569  (*segno)++;
2570  continue;
2571  }
2572  else if (*fd < 0)
2573  ereport(ERROR,
2575  errmsg("could not open file \"%s\": %m",
2576  path)));
2577  }
2578 
2579  /*
2580  * Read the statically sized part of a change which has information
2581  * about the total size. If we couldn't read a record, we're at the
2582  * end of this file.
2583  */
2586  readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
2588 
2589  /* eof */
2590  if (readBytes == 0)
2591  {
2592  CloseTransientFile(*fd);
2593  *fd = -1;
2594  (*segno)++;
2595  continue;
2596  }
2597  else if (readBytes < 0)
2598  ereport(ERROR,
2600  errmsg("could not read from reorderbuffer spill file: %m")));
2601  else if (readBytes != sizeof(ReorderBufferDiskChange))
2602  ereport(ERROR,
2604  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2605  readBytes,
2606  (uint32) sizeof(ReorderBufferDiskChange))));
2607 
2608  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2609 
2611  sizeof(ReorderBufferDiskChange) + ondisk->size);
2612  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2613 
2615  readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
2616  ondisk->size - sizeof(ReorderBufferDiskChange));
2618 
2619  if (readBytes < 0)
2620  ereport(ERROR,
2622  errmsg("could not read from reorderbuffer spill file: %m")));
2623  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
2624  ereport(ERROR,
2626  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2627  readBytes,
2628  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
2629 
2630  /*
2631  * ok, read a full change from disk, now restore it into proper
2632  * in-memory format
2633  */
2634  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
2635  restored++;
2636  }
2637 
2638  return restored;
2639 }
2640 
2641 /*
2642  * Convert change from its on-disk format to in-memory format and queue it onto
2643  * the TXN's ->changes list.
2644  *
2645  * Note: although "data" is declared char*, at entry it points to a
2646  * maxalign'd buffer, making it safe in most of this function to assume
2647  * that the pointed-to data is suitably aligned for direct access.
2648  */
2649 static void
2651  char *data)
2652 {
2653  ReorderBufferDiskChange *ondisk;
2654  ReorderBufferChange *change;
2655 
2656  ondisk = (ReorderBufferDiskChange *) data;
2657 
2658  change = ReorderBufferGetChange(rb);
2659 
2660  /* copy static part */
2661  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
2662 
2663  data += sizeof(ReorderBufferDiskChange);
2664 
2665  /* restore individual stuff */
2666  switch (change->action)
2667  {
2668  /* fall through these, they're all similar enough */
2673  if (change->data.tp.oldtuple)
2674  {
2675  uint32 tuplelen = ((HeapTuple) data)->t_len;
2676 
2677  change->data.tp.oldtuple =
2679 
2680  /* restore ->tuple */
2681  memcpy(&change->data.tp.oldtuple->tuple, data,
2682  sizeof(HeapTupleData));
2683  data += sizeof(HeapTupleData);
2684 
2685  /* reset t_data pointer into the new tuplebuf */
2686  change->data.tp.oldtuple->tuple.t_data =
2687  ReorderBufferTupleBufData(change->data.tp.oldtuple);
2688 
2689  /* restore tuple data itself */
2690  memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
2691  data += tuplelen;
2692  }
2693 
2694  if (change->data.tp.newtuple)
2695  {
2696  /* here, data might not be suitably aligned! */
2697  uint32 tuplelen;
2698 
2699  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
2700  sizeof(uint32));
2701 
2702  change->data.tp.newtuple =
2704 
2705  /* restore ->tuple */
2706  memcpy(&change->data.tp.newtuple->tuple, data,
2707  sizeof(HeapTupleData));
2708  data += sizeof(HeapTupleData);
2709 
2710  /* reset t_data pointer into the new tuplebuf */
2711  change->data.tp.newtuple->tuple.t_data =
2712  ReorderBufferTupleBufData(change->data.tp.newtuple);
2713 
2714  /* restore tuple data itself */
2715  memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
2716  data += tuplelen;
2717  }
2718 
2719  break;
2721  {
2722  Size prefix_size;
2723 
2724  /* read prefix */
2725  memcpy(&prefix_size, data, sizeof(Size));
2726  data += sizeof(Size);
2727  change->data.msg.prefix = MemoryContextAlloc(rb->context,
2728  prefix_size);
2729  memcpy(change->data.msg.prefix, data, prefix_size);
2730  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
2731  data += prefix_size;
2732 
2733  /* read the message */
2734  memcpy(&change->data.msg.message_size, data, sizeof(Size));
2735  data += sizeof(Size);
2736  change->data.msg.message = MemoryContextAlloc(rb->context,
2737  change->data.msg.message_size);
2738  memcpy(change->data.msg.message, data,
2739  change->data.msg.message_size);
2740  data += change->data.msg.message_size;
2741 
2742  break;
2743  }
2745  {
2746  Snapshot oldsnap;
2747  Snapshot newsnap;
2748  Size size;
2749 
2750  oldsnap = (Snapshot) data;
2751 
2752  size = sizeof(SnapshotData) +
2753  sizeof(TransactionId) * oldsnap->xcnt +
2754  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
2755 
2756  change->data.snapshot = MemoryContextAllocZero(rb->context, size);
2757 
2758  newsnap = change->data.snapshot;
2759 
2760  memcpy(newsnap, data, size);
2761  newsnap->xip = (TransactionId *)
2762  (((char *) newsnap) + sizeof(SnapshotData));
2763  newsnap->subxip = newsnap->xip + newsnap->xcnt;
2764  newsnap->copied = true;
2765  break;
2766  }
2767  /* the base struct contains all the data, easy peasy */
2769  {
2770  Oid *relids;
2771 
2772  relids = ReorderBufferGetRelids(rb,
2773  change->data.truncate.nrelids);
2774  memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
2775  change->data.truncate.relids = relids;
2776 
2777  break;
2778  }
2782  break;
2783  }
2784 
2785  dlist_push_tail(&txn->changes, &change->node);
2786  txn->nentries_mem++;
2787 }
2788 
2789 /*
2790  * Remove all on-disk stored for the passed in transaction.
2791  */
2792 static void
2794 {
2795  XLogSegNo first;
2796  XLogSegNo cur;
2797  XLogSegNo last;
2798 
2801 
2802  XLByteToSeg(txn->first_lsn, first, wal_segment_size);
2803  XLByteToSeg(txn->final_lsn, last, wal_segment_size);
2804 
2805  /* iterate over all possible filenames, and delete them */
2806  for (cur = first; cur <= last; cur++)
2807  {
2808  char path[MAXPGPATH];
2809 
2811  if (unlink(path) != 0 && errno != ENOENT)
2812  ereport(ERROR,
2814  errmsg("could not remove file \"%s\": %m", path)));
2815  }
2816 }
2817 
2818 /*
2819  * Remove any leftover serialized reorder buffers from a slot directory after a
2820  * prior crash or decoding session exit.
2821  */
2822 static void
2824 {
2825  DIR *spill_dir;
2826  struct dirent *spill_de;
2827  struct stat statbuf;
2828  char path[MAXPGPATH * 2 + 12];
2829 
2830  sprintf(path, "pg_replslot/%s", slotname);
2831 
2832  /* we're only handling directories here, skip if it's not ours */
2833  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2834  return;
2835 
2836  spill_dir = AllocateDir(path);
2837  while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
2838  {
2839  /* only look at names that can be ours */
2840  if (strncmp(spill_de->d_name, "xid", 3) == 0)
2841  {
2842  snprintf(path, sizeof(path),
2843  "pg_replslot/%s/%s", slotname,
2844  spill_de->d_name);
2845 
2846  if (unlink(path) != 0)
2847  ereport(ERROR,
2849  errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m",
2850  path, slotname)));
2851  }
2852  }
2853  FreeDir(spill_dir);
2854 }
2855 
2856 /*
2857  * Given a replication slot, transaction ID and segment number, fill in the
2858  * corresponding spill file into 'path', which is a caller-owned buffer of size
2859  * at least MAXPGPATH.
2860  */
2861 static void
2863  XLogSegNo segno)
2864 {
2865  XLogRecPtr recptr;
2866 
2867  XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
2868 
2869  snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.spill",
2871  xid,
2872  (uint32) (recptr >> 32), (uint32) recptr);
2873 }
2874 
2875 /*
2876  * Delete all data spilled to disk after we've restarted/crashed. It will be
2877  * recreated when the respective slots are reused.
2878  */
2879 void
2881 {
2882  DIR *logical_dir;
2883  struct dirent *logical_de;
2884 
2885  logical_dir = AllocateDir("pg_replslot");
2886  while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
2887  {
2888  if (strcmp(logical_de->d_name, ".") == 0 ||
2889  strcmp(logical_de->d_name, "..") == 0)
2890  continue;
2891 
2892  /* if it cannot be a slot, skip the directory */
2893  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
2894  continue;
2895 
2896  /*
2897  * ok, has to be a surviving logical slot, iterate and delete
2898  * everything starting with xid-*
2899  */
2901  }
2902  FreeDir(logical_dir);
2903 }
2904 
2905 /* ---------------------------------------
2906  * toast reassembly support
2907  * ---------------------------------------
2908  */
2909 
2910 /*
2911  * Initialize per tuple toast reconstruction support.
2912  */
2913 static void
2915 {
2916  HASHCTL hash_ctl;
2917 
2918  Assert(txn->toast_hash == NULL);
2919 
2920  memset(&hash_ctl, 0, sizeof(hash_ctl));
2921  hash_ctl.keysize = sizeof(Oid);
2922  hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
2923  hash_ctl.hcxt = rb->context;
2924  txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
2926 }
2927 
2928 /*
2929  * Per toast-chunk handling for toast reconstruction
2930  *
2931  * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
2932  * toasted Datum comes along.
2933  */
2934 static void
2936  Relation relation, ReorderBufferChange *change)
2937 {
2938  ReorderBufferToastEnt *ent;
2939  ReorderBufferTupleBuf *newtup;
2940  bool found;
2941  int32 chunksize;
2942  bool isnull;
2943  Pointer chunk;
2944  TupleDesc desc = RelationGetDescr(relation);
2945  Oid chunk_id;
2946  int32 chunk_seq;
2947 
2948  if (txn->toast_hash == NULL)
2949  ReorderBufferToastInitHash(rb, txn);
2950 
2951  Assert(IsToastRelation(relation));
2952 
2953  newtup = change->data.tp.newtuple;
2954  chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
2955  Assert(!isnull);
2956  chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
2957  Assert(!isnull);
2958 
2959  ent = (ReorderBufferToastEnt *)
2960  hash_search(txn->toast_hash,
2961  (void *) &chunk_id,
2962  HASH_ENTER,
2963  &found);
2964 
2965  if (!found)
2966  {
2967  Assert(ent->chunk_id == chunk_id);
2968  ent->num_chunks = 0;
2969  ent->last_chunk_seq = 0;
2970  ent->size = 0;
2971  ent->reconstructed = NULL;
2972  dlist_init(&ent->chunks);
2973 
2974  if (chunk_seq != 0)
2975  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
2976  chunk_seq, chunk_id);
2977  }
2978  else if (found && chunk_seq != ent->last_chunk_seq + 1)
2979  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
2980  chunk_seq, chunk_id, ent->last_chunk_seq + 1);
2981 
2982  chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
2983  Assert(!isnull);
2984 
2985  /* calculate size so we can allocate the right size at once later */
2986  if (!VARATT_IS_EXTENDED(chunk))
2987  chunksize = VARSIZE(chunk) - VARHDRSZ;
2988  else if (VARATT_IS_SHORT(chunk))
2989  /* could happen due to heap_form_tuple doing its thing */
2990  chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
2991  else
2992  elog(ERROR, "unexpected type of toast chunk");
2993 
2994  ent->size += chunksize;
2995  ent->last_chunk_seq = chunk_seq;
2996  ent->num_chunks++;
2997  dlist_push_tail(&ent->chunks, &change->node);
2998 }
2999 
3000 /*
3001  * Rejigger change->newtuple to point to in-memory toast tuples instead to
3002  * on-disk toast tuples that may not longer exist (think DROP TABLE or VACUUM).
3003  *
3004  * We cannot replace unchanged toast tuples though, so those will still point
3005  * to on-disk toast data.
3006  */
3007 static void
3009  Relation relation, ReorderBufferChange *change)
3010 {
3011  TupleDesc desc;
3012  int natt;
3013  Datum *attrs;
3014  bool *isnull;
3015  bool *free;
3016  HeapTuple tmphtup;
3017  Relation toast_rel;
3018  TupleDesc toast_desc;
3019  MemoryContext oldcontext;
3020  ReorderBufferTupleBuf *newtup;
3021 
3022  /* no toast tuples changed */
3023  if (txn->toast_hash == NULL)
3024  return;
3025 
3026  oldcontext = MemoryContextSwitchTo(rb->context);
3027 
3028  /* we should only have toast tuples in an INSERT or UPDATE */
3029  Assert(change->data.tp.newtuple);
3030 
3031  desc = RelationGetDescr(relation);
3032 
3033  toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
3034  toast_desc = RelationGetDescr(toast_rel);
3035 
3036  /* should we allocate from stack instead? */
3037  attrs = palloc0(sizeof(Datum) * desc->natts);
3038  isnull = palloc0(sizeof(bool) * desc->natts);
3039  free = palloc0(sizeof(bool) * desc->natts);
3040 
3041  newtup = change->data.tp.newtuple;
3042 
3043  heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
3044 
3045  for (natt = 0; natt < desc->natts; natt++)
3046  {
3047  Form_pg_attribute attr = TupleDescAttr(desc, natt);
3048  ReorderBufferToastEnt *ent;
3049  struct varlena *varlena;
3050 
3051  /* va_rawsize is the size of the original datum -- including header */
3052  struct varatt_external toast_pointer;
3053  struct varatt_indirect redirect_pointer;
3054  struct varlena *new_datum = NULL;
3055  struct varlena *reconstructed;
3056  dlist_iter it;
3057  Size data_done = 0;
3058 
3059  /* system columns aren't toasted */
3060  if (attr->attnum < 0)
3061  continue;
3062 
3063  if (attr->attisdropped)
3064  continue;
3065 
3066  /* not a varlena datatype */
3067  if (attr->attlen != -1)
3068  continue;
3069 
3070  /* no data */
3071  if (isnull[natt])
3072  continue;
3073 
3074  /* ok, we know we have a toast datum */
3075  varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
3076 
3077  /* no need to do anything if the tuple isn't external */
3078  if (!VARATT_IS_EXTERNAL(varlena))
3079  continue;
3080 
3081  VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
3082 
3083  /*
3084  * Check whether the toast tuple changed, replace if so.
3085  */
3086  ent = (ReorderBufferToastEnt *)
3087  hash_search(txn->toast_hash,
3088  (void *) &toast_pointer.va_valueid,
3089  HASH_FIND,
3090  NULL);
3091  if (ent == NULL)
3092  continue;
3093 
3094  new_datum =
3095  (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
3096 
3097  free[natt] = true;
3098 
3099  reconstructed = palloc0(toast_pointer.va_rawsize);
3100 
3101  ent->reconstructed = reconstructed;
3102 
3103  /* stitch toast tuple back together from its parts */
3104  dlist_foreach(it, &ent->chunks)
3105  {
3106  bool isnull;
3107  ReorderBufferChange *cchange;
3108  ReorderBufferTupleBuf *ctup;
3109  Pointer chunk;
3110 
3111  cchange = dlist_container(ReorderBufferChange, node, it.cur);
3112  ctup = cchange->data.tp.newtuple;
3113  chunk = DatumGetPointer(
3114  fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
3115 
3116  Assert(!isnull);
3117  Assert(!VARATT_IS_EXTERNAL(chunk));
3118  Assert(!VARATT_IS_SHORT(chunk));
3119 
3120  memcpy(VARDATA(reconstructed) + data_done,
3121  VARDATA(chunk),
3122  VARSIZE(chunk) - VARHDRSZ);
3123  data_done += VARSIZE(chunk) - VARHDRSZ;
3124  }
3125  Assert(data_done == toast_pointer.va_extsize);
3126 
3127  /* make sure its marked as compressed or not */
3128  if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
3129  SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
3130  else
3131  SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
3132 
3133  memset(&redirect_pointer, 0, sizeof(redirect_pointer));
3134  redirect_pointer.pointer = reconstructed;
3135 
3137  memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
3138  sizeof(redirect_pointer));
3139 
3140  attrs[natt] = PointerGetDatum(new_datum);
3141  }
3142 
3143  /*
3144  * Build tuple in separate memory & copy tuple back into the tuplebuf
3145  * passed to the output plugin. We can't directly heap_fill_tuple() into
3146  * the tuplebuf because attrs[] will point back into the current content.
3147  */
3148  tmphtup = heap_form_tuple(desc, attrs, isnull);
3149  Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
3150  Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
3151 
3152  memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
3153  newtup->tuple.t_len = tmphtup->t_len;
3154 
3155  /*
3156  * free resources we won't further need, more persistent stuff will be
3157  * free'd in ReorderBufferToastReset().
3158  */
3159  RelationClose(toast_rel);
3160  pfree(tmphtup);
3161  for (natt = 0; natt < desc->natts; natt++)
3162  {
3163  if (free[natt])
3164  pfree(DatumGetPointer(attrs[natt]));
3165  }
3166  pfree(attrs);
3167  pfree(free);
3168  pfree(isnull);
3169 
3170  MemoryContextSwitchTo(oldcontext);
3171 }
3172 
3173 /*
3174  * Free all resources allocated for toast reconstruction.
3175  */
3176 static void
3178 {
3179  HASH_SEQ_STATUS hstat;
3180  ReorderBufferToastEnt *ent;
3181 
3182  if (txn->toast_hash == NULL)
3183  return;
3184 
3185  /* sequentially walk over the hash and free everything */
3186  hash_seq_init(&hstat, txn->toast_hash);
3187  while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
3188  {
3189  dlist_mutable_iter it;
3190 
3191  if (ent->reconstructed != NULL)
3192  pfree(ent->reconstructed);
3193 
3194  dlist_foreach_modify(it, &ent->chunks)
3195  {
3196  ReorderBufferChange *change =
3198 
3199  dlist_delete(&change->node);
3200  ReorderBufferReturnChange(rb, change);
3201  }
3202  }
3203 
3204  hash_destroy(txn->toast_hash);
3205  txn->toast_hash = NULL;
3206 }
3207 
3208 
3209 /* ---------------------------------------
3210  * Visibility support for logical decoding
3211  *
3212  *
3213  * Lookup actual cmin/cmax values when using decoding snapshot. We can't
3214  * always rely on stored cmin/cmax values because of two scenarios:
3215  *
3216  * * A tuple got changed multiple times during a single transaction and thus
3217  * has got a combocid. Combocid's are only valid for the duration of a
3218  * single transaction.
3219  * * A tuple with a cmin but no cmax (and thus no combocid) got
3220  * deleted/updated in another transaction than the one which created it
3221  * which we are looking at right now. As only one of cmin, cmax or combocid
3222  * is actually stored in the heap we don't have access to the value we
3223  * need anymore.
3224  *
3225  * To resolve those problems we have a per-transaction hash of (cmin,
3226  * cmax) tuples keyed by (relfilenode, ctid) which contains the actual
3227  * (cmin, cmax) values. That also takes care of combocids by simply
3228  * not caring about them at all. As we have the real cmin/cmax values
3229  * combocids aren't interesting.
3230  *
3231  * As we only care about catalog tuples here the overhead of this
3232  * hashtable should be acceptable.
3233  *
3234  * Heap rewrites complicate this a bit, check rewriteheap.c for
3235  * details.
3236  * -------------------------------------------------------------------------
3237  */
3238 
3239 /* struct for sorting mapping files by LSN efficiently */
3240 typedef struct RewriteMappingFile
3241 {
3243  char fname[MAXPGPATH];
3245 
3246 #if NOT_USED
3247 static void
3248 DisplayMapping(HTAB *tuplecid_data)
3249 {
3250  HASH_SEQ_STATUS hstat;
3252 
3253  hash_seq_init(&hstat, tuplecid_data);
3254  while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
3255  {
3256  elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
3257  ent->key.relnode.dbNode,
3258  ent->key.relnode.spcNode,
3259  ent->key.relnode.relNode,
3262  ent->cmin,
3263  ent->cmax
3264  );
3265  }
3266 }
3267 #endif
3268 
3269 /*
3270  * Apply a single mapping file to tuplecid_data.
3271  *
3272  * The mapping file has to have been verified to be a) committed b) for our
3273  * transaction c) applied in LSN order.
3274  */
3275 static void
3276 ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
3277 {
3278  char path[MAXPGPATH];
3279  int fd;
3280  int readBytes;
3282 
3283  sprintf(path, "pg_logical/mappings/%s", fname);
3284  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
3285  if (fd < 0)
3286  ereport(ERROR,
3288  errmsg("could not open file \"%s\": %m", path)));
3289 
3290  while (true)
3291  {
3294  ReorderBufferTupleCidEnt *new_ent;
3295  bool found;
3296 
3297  /* be careful about padding */
3298  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
3299 
3300  /* read all mappings till the end of the file */
3302  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
3304 
3305  if (readBytes < 0)
3306  ereport(ERROR,
3308  errmsg("could not read file \"%s\": %m",
3309  path)));
3310  else if (readBytes == 0) /* EOF */
3311  break;
3312  else if (readBytes != sizeof(LogicalRewriteMappingData))
3313  ereport(ERROR,
3315  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
3316  path, readBytes,
3317  (int32) sizeof(LogicalRewriteMappingData))));
3318 
3319  key.relnode = map.old_node;
3320  ItemPointerCopy(&map.old_tid,
3321  &key.tid);
3322 
3323 
3324  ent = (ReorderBufferTupleCidEnt *)
3325  hash_search(tuplecid_data,
3326  (void *) &key,
3327  HASH_FIND,
3328  NULL);
3329 
3330  /* no existing mapping, no need to update */
3331  if (!ent)
3332  continue;
3333 
3334  key.relnode = map.new_node;
3335  ItemPointerCopy(&map.new_tid,
3336  &key.tid);
3337 
3338  new_ent = (ReorderBufferTupleCidEnt *)
3339  hash_search(tuplecid_data,
3340  (void *) &key,
3341  HASH_ENTER,
3342  &found);
3343 
3344  if (found)
3345  {
3346  /*
3347  * Make sure the existing mapping makes sense. We sometime update
3348  * old records that did not yet have a cmax (e.g. pg_class' own
3349  * entry while rewriting it) during rewrites, so allow that.
3350  */
3351  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
3352  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
3353  }
3354  else
3355  {
3356  /* update mapping */
3357  new_ent->cmin = ent->cmin;
3358  new_ent->cmax = ent->cmax;
3359  new_ent->combocid = ent->combocid;
3360  }
3361  }
3362 
3363  if (CloseTransientFile(fd) != 0)
3364  ereport(ERROR,
3366  errmsg("could not close file \"%s\": %m", path)));
3367 }
3368 
3369 
3370 /*
3371  * Check whether the TransactionOid 'xid' is in the pre-sorted array 'xip'.
3372  */
3373 static bool
3375 {
3376  return bsearch(&xid, xip, num,
3377  sizeof(TransactionId), xidComparator) != NULL;
3378 }
3379 
3380 /*
3381  * list_sort() comparator for sorting RewriteMappingFiles in LSN order.
3382  */
3383 static int
3384 file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
3385 {
3388 
3389  if (a->lsn < b->lsn)
3390  return -1;
3391  else if (a->lsn > b->lsn)
3392  return 1;
3393  return 0;
3394 }
3395 
3396 /*
3397  * Apply any existing logical remapping files if there are any targeted at our
3398  * transaction for relid.
3399  */
3400 static void
3402 {
3403  DIR *mapping_dir;
3404  struct dirent *mapping_de;
3405  List *files = NIL;
3406  ListCell *file;
3407  Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
3408 
3409  mapping_dir = AllocateDir("pg_logical/mappings");
3410  while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
3411  {
3412  Oid f_dboid;
3413  Oid f_relid;
3414  TransactionId f_mapped_xid;
3415  TransactionId f_create_xid;
3416  XLogRecPtr f_lsn;
3417  uint32 f_hi,
3418  f_lo;
3419  RewriteMappingFile *f;
3420 
3421  if (strcmp(mapping_de->d_name, ".") == 0 ||
3422  strcmp(mapping_de->d_name, "..") == 0)
3423  continue;
3424 
3425  /* Ignore files that aren't ours */
3426  if (strncmp(mapping_de->d_name, "map-", 4) != 0)
3427  continue;
3428 
3429  if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
3430  &f_dboid, &f_relid, &f_hi, &f_lo,
3431  &f_mapped_xid, &f_create_xid) != 6)
3432  elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
3433 
3434  f_lsn = ((uint64) f_hi) << 32 | f_lo;
3435 
3436  /* mapping for another database */
3437  if (f_dboid != dboid)
3438  continue;
3439 
3440  /* mapping for another relation */
3441  if (f_relid != relid)
3442  continue;
3443 
3444  /* did the creating transaction abort? */
3445  if (!TransactionIdDidCommit(f_create_xid))
3446  continue;
3447 
3448  /* not for our transaction */
3449  if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
3450  continue;
3451 
3452  /* ok, relevant, queue for apply */
3453  f = palloc(sizeof(RewriteMappingFile));
3454  f->lsn = f_lsn;
3455  strcpy(f->fname, mapping_de->d_name);
3456  files = lappend(files, f);
3457  }
3458  FreeDir(mapping_dir);
3459 
3460  /* sort files so we apply them in LSN order */
3461  list_sort(files, file_sort_by_lsn);
3462 
3463  foreach(file, files)
3464  {
3466 
3467  elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
3468  snapshot->subxip[0]);
3469  ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
3470  pfree(f);
3471  }
3472 }
3473 
3474 /*
3475  * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
3476  * combocids.
3477  */
3478 bool
3480  Snapshot snapshot,
3481  HeapTuple htup, Buffer buffer,
3482  CommandId *cmin, CommandId *cmax)
3483 {
3486  ForkNumber forkno;
3487  BlockNumber blockno;
3488  bool updated_mapping = false;
3489 
3490  /* be careful about padding */
3491  memset(&key, 0, sizeof(key));
3492 
3493  Assert(!BufferIsLocal(buffer));
3494 
3495  /*
3496  * get relfilenode from the buffer, no convenient way to access it other
3497  * than that.
3498  */
3499  BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
3500 
3501  /* tuples can only be in the main fork */
3502  Assert(forkno == MAIN_FORKNUM);
3503  Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
3504 
3505  ItemPointerCopy(&htup->t_self,
3506  &key.tid);
3507 
3508 restart:
3509  ent = (ReorderBufferTupleCidEnt *)
3510  hash_search(tuplecid_data,
3511  (void *) &key,
3512  HASH_FIND,
3513  NULL);
3514 
3515  /*
3516  * failed to find a mapping, check whether the table was rewritten and
3517  * apply mapping if so, but only do that once - there can be no new
3518  * mappings while we are in here since we have to hold a lock on the
3519  * relation.
3520  */
3521  if (ent == NULL && !updated_mapping)
3522  {
3523  UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
3524  /* now check but don't update for a mapping again */
3525  updated_mapping = true;
3526  goto restart;
3527  }
3528  else if (ent == NULL)
3529  return false;
3530 
3531  if (cmin)
3532  *cmin = ent->cmin;
3533  if (cmax)
3534  *cmax = ent->cmax;
3535  return true;
3536 }
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
XLogRecPtr first_lsn
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
#define NIL
Definition: pg_list.h:65
uint32 CommandId
Definition: c.h:521
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
TimestampTz commit_time
struct ReorderBufferToastEnt ReorderBufferToastEnt
void AbortCurrentTransaction(void)
Definition: xact.c:3159
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
#define SizeofHeapTupleHeader
Definition: htup_details.h:184
bool IsToastRelation(Relation relation)
Definition: catalog.c:142
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:814
void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
#define relpathperm(rnode, forknum)
Definition: relpath.h:83
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
ReorderBufferApplyChangeCB apply_change
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
#define AllocSetContextCreate
Definition: memutils.h:169
HeapTupleData * HeapTuple
Definition: htup.h:71
dlist_node base_snapshot_node
#define DEBUG1
Definition: elog.h:25
dlist_node * cur
Definition: ilist.h:180
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
void StartupReorderBuffer(void)
#define VARDATA(PTR)
Definition: postgres.h:302
#define fastgetattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:712
static int32 next
Definition: blutils.c:213
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
int wal_segment_size
Definition: xlog.c:112
void ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
uint32 TransactionId
Definition: c.h:507
void ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
bool copied
Definition: snapshot.h:185
void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
MemoryContext hcxt
Definition: hsearch.h:78
#define DatumGetInt32(X)
Definition: postgres.h:472
#define RelationGetDescr(relation)
Definition: rel.h:442
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
#define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)
Definition: tuptoaster.h:111
#define DEBUG3
Definition: elog.h:23
struct ReorderBufferChange::@101::@102 tp
#define write(a, b, c)
Definition: win32.h:14
#define VARHDRSZ_SHORT
Definition: postgres.h:268
TransactionId by_txn_last_xid
#define VARSIZE(PTR)
Definition: postgres.h:303
int64 TimestampTz
Definition: timestamp.h:39
#define PointerGetDatum(X)
Definition: postgres.h:556
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
ReorderBufferTXN * txn
static bool TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
#define VARHDRSZ
Definition: c.h:555
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
struct ReorderBufferChange::@101::@104 msg
XLogRecPtr current_restart_decoding_lsn
#define DatumGetObjectId(X)
Definition: postgres.h:500
char * pstrdup(const char *in)
Definition: mcxt.c:1161
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2549
static ReorderBufferTXN * ReorderBufferGetTXN(ReorderBuffer *rb)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void ReorderBufferFree(ReorderBuffer *rb)
uint16 RepOriginId
Definition: xlogdefs.h:58
Size entrysize
Definition: hsearch.h:73
struct cursor * cur
Definition: ecpg.c:28
char fname[MAXPGPATH]
int32 va_rawsize
Definition: postgres.h:69
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4627
#define INFO
Definition: elog.h:33
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:188
void binaryheap_replace_first(binaryheap *heap, Datum d)
Definition: binaryheap.c:204
void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileNode node, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
uint32 BlockNumber
Definition: block.h:31
struct ReorderBufferChange::@101::@105 tuplecid
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2051
ReorderBufferCommitCB commit
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:588
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
static int64 files
Definition: pg_checksums.c:34
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:125
ReplicationSlotPersistentData data
Definition: slot.h:132
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, int *fd, XLogSegNo *segno)
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
struct SnapshotData * Snapshot
Definition: snapshot.h:121
Form_pg_class rd_rel
Definition: rel.h:83
unsigned int Oid
Definition: postgres_ext.h:31
XLogRecPtr base_snapshot_lsn
Definition: dirent.h:9
uint32 regd_count
Definition: snapshot.h:199
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:78
void binaryheap_add_unordered(binaryheap *heap, Datum d)
Definition: binaryheap.c:110
MemoryContext change_context
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define VARDATA_EXTERNAL(PTR)
Definition: postgres.h:310
#define PG_BINARY
Definition: c.h:1191
XLogRecPtr origin_lsn
static void ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
signed int int32
Definition: c.h:346
#define FirstCommandId
Definition: c.h:523
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
HeapTupleHeader t_data
Definition: htup.h:68
#define VARATT_IS_EXTERNAL(PTR)
Definition: postgres.h:313
#define sprintf
Definition: port.h:194
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:174
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:421
Definition: dynahash.c:208
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:36
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:1031
char * Pointer
Definition: c.h:335
#define dlist_tail_element(type, membername, lhead)
Definition: ilist.h:496
Definition: dirent.c:25
#define ERROR
Definition: elog.h:43
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2257
#define SLAB_LARGE_BLOCK_SIZE
Definition: memutils.h:221
#define VARATT_IS_SHORT(PTR)
Definition: postgres.h:326
dlist_head changes
void ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)
dlist_head txns_by_base_snapshot_lsn
Datum binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:159
#define MAXPGPATH
ItemPointerData t_self
Definition: htup.h:65
ReorderBufferTupleCidKey key
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:191
#define DEBUG2
Definition: elog.h:24
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:423
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
uint32 t_len
Definition: htup.h:64
#define MaxHeapTupleSize
Definition: htup_details.h:560
struct varlena * reconstructed
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4438
#define SET_VARTAG_EXTERNAL(PTR, tag)
Definition: postgres.h:333
uint64 XLogSegNo
Definition: xlogdefs.h:41
int errcode_for_file_access(void)
Definition: elog.c:593
HeapTupleData tuple
Definition: reorderbuffer.h:27
struct SnapshotData SnapshotData
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:440
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
#define InvalidTransactionId
Definition: transam.h:31
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:200
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
unsigned int uint32
Definition: c.h:358
XLogRecPtr final_lsn
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2468
Oid t_tableOid
Definition: htup.h:66
static int file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1342
void RelationClose(Relation relation)
Definition: relcache.c:2089
TransactionId xmin
Definition: snapshot.h:157
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
ReorderBufferMessageCB message
#define ereport(elevel, rest)
Definition: elog.h:141
#define AssertArg(condition)
Definition: c.h:734
int bh_size
Definition: binaryheap.h:32
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
TransactionId * xip
Definition: snapshot.h:168
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
#define VARSIZE_SHORT(PTR)
Definition: postgres.h:305
ForkNumber
Definition: relpath.h:40
List * lappend(List *list, void *datum)
Definition: list.c:321
static HTAB * tuplecid_data
Definition: snapmgr.c:172
int CloseTransientFile(int fd)
Definition: fd.c:2434
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define stat(a, b)
Definition: win32_port.h:264
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
#define INDIRECT_POINTER_SIZE
Definition: tuptoaster.h:102
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
#define HASH_BLOBS
Definition: hsearch.h:88
TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb)
ReorderBufferChange * change
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size blockSize)
Definition: generation.c:212
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
MemoryContext context
void ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
void * palloc0(Size size)
Definition: mcxt.c:955
static void dlist_insert_before(dlist_node *before, dlist_node *node)
Definition: ilist.h:346
static bool dlist_has_next(dlist_head *head, dlist_node *node)
Definition: ilist.h:402
#define InvalidCommandId
Definition: c.h:524
uintptr_t Datum
Definition: postgres.h:367
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
ReorderBufferTXN * by_txn_last_txn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void cleanup(void)
Definition: bootstrap.c:901
dlist_head toplevel_by_lsn
struct RewriteMappingFile RewriteMappingFile
Oid MyDatabaseId
Definition: globals.c:85
TransactionId xid
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
bool IsSharedRelation(Oid relationId)
Definition: catalog.c:240
Size keysize
Definition: hsearch.h:72
dlist_node * cur
Definition: ilist.h:161
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:814
static void AssertTXNLsnOrder(ReorderBuffer *rb)
void ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
#define InvalidOid
Definition: postgres_ext.h:36
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
CommandId curcid
Definition: snapshot.h:187
struct ReorderBufferIterTXNState ReorderBufferIterTXNState
static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
struct ReorderBufferDiskChange ReorderBufferDiskChange
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:126
#define free(a)
Definition: header.h:65
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
#define PG_CATCH()
Definition: elog.h:310
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
ItemPointerData new_tid
Definition: rewriteheap.h:40
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:732
#define lfirst(lc)
Definition: pg_list.h:190
ReorderBufferTXN * ReorderBufferGetOldestTXN(ReorderBuffer *rb)
union ReorderBufferChange::@101 data
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
Definition: regguts.h:298
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2534
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
Definition: tuptoaster.h:121
int32 va_extsize
Definition: postgres.h:70
XLogRecPtr end_lsn
void StartTransactionCommand(void)
Definition: xact.c:2794
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4333
void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
size_t Size
Definition: c.h:466
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1318
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:69
SharedInvalidationMessage * invalidations
#define BufferIsLocal(buffer)
Definition: buf.h:37
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:220
#define ItemPointerGetOffsetNumber(pointer)
Definition: itemptr.h:117
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
#define PG_RE_THROW()
Definition: elog.h:331
ReorderBuffer * ReorderBufferAllocate(void)
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1389
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1044
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1379
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
struct varlena * pointer
Definition: postgres.h:86
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:433
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
dlist_head subtxns
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:33
#define VARATT_IS_EXTENDED(PTR)
Definition: postgres.h:327
#define S_ISDIR(m)
Definition: win32_port.h:305
#define DatumGetPointer(X)
Definition: postgres.h:549
#define lstat(path, sb)
Definition: win32_port.h:253
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:1249
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:556
#define Int32GetDatum(X)
Definition: postgres.h:479
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
uint32 xcnt
Definition: snapshot.h:169
void * palloc(Size size)
Definition: mcxt.c:924
void list_sort(List *list, list_sort_comparator cmp)
Definition: list.c:1478
int errmsg(const char *fmt,...)
Definition: elog.c:784
static void UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
ReorderBufferApplyTruncateCB apply_truncate
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:771
static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBufferTXN * txn
Definition: reorderbuffer.c:88
void ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:2035
#define elog(elevel,...)
Definition: elog.h:226
Oid * ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
int i
XLogRecPtr restart_decoding_lsn
#define NameStr(name)
Definition: c.h:609
#define SET_VARSIZE_COMPRESSED(PTR, len)
Definition: postgres.h:331
void * arg
Datum binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:174
static const Size max_changes_in_memory
Definition: c.h:549
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
MemoryContext tup_context
#define SET_VARSIZE(PTR, len)
Definition: postgres.h:329
char d_name[MAX_PATH]
Definition: dirent.h:14
#define ItemPointerGetBlockNumber(pointer)
Definition: itemptr.h:98
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define qsort(a, b, c, d)
Definition: port.h:491
#define TransactionIdIsValid(xid)
Definition: transam.h:41
ReorderBufferChange change
ReorderBufferBeginCB begin
static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
void BufferGetTag(Buffer buffer, RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
Definition: bufmgr.c:2634
#define PG_TRY()
Definition: elog.h:301
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
Definition: pg_list.h:50
#define snprintf
Definition: port.h:192
int Buffer
Definition: buf.h:23
static ReorderBufferIterTXNState * ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:1990
#define PG_END_TRY()
Definition: elog.h:317
#define read(a, b, c)
Definition: win32.h:13
int FreeDir(DIR *dir)
Definition: fd.c:2586
struct HeapTupleData HeapTupleData
TransactionId toplevel_xid
#define offsetof(type, field)
Definition: c.h:655
MemoryContext txn_context
dlist_head tuplecids
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:161
TransactionId * subxip
Definition: snapshot.h:180
uint32 active_count
Definition: snapshot.h:198
struct ReorderBufferChange::@101::@103 truncate
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:138
int32 subxcnt
Definition: snapshot.h:181
void ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
ReorderBufferTupleBuf * ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
bool ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
ItemPointerData old_tid
Definition: rewriteheap.h:39