PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
reorderbuffer.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * reorderbuffer.c
4  * PostgreSQL logical replay/reorder buffer management
5  *
6  *
7  * Copyright (c) 2012-2017, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/replication/reorderbuffer.c
12  *
13  * NOTES
14  * This module gets handed individual pieces of transactions in the order
15  * they are written to the WAL and is responsible to reassemble them into
16  * toplevel transaction sized pieces. When a transaction is completely
17  * reassembled - signalled by reading the transaction commit record - it
18  * will then call the output plugin (c.f. ReorderBufferCommit()) with the
19  * individual changes. The output plugins rely on snapshots built by
20  * snapbuild.c which hands them to us.
21  *
22  * Transactions and subtransactions/savepoints in postgres are not
23  * immediately linked to each other from outside the performing
24  * backend. Only at commit/abort (or special xact_assignment records) they
25  * are linked together. Which means that we will have to splice together a
26  * toplevel transaction from its subtransactions. To do that efficiently we
27  * build a binary heap indexed by the smallest current lsn of the individual
28  * subtransactions' changestreams. As the individual streams are inherently
29  * ordered by LSN - since that is where we build them from - the transaction
30  * can easily be reassembled by always using the subtransaction with the
31  * smallest current LSN from the heap.
32  *
33  * In order to cope with large transactions - which can be several times as
34  * big as the available memory - this module supports spooling the contents
35  * of a large transactions to disk. When the transaction is replayed the
36  * contents of individual (sub-)transactions will be read from disk in
37  * chunks.
38  *
39  * This module also has to deal with reassembling toast records from the
40  * individual chunks stored in WAL. When a new (or initial) version of a
41  * tuple is stored in WAL it will always be preceded by the toast chunks
42  * emitted for the columns stored out of line. Within a single toplevel
43  * transaction there will be no other data carrying records between a row's
44  * toast chunks and the row data itself. See ReorderBufferToast* for
45  * details.
46  * -------------------------------------------------------------------------
47  */
48 #include "postgres.h"
49 
50 #include <unistd.h>
51 #include <sys/stat.h>
52 
53 #include "access/rewriteheap.h"
54 #include "access/transam.h"
55 #include "access/tuptoaster.h"
56 #include "access/xact.h"
57 #include "access/xlog_internal.h"
58 #include "catalog/catalog.h"
59 #include "lib/binaryheap.h"
60 #include "miscadmin.h"
61 #include "replication/logical.h"
63 #include "replication/slot.h"
64 #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
65 #include "storage/bufmgr.h"
66 #include "storage/fd.h"
67 #include "storage/sinval.h"
68 #include "utils/builtins.h"
69 #include "utils/combocid.h"
70 #include "utils/memdebug.h"
71 #include "utils/memutils.h"
72 #include "utils/rel.h"
73 #include "utils/relfilenodemap.h"
74 #include "utils/tqual.h"
75 
76 
77 /* entry for a hash table we use to map from xid to our transaction state */
79 {
83 
84 /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
86 {
90 
92 {
96  CommandId combocid; /* just for debugging */
98 
99 /* k-way in-order change iteration support structures */
101 {
105  int fd;
108 
110 {
114  ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER];
116 
117 /* toast datastructures */
118 typedef struct ReorderBufferToastEnt
119 {
120  Oid chunk_id; /* toast_table.chunk_id */
121  int32 last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
122  * have seen */
123  Size num_chunks; /* number of chunks we've already seen */
124  Size size; /* combined size of chunks seen */
125  dlist_head chunks; /* linked list of chunks */
126  struct varlena *reconstructed; /* reconstructed varlena now pointed
127  * to in main tup */
129 
130 /* Disk serialization support datastructures */
132 {
135  /* data follows */
137 
138 /*
139  * Maximum number of changes kept in memory, per transaction. After that,
140  * changes are spooled to disk.
141  *
142  * The current value should be sufficient to decode the entire transaction
143  * without hitting disk in OLTP workloads, while starting to spool to disk in
144  * other workloads reasonably fast.
145  *
146  * At some point in the future it probably makes sense to have a more elaborate
147  * resource management here, but it's not entirely clear what that would look
148  * like.
149  */
150 static const Size max_changes_in_memory = 4096;
151 
152 /*
153  * We use a very simple form of a slab allocator for frequently allocated
154  * objects, simply keeping a fixed number in a linked list when unused,
155  * instead pfree()ing them. Without that in many workloads aset.c becomes a
156  * major bottleneck, especially when spilling to disk while decoding batch
157  * workloads.
158  */
159 static const Size max_cached_changes = 4096 * 2;
160 static const Size max_cached_tuplebufs = 4096 * 2; /* ~8MB */
161 static const Size max_cached_transactions = 512;
162 
163 
164 /* ---------------------------------------
165  * primary reorderbuffer support routines
166  * ---------------------------------------
167  */
171  TransactionId xid, bool create, bool *is_new,
172  XLogRecPtr lsn, bool create_as_top);
173 
174 static void AssertTXNLsnOrder(ReorderBuffer *rb);
175 
176 /* ---------------------------------------
177  * support functions for lsn-order iterating over the ->changes of a
178  * transaction and its subtransactions
179  *
180  * used for iteration over the k-way heap merge of a transaction and its
181  * subtransactions
182  * ---------------------------------------
183  */
185 static ReorderBufferChange *
190 
191 /*
192  * ---------------------------------------
193  * Disk serialization support functions
194  * ---------------------------------------
195  */
199  int fd, ReorderBufferChange *change);
201  int *fd, XLogSegNo *segno);
203  char *change);
205 
206 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
208  ReorderBufferTXN *txn, CommandId cid);
209 
210 /* ---------------------------------------
211  * toast reassembly support
212  * ---------------------------------------
213  */
217  Relation relation, ReorderBufferChange *change);
219  Relation relation, ReorderBufferChange *change);
220 
221 
222 /*
223  * Allocate a new ReorderBuffer
224  */
227 {
228  ReorderBuffer *buffer;
229  HASHCTL hash_ctl;
230  MemoryContext new_ctx;
231 
232  /* allocate memory in own context, to have better accountability */
234  "ReorderBuffer",
236 
237  buffer =
238  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
239 
240  memset(&hash_ctl, 0, sizeof(hash_ctl));
241 
242  buffer->context = new_ctx;
243 
244  hash_ctl.keysize = sizeof(TransactionId);
245  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
246  hash_ctl.hcxt = buffer->context;
247 
248  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
250 
252  buffer->by_txn_last_txn = NULL;
253 
254  buffer->nr_cached_transactions = 0;
255  buffer->nr_cached_changes = 0;
256  buffer->nr_cached_tuplebufs = 0;
257 
258  buffer->outbuf = NULL;
259  buffer->outbufsize = 0;
260 
262 
263  dlist_init(&buffer->toplevel_by_lsn);
265  dlist_init(&buffer->cached_changes);
266  slist_init(&buffer->cached_tuplebufs);
267 
268  return buffer;
269 }
270 
271 /*
272  * Free a ReorderBuffer
273  */
274 void
276 {
277  MemoryContext context = rb->context;
278 
279  /*
280  * We free separately allocated data by entirely scrapping reorderbuffer's
281  * memory context.
282  */
283  MemoryContextDelete(context);
284 }
285 
286 /*
287  * Get an unused, possibly preallocated, ReorderBufferTXN.
288  */
289 static ReorderBufferTXN *
291 {
292  ReorderBufferTXN *txn;
293 
294  /* check the slab cache */
295  if (rb->nr_cached_transactions > 0)
296  {
298  txn = (ReorderBufferTXN *)
301  }
302  else
303  {
304  txn = (ReorderBufferTXN *)
306  }
307 
308  memset(txn, 0, sizeof(ReorderBufferTXN));
309 
310  dlist_init(&txn->changes);
311  dlist_init(&txn->tuplecids);
312  dlist_init(&txn->subtxns);
313 
314  return txn;
315 }
316 
317 /*
318  * Free a ReorderBufferTXN.
319  *
320  * Deallocation might be delayed for efficiency purposes, for details check
321  * the comments above max_cached_changes's definition.
322  */
323 static void
325 {
326  /* clean the lookup cache if we were cached (quite likely) */
327  if (rb->by_txn_last_xid == txn->xid)
328  {
330  rb->by_txn_last_txn = NULL;
331  }
332 
333  /* free data that's contained */
334 
335  if (txn->tuplecid_hash != NULL)
336  {
338  txn->tuplecid_hash = NULL;
339  }
340 
341  if (txn->invalidations)
342  {
343  pfree(txn->invalidations);
344  txn->invalidations = NULL;
345  }
346 
347  /* check whether to put into the slab cache */
349  {
353  VALGRIND_MAKE_MEM_DEFINED(&txn->node, sizeof(txn->node));
354  }
355  else
356  {
357  pfree(txn);
358  }
359 }
360 
361 /*
362  * Get an unused, possibly preallocated, ReorderBufferChange.
363  */
366 {
367  ReorderBufferChange *change;
368 
369  /* check the slab cache */
370  if (rb->nr_cached_changes)
371  {
372  rb->nr_cached_changes--;
373  change = (ReorderBufferChange *)
376  }
377  else
378  {
379  change = (ReorderBufferChange *)
381  }
382 
383  memset(change, 0, sizeof(ReorderBufferChange));
384  return change;
385 }
386 
387 /*
388  * Free an ReorderBufferChange.
389  *
390  * Deallocation might be delayed for efficiency purposes, for details check
391  * the comments above max_cached_changes's definition.
392  */
393 void
395 {
396  /* free contained data */
397  switch (change->action)
398  {
403  if (change->data.tp.newtuple)
404  {
405  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
406  change->data.tp.newtuple = NULL;
407  }
408 
409  if (change->data.tp.oldtuple)
410  {
411  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
412  change->data.tp.oldtuple = NULL;
413  }
414  break;
416  if (change->data.msg.prefix != NULL)
417  pfree(change->data.msg.prefix);
418  change->data.msg.prefix = NULL;
419  if (change->data.msg.message != NULL)
420  pfree(change->data.msg.message);
421  change->data.msg.message = NULL;
422  break;
424  if (change->data.snapshot)
425  {
426  ReorderBufferFreeSnap(rb, change->data.snapshot);
427  change->data.snapshot = NULL;
428  }
429  break;
430  /* no data in addition to the struct itself */
434  break;
435  }
436 
437  /* check whether to put into the slab cache */
439  {
440  rb->nr_cached_changes++;
441  dlist_push_head(&rb->cached_changes, &change->node);
443  VALGRIND_MAKE_MEM_DEFINED(&change->node, sizeof(change->node));
444  }
445  else
446  {
447  pfree(change);
448  }
449 }
450 
451 
452 /*
453  * Get an unused, possibly preallocated, ReorderBufferTupleBuf fitting at
454  * least a tuple of size tuple_len (excluding header overhead).
455  */
458 {
459  ReorderBufferTupleBuf *tuple;
460  Size alloc_len;
461 
462  alloc_len = tuple_len + SizeofHeapTupleHeader;
463 
464  /*
465  * Most tuples are below MaxHeapTupleSize, so we use a slab allocator for
466  * those. Thus always allocate at least MaxHeapTupleSize. Note that tuples
467  * generated for oldtuples can be bigger, as they don't have out-of-line
468  * toast columns.
469  */
470  if (alloc_len < MaxHeapTupleSize)
471  alloc_len = MaxHeapTupleSize;
472 
473 
474  /* if small enough, check the slab cache */
475  if (alloc_len <= MaxHeapTupleSize && rb->nr_cached_tuplebufs)
476  {
477  rb->nr_cached_tuplebufs--;
481 #ifdef USE_ASSERT_CHECKING
482  memset(&tuple->tuple, 0xa9, sizeof(HeapTupleData));
484 #endif
485  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
486 #ifdef USE_ASSERT_CHECKING
487  memset(tuple->tuple.t_data, 0xa8, tuple->alloc_tuple_size);
489 #endif
490  }
491  else
492  {
493  tuple = (ReorderBufferTupleBuf *)
495  sizeof(ReorderBufferTupleBuf) +
496  MAXIMUM_ALIGNOF + alloc_len);
497  tuple->alloc_tuple_size = alloc_len;
498  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
499  }
500 
501  return tuple;
502 }
503 
504 /*
505  * Free an ReorderBufferTupleBuf.
506  *
507  * Deallocation might be delayed for efficiency purposes, for details check
508  * the comments above max_cached_changes's definition.
509  */
510 void
512 {
513  /* check whether to put into the slab cache, oversized tuples never are */
514  if (tuple->alloc_tuple_size == MaxHeapTupleSize &&
516  {
517  rb->nr_cached_tuplebufs++;
518  slist_push_head(&rb->cached_tuplebufs, &tuple->node);
521  VALGRIND_MAKE_MEM_DEFINED(&tuple->node, sizeof(tuple->node));
523  }
524  else
525  {
526  pfree(tuple);
527  }
528 }
529 
530 /*
531  * Return the ReorderBufferTXN from the given buffer, specified by Xid.
532  * If create is true, and a transaction doesn't already exist, create it
533  * (with the given LSN, and as top transaction if that's specified);
534  * when this happens, is_new is set to true.
535  */
536 static ReorderBufferTXN *
538  bool *is_new, XLogRecPtr lsn, bool create_as_top)
539 {
540  ReorderBufferTXN *txn;
542  bool found;
543 
545  Assert(!create || lsn != InvalidXLogRecPtr);
546 
547  /*
548  * Check the one-entry lookup cache first
549  */
551  rb->by_txn_last_xid == xid)
552  {
553  txn = rb->by_txn_last_txn;
554 
555  if (txn != NULL)
556  {
557  /* found it, and it's valid */
558  if (is_new)
559  *is_new = false;
560  return txn;
561  }
562 
563  /*
564  * cached as non-existent, and asked not to create? Then nothing else
565  * to do.
566  */
567  if (!create)
568  return NULL;
569  /* otherwise fall through to create it */
570  }
571 
572  /*
573  * If the cache wasn't hit or it yielded an "does-not-exist" and we want
574  * to create an entry.
575  */
576 
577  /* search the lookup table */
578  ent = (ReorderBufferTXNByIdEnt *)
579  hash_search(rb->by_txn,
580  (void *) &xid,
581  create ? HASH_ENTER : HASH_FIND,
582  &found);
583  if (found)
584  txn = ent->txn;
585  else if (create)
586  {
587  /* initialize the new entry, if creation was requested */
588  Assert(ent != NULL);
589 
590  ent->txn = ReorderBufferGetTXN(rb);
591  ent->txn->xid = xid;
592  txn = ent->txn;
593  txn->first_lsn = lsn;
595 
596  if (create_as_top)
597  {
598  dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
599  AssertTXNLsnOrder(rb);
600  }
601  }
602  else
603  txn = NULL; /* not found and not asked to create */
604 
605  /* update cache */
606  rb->by_txn_last_xid = xid;
607  rb->by_txn_last_txn = txn;
608 
609  if (is_new)
610  *is_new = !found;
611 
612  Assert(!create || txn != NULL);
613  return txn;
614 }
615 
616 /*
617  * Queue a change into a transaction so it can be replayed upon commit.
618  */
619 void
621  ReorderBufferChange *change)
622 {
623  ReorderBufferTXN *txn;
624 
625  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
626 
627  change->lsn = lsn;
628  Assert(InvalidXLogRecPtr != lsn);
629  dlist_push_tail(&txn->changes, &change->node);
630  txn->nentries++;
631  txn->nentries_mem++;
632 
634 }
635 
636 /*
637  * Queue message into a transaction so it can be processed upon commit.
638  */
639 void
641  Snapshot snapshot, XLogRecPtr lsn,
642  bool transactional, const char *prefix,
643  Size message_size, const char *message)
644 {
645  if (transactional)
646  {
647  MemoryContext oldcontext;
648  ReorderBufferChange *change;
649 
651 
652  oldcontext = MemoryContextSwitchTo(rb->context);
653 
654  change = ReorderBufferGetChange(rb);
656  change->data.msg.prefix = pstrdup(prefix);
657  change->data.msg.message_size = message_size;
658  change->data.msg.message = palloc(message_size);
659  memcpy(change->data.msg.message, message, message_size);
660 
661  ReorderBufferQueueChange(rb, xid, lsn, change);
662 
663  MemoryContextSwitchTo(oldcontext);
664  }
665  else
666  {
667  ReorderBufferTXN *txn = NULL;
668  volatile Snapshot snapshot_now = snapshot;
669 
670  if (xid != InvalidTransactionId)
671  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
672 
673  /* setup snapshot to allow catalog access */
674  SetupHistoricSnapshot(snapshot_now, NULL);
675  PG_TRY();
676  {
677  rb->message(rb, txn, lsn, false, prefix, message_size, message);
678 
680  }
681  PG_CATCH();
682  {
684  PG_RE_THROW();
685  }
686  PG_END_TRY();
687  }
688 }
689 
690 
691 static void
693 {
694 #ifdef USE_ASSERT_CHECKING
695  dlist_iter iter;
696  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
697 
698  dlist_foreach(iter, &rb->toplevel_by_lsn)
699  {
700  ReorderBufferTXN *cur_txn;
701 
702  cur_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
703  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
704 
705  if (cur_txn->end_lsn != InvalidXLogRecPtr)
706  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
707 
708  if (prev_first_lsn != InvalidXLogRecPtr)
709  Assert(prev_first_lsn < cur_txn->first_lsn);
710 
711  Assert(!cur_txn->is_known_as_subxact);
712  prev_first_lsn = cur_txn->first_lsn;
713  }
714 #endif
715 }
716 
719 {
720  ReorderBufferTXN *txn;
721 
723  return NULL;
724 
725  AssertTXNLsnOrder(rb);
726 
728 
731  return txn;
732 }
733 
734 void
736 {
738 }
739 
740 void
742  TransactionId subxid, XLogRecPtr lsn)
743 {
744  ReorderBufferTXN *txn;
745  ReorderBufferTXN *subtxn;
746  bool new_top;
747  bool new_sub;
748 
749  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
750  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
751 
752  if (new_sub)
753  {
754  /*
755  * we assign subtransactions to top level transaction even if we don't
756  * have data for it yet, assignment records frequently reference xids
757  * that have not yet produced any records. Knowing those aren't top
758  * level xids allows us to make processing cheaper in some places.
759  */
760  dlist_push_tail(&txn->subtxns, &subtxn->node);
761  txn->nsubtxns++;
762  }
763  else if (!subtxn->is_known_as_subxact)
764  {
765  subtxn->is_known_as_subxact = true;
766  Assert(subtxn->nsubtxns == 0);
767 
768  /* remove from lsn order list of top-level transactions */
769  dlist_delete(&subtxn->node);
770 
771  /* add to toplevel transaction */
772  dlist_push_tail(&txn->subtxns, &subtxn->node);
773  txn->nsubtxns++;
774  }
775  else if (new_top)
776  {
777  elog(ERROR, "existing subxact assigned to unknown toplevel xact");
778  }
779 }
780 
781 /*
782  * Associate a subtransaction with its toplevel transaction at commit
783  * time. There may be no further changes added after this.
784  */
785 void
787  TransactionId subxid, XLogRecPtr commit_lsn,
788  XLogRecPtr end_lsn)
789 {
790  ReorderBufferTXN *txn;
791  ReorderBufferTXN *subtxn;
792 
793  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
794  InvalidXLogRecPtr, false);
795 
796  /*
797  * No need to do anything if that subtxn didn't contain any changes
798  */
799  if (!subtxn)
800  return;
801 
802  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true);
803 
804  if (txn == NULL)
805  elog(ERROR, "subxact logged without previous toplevel record");
806 
807  /*
808  * Pass our base snapshot to the parent transaction if it doesn't have
809  * one, or ours is older. That can happen if there are no changes in the
810  * toplevel transaction but in one of the child transactions. This allows
811  * the parent to simply use its base snapshot initially.
812  */
813  if (subtxn->base_snapshot != NULL &&
814  (txn->base_snapshot == NULL ||
815  txn->base_snapshot_lsn > subtxn->base_snapshot_lsn))
816  {
817  txn->base_snapshot = subtxn->base_snapshot;
818  txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
819  subtxn->base_snapshot = NULL;
821  }
822 
823  subtxn->final_lsn = commit_lsn;
824  subtxn->end_lsn = end_lsn;
825 
826  if (!subtxn->is_known_as_subxact)
827  {
828  subtxn->is_known_as_subxact = true;
829  Assert(subtxn->nsubtxns == 0);
830 
831  /* remove from lsn order list of top-level transactions */
832  dlist_delete(&subtxn->node);
833 
834  /* add to subtransaction list */
835  dlist_push_tail(&txn->subtxns, &subtxn->node);
836  txn->nsubtxns++;
837  }
838 }
839 
840 
841 /*
842  * Support for efficiently iterating over a transaction's and its
843  * subtransactions' changes.
844  *
845  * We do by doing a k-way merge between transactions/subtransactions. For that
846  * we model the current heads of the different transactions as a binary heap
847  * so we easily know which (sub-)transaction has the change with the smallest
848  * lsn next.
849  *
850  * We assume the changes in individual transactions are already sorted by LSN.
851  */
852 
853 /*
854  * Binary heap comparison function.
855  */
856 static int
858 {
860  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
861  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
862 
863  if (pos_a < pos_b)
864  return 1;
865  else if (pos_a == pos_b)
866  return 0;
867  return -1;
868 }
869 
870 /*
871  * Allocate & initialize an iterator which iterates in lsn order over a
872  * transaction and all its subtransactions.
873  */
876 {
877  Size nr_txns = 0;
879  dlist_iter cur_txn_i;
880  int32 off;
881 
882  /*
883  * Calculate the size of our heap: one element for every transaction that
884  * contains changes. (Besides the transactions already in the reorder
885  * buffer, we count the one we were directly passed.)
886  */
887  if (txn->nentries > 0)
888  nr_txns++;
889 
890  dlist_foreach(cur_txn_i, &txn->subtxns)
891  {
892  ReorderBufferTXN *cur_txn;
893 
894  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
895 
896  if (cur_txn->nentries > 0)
897  nr_txns++;
898  }
899 
900  /*
901  * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no
902  * need to allocate/build a heap then.
903  */
904 
905  /* allocate iteration state */
906  state = (ReorderBufferIterTXNState *)
908  sizeof(ReorderBufferIterTXNState) +
909  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
910 
911  state->nr_txns = nr_txns;
912  dlist_init(&state->old_change);
913 
914  for (off = 0; off < state->nr_txns; off++)
915  {
916  state->entries[off].fd = -1;
917  state->entries[off].segno = 0;
918  }
919 
920  /* allocate heap */
921  state->heap = binaryheap_allocate(state->nr_txns,
923  state);
924 
925  /*
926  * Now insert items into the binary heap, in an unordered fashion. (We
927  * will run a heap assembly step at the end; this is more efficient.)
928  */
929 
930  off = 0;
931 
932  /* add toplevel transaction if it contains changes */
933  if (txn->nentries > 0)
934  {
935  ReorderBufferChange *cur_change;
936 
937  if (txn->nentries != txn->nentries_mem)
938  {
939  /* serialize remaining changes */
940  ReorderBufferSerializeTXN(rb, txn);
941  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
942  &state->entries[off].segno);
943  }
944 
945  cur_change = dlist_head_element(ReorderBufferChange, node,
946  &txn->changes);
947 
948  state->entries[off].lsn = cur_change->lsn;
949  state->entries[off].change = cur_change;
950  state->entries[off].txn = txn;
951 
953  }
954 
955  /* add subtransactions if they contain changes */
956  dlist_foreach(cur_txn_i, &txn->subtxns)
957  {
958  ReorderBufferTXN *cur_txn;
959 
960  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
961 
962  if (cur_txn->nentries > 0)
963  {
964  ReorderBufferChange *cur_change;
965 
966  if (cur_txn->nentries != cur_txn->nentries_mem)
967  {
968  /* serialize remaining changes */
969  ReorderBufferSerializeTXN(rb, cur_txn);
970  ReorderBufferRestoreChanges(rb, cur_txn,
971  &state->entries[off].fd,
972  &state->entries[off].segno);
973  }
974  cur_change = dlist_head_element(ReorderBufferChange, node,
975  &cur_txn->changes);
976 
977  state->entries[off].lsn = cur_change->lsn;
978  state->entries[off].change = cur_change;
979  state->entries[off].txn = cur_txn;
980 
982  }
983  }
984 
985  /* assemble a valid binary heap */
986  binaryheap_build(state->heap);
987 
988  return state;
989 }
990 
991 /*
992  * Return the next change when iterating over a transaction and its
993  * subtransactions.
994  *
995  * Returns NULL when no further changes exist.
996  */
997 static ReorderBufferChange *
999 {
1000  ReorderBufferChange *change;
1002  int32 off;
1003 
1004  /* nothing there anymore */
1005  if (state->heap->bh_size == 0)
1006  return NULL;
1007 
1008  off = DatumGetInt32(binaryheap_first(state->heap));
1009  entry = &state->entries[off];
1010 
1011  /* free memory we might have "leaked" in the previous *Next call */
1012  if (!dlist_is_empty(&state->old_change))
1013  {
1014  change = dlist_container(ReorderBufferChange, node,
1015  dlist_pop_head_node(&state->old_change));
1016  ReorderBufferReturnChange(rb, change);
1017  Assert(dlist_is_empty(&state->old_change));
1018  }
1019 
1020  change = entry->change;
1021 
1022  /*
1023  * update heap with information about which transaction has the next
1024  * relevant change in LSN order
1025  */
1026 
1027  /* there are in-memory changes */
1028  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1029  {
1030  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1031  ReorderBufferChange *next_change =
1032  dlist_container(ReorderBufferChange, node, next);
1033 
1034  /* txn stays the same */
1035  state->entries[off].lsn = next_change->lsn;
1036  state->entries[off].change = next_change;
1037 
1039  return change;
1040  }
1041 
1042  /* try to load changes from disk */
1043  if (entry->txn->nentries != entry->txn->nentries_mem)
1044  {
1045  /*
1046  * Ugly: restoring changes will reuse *Change records, thus delete the
1047  * current one from the per-tx list and only free in the next call.
1048  */
1049  dlist_delete(&change->node);
1050  dlist_push_tail(&state->old_change, &change->node);
1051 
1052  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
1053  &state->entries[off].segno))
1054  {
1055  /* successfully restored changes from disk */
1056  ReorderBufferChange *next_change =
1058  &entry->txn->changes);
1059 
1060  elog(DEBUG2, "restored %u/%u changes from disk",
1061  (uint32) entry->txn->nentries_mem,
1062  (uint32) entry->txn->nentries);
1063 
1064  Assert(entry->txn->nentries_mem);
1065  /* txn stays the same */
1066  state->entries[off].lsn = next_change->lsn;
1067  state->entries[off].change = next_change;
1069 
1070  return change;
1071  }
1072  }
1073 
1074  /* ok, no changes there anymore, remove */
1075  binaryheap_remove_first(state->heap);
1076 
1077  return change;
1078 }
1079 
1080 /*
1081  * Deallocate the iterator
1082  */
1083 static void
1086 {
1087  int32 off;
1088 
1089  for (off = 0; off < state->nr_txns; off++)
1090  {
1091  if (state->entries[off].fd != -1)
1092  CloseTransientFile(state->entries[off].fd);
1093  }
1094 
1095  /* free memory we might have "leaked" in the last *Next call */
1096  if (!dlist_is_empty(&state->old_change))
1097  {
1098  ReorderBufferChange *change;
1099 
1100  change = dlist_container(ReorderBufferChange, node,
1101  dlist_pop_head_node(&state->old_change));
1102  ReorderBufferReturnChange(rb, change);
1103  Assert(dlist_is_empty(&state->old_change));
1104  }
1105 
1106  binaryheap_free(state->heap);
1107  pfree(state);
1108 }
1109 
1110 /*
1111  * Cleanup the contents of a transaction, usually after the transaction
1112  * committed or aborted.
1113  */
1114 static void
1116 {
1117  bool found;
1118  dlist_mutable_iter iter;
1119 
1120  /* cleanup subtransactions & their changes */
1121  dlist_foreach_modify(iter, &txn->subtxns)
1122  {
1123  ReorderBufferTXN *subtxn;
1124 
1125  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1126 
1127  /*
1128  * Subtransactions are always associated to the toplevel TXN, even if
1129  * they originally were happening inside another subtxn, so we won't
1130  * ever recurse more than one level deep here.
1131  */
1132  Assert(subtxn->is_known_as_subxact);
1133  Assert(subtxn->nsubtxns == 0);
1134 
1135  ReorderBufferCleanupTXN(rb, subtxn);
1136  }
1137 
1138  /* cleanup changes in the toplevel txn */
1139  dlist_foreach_modify(iter, &txn->changes)
1140  {
1141  ReorderBufferChange *change;
1142 
1143  change = dlist_container(ReorderBufferChange, node, iter.cur);
1144 
1145  ReorderBufferReturnChange(rb, change);
1146  }
1147 
1148  /*
1149  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1150  * They are always stored in the toplevel transaction.
1151  */
1152  dlist_foreach_modify(iter, &txn->tuplecids)
1153  {
1154  ReorderBufferChange *change;
1155 
1156  change = dlist_container(ReorderBufferChange, node, iter.cur);
1158  ReorderBufferReturnChange(rb, change);
1159  }
1160 
1161  if (txn->base_snapshot != NULL)
1162  {
1164  txn->base_snapshot = NULL;
1166  }
1167 
1168  /*
1169  * Remove TXN from its containing list.
1170  *
1171  * Note: if txn->is_known_as_subxact, we are deleting the TXN from its
1172  * parent's list of known subxacts; this leaves the parent's nsubxacts
1173  * count too high, but we don't care. Otherwise, we are deleting the TXN
1174  * from the LSN-ordered list of toplevel TXNs.
1175  */
1176  dlist_delete(&txn->node);
1177 
1178  /* now remove reference from buffer */
1179  hash_search(rb->by_txn,
1180  (void *) &txn->xid,
1181  HASH_REMOVE,
1182  &found);
1183  Assert(found);
1184 
1185  /* remove entries spilled to disk */
1186  if (txn->nentries != txn->nentries_mem)
1187  ReorderBufferRestoreCleanup(rb, txn);
1188 
1189  /* deallocate */
1190  ReorderBufferReturnTXN(rb, txn);
1191 }
1192 
1193 /*
1194  * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by
1195  * tqual.c's HeapTupleSatisfiesHistoricMVCC.
1196  */
1197 static void
1199 {
1200  dlist_iter iter;
1201  HASHCTL hash_ctl;
1202 
1203  if (!txn->has_catalog_changes || dlist_is_empty(&txn->tuplecids))
1204  return;
1205 
1206  memset(&hash_ctl, 0, sizeof(hash_ctl));
1207 
1208  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1209  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1210  hash_ctl.hcxt = rb->context;
1211 
1212  /*
1213  * create the hash with the exact number of to-be-stored tuplecids from
1214  * the start
1215  */
1216  txn->tuplecid_hash =
1217  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1219 
1220  dlist_foreach(iter, &txn->tuplecids)
1221  {
1224  bool found;
1225  ReorderBufferChange *change;
1226 
1227  change = dlist_container(ReorderBufferChange, node, iter.cur);
1228 
1230 
1231  /* be careful about padding */
1232  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1233 
1234  key.relnode = change->data.tuplecid.node;
1235 
1236  ItemPointerCopy(&change->data.tuplecid.tid,
1237  &key.tid);
1238 
1239  ent = (ReorderBufferTupleCidEnt *)
1241  (void *) &key,
1243  &found);
1244  if (!found)
1245  {
1246  ent->cmin = change->data.tuplecid.cmin;
1247  ent->cmax = change->data.tuplecid.cmax;
1248  ent->combocid = change->data.tuplecid.combocid;
1249  }
1250  else
1251  {
1252  Assert(ent->cmin == change->data.tuplecid.cmin);
1253  Assert(ent->cmax == InvalidCommandId ||
1254  ent->cmax == change->data.tuplecid.cmax);
1255 
1256  /*
1257  * if the tuple got valid in this transaction and now got deleted
1258  * we already have a valid cmin stored. The cmax will be
1259  * InvalidCommandId though.
1260  */
1261  ent->cmax = change->data.tuplecid.cmax;
1262  }
1263  }
1264 }
1265 
1266 /*
1267  * Copy a provided snapshot so we can modify it privately. This is needed so
1268  * that catalog modifying transactions can look into intermediate catalog
1269  * states.
1270  */
1271 static Snapshot
1273  ReorderBufferTXN *txn, CommandId cid)
1274 {
1275  Snapshot snap;
1276  dlist_iter iter;
1277  int i = 0;
1278  Size size;
1279 
1280  size = sizeof(SnapshotData) +
1281  sizeof(TransactionId) * orig_snap->xcnt +
1282  sizeof(TransactionId) * (txn->nsubtxns + 1);
1283 
1284  snap = MemoryContextAllocZero(rb->context, size);
1285  memcpy(snap, orig_snap, sizeof(SnapshotData));
1286 
1287  snap->copied = true;
1288  snap->active_count = 1; /* mark as active so nobody frees it */
1289  snap->regd_count = 0;
1290  snap->xip = (TransactionId *) (snap + 1);
1291 
1292  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1293 
1294  /*
1295  * snap->subxip contains all txids that belong to our transaction which we
1296  * need to check via cmin/cmax. Thats why we store the toplevel
1297  * transaction in there as well.
1298  */
1299  snap->subxip = snap->xip + snap->xcnt;
1300  snap->subxip[i++] = txn->xid;
1301 
1302  /*
1303  * nsubxcnt isn't decreased when subtransactions abort, so count manually.
1304  * Since it's an upper boundary it is safe to use it for the allocation
1305  * above.
1306  */
1307  snap->subxcnt = 1;
1308 
1309  dlist_foreach(iter, &txn->subtxns)
1310  {
1311  ReorderBufferTXN *sub_txn;
1312 
1313  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1314  snap->subxip[i++] = sub_txn->xid;
1315  snap->subxcnt++;
1316  }
1317 
1318  /* sort so we can bsearch() later */
1319  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1320 
1321  /* store the specified current CommandId */
1322  snap->curcid = cid;
1323 
1324  return snap;
1325 }
1326 
1327 /*
1328  * Free a previously ReorderBufferCopySnap'ed snapshot
1329  */
1330 static void
1332 {
1333  if (snap->copied)
1334  pfree(snap);
1335  else
1337 }
1338 
1339 /*
1340  * Perform the replay of a transaction and it's non-aborted subtransactions.
1341  *
1342  * Subtransactions previously have to be processed by
1343  * ReorderBufferCommitChild(), even if previously assigned to the toplevel
1344  * transaction with ReorderBufferAssignChild.
1345  *
1346  * We currently can only decode a transaction's contents in when their commit
1347  * record is read because that's currently the only place where we know about
1348  * cache invalidations. Thus, once a toplevel commit is read, we iterate over
1349  * the top and subtransactions (using a k-way merge) and replay the changes in
1350  * lsn order.
1351  */
1352 void
1354  XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
1355  TimestampTz commit_time,
1356  RepOriginId origin_id, XLogRecPtr origin_lsn)
1357 {
1358  ReorderBufferTXN *txn;
1359  volatile Snapshot snapshot_now;
1360  volatile CommandId command_id = FirstCommandId;
1361  bool using_subtxn;
1362  ReorderBufferIterTXNState *volatile iterstate = NULL;
1363 
1364  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1365  false);
1366 
1367  /* unknown transaction, nothing to replay */
1368  if (txn == NULL)
1369  return;
1370 
1371  txn->final_lsn = commit_lsn;
1372  txn->end_lsn = end_lsn;
1373  txn->commit_time = commit_time;
1374  txn->origin_id = origin_id;
1375  txn->origin_lsn = origin_lsn;
1376 
1377  /*
1378  * If this transaction didn't have any real changes in our database, it's
1379  * OK not to have a snapshot. Note that ReorderBufferCommitChild will have
1380  * transferred its snapshot to this transaction if it had one and the
1381  * toplevel tx didn't.
1382  */
1383  if (txn->base_snapshot == NULL)
1384  {
1385  Assert(txn->ninvalidations == 0);
1386  ReorderBufferCleanupTXN(rb, txn);
1387  return;
1388  }
1389 
1390  snapshot_now = txn->base_snapshot;
1391 
1392  /* build data to be able to lookup the CommandIds of catalog tuples */
1394 
1395  /* setup the initial snapshot */
1396  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1397 
1398  /*
1399  * Decoding needs access to syscaches et al., which in turn use
1400  * heavyweight locks and such. Thus we need to have enough state around to
1401  * keep track of those. The easiest way is to simply use a transaction
1402  * internally. That also allows us to easily enforce that nothing writes
1403  * to the database by checking for xid assignments.
1404  *
1405  * When we're called via the SQL SRF there's already a transaction
1406  * started, so start an explicit subtransaction there.
1407  */
1408  using_subtxn = IsTransactionOrTransactionBlock();
1409 
1410  PG_TRY();
1411  {
1412  ReorderBufferChange *change;
1413  ReorderBufferChange *specinsert = NULL;
1414 
1415  if (using_subtxn)
1416  BeginInternalSubTransaction("replay");
1417  else
1419 
1420  rb->begin(rb, txn);
1421 
1422  iterstate = ReorderBufferIterTXNInit(rb, txn);
1423  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1424  {
1425  Relation relation = NULL;
1426  Oid reloid;
1427 
1428  switch (change->action)
1429  {
1431 
1432  /*
1433  * Confirmation for speculative insertion arrived. Simply
1434  * use as a normal record. It'll be cleaned up at the end
1435  * of INSERT processing.
1436  */
1437  Assert(specinsert->data.tp.oldtuple == NULL);
1438  change = specinsert;
1440 
1441  /* intentionally fall through */
1445  Assert(snapshot_now);
1446 
1447  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1448  change->data.tp.relnode.relNode);
1449 
1450  /*
1451  * Catalog tuple without data, emitted while catalog was
1452  * in the process of being rewritten.
1453  */
1454  if (reloid == InvalidOid &&
1455  change->data.tp.newtuple == NULL &&
1456  change->data.tp.oldtuple == NULL)
1457  goto change_done;
1458  else if (reloid == InvalidOid)
1459  elog(ERROR, "could not map filenode \"%s\" to relation OID",
1460  relpathperm(change->data.tp.relnode,
1461  MAIN_FORKNUM));
1462 
1463  relation = RelationIdGetRelation(reloid);
1464 
1465  if (relation == NULL)
1466  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1467  reloid,
1468  relpathperm(change->data.tp.relnode,
1469  MAIN_FORKNUM));
1470 
1471  if (!RelationIsLogicallyLogged(relation))
1472  goto change_done;
1473 
1474  /*
1475  * For now ignore sequence changes entirely. Most of the
1476  * time they don't log changes using records we
1477  * understand, so it doesn't make sense to handle the few
1478  * cases we do.
1479  */
1480  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1481  goto change_done;
1482 
1483  /* user-triggered change */
1484  if (!IsToastRelation(relation))
1485  {
1486  ReorderBufferToastReplace(rb, txn, relation, change);
1487  rb->apply_change(rb, txn, relation, change);
1488 
1489  /*
1490  * Only clear reassembled toast chunks if we're sure
1491  * they're not required anymore. The creator of the
1492  * tuple tells us.
1493  */
1494  if (change->data.tp.clear_toast_afterwards)
1495  ReorderBufferToastReset(rb, txn);
1496  }
1497  /* we're not interested in toast deletions */
1498  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1499  {
1500  /*
1501  * Need to reassemble the full toasted Datum in
1502  * memory, to ensure the chunks don't get reused till
1503  * we're done remove it from the list of this
1504  * transaction's changes. Otherwise it will get
1505  * freed/reused while restoring spooled data from
1506  * disk.
1507  */
1508  dlist_delete(&change->node);
1509  ReorderBufferToastAppendChunk(rb, txn, relation,
1510  change);
1511  }
1512 
1513  change_done:
1514 
1515  /*
1516  * Either speculative insertion was confirmed, or it was
1517  * unsuccessful and the record isn't needed anymore.
1518  */
1519  if (specinsert != NULL)
1520  {
1521  ReorderBufferReturnChange(rb, specinsert);
1522  specinsert = NULL;
1523  }
1524 
1525  if (relation != NULL)
1526  {
1527  RelationClose(relation);
1528  relation = NULL;
1529  }
1530  break;
1531 
1533 
1534  /*
1535  * Speculative insertions are dealt with by delaying the
1536  * processing of the insert until the confirmation record
1537  * arrives. For that we simply unlink the record from the
1538  * chain, so it does not get freed/reused while restoring
1539  * spooled data from disk.
1540  *
1541  * This is safe in the face of concurrent catalog changes
1542  * because the relevant relation can't be changed between
1543  * speculative insertion and confirmation due to
1544  * CheckTableNotInUse() and locking.
1545  */
1546 
1547  /* clear out a pending (and thus failed) speculation */
1548  if (specinsert != NULL)
1549  {
1550  ReorderBufferReturnChange(rb, specinsert);
1551  specinsert = NULL;
1552  }
1553 
1554  /* and memorize the pending insertion */
1555  dlist_delete(&change->node);
1556  specinsert = change;
1557  break;
1558 
1560  rb->message(rb, txn, change->lsn, true,
1561  change->data.msg.prefix,
1562  change->data.msg.message_size,
1563  change->data.msg.message);
1564  break;
1565 
1567  /* get rid of the old */
1568  TeardownHistoricSnapshot(false);
1569 
1570  if (snapshot_now->copied)
1571  {
1572  ReorderBufferFreeSnap(rb, snapshot_now);
1573  snapshot_now =
1574  ReorderBufferCopySnap(rb, change->data.snapshot,
1575  txn, command_id);
1576  }
1577 
1578  /*
1579  * Restored from disk, need to be careful not to double
1580  * free. We could introduce refcounting for that, but for
1581  * now this seems infrequent enough not to care.
1582  */
1583  else if (change->data.snapshot->copied)
1584  {
1585  snapshot_now =
1586  ReorderBufferCopySnap(rb, change->data.snapshot,
1587  txn, command_id);
1588  }
1589  else
1590  {
1591  snapshot_now = change->data.snapshot;
1592  }
1593 
1594 
1595  /* and continue with the new one */
1596  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1597  break;
1598 
1600  Assert(change->data.command_id != InvalidCommandId);
1601 
1602  if (command_id < change->data.command_id)
1603  {
1604  command_id = change->data.command_id;
1605 
1606  if (!snapshot_now->copied)
1607  {
1608  /* we don't use the global one anymore */
1609  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1610  txn, command_id);
1611  }
1612 
1613  snapshot_now->curcid = command_id;
1614 
1615  TeardownHistoricSnapshot(false);
1616  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1617 
1618  /*
1619  * Every time the CommandId is incremented, we could
1620  * see new catalog contents, so execute all
1621  * invalidations.
1622  */
1624  }
1625 
1626  break;
1627 
1629  elog(ERROR, "tuplecid value in changequeue");
1630  break;
1631  }
1632  }
1633 
1634  /*
1635  * There's a speculative insertion remaining, just clean in up, it
1636  * can't have been successful, otherwise we'd gotten a confirmation
1637  * record.
1638  */
1639  if (specinsert)
1640  {
1641  ReorderBufferReturnChange(rb, specinsert);
1642  specinsert = NULL;
1643  }
1644 
1645  /* clean up the iterator */
1646  ReorderBufferIterTXNFinish(rb, iterstate);
1647  iterstate = NULL;
1648 
1649  /* call commit callback */
1650  rb->commit(rb, txn, commit_lsn);
1651 
1652  /* this is just a sanity check against bad output plugin behaviour */
1654  elog(ERROR, "output plugin used XID %u",
1656 
1657  /* cleanup */
1658  TeardownHistoricSnapshot(false);
1659 
1660  /*
1661  * Aborting the current (sub-)transaction as a whole has the right
1662  * semantics. We want all locks acquired in here to be released, not
1663  * reassigned to the parent and we do not want any database access
1664  * have persistent effects.
1665  */
1667 
1668  /* make sure there's no cache pollution */
1670 
1671  if (using_subtxn)
1673 
1674  if (snapshot_now->copied)
1675  ReorderBufferFreeSnap(rb, snapshot_now);
1676 
1677  /* remove potential on-disk data, and deallocate */
1678  ReorderBufferCleanupTXN(rb, txn);
1679  }
1680  PG_CATCH();
1681  {
1682  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1683  if (iterstate)
1684  ReorderBufferIterTXNFinish(rb, iterstate);
1685 
1687 
1688  /*
1689  * Force cache invalidation to happen outside of a valid transaction
1690  * to prevent catalog access as we just caught an error.
1691  */
1693 
1694  /* make sure there's no cache pollution */
1696 
1697  if (using_subtxn)
1699 
1700  if (snapshot_now->copied)
1701  ReorderBufferFreeSnap(rb, snapshot_now);
1702 
1703  /* remove potential on-disk data, and deallocate */
1704  ReorderBufferCleanupTXN(rb, txn);
1705 
1706  PG_RE_THROW();
1707  }
1708  PG_END_TRY();
1709 }
1710 
1711 /*
1712  * Abort a transaction that possibly has previous changes. Needs to be first
1713  * called for subtransactions and then for the toplevel xid.
1714  *
1715  * NB: Transactions handled here have to have actively aborted (i.e. have
1716  * produced an abort record). Implicitly aborted transactions are handled via
1717  * ReorderBufferAbortOld(); transactions we're just not interested in, but
1718  * which have committed are handled in ReorderBufferForget().
1719  *
1720  * This function purges this transaction and its contents from memory and
1721  * disk.
1722  */
1723 void
1725 {
1726  ReorderBufferTXN *txn;
1727 
1728  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1729  false);
1730 
1731  /* unknown, nothing to remove */
1732  if (txn == NULL)
1733  return;
1734 
1735  /* cosmetic... */
1736  txn->final_lsn = lsn;
1737 
1738  /* remove potential on-disk data, and deallocate */
1739  ReorderBufferCleanupTXN(rb, txn);
1740 }
1741 
1742 /*
1743  * Abort all transactions that aren't actually running anymore because the
1744  * server restarted.
1745  *
1746  * NB: These really have to be transactions that have aborted due to a server
1747  * crash/immediate restart, as we don't deal with invalidations here.
1748  */
1749 void
1751 {
1752  dlist_mutable_iter it;
1753 
1754  /*
1755  * Iterate through all (potential) toplevel TXNs and abort all that are
1756  * older than what possibly can be running. Once we've found the first
1757  * that is alive we stop, there might be some that acquired an xid earlier
1758  * but started writing later, but it's unlikely and they will cleaned up
1759  * in a later call to ReorderBufferAbortOld().
1760  */
1762  {
1763  ReorderBufferTXN *txn;
1764 
1765  txn = dlist_container(ReorderBufferTXN, node, it.cur);
1766 
1767  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1768  {
1769  elog(DEBUG1, "aborting old transaction %u", txn->xid);
1770 
1771  /* remove potential on-disk data, and deallocate this tx */
1772  ReorderBufferCleanupTXN(rb, txn);
1773  }
1774  else
1775  return;
1776  }
1777 }
1778 
1779 /*
1780  * Forget the contents of a transaction if we aren't interested in it's
1781  * contents. Needs to be first called for subtransactions and then for the
1782  * toplevel xid.
1783  *
1784  * This is significantly different to ReorderBufferAbort() because
1785  * transactions that have committed need to be treated differently from aborted
1786  * ones since they may have modified the catalog.
1787  *
1788  * Note that this is only allowed to be called in the moment a transaction
1789  * commit has just been read, not earlier; otherwise later records referring
1790  * to this xid might re-create the transaction incompletely.
1791  */
1792 void
1794 {
1795  ReorderBufferTXN *txn;
1796 
1797  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1798  false);
1799 
1800  /* unknown, nothing to forget */
1801  if (txn == NULL)
1802  return;
1803 
1804  /* cosmetic... */
1805  txn->final_lsn = lsn;
1806 
1807  /*
1808  * Process cache invalidation messages if there are any. Even if we're not
1809  * interested in the transaction's contents, it could have manipulated the
1810  * catalog and we need to update the caches according to that.
1811  */
1812  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
1814  txn->invalidations);
1815  else
1816  Assert(txn->ninvalidations == 0);
1817 
1818  /* remove potential on-disk data, and deallocate */
1819  ReorderBufferCleanupTXN(rb, txn);
1820 }
1821 
1822 /*
1823  * Execute invalidations happening outside the context of a decoded
1824  * transaction. That currently happens either for xid-less commits
1825  * (c.f. RecordTransactionCommit()) or for invalidations in uninteresting
1826  * transactions (via ReorderBufferForget()).
1827  */
1828 void
1830  SharedInvalidationMessage *invalidations)
1831 {
1832  bool use_subtxn = IsTransactionOrTransactionBlock();
1833  int i;
1834 
1835  if (use_subtxn)
1836  BeginInternalSubTransaction("replay");
1837 
1838  /*
1839  * Force invalidations to happen outside of a valid transaction - that way
1840  * entries will just be marked as invalid without accessing the catalog.
1841  * That's advantageous because we don't need to setup the full state
1842  * necessary for catalog access.
1843  */
1844  if (use_subtxn)
1846 
1847  for (i = 0; i < ninvalidations; i++)
1848  LocalExecuteInvalidationMessage(&invalidations[i]);
1849 
1850  if (use_subtxn)
1852 }
1853 
1854 /*
1855  * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
1856  * least once for every xid in XLogRecord->xl_xid (other places in records
1857  * may, but do not have to be passed through here).
1858  *
1859  * Reorderbuffer keeps some datastructures about transactions in LSN order,
1860  * for efficiency. To do that it has to know about when transactions are seen
1861  * first in the WAL. As many types of records are not actually interesting for
1862  * logical decoding, they do not necessarily pass though here.
1863  */
1864 void
1866 {
1867  /* many records won't have an xid assigned, centralize check here */
1868  if (xid != InvalidTransactionId)
1869  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1870 }
1871 
1872 /*
1873  * Add a new snapshot to this transaction that may only used after lsn 'lsn'
1874  * because the previous snapshot doesn't describe the catalog correctly for
1875  * following rows.
1876  */
1877 void
1879  XLogRecPtr lsn, Snapshot snap)
1880 {
1882 
1883  change->data.snapshot = snap;
1885 
1886  ReorderBufferQueueChange(rb, xid, lsn, change);
1887 }
1888 
1889 /*
1890  * Setup the base snapshot of a transaction. The base snapshot is the snapshot
1891  * that is used to decode all changes until either this transaction modifies
1892  * the catalog or another catalog modifying transaction commits.
1893  *
1894  * Needs to be called before any changes are added with
1895  * ReorderBufferQueueChange().
1896  */
1897 void
1899  XLogRecPtr lsn, Snapshot snap)
1900 {
1901  ReorderBufferTXN *txn;
1902  bool is_new;
1903 
1904  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
1905  Assert(txn->base_snapshot == NULL);
1906  Assert(snap != NULL);
1907 
1908  txn->base_snapshot = snap;
1909  txn->base_snapshot_lsn = lsn;
1910 }
1911 
1912 /*
1913  * Access the catalog with this CommandId at this point in the changestream.
1914  *
1915  * May only be called for command ids > 1
1916  */
1917 void
1919  XLogRecPtr lsn, CommandId cid)
1920 {
1922 
1923  change->data.command_id = cid;
1925 
1926  ReorderBufferQueueChange(rb, xid, lsn, change);
1927 }
1928 
1929 
1930 /*
1931  * Add new (relfilenode, tid) -> (cmin, cmax) mappings.
1932  */
1933 void
1935  XLogRecPtr lsn, RelFileNode node,
1936  ItemPointerData tid, CommandId cmin,
1937  CommandId cmax, CommandId combocid)
1938 {
1940  ReorderBufferTXN *txn;
1941 
1942  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1943 
1944  change->data.tuplecid.node = node;
1945  change->data.tuplecid.tid = tid;
1946  change->data.tuplecid.cmin = cmin;
1947  change->data.tuplecid.cmax = cmax;
1948  change->data.tuplecid.combocid = combocid;
1949  change->lsn = lsn;
1951 
1952  dlist_push_tail(&txn->tuplecids, &change->node);
1953  txn->ntuplecids++;
1954 }
1955 
1956 /*
1957  * Setup the invalidation of the toplevel transaction.
1958  *
1959  * This needs to be done before ReorderBufferCommit is called!
1960  */
1961 void
1963  XLogRecPtr lsn, Size nmsgs,
1965 {
1966  ReorderBufferTXN *txn;
1967 
1968  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1969 
1970  if (txn->ninvalidations != 0)
1971  elog(ERROR, "only ever add one set of invalidations");
1972 
1973  Assert(nmsgs > 0);
1974 
1975  txn->ninvalidations = nmsgs;
1978  sizeof(SharedInvalidationMessage) * nmsgs);
1979  memcpy(txn->invalidations, msgs,
1980  sizeof(SharedInvalidationMessage) * nmsgs);
1981 }
1982 
1983 /*
1984  * Apply all invalidations we know. Possibly we only need parts at this point
1985  * in the changestream but we don't know which those are.
1986  */
1987 static void
1989 {
1990  int i;
1991 
1992  for (i = 0; i < txn->ninvalidations; i++)
1994 }
1995 
1996 /*
1997  * Mark a transaction as containing catalog changes
1998  */
1999 void
2001  XLogRecPtr lsn)
2002 {
2003  ReorderBufferTXN *txn;
2004 
2005  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
2006 
2007  txn->has_catalog_changes = true;
2008 }
2009 
2010 /*
2011  * Query whether a transaction is already *known* to contain catalog
2012  * changes. This can be wrong until directly before the commit!
2013  */
2014 bool
2016 {
2017  ReorderBufferTXN *txn;
2018 
2019  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2020  false);
2021  if (txn == NULL)
2022  return false;
2023 
2024  return txn->has_catalog_changes;
2025 }
2026 
2027 /*
2028  * Have we already added the first snapshot?
2029  */
2030 bool
2032 {
2033  ReorderBufferTXN *txn;
2034 
2035  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2036  false);
2037 
2038  /* transaction isn't known yet, ergo no snapshot */
2039  if (txn == NULL)
2040  return false;
2041 
2042  /*
2043  * TODO: It would be a nice improvement if we would check the toplevel
2044  * transaction in subtransactions, but we'd need to keep track of a bit
2045  * more state.
2046  */
2047  return txn->base_snapshot != NULL;
2048 }
2049 
2050 
2051 /*
2052  * ---------------------------------------
2053  * Disk serialization support
2054  * ---------------------------------------
2055  */
2056 
2057 /*
2058  * Ensure the IO buffer is >= sz.
2059  */
2060 static void
2062 {
2063  if (!rb->outbufsize)
2064  {
2065  rb->outbuf = MemoryContextAlloc(rb->context, sz);
2066  rb->outbufsize = sz;
2067  }
2068  else if (rb->outbufsize < sz)
2069  {
2070  rb->outbuf = repalloc(rb->outbuf, sz);
2071  rb->outbufsize = sz;
2072  }
2073 }
2074 
2075 /*
2076  * Check whether the transaction tx should spill its data to disk.
2077  */
2078 static void
2080 {
2081  /*
2082  * TODO: improve accounting so we cheaply can take subtransactions into
2083  * account here.
2084  */
2085  if (txn->nentries_mem >= max_changes_in_memory)
2086  {
2087  ReorderBufferSerializeTXN(rb, txn);
2088  Assert(txn->nentries_mem == 0);
2089  }
2090 }
2091 
2092 /*
2093  * Spill data of a large transaction (and its subtransactions) to disk.
2094  */
2095 static void
2097 {
2098  dlist_iter subtxn_i;
2099  dlist_mutable_iter change_i;
2100  int fd = -1;
2101  XLogSegNo curOpenSegNo = 0;
2102  Size spilled = 0;
2103  char path[MAXPGPATH];
2104 
2105  elog(DEBUG2, "spill %u changes in XID %u to disk",
2106  (uint32) txn->nentries_mem, txn->xid);
2107 
2108  /* do the same to all child TXs */
2109  dlist_foreach(subtxn_i, &txn->subtxns)
2110  {
2111  ReorderBufferTXN *subtxn;
2112 
2113  subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
2114  ReorderBufferSerializeTXN(rb, subtxn);
2115  }
2116 
2117  /* serialize changestream */
2118  dlist_foreach_modify(change_i, &txn->changes)
2119  {
2120  ReorderBufferChange *change;
2121 
2122  change = dlist_container(ReorderBufferChange, node, change_i.cur);
2123 
2124  /*
2125  * store in segment in which it belongs by start lsn, don't split over
2126  * multiple segments tho
2127  */
2128  if (fd == -1 || !XLByteInSeg(change->lsn, curOpenSegNo))
2129  {
2130  XLogRecPtr recptr;
2131 
2132  if (fd != -1)
2133  CloseTransientFile(fd);
2134 
2135  XLByteToSeg(change->lsn, curOpenSegNo);
2136  XLogSegNoOffsetToRecPtr(curOpenSegNo, 0, recptr);
2137 
2138  /*
2139  * No need to care about TLIs here, only used during a single run,
2140  * so each LSN only maps to a specific WAL record.
2141  */
2142  sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2144  (uint32) (recptr >> 32), (uint32) recptr);
2145 
2146  /* open segment, create it if necessary */
2147  fd = OpenTransientFile(path,
2148  O_CREAT | O_WRONLY | O_APPEND | PG_BINARY,
2149  S_IRUSR | S_IWUSR);
2150 
2151  if (fd < 0)
2152  ereport(ERROR,
2154  errmsg("could not open file \"%s\": %m",
2155  path)));
2156  }
2157 
2158  ReorderBufferSerializeChange(rb, txn, fd, change);
2159  dlist_delete(&change->node);
2160  ReorderBufferReturnChange(rb, change);
2161 
2162  spilled++;
2163  }
2164 
2165  Assert(spilled == txn->nentries_mem);
2166  Assert(dlist_is_empty(&txn->changes));
2167  txn->nentries_mem = 0;
2168 
2169  if (fd != -1)
2170  CloseTransientFile(fd);
2171 }
2172 
2173 /*
2174  * Serialize individual change to disk.
2175  */
2176 static void
2178  int fd, ReorderBufferChange *change)
2179 {
2180  ReorderBufferDiskChange *ondisk;
2181  Size sz = sizeof(ReorderBufferDiskChange);
2182 
2184 
2185  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2186  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
2187 
2188  switch (change->action)
2189  {
2190  /* fall through these, they're all similar enough */
2195  {
2196  char *data;
2197  ReorderBufferTupleBuf *oldtup,
2198  *newtup;
2199  Size oldlen = 0;
2200  Size newlen = 0;
2201 
2202  oldtup = change->data.tp.oldtuple;
2203  newtup = change->data.tp.newtuple;
2204 
2205  if (oldtup)
2206  {
2207  sz += sizeof(HeapTupleData);
2208  oldlen = oldtup->tuple.t_len;
2209  sz += oldlen;
2210  }
2211 
2212  if (newtup)
2213  {
2214  sz += sizeof(HeapTupleData);
2215  newlen = newtup->tuple.t_len;
2216  sz += newlen;
2217  }
2218 
2219  /* make sure we have enough space */
2221 
2222  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2223  /* might have been reallocated above */
2224  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2225 
2226  if (oldlen)
2227  {
2228  memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
2229  data += sizeof(HeapTupleData);
2230 
2231  memcpy(data, oldtup->tuple.t_data, oldlen);
2232  data += oldlen;
2233  }
2234 
2235  if (newlen)
2236  {
2237  memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
2238  data += sizeof(HeapTupleData);
2239 
2240  memcpy(data, newtup->tuple.t_data, newlen);
2241  data += newlen;
2242  }
2243  break;
2244  }
2246  {
2247  char *data;
2248  Size prefix_size = strlen(change->data.msg.prefix) + 1;
2249 
2250  sz += prefix_size + change->data.msg.message_size +
2251  sizeof(Size) + sizeof(Size);
2253 
2254  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2255 
2256  /* might have been reallocated above */
2257  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2258 
2259  /* write the prefix including the size */
2260  memcpy(data, &prefix_size, sizeof(Size));
2261  data += sizeof(Size);
2262  memcpy(data, change->data.msg.prefix,
2263  prefix_size);
2264  data += prefix_size;
2265 
2266  /* write the message including the size */
2267  memcpy(data, &change->data.msg.message_size, sizeof(Size));
2268  data += sizeof(Size);
2269  memcpy(data, change->data.msg.message,
2270  change->data.msg.message_size);
2271  data += change->data.msg.message_size;
2272 
2273  break;
2274  }
2276  {
2277  Snapshot snap;
2278  char *data;
2279 
2280  snap = change->data.snapshot;
2281 
2282  sz += sizeof(SnapshotData) +
2283  sizeof(TransactionId) * snap->xcnt +
2284  sizeof(TransactionId) * snap->subxcnt
2285  ;
2286 
2287  /* make sure we have enough space */
2289  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2290  /* might have been reallocated above */
2291  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2292 
2293  memcpy(data, snap, sizeof(SnapshotData));
2294  data += sizeof(SnapshotData);
2295 
2296  if (snap->xcnt)
2297  {
2298  memcpy(data, snap->xip,
2299  sizeof(TransactionId) * snap->xcnt);
2300  data += sizeof(TransactionId) * snap->xcnt;
2301  }
2302 
2303  if (snap->subxcnt)
2304  {
2305  memcpy(data, snap->subxip,
2306  sizeof(TransactionId) * snap->subxcnt);
2307  data += sizeof(TransactionId) * snap->subxcnt;
2308  }
2309  break;
2310  }
2314  /* ReorderBufferChange contains everything important */
2315  break;
2316  }
2317 
2318  ondisk->size = sz;
2319 
2320  if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
2321  {
2322  int save_errno = errno;
2323 
2324  CloseTransientFile(fd);
2325  errno = save_errno;
2326  ereport(ERROR,
2328  errmsg("could not write to data file for XID %u: %m",
2329  txn->xid)));
2330  }
2331 
2332  Assert(ondisk->change.action == change->action);
2333 }
2334 
2335 /*
2336  * Restore a number of changes spilled to disk back into memory.
2337  */
2338 static Size
2340  int *fd, XLogSegNo *segno)
2341 {
2342  Size restored = 0;
2343  XLogSegNo last_segno;
2344  dlist_mutable_iter cleanup_iter;
2345 
2348 
2349  /* free current entries, so we have memory for more */
2350  dlist_foreach_modify(cleanup_iter, &txn->changes)
2351  {
2353  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
2354 
2355  dlist_delete(&cleanup->node);
2356  ReorderBufferReturnChange(rb, cleanup);
2357  }
2358  txn->nentries_mem = 0;
2359  Assert(dlist_is_empty(&txn->changes));
2360 
2361  XLByteToSeg(txn->final_lsn, last_segno);
2362 
2363  while (restored < max_changes_in_memory && *segno <= last_segno)
2364  {
2365  int readBytes;
2366  ReorderBufferDiskChange *ondisk;
2367 
2368  if (*fd == -1)
2369  {
2370  XLogRecPtr recptr;
2371  char path[MAXPGPATH];
2372 
2373  /* first time in */
2374  if (*segno == 0)
2375  {
2376  XLByteToSeg(txn->first_lsn, *segno);
2377  }
2378 
2379  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
2380  XLogSegNoOffsetToRecPtr(*segno, 0, recptr);
2381 
2382  /*
2383  * No need to care about TLIs here, only used during a single run,
2384  * so each LSN only maps to a specific WAL record.
2385  */
2386  sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2388  (uint32) (recptr >> 32), (uint32) recptr);
2389 
2390  *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
2391  if (*fd < 0 && errno == ENOENT)
2392  {
2393  *fd = -1;
2394  (*segno)++;
2395  continue;
2396  }
2397  else if (*fd < 0)
2398  ereport(ERROR,
2400  errmsg("could not open file \"%s\": %m",
2401  path)));
2402 
2403  }
2404 
2405  /*
2406  * Read the statically sized part of a change which has information
2407  * about the total size. If we couldn't read a record, we're at the
2408  * end of this file.
2409  */
2411  readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
2412 
2413  /* eof */
2414  if (readBytes == 0)
2415  {
2416  CloseTransientFile(*fd);
2417  *fd = -1;
2418  (*segno)++;
2419  continue;
2420  }
2421  else if (readBytes < 0)
2422  ereport(ERROR,
2424  errmsg("could not read from reorderbuffer spill file: %m")));
2425  else if (readBytes != sizeof(ReorderBufferDiskChange))
2426  ereport(ERROR,
2428  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2429  readBytes,
2430  (uint32) sizeof(ReorderBufferDiskChange))));
2431 
2432  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2433 
2435  sizeof(ReorderBufferDiskChange) + ondisk->size);
2436  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2437 
2438  readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
2439  ondisk->size - sizeof(ReorderBufferDiskChange));
2440 
2441  if (readBytes < 0)
2442  ereport(ERROR,
2444  errmsg("could not read from reorderbuffer spill file: %m")));
2445  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
2446  ereport(ERROR,
2448  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2449  readBytes,
2450  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
2451 
2452  /*
2453  * ok, read a full change from disk, now restore it into proper
2454  * in-memory format
2455  */
2456  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
2457  restored++;
2458  }
2459 
2460  return restored;
2461 }
2462 
2463 /*
2464  * Convert change from its on-disk format to in-memory format and queue it onto
2465  * the TXN's ->changes list.
2466  *
2467  * Note: although "data" is declared char*, at entry it points to a
2468  * maxalign'd buffer, making it safe in most of this function to assume
2469  * that the pointed-to data is suitably aligned for direct access.
2470  */
2471 static void
2473  char *data)
2474 {
2475  ReorderBufferDiskChange *ondisk;
2476  ReorderBufferChange *change;
2477 
2478  ondisk = (ReorderBufferDiskChange *) data;
2479 
2480  change = ReorderBufferGetChange(rb);
2481 
2482  /* copy static part */
2483  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
2484 
2485  data += sizeof(ReorderBufferDiskChange);
2486 
2487  /* restore individual stuff */
2488  switch (change->action)
2489  {
2490  /* fall through these, they're all similar enough */
2495  if (change->data.tp.oldtuple)
2496  {
2497  uint32 tuplelen = ((HeapTuple) data)->t_len;
2498 
2499  change->data.tp.oldtuple =
2501 
2502  /* restore ->tuple */
2503  memcpy(&change->data.tp.oldtuple->tuple, data,
2504  sizeof(HeapTupleData));
2505  data += sizeof(HeapTupleData);
2506 
2507  /* reset t_data pointer into the new tuplebuf */
2508  change->data.tp.oldtuple->tuple.t_data =
2509  ReorderBufferTupleBufData(change->data.tp.oldtuple);
2510 
2511  /* restore tuple data itself */
2512  memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
2513  data += tuplelen;
2514  }
2515 
2516  if (change->data.tp.newtuple)
2517  {
2518  /* here, data might not be suitably aligned! */
2519  uint32 tuplelen;
2520 
2521  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
2522  sizeof(uint32));
2523 
2524  change->data.tp.newtuple =
2526 
2527  /* restore ->tuple */
2528  memcpy(&change->data.tp.newtuple->tuple, data,
2529  sizeof(HeapTupleData));
2530  data += sizeof(HeapTupleData);
2531 
2532  /* reset t_data pointer into the new tuplebuf */
2533  change->data.tp.newtuple->tuple.t_data =
2534  ReorderBufferTupleBufData(change->data.tp.newtuple);
2535 
2536  /* restore tuple data itself */
2537  memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
2538  data += tuplelen;
2539  }
2540 
2541  break;
2543  {
2544  Size prefix_size;
2545 
2546  /* read prefix */
2547  memcpy(&prefix_size, data, sizeof(Size));
2548  data += sizeof(Size);
2549  change->data.msg.prefix = MemoryContextAlloc(rb->context,
2550  prefix_size);
2551  memcpy(change->data.msg.prefix, data, prefix_size);
2552  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
2553  data += prefix_size;
2554 
2555  /* read the message */
2556  memcpy(&change->data.msg.message_size, data, sizeof(Size));
2557  data += sizeof(Size);
2558  change->data.msg.message = MemoryContextAlloc(rb->context,
2559  change->data.msg.message_size);
2560  memcpy(change->data.msg.message, data,
2561  change->data.msg.message_size);
2562  data += change->data.msg.message_size;
2563 
2564  break;
2565  }
2567  {
2568  Snapshot oldsnap;
2569  Snapshot newsnap;
2570  Size size;
2571 
2572  oldsnap = (Snapshot) data;
2573 
2574  size = sizeof(SnapshotData) +
2575  sizeof(TransactionId) * oldsnap->xcnt +
2576  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
2577 
2578  change->data.snapshot = MemoryContextAllocZero(rb->context, size);
2579 
2580  newsnap = change->data.snapshot;
2581 
2582  memcpy(newsnap, data, size);
2583  newsnap->xip = (TransactionId *)
2584  (((char *) newsnap) + sizeof(SnapshotData));
2585  newsnap->subxip = newsnap->xip + newsnap->xcnt;
2586  newsnap->copied = true;
2587  break;
2588  }
2589  /* the base struct contains all the data, easy peasy */
2593  break;
2594  }
2595 
2596  dlist_push_tail(&txn->changes, &change->node);
2597  txn->nentries_mem++;
2598 }
2599 
2600 /*
2601  * Remove all on-disk stored for the passed in transaction.
2602  */
2603 static void
2605 {
2606  XLogSegNo first;
2607  XLogSegNo cur;
2608  XLogSegNo last;
2609 
2612 
2613  XLByteToSeg(txn->first_lsn, first);
2614  XLByteToSeg(txn->final_lsn, last);
2615 
2616  /* iterate over all possible filenames, and delete them */
2617  for (cur = first; cur <= last; cur++)
2618  {
2619  char path[MAXPGPATH];
2620  XLogRecPtr recptr;
2621 
2622  XLogSegNoOffsetToRecPtr(cur, 0, recptr);
2623 
2624  sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2626  (uint32) (recptr >> 32), (uint32) recptr);
2627  if (unlink(path) != 0 && errno != ENOENT)
2628  ereport(ERROR,
2630  errmsg("could not remove file \"%s\": %m", path)));
2631  }
2632 }
2633 
2634 /*
2635  * Delete all data spilled to disk after we've restarted/crashed. It will be
2636  * recreated when the respective slots are reused.
2637  */
2638 void
2640 {
2641  DIR *logical_dir;
2642  struct dirent *logical_de;
2643 
2644  DIR *spill_dir;
2645  struct dirent *spill_de;
2646 
2647  logical_dir = AllocateDir("pg_replslot");
2648  while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
2649  {
2650  struct stat statbuf;
2651  char path[MAXPGPATH];
2652 
2653  if (strcmp(logical_de->d_name, ".") == 0 ||
2654  strcmp(logical_de->d_name, "..") == 0)
2655  continue;
2656 
2657  /* if it cannot be a slot, skip the directory */
2658  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
2659  continue;
2660 
2661  /*
2662  * ok, has to be a surviving logical slot, iterate and delete
2663  * everything starting with xid-*
2664  */
2665  sprintf(path, "pg_replslot/%s", logical_de->d_name);
2666 
2667  /* we're only creating directories here, skip if it's not our's */
2668  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2669  continue;
2670 
2671  spill_dir = AllocateDir(path);
2672  while ((spill_de = ReadDir(spill_dir, path)) != NULL)
2673  {
2674  if (strcmp(spill_de->d_name, ".") == 0 ||
2675  strcmp(spill_de->d_name, "..") == 0)
2676  continue;
2677 
2678  /* only look at names that can be ours */
2679  if (strncmp(spill_de->d_name, "xid", 3) == 0)
2680  {
2681  sprintf(path, "pg_replslot/%s/%s", logical_de->d_name,
2682  spill_de->d_name);
2683 
2684  if (unlink(path) != 0)
2685  ereport(PANIC,
2687  errmsg("could not remove file \"%s\": %m",
2688  path)));
2689  }
2690  }
2691  FreeDir(spill_dir);
2692  }
2693  FreeDir(logical_dir);
2694 }
2695 
2696 /* ---------------------------------------
2697  * toast reassembly support
2698  * ---------------------------------------
2699  */
2700 
2701 /*
2702  * Initialize per tuple toast reconstruction support.
2703  */
2704 static void
2706 {
2707  HASHCTL hash_ctl;
2708 
2709  Assert(txn->toast_hash == NULL);
2710 
2711  memset(&hash_ctl, 0, sizeof(hash_ctl));
2712  hash_ctl.keysize = sizeof(Oid);
2713  hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
2714  hash_ctl.hcxt = rb->context;
2715  txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
2717 }
2718 
2719 /*
2720  * Per toast-chunk handling for toast reconstruction
2721  *
2722  * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
2723  * toasted Datum comes along.
2724  */
2725 static void
2727  Relation relation, ReorderBufferChange *change)
2728 {
2729  ReorderBufferToastEnt *ent;
2730  ReorderBufferTupleBuf *newtup;
2731  bool found;
2732  int32 chunksize;
2733  bool isnull;
2734  Pointer chunk;
2735  TupleDesc desc = RelationGetDescr(relation);
2736  Oid chunk_id;
2737  int32 chunk_seq;
2738 
2739  if (txn->toast_hash == NULL)
2740  ReorderBufferToastInitHash(rb, txn);
2741 
2742  Assert(IsToastRelation(relation));
2743 
2744  newtup = change->data.tp.newtuple;
2745  chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
2746  Assert(!isnull);
2747  chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
2748  Assert(!isnull);
2749 
2750  ent = (ReorderBufferToastEnt *)
2751  hash_search(txn->toast_hash,
2752  (void *) &chunk_id,
2753  HASH_ENTER,
2754  &found);
2755 
2756  if (!found)
2757  {
2758  Assert(ent->chunk_id == chunk_id);
2759  ent->num_chunks = 0;
2760  ent->last_chunk_seq = 0;
2761  ent->size = 0;
2762  ent->reconstructed = NULL;
2763  dlist_init(&ent->chunks);
2764 
2765  if (chunk_seq != 0)
2766  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
2767  chunk_seq, chunk_id);
2768  }
2769  else if (found && chunk_seq != ent->last_chunk_seq + 1)
2770  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
2771  chunk_seq, chunk_id, ent->last_chunk_seq + 1);
2772 
2773  chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
2774  Assert(!isnull);
2775 
2776  /* calculate size so we can allocate the right size at once later */
2777  if (!VARATT_IS_EXTENDED(chunk))
2778  chunksize = VARSIZE(chunk) - VARHDRSZ;
2779  else if (VARATT_IS_SHORT(chunk))
2780  /* could happen due to heap_form_tuple doing its thing */
2781  chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
2782  else
2783  elog(ERROR, "unexpected type of toast chunk");
2784 
2785  ent->size += chunksize;
2786  ent->last_chunk_seq = chunk_seq;
2787  ent->num_chunks++;
2788  dlist_push_tail(&ent->chunks, &change->node);
2789 }
2790 
2791 /*
2792  * Rejigger change->newtuple to point to in-memory toast tuples instead to
2793  * on-disk toast tuples that may not longer exist (think DROP TABLE or VACUUM).
2794  *
2795  * We cannot replace unchanged toast tuples though, so those will still point
2796  * to on-disk toast data.
2797  */
2798 static void
2800  Relation relation, ReorderBufferChange *change)
2801 {
2802  TupleDesc desc;
2803  int natt;
2804  Datum *attrs;
2805  bool *isnull;
2806  bool *free;
2807  HeapTuple tmphtup;
2808  Relation toast_rel;
2809  TupleDesc toast_desc;
2810  MemoryContext oldcontext;
2811  ReorderBufferTupleBuf *newtup;
2812 
2813  /* no toast tuples changed */
2814  if (txn->toast_hash == NULL)
2815  return;
2816 
2817  oldcontext = MemoryContextSwitchTo(rb->context);
2818 
2819  /* we should only have toast tuples in an INSERT or UPDATE */
2820  Assert(change->data.tp.newtuple);
2821 
2822  desc = RelationGetDescr(relation);
2823 
2824  toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
2825  toast_desc = RelationGetDescr(toast_rel);
2826 
2827  /* should we allocate from stack instead? */
2828  attrs = palloc0(sizeof(Datum) * desc->natts);
2829  isnull = palloc0(sizeof(bool) * desc->natts);
2830  free = palloc0(sizeof(bool) * desc->natts);
2831 
2832  newtup = change->data.tp.newtuple;
2833 
2834  heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
2835 
2836  for (natt = 0; natt < desc->natts; natt++)
2837  {
2838  Form_pg_attribute attr = desc->attrs[natt];
2839  ReorderBufferToastEnt *ent;
2840  struct varlena *varlena;
2841 
2842  /* va_rawsize is the size of the original datum -- including header */
2843  struct varatt_external toast_pointer;
2844  struct varatt_indirect redirect_pointer;
2845  struct varlena *new_datum = NULL;
2846  struct varlena *reconstructed;
2847  dlist_iter it;
2848  Size data_done = 0;
2849 
2850  /* system columns aren't toasted */
2851  if (attr->attnum < 0)
2852  continue;
2853 
2854  if (attr->attisdropped)
2855  continue;
2856 
2857  /* not a varlena datatype */
2858  if (attr->attlen != -1)
2859  continue;
2860 
2861  /* no data */
2862  if (isnull[natt])
2863  continue;
2864 
2865  /* ok, we know we have a toast datum */
2866  varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
2867 
2868  /* no need to do anything if the tuple isn't external */
2869  if (!VARATT_IS_EXTERNAL(varlena))
2870  continue;
2871 
2872  VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
2873 
2874  /*
2875  * Check whether the toast tuple changed, replace if so.
2876  */
2877  ent = (ReorderBufferToastEnt *)
2878  hash_search(txn->toast_hash,
2879  (void *) &toast_pointer.va_valueid,
2880  HASH_FIND,
2881  NULL);
2882  if (ent == NULL)
2883  continue;
2884 
2885  new_datum =
2886  (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
2887 
2888  free[natt] = true;
2889 
2890  reconstructed = palloc0(toast_pointer.va_rawsize);
2891 
2892  ent->reconstructed = reconstructed;
2893 
2894  /* stitch toast tuple back together from its parts */
2895  dlist_foreach(it, &ent->chunks)
2896  {
2897  bool isnull;
2898  ReorderBufferChange *cchange;
2899  ReorderBufferTupleBuf *ctup;
2900  Pointer chunk;
2901 
2902  cchange = dlist_container(ReorderBufferChange, node, it.cur);
2903  ctup = cchange->data.tp.newtuple;
2904  chunk = DatumGetPointer(
2905  fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
2906 
2907  Assert(!isnull);
2908  Assert(!VARATT_IS_EXTERNAL(chunk));
2909  Assert(!VARATT_IS_SHORT(chunk));
2910 
2911  memcpy(VARDATA(reconstructed) + data_done,
2912  VARDATA(chunk),
2913  VARSIZE(chunk) - VARHDRSZ);
2914  data_done += VARSIZE(chunk) - VARHDRSZ;
2915  }
2916  Assert(data_done == toast_pointer.va_extsize);
2917 
2918  /* make sure its marked as compressed or not */
2919  if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
2920  SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
2921  else
2922  SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
2923 
2924  memset(&redirect_pointer, 0, sizeof(redirect_pointer));
2925  redirect_pointer.pointer = reconstructed;
2926 
2928  memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
2929  sizeof(redirect_pointer));
2930 
2931  attrs[natt] = PointerGetDatum(new_datum);
2932  }
2933 
2934  /*
2935  * Build tuple in separate memory & copy tuple back into the tuplebuf
2936  * passed to the output plugin. We can't directly heap_fill_tuple() into
2937  * the tuplebuf because attrs[] will point back into the current content.
2938  */
2939  tmphtup = heap_form_tuple(desc, attrs, isnull);
2940  Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
2941  Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
2942 
2943  memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
2944  newtup->tuple.t_len = tmphtup->t_len;
2945 
2946  /*
2947  * free resources we won't further need, more persistent stuff will be
2948  * free'd in ReorderBufferToastReset().
2949  */
2950  RelationClose(toast_rel);
2951  pfree(tmphtup);
2952  for (natt = 0; natt < desc->natts; natt++)
2953  {
2954  if (free[natt])
2955  pfree(DatumGetPointer(attrs[natt]));
2956  }
2957  pfree(attrs);
2958  pfree(free);
2959  pfree(isnull);
2960 
2961  MemoryContextSwitchTo(oldcontext);
2962 }
2963 
2964 /*
2965  * Free all resources allocated for toast reconstruction.
2966  */
2967 static void
2969 {
2970  HASH_SEQ_STATUS hstat;
2971  ReorderBufferToastEnt *ent;
2972 
2973  if (txn->toast_hash == NULL)
2974  return;
2975 
2976  /* sequentially walk over the hash and free everything */
2977  hash_seq_init(&hstat, txn->toast_hash);
2978  while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
2979  {
2980  dlist_mutable_iter it;
2981 
2982  if (ent->reconstructed != NULL)
2983  pfree(ent->reconstructed);
2984 
2985  dlist_foreach_modify(it, &ent->chunks)
2986  {
2987  ReorderBufferChange *change =
2989 
2990  dlist_delete(&change->node);
2991  ReorderBufferReturnChange(rb, change);
2992  }
2993  }
2994 
2995  hash_destroy(txn->toast_hash);
2996  txn->toast_hash = NULL;
2997 }
2998 
2999 
3000 /* ---------------------------------------
3001  * Visibility support for logical decoding
3002  *
3003  *
3004  * Lookup actual cmin/cmax values when using decoding snapshot. We can't
3005  * always rely on stored cmin/cmax values because of two scenarios:
3006  *
3007  * * A tuple got changed multiple times during a single transaction and thus
3008  * has got a combocid. Combocid's are only valid for the duration of a
3009  * single transaction.
3010  * * A tuple with a cmin but no cmax (and thus no combocid) got
3011  * deleted/updated in another transaction than the one which created it
3012  * which we are looking at right now. As only one of cmin, cmax or combocid
3013  * is actually stored in the heap we don't have access to the value we
3014  * need anymore.
3015  *
3016  * To resolve those problems we have a per-transaction hash of (cmin,
3017  * cmax) tuples keyed by (relfilenode, ctid) which contains the actual
3018  * (cmin, cmax) values. That also takes care of combocids by simply
3019  * not caring about them at all. As we have the real cmin/cmax values
3020  * combocids aren't interesting.
3021  *
3022  * As we only care about catalog tuples here the overhead of this
3023  * hashtable should be acceptable.
3024  *
3025  * Heap rewrites complicate this a bit, check rewriteheap.c for
3026  * details.
3027  * -------------------------------------------------------------------------
3028  */
3029 
3030 /* struct for qsort()ing mapping files by lsn somewhat efficiently */
3031 typedef struct RewriteMappingFile
3032 {
3036 
3037 #if NOT_USED
3038 static void
3039 DisplayMapping(HTAB *tuplecid_data)
3040 {
3041  HASH_SEQ_STATUS hstat;
3043 
3044  hash_seq_init(&hstat, tuplecid_data);
3045  while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
3046  {
3047  elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
3048  ent->key.relnode.dbNode,
3049  ent->key.relnode.spcNode,
3050  ent->key.relnode.relNode,
3052  ent->key.tid.ip_posid,
3053  ent->cmin,
3054  ent->cmax
3055  );
3056  }
3057 }
3058 #endif
3059 
3060 /*
3061  * Apply a single mapping file to tuplecid_data.
3062  *
3063  * The mapping file has to have been verified to be a) committed b) for our
3064  * transaction c) applied in LSN order.
3065  */
3066 static void
3067 ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
3068 {
3069  char path[MAXPGPATH];
3070  int fd;
3071  int readBytes;
3073 
3074  sprintf(path, "pg_logical/mappings/%s", fname);
3075  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
3076  if (fd < 0)
3077  ereport(ERROR,
3079  errmsg("could not open file \"%s\": %m", path)));
3080 
3081  while (true)
3082  {
3085  ReorderBufferTupleCidEnt *new_ent;
3086  bool found;
3087 
3088  /* be careful about padding */
3089  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
3090 
3091  /* read all mappings till the end of the file */
3092  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
3093 
3094  if (readBytes < 0)
3095  ereport(ERROR,
3097  errmsg("could not read file \"%s\": %m",
3098  path)));
3099  else if (readBytes == 0) /* EOF */
3100  break;
3101  else if (readBytes != sizeof(LogicalRewriteMappingData))
3102  ereport(ERROR,
3104  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
3105  path, readBytes,
3106  (int32) sizeof(LogicalRewriteMappingData))));
3107 
3108  key.relnode = map.old_node;
3109  ItemPointerCopy(&map.old_tid,
3110  &key.tid);
3111 
3112 
3113  ent = (ReorderBufferTupleCidEnt *)
3114  hash_search(tuplecid_data,
3115  (void *) &key,
3116  HASH_FIND,
3117  NULL);
3118 
3119  /* no existing mapping, no need to update */
3120  if (!ent)
3121  continue;
3122 
3123  key.relnode = map.new_node;
3124  ItemPointerCopy(&map.new_tid,
3125  &key.tid);
3126 
3127  new_ent = (ReorderBufferTupleCidEnt *)
3128  hash_search(tuplecid_data,
3129  (void *) &key,
3130  HASH_ENTER,
3131  &found);
3132 
3133  if (found)
3134  {
3135  /*
3136  * Make sure the existing mapping makes sense. We sometime update
3137  * old records that did not yet have a cmax (e.g. pg_class' own
3138  * entry while rewriting it) during rewrites, so allow that.
3139  */
3140  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
3141  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
3142  }
3143  else
3144  {
3145  /* update mapping */
3146  new_ent->cmin = ent->cmin;
3147  new_ent->cmax = ent->cmax;
3148  new_ent->combocid = ent->combocid;
3149  }
3150  }
3151 }
3152 
3153 
3154 /*
3155  * Check whether the TransactionOId 'xid' is in the pre-sorted array 'xip'.
3156  */
3157 static bool
3159 {
3160  return bsearch(&xid, xip, num,
3161  sizeof(TransactionId), xidComparator) != NULL;
3162 }
3163 
3164 /*
3165  * qsort() comparator for sorting RewriteMappingFiles in LSN order.
3166  */
3167 static int
3168 file_sort_by_lsn(const void *a_p, const void *b_p)
3169 {
3170  RewriteMappingFile *a = *(RewriteMappingFile **) a_p;
3171  RewriteMappingFile *b = *(RewriteMappingFile **) b_p;
3172 
3173  if (a->lsn < b->lsn)
3174  return -1;
3175  else if (a->lsn > b->lsn)
3176  return 1;
3177  return 0;
3178 }
3179 
3180 /*
3181  * Apply any existing logical remapping files if there are any targeted at our
3182  * transaction for relid.
3183  */
3184 static void
3186 {
3187  DIR *mapping_dir;
3188  struct dirent *mapping_de;
3189  List *files = NIL;
3190  ListCell *file;
3191  RewriteMappingFile **files_a;
3192  size_t off;
3193  Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
3194 
3195  mapping_dir = AllocateDir("pg_logical/mappings");
3196  while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
3197  {
3198  Oid f_dboid;
3199  Oid f_relid;
3200  TransactionId f_mapped_xid;
3201  TransactionId f_create_xid;
3202  XLogRecPtr f_lsn;
3203  uint32 f_hi,
3204  f_lo;
3205  RewriteMappingFile *f;
3206 
3207  if (strcmp(mapping_de->d_name, ".") == 0 ||
3208  strcmp(mapping_de->d_name, "..") == 0)
3209  continue;
3210 
3211  /* Ignore files that aren't ours */
3212  if (strncmp(mapping_de->d_name, "map-", 4) != 0)
3213  continue;
3214 
3215  if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
3216  &f_dboid, &f_relid, &f_hi, &f_lo,
3217  &f_mapped_xid, &f_create_xid) != 6)
3218  elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
3219 
3220  f_lsn = ((uint64) f_hi) << 32 | f_lo;
3221 
3222  /* mapping for another database */
3223  if (f_dboid != dboid)
3224  continue;
3225 
3226  /* mapping for another relation */
3227  if (f_relid != relid)
3228  continue;
3229 
3230  /* did the creating transaction abort? */
3231  if (!TransactionIdDidCommit(f_create_xid))
3232  continue;
3233 
3234  /* not for our transaction */
3235  if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
3236  continue;
3237 
3238  /* ok, relevant, queue for apply */
3239  f = palloc(sizeof(RewriteMappingFile));
3240  f->lsn = f_lsn;
3241  strcpy(f->fname, mapping_de->d_name);
3242  files = lappend(files, f);
3243  }
3244  FreeDir(mapping_dir);
3245 
3246  /* build array we can easily sort */
3247  files_a = palloc(list_length(files) * sizeof(RewriteMappingFile *));
3248  off = 0;
3249  foreach(file, files)
3250  {
3251  files_a[off++] = lfirst(file);
3252  }
3253 
3254  /* sort files so we apply them in LSN order */
3255  qsort(files_a, list_length(files), sizeof(RewriteMappingFile *),
3257 
3258  for (off = 0; off < list_length(files); off++)
3259  {
3260  RewriteMappingFile *f = files_a[off];
3261 
3262  elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
3263  snapshot->subxip[0]);
3264  ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
3265  pfree(f);
3266  }
3267 }
3268 
3269 /*
3270  * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
3271  * combocids.
3272  */
3273 bool
3275  Snapshot snapshot,
3276  HeapTuple htup, Buffer buffer,
3277  CommandId *cmin, CommandId *cmax)
3278 {
3281  ForkNumber forkno;
3282  BlockNumber blockno;
3283  bool updated_mapping = false;
3284 
3285  /* be careful about padding */
3286  memset(&key, 0, sizeof(key));
3287 
3288  Assert(!BufferIsLocal(buffer));
3289 
3290  /*
3291  * get relfilenode from the buffer, no convenient way to access it other
3292  * than that.
3293  */
3294  BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
3295 
3296  /* tuples can only be in the main fork */
3297  Assert(forkno == MAIN_FORKNUM);
3298  Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
3299 
3300  ItemPointerCopy(&htup->t_self,
3301  &key.tid);
3302 
3303 restart:
3304  ent = (ReorderBufferTupleCidEnt *)
3305  hash_search(tuplecid_data,
3306  (void *) &key,
3307  HASH_FIND,
3308  NULL);
3309 
3310  /*
3311  * failed to find a mapping, check whether the table was rewritten and
3312  * apply mapping if so, but only do that once - there can be no new
3313  * mappings while we are in here since we have to hold a lock on the
3314  * relation.
3315  */
3316  if (ent == NULL && !updated_mapping)
3317  {
3318  UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
3319  /* now check but don't update for a mapping again */
3320  updated_mapping = true;
3321  goto restart;
3322  }
3323  else if (ent == NULL)
3324  return false;
3325 
3326  if (cmin)
3327  *cmin = ent->cmin;
3328  if (cmax)
3329  *cmax = ent->cmax;
3330  return true;
3331 }
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
XLogRecPtr first_lsn
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
#define NIL
Definition: pg_list.h:69
uint32 CommandId
Definition: c.h:408
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
TimestampTz commit_time
#define BlockIdGetBlockNumber(blockId)
Definition: block.h:115
struct ReorderBufferToastEnt ReorderBufferToastEnt
void AbortCurrentTransaction(void)
Definition: xact.c:2984
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
#define SizeofHeapTupleHeader
Definition: htup_details.h:170
bool IsToastRelation(Relation relation)
Definition: catalog.c:135
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:793
dlist_head cached_changes
void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
#define relpathperm(rnode, forknum)
Definition: relpath.h:67
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
ReorderBufferApplyChangeCB apply_change
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
HeapTupleData * HeapTuple
Definition: htup.h:70
#define DEBUG1
Definition: elog.h:25
#define VALGRIND_MAKE_MEM_DEFINED(addr, size)
Definition: memdebug.h:26
dlist_node * cur
Definition: ilist.h:180
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
void StartupReorderBuffer(void)
#define VARDATA(PTR)
Definition: postgres.h:305
#define fastgetattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:719
static int32 next
Definition: blutils.c:210
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
static const Size max_cached_tuplebufs
void ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
uint32 TransactionId
Definition: c.h:394
void ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
bool copied
Definition: snapshot.h:94
void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
MemoryContext hcxt
Definition: hsearch.h:78
#define DatumGetInt32(X)
Definition: postgres.h:480
#define RelationGetDescr(relation)
Definition: rel.h:425
static void dlist_push_head(dlist_head *head, dlist_node *node)
Definition: ilist.h:300
#define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)
Definition: tuptoaster.h:111
#define DEBUG3
Definition: elog.h:23
#define write(a, b, c)
Definition: win32.h:19
#define VARHDRSZ_SHORT
Definition: postgres.h:269
TransactionId by_txn_last_xid
#define VARSIZE(PTR)
Definition: postgres.h:306
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
Definition: memdebug.h:28
int64 TimestampTz
Definition: timestamp.h:39
#define PointerGetDatum(X)
Definition: postgres.h:564
ReorderBufferTXN * txn
static bool TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
#define VARHDRSZ
Definition: c.h:441
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
XLogRecPtr current_restart_decoding_lsn
#define DatumGetObjectId(X)
Definition: postgres.h:508
char * pstrdup(const char *in)
Definition: mcxt.c:1165
static ReorderBufferTXN * ReorderBufferGetTXN(ReorderBuffer *rb)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
Form_pg_attribute * attrs
Definition: tupdesc.h:74
void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void ReorderBufferFree(ReorderBuffer *rb)
static void slist_push_head(slist_head *head, slist_node *node)
Definition: ilist.h:574
uint16 RepOriginId
Definition: xlogdefs.h:51
Size entrysize
Definition: hsearch.h:73
union ReorderBufferChange::@49 data
struct cursor * cur
Definition: ecpg.c:28
char fname[MAXPGPATH]
int32 va_rawsize
Definition: postgres.h:70
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4320
void binaryheap_replace_first(binaryheap *heap, Datum d)
Definition: binaryheap.c:204
void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileNode node, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
uint32 BlockNumber
Definition: block.h:31
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:1965
ReorderBufferCommitCB commit
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:692
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:572
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:885
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:125
ReplicationSlotPersistentData data
Definition: slot.h:115
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, int *fd, XLogSegNo *segno)
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
struct SnapshotData * Snapshot
Definition: snapshot.h:23
Form_pg_class rd_rel
Definition: rel.h:113
unsigned int Oid
Definition: postgres_ext.h:31
XLogRecPtr base_snapshot_lsn
Definition: dirent.h:9
uint32 regd_count
Definition: snapshot.h:108
#define PANIC
Definition: elog.h:53
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
void binaryheap_add_unordered(binaryheap *heap, Datum d)
Definition: binaryheap.c:110
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define VARDATA_EXTERNAL(PTR)
Definition: postgres.h:313
#define PG_BINARY
Definition: c.h:1038
int natts
Definition: tupdesc.h:73
XLogRecPtr origin_lsn
static void ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
signed int int32
Definition: c.h:253
static int file_sort_by_lsn(const void *a_p, const void *b_p)
#define XLogSegNoOffsetToRecPtr(segno, offset, dest)
Definition: xlog_internal.h:95
#define FirstCommandId
Definition: c.h:410
void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
HeapTupleHeader t_data
Definition: htup.h:67
struct ReorderBufferChange::@49::@51 msg
#define VARATT_IS_EXTERNAL(PTR)
Definition: postgres.h:316
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:172
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:421
Definition: dynahash.c:193
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:36
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:992
static void slist_init(slist_head *head)
Definition: ilist.h:554
char * Pointer
Definition: c.h:242
struct ReorderBufferChange::@49::@50 tp
Size nr_cached_transactions
Definition: dirent.c:25
#define ERROR
Definition: elog.h:43
Size nr_cached_changes
BlockIdData ip_blkid
Definition: itemptr.h:38
#define VARATT_IS_SHORT(PTR)
Definition: postgres.h:327
dlist_head changes
Datum binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:159
#define MAXPGPATH
ItemPointerData t_self
Definition: htup.h:65
ReorderBufferTupleCidKey key
Definition: reorderbuffer.c:93
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:145
#define DEBUG2
Definition: elog.h:24
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:416
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
uint32 t_len
Definition: htup.h:64
#define MaxHeapTupleSize
Definition: htup_details.h:561
struct varlena * reconstructed
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4155
#define SET_VARTAG_EXTERNAL(PTR, tag)
Definition: postgres.h:334
uint64 XLogSegNo
Definition: xlogdefs.h:34
static const Size max_cached_changes
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2093
struct ReorderBufferChange::@49::@52 tuplecid
int errcode_for_file_access(void)
Definition: elog.c:598
HeapTupleData tuple
Definition: reorderbuffer.h:27
struct SnapshotData SnapshotData
dlist_head cached_transactions
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:433
#define InvalidTransactionId
Definition: transam.h:31
static const Size max_cached_transactions
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:184
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
unsigned int uint32
Definition: c.h:265
XLogRecPtr final_lsn
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2284
Oid t_tableOid
Definition: htup.h:66
void RelationClose(Relation relation)
Definition: relcache.c:2155
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
ReorderBufferMessageCB message
int unlink(const char *filename)
#define ereport(elevel, rest)
Definition: elog.h:122
int bh_size
Definition: binaryheap.h:32
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
TransactionId * xip
Definition: snapshot.h:77
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
#define VARSIZE_SHORT(PTR)
Definition: postgres.h:308
ForkNumber
Definition: relpath.h:24
static slist_node * slist_pop_head_node(slist_head *head)
Definition: ilist.h:596
List * lappend(List *list, void *datum)
Definition: list.c:128
static HTAB * tuplecid_data
Definition: snapmgr.c:170
int CloseTransientFile(int fd)
Definition: fd.c:2254
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
#define INDIRECT_POINTER_SIZE
Definition: tuptoaster.h:102
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
#define HASH_BLOBS
Definition: hsearch.h:88
ReorderBufferChange * change
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
MemoryContext context
#define slist_container(type, membername, ptr)
Definition: ilist.h:674
void ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:440
void * palloc0(Size size)
Definition: mcxt.c:920
static bool dlist_has_next(dlist_head *head, dlist_node *node)
Definition: ilist.h:402
#define InvalidCommandId
Definition: c.h:411
uintptr_t Datum
Definition: postgres.h:374
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:301
ReorderBufferTXN * by_txn_last_txn
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void cleanup(void)
Definition: bootstrap.c:848
dlist_head toplevel_by_lsn
struct RewriteMappingFile RewriteMappingFile
Oid MyDatabaseId
Definition: globals.c:76
TransactionId xid
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
bool IsSharedRelation(Oid relationId)
Definition: catalog.c:219
Size keysize
Definition: hsearch.h:72
dlist_node * cur
Definition: ilist.h:161
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:784
static void AssertTXNLsnOrder(ReorderBuffer *rb)
void ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
#define InvalidOid
Definition: postgres_ext.h:36
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
CommandId curcid
Definition: snapshot.h:96
struct ReorderBufferIterTXNState ReorderBufferIterTXNState
static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
struct ReorderBufferDiskChange ReorderBufferDiskChange
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:126
#define free(a)
Definition: header.h:60
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
#define PG_CATCH()
Definition: elog.h:293
ReplicationSlot * MyReplicationSlot
Definition: slot.c:95
#define XLByteToSeg(xlrp, logSegNo)
ItemPointerData new_tid
Definition: rewriteheap.h:40
#define NULL
Definition: c.h:226
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:671
#define lfirst(lc)
Definition: pg_list.h:106
ReorderBufferTXN * ReorderBufferGetOldestTXN(ReorderBuffer *rb)
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
Definition: regguts.h:298
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2350
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
Definition: tuptoaster.h:121
int32 va_extsize
Definition: postgres.h:71
XLogRecPtr end_lsn
void StartTransactionCommand(void)
Definition: xact.c:2675
void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
size_t Size
Definition: c.h:353
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:69
SharedInvalidationMessage * invalidations
static int list_length(const List *l)
Definition: pg_list.h:89
void BeginInternalSubTransaction(char *name)
Definition: xact.c:4051
#define BufferIsLocal(buffer)
Definition: buf.h:37
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
#define PG_RE_THROW()
Definition: elog.h:314
ReorderBuffer * ReorderBufferAllocate(void)
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1353
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1021
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1343
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
struct varlena * pointer
Definition: postgres.h:87
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:397
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
dlist_head subtxns
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:33
#define VARATT_IS_EXTENDED(PTR)
Definition: postgres.h:328
#define DatumGetPointer(X)
Definition: postgres.h:557
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:935
Size nr_cached_tuplebufs
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:547
#define Int32GetDatum(X)
Definition: postgres.h:487
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
uint32 xcnt
Definition: snapshot.h:78
void * palloc(Size size)
Definition: mcxt.c:891
int errmsg(const char *fmt,...)
Definition: elog.c:797
static void UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:749
static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBufferTXN * txn
Definition: reorderbuffer.c:81
void ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:1949
int i
XLogRecPtr restart_decoding_lsn
#define NameStr(name)
Definition: c.h:495
#define SET_VARSIZE_COMPRESSED(PTR, len)
Definition: postgres.h:332
void * arg
Datum binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:174
static const Size max_changes_in_memory
Definition: c.h:435
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
#define SET_VARSIZE(PTR, len)
Definition: postgres.h:330
char d_name[MAX_PATH]
Definition: dirent.h:14
#define elog
Definition: elog.h:219
#define ItemPointerGetBlockNumber(pointer)
Definition: itemptr.h:66
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define qsort(a, b, c, d)
Definition: port.h:440
#define TransactionIdIsValid(xid)
Definition: transam.h:41
ReorderBufferChange change
ReorderBufferBeginCB begin
static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
void BufferGetTag(Buffer buffer, RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
Definition: bufmgr.c:2609
#define PG_TRY()
Definition: elog.h:284
#define XLByteInSeg(xlrp, logSegNo)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
#define RELKIND_SEQUENCE
Definition: pg_class.h:162
#define lstat(path, sb)
Definition: win32.h:272
Definition: pg_list.h:45
int Buffer
Definition: buf.h:23
OffsetNumber ip_posid
Definition: itemptr.h:39
static ReorderBufferIterTXNState * ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2066
#define PG_END_TRY()
Definition: elog.h:300
#define read(a, b, c)
Definition: win32.h:18
int FreeDir(DIR *dir)
Definition: fd.c:2393
struct HeapTupleData HeapTupleData
#define offsetof(type, field)
Definition: c.h:551
dlist_head tuplecids
slist_head cached_tuplebufs
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:120
TransactionId * subxip
Definition: snapshot.h:89
uint32 active_count
Definition: snapshot.h:107
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:138
int32 subxcnt
Definition: snapshot.h:90
void ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
ReorderBufferTupleBuf * ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
bool ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
ItemPointerData old_tid
Definition: rewriteheap.h:39