PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
reorderbuffer.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * reorderbuffer.c
4  * PostgreSQL logical replay/reorder buffer management
5  *
6  *
7  * Copyright (c) 2012-2017, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/replication/reorderbuffer.c
12  *
13  * NOTES
14  * This module gets handed individual pieces of transactions in the order
15  * they are written to the WAL and is responsible to reassemble them into
16  * toplevel transaction sized pieces. When a transaction is completely
17  * reassembled - signalled by reading the transaction commit record - it
18  * will then call the output plugin (c.f. ReorderBufferCommit()) with the
19  * individual changes. The output plugins rely on snapshots built by
20  * snapbuild.c which hands them to us.
21  *
22  * Transactions and subtransactions/savepoints in postgres are not
23  * immediately linked to each other from outside the performing
24  * backend. Only at commit/abort (or special xact_assignment records) they
25  * are linked together. Which means that we will have to splice together a
26  * toplevel transaction from its subtransactions. To do that efficiently we
27  * build a binary heap indexed by the smallest current lsn of the individual
28  * subtransactions' changestreams. As the individual streams are inherently
29  * ordered by LSN - since that is where we build them from - the transaction
30  * can easily be reassembled by always using the subtransaction with the
31  * smallest current LSN from the heap.
32  *
33  * In order to cope with large transactions - which can be several times as
34  * big as the available memory - this module supports spooling the contents
35  * of a large transactions to disk. When the transaction is replayed the
36  * contents of individual (sub-)transactions will be read from disk in
37  * chunks.
38  *
39  * This module also has to deal with reassembling toast records from the
40  * individual chunks stored in WAL. When a new (or initial) version of a
41  * tuple is stored in WAL it will always be preceded by the toast chunks
42  * emitted for the columns stored out of line. Within a single toplevel
43  * transaction there will be no other data carrying records between a row's
44  * toast chunks and the row data itself. See ReorderBufferToast* for
45  * details.
46  * -------------------------------------------------------------------------
47  */
48 #include "postgres.h"
49 
50 #include <unistd.h>
51 #include <sys/stat.h>
52 
53 #include "access/rewriteheap.h"
54 #include "access/transam.h"
55 #include "access/tuptoaster.h"
56 #include "access/xact.h"
57 #include "access/xlog_internal.h"
58 #include "catalog/catalog.h"
59 #include "lib/binaryheap.h"
60 #include "miscadmin.h"
61 #include "pgstat.h"
62 #include "replication/logical.h"
64 #include "replication/slot.h"
65 #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
66 #include "storage/bufmgr.h"
67 #include "storage/fd.h"
68 #include "storage/sinval.h"
69 #include "utils/builtins.h"
70 #include "utils/combocid.h"
71 #include "utils/memdebug.h"
72 #include "utils/memutils.h"
73 #include "utils/rel.h"
74 #include "utils/relfilenodemap.h"
75 #include "utils/tqual.h"
76 
77 
78 /* entry for a hash table we use to map from xid to our transaction state */
80 {
84 
85 /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
87 {
91 
93 {
97  CommandId combocid; /* just for debugging */
99 
100 /* k-way in-order change iteration support structures */
102 {
106  int fd;
109 
111 {
115  ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER];
117 
118 /* toast datastructures */
119 typedef struct ReorderBufferToastEnt
120 {
121  Oid chunk_id; /* toast_table.chunk_id */
122  int32 last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
123  * have seen */
124  Size num_chunks; /* number of chunks we've already seen */
125  Size size; /* combined size of chunks seen */
126  dlist_head chunks; /* linked list of chunks */
127  struct varlena *reconstructed; /* reconstructed varlena now pointed
128  * to in main tup */
130 
131 /* Disk serialization support datastructures */
133 {
136  /* data follows */
138 
139 /*
140  * Maximum number of changes kept in memory, per transaction. After that,
141  * changes are spooled to disk.
142  *
143  * The current value should be sufficient to decode the entire transaction
144  * without hitting disk in OLTP workloads, while starting to spool to disk in
145  * other workloads reasonably fast.
146  *
147  * At some point in the future it probably makes sense to have a more elaborate
148  * resource management here, but it's not entirely clear what that would look
149  * like.
150  */
151 static const Size max_changes_in_memory = 4096;
152 
153 /*
154  * We use a very simple form of a slab allocator for frequently allocated
155  * objects, simply keeping a fixed number in a linked list when unused,
156  * instead pfree()ing them. Without that in many workloads aset.c becomes a
157  * major bottleneck, especially when spilling to disk while decoding batch
158  * workloads.
159  */
160 static const Size max_cached_tuplebufs = 4096 * 2; /* ~8MB */
161 
162 /* ---------------------------------------
163  * primary reorderbuffer support routines
164  * ---------------------------------------
165  */
169  TransactionId xid, bool create, bool *is_new,
170  XLogRecPtr lsn, bool create_as_top);
171 
172 static void AssertTXNLsnOrder(ReorderBuffer *rb);
173 
174 /* ---------------------------------------
175  * support functions for lsn-order iterating over the ->changes of a
176  * transaction and its subtransactions
177  *
178  * used for iteration over the k-way heap merge of a transaction and its
179  * subtransactions
180  * ---------------------------------------
181  */
183 static ReorderBufferChange *
188 
189 /*
190  * ---------------------------------------
191  * Disk serialization support functions
192  * ---------------------------------------
193  */
197  int fd, ReorderBufferChange *change);
199  int *fd, XLogSegNo *segno);
201  char *change);
203 
204 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
206  ReorderBufferTXN *txn, CommandId cid);
207 
208 /* ---------------------------------------
209  * toast reassembly support
210  * ---------------------------------------
211  */
215  Relation relation, ReorderBufferChange *change);
217  Relation relation, ReorderBufferChange *change);
218 
219 
220 /*
221  * Allocate a new ReorderBuffer
222  */
225 {
227  HASHCTL hash_ctl;
228  MemoryContext new_ctx;
229 
230  /* allocate memory in own context, to have better accountability */
232  "ReorderBuffer",
234 
235  buffer =
236  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
237 
238  memset(&hash_ctl, 0, sizeof(hash_ctl));
239 
240  buffer->context = new_ctx;
241 
242  buffer->change_context = SlabContextCreate(new_ctx,
243  "Change",
245  sizeof(ReorderBufferChange));
246 
247  buffer->txn_context = SlabContextCreate(new_ctx,
248  "TXN",
250  sizeof(ReorderBufferTXN));
251 
252  hash_ctl.keysize = sizeof(TransactionId);
253  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
254  hash_ctl.hcxt = buffer->context;
255 
256  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
258 
260  buffer->by_txn_last_txn = NULL;
261 
262  buffer->nr_cached_tuplebufs = 0;
263 
264  buffer->outbuf = NULL;
265  buffer->outbufsize = 0;
266 
268 
269  dlist_init(&buffer->toplevel_by_lsn);
270  slist_init(&buffer->cached_tuplebufs);
271 
272  return buffer;
273 }
274 
275 /*
276  * Free a ReorderBuffer
277  */
278 void
280 {
281  MemoryContext context = rb->context;
282 
283  /*
284  * We free separately allocated data by entirely scrapping reorderbuffer's
285  * memory context.
286  */
287  MemoryContextDelete(context);
288 }
289 
290 /*
291  * Get an unused, possibly preallocated, ReorderBufferTXN.
292  */
293 static ReorderBufferTXN *
295 {
296  ReorderBufferTXN *txn;
297 
298  txn = (ReorderBufferTXN *)
300 
301  memset(txn, 0, sizeof(ReorderBufferTXN));
302 
303  dlist_init(&txn->changes);
304  dlist_init(&txn->tuplecids);
305  dlist_init(&txn->subtxns);
306 
307  return txn;
308 }
309 
310 /*
311  * Free a ReorderBufferTXN.
312  *
313  * Deallocation might be delayed for efficiency purposes, for details check
314  * the comments above max_cached_changes's definition.
315  */
316 static void
318 {
319  /* clean the lookup cache if we were cached (quite likely) */
320  if (rb->by_txn_last_xid == txn->xid)
321  {
323  rb->by_txn_last_txn = NULL;
324  }
325 
326  /* free data that's contained */
327 
328  if (txn->tuplecid_hash != NULL)
329  {
331  txn->tuplecid_hash = NULL;
332  }
333 
334  if (txn->invalidations)
335  {
336  pfree(txn->invalidations);
337  txn->invalidations = NULL;
338  }
339 
340  pfree(txn);
341 }
342 
343 /*
344  * Get an unused, possibly preallocated, ReorderBufferChange.
345  */
348 {
349  ReorderBufferChange *change;
350 
351  change = (ReorderBufferChange *)
353 
354  memset(change, 0, sizeof(ReorderBufferChange));
355  return change;
356 }
357 
358 /*
359  * Free an ReorderBufferChange.
360  *
361  * Deallocation might be delayed for efficiency purposes, for details check
362  * the comments above max_cached_changes's definition.
363  */
364 void
366 {
367  /* free contained data */
368  switch (change->action)
369  {
374  if (change->data.tp.newtuple)
375  {
376  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
377  change->data.tp.newtuple = NULL;
378  }
379 
380  if (change->data.tp.oldtuple)
381  {
382  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
383  change->data.tp.oldtuple = NULL;
384  }
385  break;
387  if (change->data.msg.prefix != NULL)
388  pfree(change->data.msg.prefix);
389  change->data.msg.prefix = NULL;
390  if (change->data.msg.message != NULL)
391  pfree(change->data.msg.message);
392  change->data.msg.message = NULL;
393  break;
395  if (change->data.snapshot)
396  {
397  ReorderBufferFreeSnap(rb, change->data.snapshot);
398  change->data.snapshot = NULL;
399  }
400  break;
401  /* no data in addition to the struct itself */
405  break;
406  }
407 
408  pfree(change);
409 }
410 
411 /*
412  * Get an unused, possibly preallocated, ReorderBufferTupleBuf fitting at
413  * least a tuple of size tuple_len (excluding header overhead).
414  */
417 {
418  ReorderBufferTupleBuf *tuple;
419  Size alloc_len;
420 
421  alloc_len = tuple_len + SizeofHeapTupleHeader;
422 
423  /*
424  * Most tuples are below MaxHeapTupleSize, so we use a slab allocator for
425  * those. Thus always allocate at least MaxHeapTupleSize. Note that tuples
426  * generated for oldtuples can be bigger, as they don't have out-of-line
427  * toast columns.
428  */
429  if (alloc_len < MaxHeapTupleSize)
430  alloc_len = MaxHeapTupleSize;
431 
432 
433  /* if small enough, check the slab cache */
434  if (alloc_len <= MaxHeapTupleSize && rb->nr_cached_tuplebufs)
435  {
436  rb->nr_cached_tuplebufs--;
440 #ifdef USE_ASSERT_CHECKING
441  memset(&tuple->tuple, 0xa9, sizeof(HeapTupleData));
443 #endif
444  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
445 #ifdef USE_ASSERT_CHECKING
446  memset(tuple->tuple.t_data, 0xa8, tuple->alloc_tuple_size);
448 #endif
449  }
450  else
451  {
452  tuple = (ReorderBufferTupleBuf *)
454  sizeof(ReorderBufferTupleBuf) +
455  MAXIMUM_ALIGNOF + alloc_len);
456  tuple->alloc_tuple_size = alloc_len;
457  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
458  }
459 
460  return tuple;
461 }
462 
463 /*
464  * Free an ReorderBufferTupleBuf.
465  *
466  * Deallocation might be delayed for efficiency purposes, for details check
467  * the comments above max_cached_changes's definition.
468  */
469 void
471 {
472  /* check whether to put into the slab cache, oversized tuples never are */
473  if (tuple->alloc_tuple_size == MaxHeapTupleSize &&
475  {
476  rb->nr_cached_tuplebufs++;
477  slist_push_head(&rb->cached_tuplebufs, &tuple->node);
480  VALGRIND_MAKE_MEM_DEFINED(&tuple->node, sizeof(tuple->node));
482  }
483  else
484  {
485  pfree(tuple);
486  }
487 }
488 
489 /*
490  * Return the ReorderBufferTXN from the given buffer, specified by Xid.
491  * If create is true, and a transaction doesn't already exist, create it
492  * (with the given LSN, and as top transaction if that's specified);
493  * when this happens, is_new is set to true.
494  */
495 static ReorderBufferTXN *
497  bool *is_new, XLogRecPtr lsn, bool create_as_top)
498 {
499  ReorderBufferTXN *txn;
501  bool found;
502 
504  Assert(!create || lsn != InvalidXLogRecPtr);
505 
506  /*
507  * Check the one-entry lookup cache first
508  */
510  rb->by_txn_last_xid == xid)
511  {
512  txn = rb->by_txn_last_txn;
513 
514  if (txn != NULL)
515  {
516  /* found it, and it's valid */
517  if (is_new)
518  *is_new = false;
519  return txn;
520  }
521 
522  /*
523  * cached as non-existent, and asked not to create? Then nothing else
524  * to do.
525  */
526  if (!create)
527  return NULL;
528  /* otherwise fall through to create it */
529  }
530 
531  /*
532  * If the cache wasn't hit or it yielded an "does-not-exist" and we want
533  * to create an entry.
534  */
535 
536  /* search the lookup table */
537  ent = (ReorderBufferTXNByIdEnt *)
538  hash_search(rb->by_txn,
539  (void *) &xid,
540  create ? HASH_ENTER : HASH_FIND,
541  &found);
542  if (found)
543  txn = ent->txn;
544  else if (create)
545  {
546  /* initialize the new entry, if creation was requested */
547  Assert(ent != NULL);
548 
549  ent->txn = ReorderBufferGetTXN(rb);
550  ent->txn->xid = xid;
551  txn = ent->txn;
552  txn->first_lsn = lsn;
554 
555  if (create_as_top)
556  {
557  dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
558  AssertTXNLsnOrder(rb);
559  }
560  }
561  else
562  txn = NULL; /* not found and not asked to create */
563 
564  /* update cache */
565  rb->by_txn_last_xid = xid;
566  rb->by_txn_last_txn = txn;
567 
568  if (is_new)
569  *is_new = !found;
570 
571  Assert(!create || txn != NULL);
572  return txn;
573 }
574 
575 /*
576  * Queue a change into a transaction so it can be replayed upon commit.
577  */
578 void
580  ReorderBufferChange *change)
581 {
582  ReorderBufferTXN *txn;
583 
584  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
585 
586  change->lsn = lsn;
587  Assert(InvalidXLogRecPtr != lsn);
588  dlist_push_tail(&txn->changes, &change->node);
589  txn->nentries++;
590  txn->nentries_mem++;
591 
593 }
594 
595 /*
596  * Queue message into a transaction so it can be processed upon commit.
597  */
598 void
600  Snapshot snapshot, XLogRecPtr lsn,
601  bool transactional, const char *prefix,
602  Size message_size, const char *message)
603 {
604  if (transactional)
605  {
606  MemoryContext oldcontext;
607  ReorderBufferChange *change;
608 
610 
611  oldcontext = MemoryContextSwitchTo(rb->context);
612 
613  change = ReorderBufferGetChange(rb);
615  change->data.msg.prefix = pstrdup(prefix);
616  change->data.msg.message_size = message_size;
617  change->data.msg.message = palloc(message_size);
618  memcpy(change->data.msg.message, message, message_size);
619 
620  ReorderBufferQueueChange(rb, xid, lsn, change);
621 
622  MemoryContextSwitchTo(oldcontext);
623  }
624  else
625  {
626  ReorderBufferTXN *txn = NULL;
627  volatile Snapshot snapshot_now = snapshot;
628 
629  if (xid != InvalidTransactionId)
630  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
631 
632  /* setup snapshot to allow catalog access */
633  SetupHistoricSnapshot(snapshot_now, NULL);
634  PG_TRY();
635  {
636  rb->message(rb, txn, lsn, false, prefix, message_size, message);
637 
639  }
640  PG_CATCH();
641  {
643  PG_RE_THROW();
644  }
645  PG_END_TRY();
646  }
647 }
648 
649 
650 static void
652 {
653 #ifdef USE_ASSERT_CHECKING
654  dlist_iter iter;
655  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
656 
657  dlist_foreach(iter, &rb->toplevel_by_lsn)
658  {
659  ReorderBufferTXN *cur_txn;
660 
661  cur_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
662  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
663 
664  if (cur_txn->end_lsn != InvalidXLogRecPtr)
665  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
666 
667  if (prev_first_lsn != InvalidXLogRecPtr)
668  Assert(prev_first_lsn < cur_txn->first_lsn);
669 
670  Assert(!cur_txn->is_known_as_subxact);
671  prev_first_lsn = cur_txn->first_lsn;
672  }
673 #endif
674 }
675 
678 {
679  ReorderBufferTXN *txn;
680 
682  return NULL;
683 
684  AssertTXNLsnOrder(rb);
685 
687 
690  return txn;
691 }
692 
693 void
695 {
697 }
698 
699 void
701  TransactionId subxid, XLogRecPtr lsn)
702 {
703  ReorderBufferTXN *txn;
704  ReorderBufferTXN *subtxn;
705  bool new_top;
706  bool new_sub;
707 
708  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
709  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
710 
711  if (new_sub)
712  {
713  /*
714  * we assign subtransactions to top level transaction even if we don't
715  * have data for it yet, assignment records frequently reference xids
716  * that have not yet produced any records. Knowing those aren't top
717  * level xids allows us to make processing cheaper in some places.
718  */
719  dlist_push_tail(&txn->subtxns, &subtxn->node);
720  txn->nsubtxns++;
721  }
722  else if (!subtxn->is_known_as_subxact)
723  {
724  subtxn->is_known_as_subxact = true;
725  Assert(subtxn->nsubtxns == 0);
726 
727  /* remove from lsn order list of top-level transactions */
728  dlist_delete(&subtxn->node);
729 
730  /* add to toplevel transaction */
731  dlist_push_tail(&txn->subtxns, &subtxn->node);
732  txn->nsubtxns++;
733  }
734  else if (new_top)
735  {
736  elog(ERROR, "existing subxact assigned to unknown toplevel xact");
737  }
738 }
739 
740 /*
741  * Associate a subtransaction with its toplevel transaction at commit
742  * time. There may be no further changes added after this.
743  */
744 void
746  TransactionId subxid, XLogRecPtr commit_lsn,
747  XLogRecPtr end_lsn)
748 {
749  ReorderBufferTXN *txn;
750  ReorderBufferTXN *subtxn;
751 
752  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
753  InvalidXLogRecPtr, false);
754 
755  /*
756  * No need to do anything if that subtxn didn't contain any changes
757  */
758  if (!subtxn)
759  return;
760 
761  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true);
762 
763  if (txn == NULL)
764  elog(ERROR, "subxact logged without previous toplevel record");
765 
766  /*
767  * Pass our base snapshot to the parent transaction if it doesn't have
768  * one, or ours is older. That can happen if there are no changes in the
769  * toplevel transaction but in one of the child transactions. This allows
770  * the parent to simply use its base snapshot initially.
771  */
772  if (subtxn->base_snapshot != NULL &&
773  (txn->base_snapshot == NULL ||
774  txn->base_snapshot_lsn > subtxn->base_snapshot_lsn))
775  {
776  txn->base_snapshot = subtxn->base_snapshot;
777  txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
778  subtxn->base_snapshot = NULL;
780  }
781 
782  subtxn->final_lsn = commit_lsn;
783  subtxn->end_lsn = end_lsn;
784 
785  if (!subtxn->is_known_as_subxact)
786  {
787  subtxn->is_known_as_subxact = true;
788  Assert(subtxn->nsubtxns == 0);
789 
790  /* remove from lsn order list of top-level transactions */
791  dlist_delete(&subtxn->node);
792 
793  /* add to subtransaction list */
794  dlist_push_tail(&txn->subtxns, &subtxn->node);
795  txn->nsubtxns++;
796  }
797 }
798 
799 
800 /*
801  * Support for efficiently iterating over a transaction's and its
802  * subtransactions' changes.
803  *
804  * We do by doing a k-way merge between transactions/subtransactions. For that
805  * we model the current heads of the different transactions as a binary heap
806  * so we easily know which (sub-)transaction has the change with the smallest
807  * lsn next.
808  *
809  * We assume the changes in individual transactions are already sorted by LSN.
810  */
811 
812 /*
813  * Binary heap comparison function.
814  */
815 static int
817 {
819  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
820  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
821 
822  if (pos_a < pos_b)
823  return 1;
824  else if (pos_a == pos_b)
825  return 0;
826  return -1;
827 }
828 
829 /*
830  * Allocate & initialize an iterator which iterates in lsn order over a
831  * transaction and all its subtransactions.
832  */
835 {
836  Size nr_txns = 0;
838  dlist_iter cur_txn_i;
839  int32 off;
840 
841  /*
842  * Calculate the size of our heap: one element for every transaction that
843  * contains changes. (Besides the transactions already in the reorder
844  * buffer, we count the one we were directly passed.)
845  */
846  if (txn->nentries > 0)
847  nr_txns++;
848 
849  dlist_foreach(cur_txn_i, &txn->subtxns)
850  {
851  ReorderBufferTXN *cur_txn;
852 
853  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
854 
855  if (cur_txn->nentries > 0)
856  nr_txns++;
857  }
858 
859  /*
860  * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no
861  * need to allocate/build a heap then.
862  */
863 
864  /* allocate iteration state */
865  state = (ReorderBufferIterTXNState *)
867  sizeof(ReorderBufferIterTXNState) +
868  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
869 
870  state->nr_txns = nr_txns;
871  dlist_init(&state->old_change);
872 
873  for (off = 0; off < state->nr_txns; off++)
874  {
875  state->entries[off].fd = -1;
876  state->entries[off].segno = 0;
877  }
878 
879  /* allocate heap */
880  state->heap = binaryheap_allocate(state->nr_txns,
882  state);
883 
884  /*
885  * Now insert items into the binary heap, in an unordered fashion. (We
886  * will run a heap assembly step at the end; this is more efficient.)
887  */
888 
889  off = 0;
890 
891  /* add toplevel transaction if it contains changes */
892  if (txn->nentries > 0)
893  {
894  ReorderBufferChange *cur_change;
895 
896  if (txn->nentries != txn->nentries_mem)
897  {
898  /* serialize remaining changes */
899  ReorderBufferSerializeTXN(rb, txn);
900  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
901  &state->entries[off].segno);
902  }
903 
904  cur_change = dlist_head_element(ReorderBufferChange, node,
905  &txn->changes);
906 
907  state->entries[off].lsn = cur_change->lsn;
908  state->entries[off].change = cur_change;
909  state->entries[off].txn = txn;
910 
912  }
913 
914  /* add subtransactions if they contain changes */
915  dlist_foreach(cur_txn_i, &txn->subtxns)
916  {
917  ReorderBufferTXN *cur_txn;
918 
919  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
920 
921  if (cur_txn->nentries > 0)
922  {
923  ReorderBufferChange *cur_change;
924 
925  if (cur_txn->nentries != cur_txn->nentries_mem)
926  {
927  /* serialize remaining changes */
928  ReorderBufferSerializeTXN(rb, cur_txn);
929  ReorderBufferRestoreChanges(rb, cur_txn,
930  &state->entries[off].fd,
931  &state->entries[off].segno);
932  }
933  cur_change = dlist_head_element(ReorderBufferChange, node,
934  &cur_txn->changes);
935 
936  state->entries[off].lsn = cur_change->lsn;
937  state->entries[off].change = cur_change;
938  state->entries[off].txn = cur_txn;
939 
941  }
942  }
943 
944  /* assemble a valid binary heap */
945  binaryheap_build(state->heap);
946 
947  return state;
948 }
949 
950 /*
951  * Return the next change when iterating over a transaction and its
952  * subtransactions.
953  *
954  * Returns NULL when no further changes exist.
955  */
956 static ReorderBufferChange *
958 {
959  ReorderBufferChange *change;
961  int32 off;
962 
963  /* nothing there anymore */
964  if (state->heap->bh_size == 0)
965  return NULL;
966 
967  off = DatumGetInt32(binaryheap_first(state->heap));
968  entry = &state->entries[off];
969 
970  /* free memory we might have "leaked" in the previous *Next call */
971  if (!dlist_is_empty(&state->old_change))
972  {
973  change = dlist_container(ReorderBufferChange, node,
975  ReorderBufferReturnChange(rb, change);
976  Assert(dlist_is_empty(&state->old_change));
977  }
978 
979  change = entry->change;
980 
981  /*
982  * update heap with information about which transaction has the next
983  * relevant change in LSN order
984  */
985 
986  /* there are in-memory changes */
987  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
988  {
989  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
990  ReorderBufferChange *next_change =
992 
993  /* txn stays the same */
994  state->entries[off].lsn = next_change->lsn;
995  state->entries[off].change = next_change;
996 
998  return change;
999  }
1000 
1001  /* try to load changes from disk */
1002  if (entry->txn->nentries != entry->txn->nentries_mem)
1003  {
1004  /*
1005  * Ugly: restoring changes will reuse *Change records, thus delete the
1006  * current one from the per-tx list and only free in the next call.
1007  */
1008  dlist_delete(&change->node);
1009  dlist_push_tail(&state->old_change, &change->node);
1010 
1011  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
1012  &state->entries[off].segno))
1013  {
1014  /* successfully restored changes from disk */
1015  ReorderBufferChange *next_change =
1017  &entry->txn->changes);
1018 
1019  elog(DEBUG2, "restored %u/%u changes from disk",
1020  (uint32) entry->txn->nentries_mem,
1021  (uint32) entry->txn->nentries);
1022 
1023  Assert(entry->txn->nentries_mem);
1024  /* txn stays the same */
1025  state->entries[off].lsn = next_change->lsn;
1026  state->entries[off].change = next_change;
1028 
1029  return change;
1030  }
1031  }
1032 
1033  /* ok, no changes there anymore, remove */
1034  binaryheap_remove_first(state->heap);
1035 
1036  return change;
1037 }
1038 
1039 /*
1040  * Deallocate the iterator
1041  */
1042 static void
1045 {
1046  int32 off;
1047 
1048  for (off = 0; off < state->nr_txns; off++)
1049  {
1050  if (state->entries[off].fd != -1)
1051  CloseTransientFile(state->entries[off].fd);
1052  }
1053 
1054  /* free memory we might have "leaked" in the last *Next call */
1055  if (!dlist_is_empty(&state->old_change))
1056  {
1057  ReorderBufferChange *change;
1058 
1059  change = dlist_container(ReorderBufferChange, node,
1060  dlist_pop_head_node(&state->old_change));
1061  ReorderBufferReturnChange(rb, change);
1062  Assert(dlist_is_empty(&state->old_change));
1063  }
1064 
1065  binaryheap_free(state->heap);
1066  pfree(state);
1067 }
1068 
1069 /*
1070  * Cleanup the contents of a transaction, usually after the transaction
1071  * committed or aborted.
1072  */
1073 static void
1075 {
1076  bool found;
1077  dlist_mutable_iter iter;
1078 
1079  /* cleanup subtransactions & their changes */
1080  dlist_foreach_modify(iter, &txn->subtxns)
1081  {
1082  ReorderBufferTXN *subtxn;
1083 
1084  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1085 
1086  /*
1087  * Subtransactions are always associated to the toplevel TXN, even if
1088  * they originally were happening inside another subtxn, so we won't
1089  * ever recurse more than one level deep here.
1090  */
1091  Assert(subtxn->is_known_as_subxact);
1092  Assert(subtxn->nsubtxns == 0);
1093 
1094  ReorderBufferCleanupTXN(rb, subtxn);
1095  }
1096 
1097  /* cleanup changes in the toplevel txn */
1098  dlist_foreach_modify(iter, &txn->changes)
1099  {
1100  ReorderBufferChange *change;
1101 
1102  change = dlist_container(ReorderBufferChange, node, iter.cur);
1103 
1104  ReorderBufferReturnChange(rb, change);
1105  }
1106 
1107  /*
1108  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1109  * They are always stored in the toplevel transaction.
1110  */
1111  dlist_foreach_modify(iter, &txn->tuplecids)
1112  {
1113  ReorderBufferChange *change;
1114 
1115  change = dlist_container(ReorderBufferChange, node, iter.cur);
1117  ReorderBufferReturnChange(rb, change);
1118  }
1119 
1120  if (txn->base_snapshot != NULL)
1121  {
1123  txn->base_snapshot = NULL;
1125  }
1126 
1127  /*
1128  * Remove TXN from its containing list.
1129  *
1130  * Note: if txn->is_known_as_subxact, we are deleting the TXN from its
1131  * parent's list of known subxacts; this leaves the parent's nsubxacts
1132  * count too high, but we don't care. Otherwise, we are deleting the TXN
1133  * from the LSN-ordered list of toplevel TXNs.
1134  */
1135  dlist_delete(&txn->node);
1136 
1137  /* now remove reference from buffer */
1138  hash_search(rb->by_txn,
1139  (void *) &txn->xid,
1140  HASH_REMOVE,
1141  &found);
1142  Assert(found);
1143 
1144  /* remove entries spilled to disk */
1145  if (txn->nentries != txn->nentries_mem)
1146  ReorderBufferRestoreCleanup(rb, txn);
1147 
1148  /* deallocate */
1149  ReorderBufferReturnTXN(rb, txn);
1150 }
1151 
1152 /*
1153  * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by
1154  * tqual.c's HeapTupleSatisfiesHistoricMVCC.
1155  */
1156 static void
1158 {
1159  dlist_iter iter;
1160  HASHCTL hash_ctl;
1161 
1162  if (!txn->has_catalog_changes || dlist_is_empty(&txn->tuplecids))
1163  return;
1164 
1165  memset(&hash_ctl, 0, sizeof(hash_ctl));
1166 
1167  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1168  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1169  hash_ctl.hcxt = rb->context;
1170 
1171  /*
1172  * create the hash with the exact number of to-be-stored tuplecids from
1173  * the start
1174  */
1175  txn->tuplecid_hash =
1176  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1178 
1179  dlist_foreach(iter, &txn->tuplecids)
1180  {
1183  bool found;
1184  ReorderBufferChange *change;
1185 
1186  change = dlist_container(ReorderBufferChange, node, iter.cur);
1187 
1189 
1190  /* be careful about padding */
1191  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1192 
1193  key.relnode = change->data.tuplecid.node;
1194 
1195  ItemPointerCopy(&change->data.tuplecid.tid,
1196  &key.tid);
1197 
1198  ent = (ReorderBufferTupleCidEnt *)
1200  (void *) &key,
1202  &found);
1203  if (!found)
1204  {
1205  ent->cmin = change->data.tuplecid.cmin;
1206  ent->cmax = change->data.tuplecid.cmax;
1207  ent->combocid = change->data.tuplecid.combocid;
1208  }
1209  else
1210  {
1211  Assert(ent->cmin == change->data.tuplecid.cmin);
1212  Assert(ent->cmax == InvalidCommandId ||
1213  ent->cmax == change->data.tuplecid.cmax);
1214 
1215  /*
1216  * if the tuple got valid in this transaction and now got deleted
1217  * we already have a valid cmin stored. The cmax will be
1218  * InvalidCommandId though.
1219  */
1220  ent->cmax = change->data.tuplecid.cmax;
1221  }
1222  }
1223 }
1224 
1225 /*
1226  * Copy a provided snapshot so we can modify it privately. This is needed so
1227  * that catalog modifying transactions can look into intermediate catalog
1228  * states.
1229  */
1230 static Snapshot
1232  ReorderBufferTXN *txn, CommandId cid)
1233 {
1234  Snapshot snap;
1235  dlist_iter iter;
1236  int i = 0;
1237  Size size;
1238 
1239  size = sizeof(SnapshotData) +
1240  sizeof(TransactionId) * orig_snap->xcnt +
1241  sizeof(TransactionId) * (txn->nsubtxns + 1);
1242 
1243  snap = MemoryContextAllocZero(rb->context, size);
1244  memcpy(snap, orig_snap, sizeof(SnapshotData));
1245 
1246  snap->copied = true;
1247  snap->active_count = 1; /* mark as active so nobody frees it */
1248  snap->regd_count = 0;
1249  snap->xip = (TransactionId *) (snap + 1);
1250 
1251  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1252 
1253  /*
1254  * snap->subxip contains all txids that belong to our transaction which we
1255  * need to check via cmin/cmax. Thats why we store the toplevel
1256  * transaction in there as well.
1257  */
1258  snap->subxip = snap->xip + snap->xcnt;
1259  snap->subxip[i++] = txn->xid;
1260 
1261  /*
1262  * nsubxcnt isn't decreased when subtransactions abort, so count manually.
1263  * Since it's an upper boundary it is safe to use it for the allocation
1264  * above.
1265  */
1266  snap->subxcnt = 1;
1267 
1268  dlist_foreach(iter, &txn->subtxns)
1269  {
1270  ReorderBufferTXN *sub_txn;
1271 
1272  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1273  snap->subxip[i++] = sub_txn->xid;
1274  snap->subxcnt++;
1275  }
1276 
1277  /* sort so we can bsearch() later */
1278  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1279 
1280  /* store the specified current CommandId */
1281  snap->curcid = cid;
1282 
1283  return snap;
1284 }
1285 
1286 /*
1287  * Free a previously ReorderBufferCopySnap'ed snapshot
1288  */
1289 static void
1291 {
1292  if (snap->copied)
1293  pfree(snap);
1294  else
1296 }
1297 
1298 /*
1299  * Perform the replay of a transaction and it's non-aborted subtransactions.
1300  *
1301  * Subtransactions previously have to be processed by
1302  * ReorderBufferCommitChild(), even if previously assigned to the toplevel
1303  * transaction with ReorderBufferAssignChild.
1304  *
1305  * We currently can only decode a transaction's contents in when their commit
1306  * record is read because that's currently the only place where we know about
1307  * cache invalidations. Thus, once a toplevel commit is read, we iterate over
1308  * the top and subtransactions (using a k-way merge) and replay the changes in
1309  * lsn order.
1310  */
1311 void
1313  XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
1314  TimestampTz commit_time,
1315  RepOriginId origin_id, XLogRecPtr origin_lsn)
1316 {
1317  ReorderBufferTXN *txn;
1318  volatile Snapshot snapshot_now;
1319  volatile CommandId command_id = FirstCommandId;
1320  bool using_subtxn;
1321  ReorderBufferIterTXNState *volatile iterstate = NULL;
1322 
1323  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1324  false);
1325 
1326  /* unknown transaction, nothing to replay */
1327  if (txn == NULL)
1328  return;
1329 
1330  txn->final_lsn = commit_lsn;
1331  txn->end_lsn = end_lsn;
1332  txn->commit_time = commit_time;
1333  txn->origin_id = origin_id;
1334  txn->origin_lsn = origin_lsn;
1335 
1336  /*
1337  * If this transaction didn't have any real changes in our database, it's
1338  * OK not to have a snapshot. Note that ReorderBufferCommitChild will have
1339  * transferred its snapshot to this transaction if it had one and the
1340  * toplevel tx didn't.
1341  */
1342  if (txn->base_snapshot == NULL)
1343  {
1344  Assert(txn->ninvalidations == 0);
1345  ReorderBufferCleanupTXN(rb, txn);
1346  return;
1347  }
1348 
1349  snapshot_now = txn->base_snapshot;
1350 
1351  /* build data to be able to lookup the CommandIds of catalog tuples */
1353 
1354  /* setup the initial snapshot */
1355  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1356 
1357  /*
1358  * Decoding needs access to syscaches et al., which in turn use
1359  * heavyweight locks and such. Thus we need to have enough state around to
1360  * keep track of those. The easiest way is to simply use a transaction
1361  * internally. That also allows us to easily enforce that nothing writes
1362  * to the database by checking for xid assignments.
1363  *
1364  * When we're called via the SQL SRF there's already a transaction
1365  * started, so start an explicit subtransaction there.
1366  */
1367  using_subtxn = IsTransactionOrTransactionBlock();
1368 
1369  PG_TRY();
1370  {
1371  ReorderBufferChange *change;
1372  ReorderBufferChange *specinsert = NULL;
1373 
1374  if (using_subtxn)
1375  BeginInternalSubTransaction("replay");
1376  else
1378 
1379  rb->begin(rb, txn);
1380 
1381  iterstate = ReorderBufferIterTXNInit(rb, txn);
1382  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1383  {
1384  Relation relation = NULL;
1385  Oid reloid;
1386 
1387  switch (change->action)
1388  {
1390 
1391  /*
1392  * Confirmation for speculative insertion arrived. Simply
1393  * use as a normal record. It'll be cleaned up at the end
1394  * of INSERT processing.
1395  */
1396  Assert(specinsert->data.tp.oldtuple == NULL);
1397  change = specinsert;
1399 
1400  /* intentionally fall through */
1404  Assert(snapshot_now);
1405 
1406  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1407  change->data.tp.relnode.relNode);
1408 
1409  /*
1410  * Catalog tuple without data, emitted while catalog was
1411  * in the process of being rewritten.
1412  */
1413  if (reloid == InvalidOid &&
1414  change->data.tp.newtuple == NULL &&
1415  change->data.tp.oldtuple == NULL)
1416  goto change_done;
1417  else if (reloid == InvalidOid)
1418  elog(ERROR, "could not map filenode \"%s\" to relation OID",
1419  relpathperm(change->data.tp.relnode,
1420  MAIN_FORKNUM));
1421 
1422  relation = RelationIdGetRelation(reloid);
1423 
1424  if (relation == NULL)
1425  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1426  reloid,
1427  relpathperm(change->data.tp.relnode,
1428  MAIN_FORKNUM));
1429 
1430  if (!RelationIsLogicallyLogged(relation))
1431  goto change_done;
1432 
1433  /*
1434  * For now ignore sequence changes entirely. Most of the
1435  * time they don't log changes using records we
1436  * understand, so it doesn't make sense to handle the few
1437  * cases we do.
1438  */
1439  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1440  goto change_done;
1441 
1442  /* user-triggered change */
1443  if (!IsToastRelation(relation))
1444  {
1445  ReorderBufferToastReplace(rb, txn, relation, change);
1446  rb->apply_change(rb, txn, relation, change);
1447 
1448  /*
1449  * Only clear reassembled toast chunks if we're sure
1450  * they're not required anymore. The creator of the
1451  * tuple tells us.
1452  */
1453  if (change->data.tp.clear_toast_afterwards)
1454  ReorderBufferToastReset(rb, txn);
1455  }
1456  /* we're not interested in toast deletions */
1457  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1458  {
1459  /*
1460  * Need to reassemble the full toasted Datum in
1461  * memory, to ensure the chunks don't get reused till
1462  * we're done remove it from the list of this
1463  * transaction's changes. Otherwise it will get
1464  * freed/reused while restoring spooled data from
1465  * disk.
1466  */
1467  dlist_delete(&change->node);
1468  ReorderBufferToastAppendChunk(rb, txn, relation,
1469  change);
1470  }
1471 
1472  change_done:
1473 
1474  /*
1475  * Either speculative insertion was confirmed, or it was
1476  * unsuccessful and the record isn't needed anymore.
1477  */
1478  if (specinsert != NULL)
1479  {
1480  ReorderBufferReturnChange(rb, specinsert);
1481  specinsert = NULL;
1482  }
1483 
1484  if (relation != NULL)
1485  {
1486  RelationClose(relation);
1487  relation = NULL;
1488  }
1489  break;
1490 
1492 
1493  /*
1494  * Speculative insertions are dealt with by delaying the
1495  * processing of the insert until the confirmation record
1496  * arrives. For that we simply unlink the record from the
1497  * chain, so it does not get freed/reused while restoring
1498  * spooled data from disk.
1499  *
1500  * This is safe in the face of concurrent catalog changes
1501  * because the relevant relation can't be changed between
1502  * speculative insertion and confirmation due to
1503  * CheckTableNotInUse() and locking.
1504  */
1505 
1506  /* clear out a pending (and thus failed) speculation */
1507  if (specinsert != NULL)
1508  {
1509  ReorderBufferReturnChange(rb, specinsert);
1510  specinsert = NULL;
1511  }
1512 
1513  /* and memorize the pending insertion */
1514  dlist_delete(&change->node);
1515  specinsert = change;
1516  break;
1517 
1519  rb->message(rb, txn, change->lsn, true,
1520  change->data.msg.prefix,
1521  change->data.msg.message_size,
1522  change->data.msg.message);
1523  break;
1524 
1526  /* get rid of the old */
1527  TeardownHistoricSnapshot(false);
1528 
1529  if (snapshot_now->copied)
1530  {
1531  ReorderBufferFreeSnap(rb, snapshot_now);
1532  snapshot_now =
1533  ReorderBufferCopySnap(rb, change->data.snapshot,
1534  txn, command_id);
1535  }
1536 
1537  /*
1538  * Restored from disk, need to be careful not to double
1539  * free. We could introduce refcounting for that, but for
1540  * now this seems infrequent enough not to care.
1541  */
1542  else if (change->data.snapshot->copied)
1543  {
1544  snapshot_now =
1545  ReorderBufferCopySnap(rb, change->data.snapshot,
1546  txn, command_id);
1547  }
1548  else
1549  {
1550  snapshot_now = change->data.snapshot;
1551  }
1552 
1553 
1554  /* and continue with the new one */
1555  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1556  break;
1557 
1559  Assert(change->data.command_id != InvalidCommandId);
1560 
1561  if (command_id < change->data.command_id)
1562  {
1563  command_id = change->data.command_id;
1564 
1565  if (!snapshot_now->copied)
1566  {
1567  /* we don't use the global one anymore */
1568  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1569  txn, command_id);
1570  }
1571 
1572  snapshot_now->curcid = command_id;
1573 
1574  TeardownHistoricSnapshot(false);
1575  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1576 
1577  /*
1578  * Every time the CommandId is incremented, we could
1579  * see new catalog contents, so execute all
1580  * invalidations.
1581  */
1583  }
1584 
1585  break;
1586 
1588  elog(ERROR, "tuplecid value in changequeue");
1589  break;
1590  }
1591  }
1592 
1593  /*
1594  * There's a speculative insertion remaining, just clean in up, it
1595  * can't have been successful, otherwise we'd gotten a confirmation
1596  * record.
1597  */
1598  if (specinsert)
1599  {
1600  ReorderBufferReturnChange(rb, specinsert);
1601  specinsert = NULL;
1602  }
1603 
1604  /* clean up the iterator */
1605  ReorderBufferIterTXNFinish(rb, iterstate);
1606  iterstate = NULL;
1607 
1608  /* call commit callback */
1609  rb->commit(rb, txn, commit_lsn);
1610 
1611  /* this is just a sanity check against bad output plugin behaviour */
1613  elog(ERROR, "output plugin used XID %u",
1615 
1616  /* cleanup */
1617  TeardownHistoricSnapshot(false);
1618 
1619  /*
1620  * Aborting the current (sub-)transaction as a whole has the right
1621  * semantics. We want all locks acquired in here to be released, not
1622  * reassigned to the parent and we do not want any database access
1623  * have persistent effects.
1624  */
1626 
1627  /* make sure there's no cache pollution */
1629 
1630  if (using_subtxn)
1632 
1633  if (snapshot_now->copied)
1634  ReorderBufferFreeSnap(rb, snapshot_now);
1635 
1636  /* remove potential on-disk data, and deallocate */
1637  ReorderBufferCleanupTXN(rb, txn);
1638  }
1639  PG_CATCH();
1640  {
1641  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1642  if (iterstate)
1643  ReorderBufferIterTXNFinish(rb, iterstate);
1644 
1646 
1647  /*
1648  * Force cache invalidation to happen outside of a valid transaction
1649  * to prevent catalog access as we just caught an error.
1650  */
1652 
1653  /* make sure there's no cache pollution */
1655 
1656  if (using_subtxn)
1658 
1659  if (snapshot_now->copied)
1660  ReorderBufferFreeSnap(rb, snapshot_now);
1661 
1662  /* remove potential on-disk data, and deallocate */
1663  ReorderBufferCleanupTXN(rb, txn);
1664 
1665  PG_RE_THROW();
1666  }
1667  PG_END_TRY();
1668 }
1669 
1670 /*
1671  * Abort a transaction that possibly has previous changes. Needs to be first
1672  * called for subtransactions and then for the toplevel xid.
1673  *
1674  * NB: Transactions handled here have to have actively aborted (i.e. have
1675  * produced an abort record). Implicitly aborted transactions are handled via
1676  * ReorderBufferAbortOld(); transactions we're just not interested in, but
1677  * which have committed are handled in ReorderBufferForget().
1678  *
1679  * This function purges this transaction and its contents from memory and
1680  * disk.
1681  */
1682 void
1684 {
1685  ReorderBufferTXN *txn;
1686 
1687  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1688  false);
1689 
1690  /* unknown, nothing to remove */
1691  if (txn == NULL)
1692  return;
1693 
1694  /* cosmetic... */
1695  txn->final_lsn = lsn;
1696 
1697  /* remove potential on-disk data, and deallocate */
1698  ReorderBufferCleanupTXN(rb, txn);
1699 }
1700 
1701 /*
1702  * Abort all transactions that aren't actually running anymore because the
1703  * server restarted.
1704  *
1705  * NB: These really have to be transactions that have aborted due to a server
1706  * crash/immediate restart, as we don't deal with invalidations here.
1707  */
1708 void
1710 {
1711  dlist_mutable_iter it;
1712 
1713  /*
1714  * Iterate through all (potential) toplevel TXNs and abort all that are
1715  * older than what possibly can be running. Once we've found the first
1716  * that is alive we stop, there might be some that acquired an xid earlier
1717  * but started writing later, but it's unlikely and they will cleaned up
1718  * in a later call to ReorderBufferAbortOld().
1719  */
1721  {
1722  ReorderBufferTXN *txn;
1723 
1724  txn = dlist_container(ReorderBufferTXN, node, it.cur);
1725 
1726  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1727  {
1728  elog(DEBUG1, "aborting old transaction %u", txn->xid);
1729 
1730  /* remove potential on-disk data, and deallocate this tx */
1731  ReorderBufferCleanupTXN(rb, txn);
1732  }
1733  else
1734  return;
1735  }
1736 }
1737 
1738 /*
1739  * Forget the contents of a transaction if we aren't interested in it's
1740  * contents. Needs to be first called for subtransactions and then for the
1741  * toplevel xid.
1742  *
1743  * This is significantly different to ReorderBufferAbort() because
1744  * transactions that have committed need to be treated differently from aborted
1745  * ones since they may have modified the catalog.
1746  *
1747  * Note that this is only allowed to be called in the moment a transaction
1748  * commit has just been read, not earlier; otherwise later records referring
1749  * to this xid might re-create the transaction incompletely.
1750  */
1751 void
1753 {
1754  ReorderBufferTXN *txn;
1755 
1756  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1757  false);
1758 
1759  /* unknown, nothing to forget */
1760  if (txn == NULL)
1761  return;
1762 
1763  /* cosmetic... */
1764  txn->final_lsn = lsn;
1765 
1766  /*
1767  * Process cache invalidation messages if there are any. Even if we're not
1768  * interested in the transaction's contents, it could have manipulated the
1769  * catalog and we need to update the caches according to that.
1770  */
1771  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
1773  txn->invalidations);
1774  else
1775  Assert(txn->ninvalidations == 0);
1776 
1777  /* remove potential on-disk data, and deallocate */
1778  ReorderBufferCleanupTXN(rb, txn);
1779 }
1780 
1781 /*
1782  * Execute invalidations happening outside the context of a decoded
1783  * transaction. That currently happens either for xid-less commits
1784  * (c.f. RecordTransactionCommit()) or for invalidations in uninteresting
1785  * transactions (via ReorderBufferForget()).
1786  */
1787 void
1789  SharedInvalidationMessage *invalidations)
1790 {
1791  bool use_subtxn = IsTransactionOrTransactionBlock();
1792  int i;
1793 
1794  if (use_subtxn)
1795  BeginInternalSubTransaction("replay");
1796 
1797  /*
1798  * Force invalidations to happen outside of a valid transaction - that way
1799  * entries will just be marked as invalid without accessing the catalog.
1800  * That's advantageous because we don't need to setup the full state
1801  * necessary for catalog access.
1802  */
1803  if (use_subtxn)
1805 
1806  for (i = 0; i < ninvalidations; i++)
1807  LocalExecuteInvalidationMessage(&invalidations[i]);
1808 
1809  if (use_subtxn)
1811 }
1812 
1813 /*
1814  * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
1815  * least once for every xid in XLogRecord->xl_xid (other places in records
1816  * may, but do not have to be passed through here).
1817  *
1818  * Reorderbuffer keeps some datastructures about transactions in LSN order,
1819  * for efficiency. To do that it has to know about when transactions are seen
1820  * first in the WAL. As many types of records are not actually interesting for
1821  * logical decoding, they do not necessarily pass though here.
1822  */
1823 void
1825 {
1826  /* many records won't have an xid assigned, centralize check here */
1827  if (xid != InvalidTransactionId)
1828  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1829 }
1830 
1831 /*
1832  * Add a new snapshot to this transaction that may only used after lsn 'lsn'
1833  * because the previous snapshot doesn't describe the catalog correctly for
1834  * following rows.
1835  */
1836 void
1838  XLogRecPtr lsn, Snapshot snap)
1839 {
1841 
1842  change->data.snapshot = snap;
1844 
1845  ReorderBufferQueueChange(rb, xid, lsn, change);
1846 }
1847 
1848 /*
1849  * Setup the base snapshot of a transaction. The base snapshot is the snapshot
1850  * that is used to decode all changes until either this transaction modifies
1851  * the catalog or another catalog modifying transaction commits.
1852  *
1853  * Needs to be called before any changes are added with
1854  * ReorderBufferQueueChange().
1855  */
1856 void
1858  XLogRecPtr lsn, Snapshot snap)
1859 {
1860  ReorderBufferTXN *txn;
1861  bool is_new;
1862 
1863  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
1864  Assert(txn->base_snapshot == NULL);
1865  Assert(snap != NULL);
1866 
1867  txn->base_snapshot = snap;
1868  txn->base_snapshot_lsn = lsn;
1869 }
1870 
1871 /*
1872  * Access the catalog with this CommandId at this point in the changestream.
1873  *
1874  * May only be called for command ids > 1
1875  */
1876 void
1878  XLogRecPtr lsn, CommandId cid)
1879 {
1881 
1882  change->data.command_id = cid;
1884 
1885  ReorderBufferQueueChange(rb, xid, lsn, change);
1886 }
1887 
1888 
1889 /*
1890  * Add new (relfilenode, tid) -> (cmin, cmax) mappings.
1891  */
1892 void
1894  XLogRecPtr lsn, RelFileNode node,
1895  ItemPointerData tid, CommandId cmin,
1896  CommandId cmax, CommandId combocid)
1897 {
1899  ReorderBufferTXN *txn;
1900 
1901  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1902 
1903  change->data.tuplecid.node = node;
1904  change->data.tuplecid.tid = tid;
1905  change->data.tuplecid.cmin = cmin;
1906  change->data.tuplecid.cmax = cmax;
1907  change->data.tuplecid.combocid = combocid;
1908  change->lsn = lsn;
1910 
1911  dlist_push_tail(&txn->tuplecids, &change->node);
1912  txn->ntuplecids++;
1913 }
1914 
1915 /*
1916  * Setup the invalidation of the toplevel transaction.
1917  *
1918  * This needs to be done before ReorderBufferCommit is called!
1919  */
1920 void
1922  XLogRecPtr lsn, Size nmsgs,
1924 {
1925  ReorderBufferTXN *txn;
1926 
1927  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1928 
1929  if (txn->ninvalidations != 0)
1930  elog(ERROR, "only ever add one set of invalidations");
1931 
1932  Assert(nmsgs > 0);
1933 
1934  txn->ninvalidations = nmsgs;
1937  sizeof(SharedInvalidationMessage) * nmsgs);
1938  memcpy(txn->invalidations, msgs,
1939  sizeof(SharedInvalidationMessage) * nmsgs);
1940 }
1941 
1942 /*
1943  * Apply all invalidations we know. Possibly we only need parts at this point
1944  * in the changestream but we don't know which those are.
1945  */
1946 static void
1948 {
1949  int i;
1950 
1951  for (i = 0; i < txn->ninvalidations; i++)
1953 }
1954 
1955 /*
1956  * Mark a transaction as containing catalog changes
1957  */
1958 void
1960  XLogRecPtr lsn)
1961 {
1962  ReorderBufferTXN *txn;
1963 
1964  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1965 
1966  txn->has_catalog_changes = true;
1967 }
1968 
1969 /*
1970  * Query whether a transaction is already *known* to contain catalog
1971  * changes. This can be wrong until directly before the commit!
1972  */
1973 bool
1975 {
1976  ReorderBufferTXN *txn;
1977 
1978  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1979  false);
1980  if (txn == NULL)
1981  return false;
1982 
1983  return txn->has_catalog_changes;
1984 }
1985 
1986 /*
1987  * Have we already added the first snapshot?
1988  */
1989 bool
1991 {
1992  ReorderBufferTXN *txn;
1993 
1994  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1995  false);
1996 
1997  /* transaction isn't known yet, ergo no snapshot */
1998  if (txn == NULL)
1999  return false;
2000 
2001  /*
2002  * TODO: It would be a nice improvement if we would check the toplevel
2003  * transaction in subtransactions, but we'd need to keep track of a bit
2004  * more state.
2005  */
2006  return txn->base_snapshot != NULL;
2007 }
2008 
2009 
2010 /*
2011  * ---------------------------------------
2012  * Disk serialization support
2013  * ---------------------------------------
2014  */
2015 
2016 /*
2017  * Ensure the IO buffer is >= sz.
2018  */
2019 static void
2021 {
2022  if (!rb->outbufsize)
2023  {
2024  rb->outbuf = MemoryContextAlloc(rb->context, sz);
2025  rb->outbufsize = sz;
2026  }
2027  else if (rb->outbufsize < sz)
2028  {
2029  rb->outbuf = repalloc(rb->outbuf, sz);
2030  rb->outbufsize = sz;
2031  }
2032 }
2033 
2034 /*
2035  * Check whether the transaction tx should spill its data to disk.
2036  */
2037 static void
2039 {
2040  /*
2041  * TODO: improve accounting so we cheaply can take subtransactions into
2042  * account here.
2043  */
2044  if (txn->nentries_mem >= max_changes_in_memory)
2045  {
2046  ReorderBufferSerializeTXN(rb, txn);
2047  Assert(txn->nentries_mem == 0);
2048  }
2049 }
2050 
2051 /*
2052  * Spill data of a large transaction (and its subtransactions) to disk.
2053  */
2054 static void
2056 {
2057  dlist_iter subtxn_i;
2058  dlist_mutable_iter change_i;
2059  int fd = -1;
2060  XLogSegNo curOpenSegNo = 0;
2061  Size spilled = 0;
2062  char path[MAXPGPATH];
2063 
2064  elog(DEBUG2, "spill %u changes in XID %u to disk",
2065  (uint32) txn->nentries_mem, txn->xid);
2066 
2067  /* do the same to all child TXs */
2068  dlist_foreach(subtxn_i, &txn->subtxns)
2069  {
2070  ReorderBufferTXN *subtxn;
2071 
2072  subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
2073  ReorderBufferSerializeTXN(rb, subtxn);
2074  }
2075 
2076  /* serialize changestream */
2077  dlist_foreach_modify(change_i, &txn->changes)
2078  {
2079  ReorderBufferChange *change;
2080 
2081  change = dlist_container(ReorderBufferChange, node, change_i.cur);
2082 
2083  /*
2084  * store in segment in which it belongs by start lsn, don't split over
2085  * multiple segments tho
2086  */
2087  if (fd == -1 || !XLByteInSeg(change->lsn, curOpenSegNo))
2088  {
2089  XLogRecPtr recptr;
2090 
2091  if (fd != -1)
2092  CloseTransientFile(fd);
2093 
2094  XLByteToSeg(change->lsn, curOpenSegNo);
2095  XLogSegNoOffsetToRecPtr(curOpenSegNo, 0, recptr);
2096 
2097  /*
2098  * No need to care about TLIs here, only used during a single run,
2099  * so each LSN only maps to a specific WAL record.
2100  */
2101  sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2103  (uint32) (recptr >> 32), (uint32) recptr);
2104 
2105  /* open segment, create it if necessary */
2106  fd = OpenTransientFile(path,
2107  O_CREAT | O_WRONLY | O_APPEND | PG_BINARY,
2108  S_IRUSR | S_IWUSR);
2109 
2110  if (fd < 0)
2111  ereport(ERROR,
2113  errmsg("could not open file \"%s\": %m",
2114  path)));
2115  }
2116 
2117  ReorderBufferSerializeChange(rb, txn, fd, change);
2118  dlist_delete(&change->node);
2119  ReorderBufferReturnChange(rb, change);
2120 
2121  spilled++;
2122  }
2123 
2124  Assert(spilled == txn->nentries_mem);
2125  Assert(dlist_is_empty(&txn->changes));
2126  txn->nentries_mem = 0;
2127 
2128  if (fd != -1)
2129  CloseTransientFile(fd);
2130 }
2131 
2132 /*
2133  * Serialize individual change to disk.
2134  */
2135 static void
2137  int fd, ReorderBufferChange *change)
2138 {
2139  ReorderBufferDiskChange *ondisk;
2140  Size sz = sizeof(ReorderBufferDiskChange);
2141 
2143 
2144  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2145  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
2146 
2147  switch (change->action)
2148  {
2149  /* fall through these, they're all similar enough */
2154  {
2155  char *data;
2156  ReorderBufferTupleBuf *oldtup,
2157  *newtup;
2158  Size oldlen = 0;
2159  Size newlen = 0;
2160 
2161  oldtup = change->data.tp.oldtuple;
2162  newtup = change->data.tp.newtuple;
2163 
2164  if (oldtup)
2165  {
2166  sz += sizeof(HeapTupleData);
2167  oldlen = oldtup->tuple.t_len;
2168  sz += oldlen;
2169  }
2170 
2171  if (newtup)
2172  {
2173  sz += sizeof(HeapTupleData);
2174  newlen = newtup->tuple.t_len;
2175  sz += newlen;
2176  }
2177 
2178  /* make sure we have enough space */
2180 
2181  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2182  /* might have been reallocated above */
2183  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2184 
2185  if (oldlen)
2186  {
2187  memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
2188  data += sizeof(HeapTupleData);
2189 
2190  memcpy(data, oldtup->tuple.t_data, oldlen);
2191  data += oldlen;
2192  }
2193 
2194  if (newlen)
2195  {
2196  memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
2197  data += sizeof(HeapTupleData);
2198 
2199  memcpy(data, newtup->tuple.t_data, newlen);
2200  data += newlen;
2201  }
2202  break;
2203  }
2205  {
2206  char *data;
2207  Size prefix_size = strlen(change->data.msg.prefix) + 1;
2208 
2209  sz += prefix_size + change->data.msg.message_size +
2210  sizeof(Size) + sizeof(Size);
2212 
2213  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2214 
2215  /* might have been reallocated above */
2216  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2217 
2218  /* write the prefix including the size */
2219  memcpy(data, &prefix_size, sizeof(Size));
2220  data += sizeof(Size);
2221  memcpy(data, change->data.msg.prefix,
2222  prefix_size);
2223  data += prefix_size;
2224 
2225  /* write the message including the size */
2226  memcpy(data, &change->data.msg.message_size, sizeof(Size));
2227  data += sizeof(Size);
2228  memcpy(data, change->data.msg.message,
2229  change->data.msg.message_size);
2230  data += change->data.msg.message_size;
2231 
2232  break;
2233  }
2235  {
2236  Snapshot snap;
2237  char *data;
2238 
2239  snap = change->data.snapshot;
2240 
2241  sz += sizeof(SnapshotData) +
2242  sizeof(TransactionId) * snap->xcnt +
2243  sizeof(TransactionId) * snap->subxcnt
2244  ;
2245 
2246  /* make sure we have enough space */
2248  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2249  /* might have been reallocated above */
2250  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2251 
2252  memcpy(data, snap, sizeof(SnapshotData));
2253  data += sizeof(SnapshotData);
2254 
2255  if (snap->xcnt)
2256  {
2257  memcpy(data, snap->xip,
2258  sizeof(TransactionId) * snap->xcnt);
2259  data += sizeof(TransactionId) * snap->xcnt;
2260  }
2261 
2262  if (snap->subxcnt)
2263  {
2264  memcpy(data, snap->subxip,
2265  sizeof(TransactionId) * snap->subxcnt);
2266  data += sizeof(TransactionId) * snap->subxcnt;
2267  }
2268  break;
2269  }
2273  /* ReorderBufferChange contains everything important */
2274  break;
2275  }
2276 
2277  ondisk->size = sz;
2278 
2280  if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
2281  {
2282  int save_errno = errno;
2283 
2284  CloseTransientFile(fd);
2285  errno = save_errno;
2286  ereport(ERROR,
2288  errmsg("could not write to data file for XID %u: %m",
2289  txn->xid)));
2290  }
2292 
2293  Assert(ondisk->change.action == change->action);
2294 }
2295 
2296 /*
2297  * Restore a number of changes spilled to disk back into memory.
2298  */
2299 static Size
2301  int *fd, XLogSegNo *segno)
2302 {
2303  Size restored = 0;
2304  XLogSegNo last_segno;
2305  dlist_mutable_iter cleanup_iter;
2306 
2309 
2310  /* free current entries, so we have memory for more */
2311  dlist_foreach_modify(cleanup_iter, &txn->changes)
2312  {
2314  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
2315 
2316  dlist_delete(&cleanup->node);
2317  ReorderBufferReturnChange(rb, cleanup);
2318  }
2319  txn->nentries_mem = 0;
2320  Assert(dlist_is_empty(&txn->changes));
2321 
2322  XLByteToSeg(txn->final_lsn, last_segno);
2323 
2324  while (restored < max_changes_in_memory && *segno <= last_segno)
2325  {
2326  int readBytes;
2327  ReorderBufferDiskChange *ondisk;
2328 
2329  if (*fd == -1)
2330  {
2331  XLogRecPtr recptr;
2332  char path[MAXPGPATH];
2333 
2334  /* first time in */
2335  if (*segno == 0)
2336  {
2337  XLByteToSeg(txn->first_lsn, *segno);
2338  }
2339 
2340  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
2341  XLogSegNoOffsetToRecPtr(*segno, 0, recptr);
2342 
2343  /*
2344  * No need to care about TLIs here, only used during a single run,
2345  * so each LSN only maps to a specific WAL record.
2346  */
2347  sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2349  (uint32) (recptr >> 32), (uint32) recptr);
2350 
2351  *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
2352  if (*fd < 0 && errno == ENOENT)
2353  {
2354  *fd = -1;
2355  (*segno)++;
2356  continue;
2357  }
2358  else if (*fd < 0)
2359  ereport(ERROR,
2361  errmsg("could not open file \"%s\": %m",
2362  path)));
2363 
2364  }
2365 
2366  /*
2367  * Read the statically sized part of a change which has information
2368  * about the total size. If we couldn't read a record, we're at the
2369  * end of this file.
2370  */
2373  readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
2375 
2376  /* eof */
2377  if (readBytes == 0)
2378  {
2379  CloseTransientFile(*fd);
2380  *fd = -1;
2381  (*segno)++;
2382  continue;
2383  }
2384  else if (readBytes < 0)
2385  ereport(ERROR,
2387  errmsg("could not read from reorderbuffer spill file: %m")));
2388  else if (readBytes != sizeof(ReorderBufferDiskChange))
2389  ereport(ERROR,
2391  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2392  readBytes,
2393  (uint32) sizeof(ReorderBufferDiskChange))));
2394 
2395  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2396 
2398  sizeof(ReorderBufferDiskChange) + ondisk->size);
2399  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2400 
2402  readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
2403  ondisk->size - sizeof(ReorderBufferDiskChange));
2405 
2406  if (readBytes < 0)
2407  ereport(ERROR,
2409  errmsg("could not read from reorderbuffer spill file: %m")));
2410  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
2411  ereport(ERROR,
2413  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2414  readBytes,
2415  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
2416 
2417  /*
2418  * ok, read a full change from disk, now restore it into proper
2419  * in-memory format
2420  */
2421  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
2422  restored++;
2423  }
2424 
2425  return restored;
2426 }
2427 
2428 /*
2429  * Convert change from its on-disk format to in-memory format and queue it onto
2430  * the TXN's ->changes list.
2431  *
2432  * Note: although "data" is declared char*, at entry it points to a
2433  * maxalign'd buffer, making it safe in most of this function to assume
2434  * that the pointed-to data is suitably aligned for direct access.
2435  */
2436 static void
2438  char *data)
2439 {
2440  ReorderBufferDiskChange *ondisk;
2441  ReorderBufferChange *change;
2442 
2443  ondisk = (ReorderBufferDiskChange *) data;
2444 
2445  change = ReorderBufferGetChange(rb);
2446 
2447  /* copy static part */
2448  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
2449 
2450  data += sizeof(ReorderBufferDiskChange);
2451 
2452  /* restore individual stuff */
2453  switch (change->action)
2454  {
2455  /* fall through these, they're all similar enough */
2460  if (change->data.tp.oldtuple)
2461  {
2462  uint32 tuplelen = ((HeapTuple) data)->t_len;
2463 
2464  change->data.tp.oldtuple =
2466 
2467  /* restore ->tuple */
2468  memcpy(&change->data.tp.oldtuple->tuple, data,
2469  sizeof(HeapTupleData));
2470  data += sizeof(HeapTupleData);
2471 
2472  /* reset t_data pointer into the new tuplebuf */
2473  change->data.tp.oldtuple->tuple.t_data =
2474  ReorderBufferTupleBufData(change->data.tp.oldtuple);
2475 
2476  /* restore tuple data itself */
2477  memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
2478  data += tuplelen;
2479  }
2480 
2481  if (change->data.tp.newtuple)
2482  {
2483  /* here, data might not be suitably aligned! */
2484  uint32 tuplelen;
2485 
2486  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
2487  sizeof(uint32));
2488 
2489  change->data.tp.newtuple =
2491 
2492  /* restore ->tuple */
2493  memcpy(&change->data.tp.newtuple->tuple, data,
2494  sizeof(HeapTupleData));
2495  data += sizeof(HeapTupleData);
2496 
2497  /* reset t_data pointer into the new tuplebuf */
2498  change->data.tp.newtuple->tuple.t_data =
2499  ReorderBufferTupleBufData(change->data.tp.newtuple);
2500 
2501  /* restore tuple data itself */
2502  memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
2503  data += tuplelen;
2504  }
2505 
2506  break;
2508  {
2509  Size prefix_size;
2510 
2511  /* read prefix */
2512  memcpy(&prefix_size, data, sizeof(Size));
2513  data += sizeof(Size);
2514  change->data.msg.prefix = MemoryContextAlloc(rb->context,
2515  prefix_size);
2516  memcpy(change->data.msg.prefix, data, prefix_size);
2517  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
2518  data += prefix_size;
2519 
2520  /* read the message */
2521  memcpy(&change->data.msg.message_size, data, sizeof(Size));
2522  data += sizeof(Size);
2523  change->data.msg.message = MemoryContextAlloc(rb->context,
2524  change->data.msg.message_size);
2525  memcpy(change->data.msg.message, data,
2526  change->data.msg.message_size);
2527  data += change->data.msg.message_size;
2528 
2529  break;
2530  }
2532  {
2533  Snapshot oldsnap;
2534  Snapshot newsnap;
2535  Size size;
2536 
2537  oldsnap = (Snapshot) data;
2538 
2539  size = sizeof(SnapshotData) +
2540  sizeof(TransactionId) * oldsnap->xcnt +
2541  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
2542 
2543  change->data.snapshot = MemoryContextAllocZero(rb->context, size);
2544 
2545  newsnap = change->data.snapshot;
2546 
2547  memcpy(newsnap, data, size);
2548  newsnap->xip = (TransactionId *)
2549  (((char *) newsnap) + sizeof(SnapshotData));
2550  newsnap->subxip = newsnap->xip + newsnap->xcnt;
2551  newsnap->copied = true;
2552  break;
2553  }
2554  /* the base struct contains all the data, easy peasy */
2558  break;
2559  }
2560 
2561  dlist_push_tail(&txn->changes, &change->node);
2562  txn->nentries_mem++;
2563 }
2564 
2565 /*
2566  * Remove all on-disk stored for the passed in transaction.
2567  */
2568 static void
2570 {
2571  XLogSegNo first;
2572  XLogSegNo cur;
2573  XLogSegNo last;
2574 
2577 
2578  XLByteToSeg(txn->first_lsn, first);
2579  XLByteToSeg(txn->final_lsn, last);
2580 
2581  /* iterate over all possible filenames, and delete them */
2582  for (cur = first; cur <= last; cur++)
2583  {
2584  char path[MAXPGPATH];
2585  XLogRecPtr recptr;
2586 
2587  XLogSegNoOffsetToRecPtr(cur, 0, recptr);
2588 
2589  sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2591  (uint32) (recptr >> 32), (uint32) recptr);
2592  if (unlink(path) != 0 && errno != ENOENT)
2593  ereport(ERROR,
2595  errmsg("could not remove file \"%s\": %m", path)));
2596  }
2597 }
2598 
2599 /*
2600  * Delete all data spilled to disk after we've restarted/crashed. It will be
2601  * recreated when the respective slots are reused.
2602  */
2603 void
2605 {
2606  DIR *logical_dir;
2607  struct dirent *logical_de;
2608 
2609  DIR *spill_dir;
2610  struct dirent *spill_de;
2611 
2612  logical_dir = AllocateDir("pg_replslot");
2613  while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
2614  {
2615  struct stat statbuf;
2616  char path[MAXPGPATH * 2 + 12];
2617 
2618  if (strcmp(logical_de->d_name, ".") == 0 ||
2619  strcmp(logical_de->d_name, "..") == 0)
2620  continue;
2621 
2622  /* if it cannot be a slot, skip the directory */
2623  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
2624  continue;
2625 
2626  /*
2627  * ok, has to be a surviving logical slot, iterate and delete
2628  * everything starting with xid-*
2629  */
2630  sprintf(path, "pg_replslot/%s", logical_de->d_name);
2631 
2632  /* we're only creating directories here, skip if it's not our's */
2633  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2634  continue;
2635 
2636  spill_dir = AllocateDir(path);
2637  while ((spill_de = ReadDir(spill_dir, path)) != NULL)
2638  {
2639  if (strcmp(spill_de->d_name, ".") == 0 ||
2640  strcmp(spill_de->d_name, "..") == 0)
2641  continue;
2642 
2643  /* only look at names that can be ours */
2644  if (strncmp(spill_de->d_name, "xid", 3) == 0)
2645  {
2646  sprintf(path, "pg_replslot/%s/%s", logical_de->d_name,
2647  spill_de->d_name);
2648 
2649  if (unlink(path) != 0)
2650  ereport(PANIC,
2652  errmsg("could not remove file \"%s\": %m",
2653  path)));
2654  }
2655  }
2656  FreeDir(spill_dir);
2657  }
2658  FreeDir(logical_dir);
2659 }
2660 
2661 /* ---------------------------------------
2662  * toast reassembly support
2663  * ---------------------------------------
2664  */
2665 
2666 /*
2667  * Initialize per tuple toast reconstruction support.
2668  */
2669 static void
2671 {
2672  HASHCTL hash_ctl;
2673 
2674  Assert(txn->toast_hash == NULL);
2675 
2676  memset(&hash_ctl, 0, sizeof(hash_ctl));
2677  hash_ctl.keysize = sizeof(Oid);
2678  hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
2679  hash_ctl.hcxt = rb->context;
2680  txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
2682 }
2683 
2684 /*
2685  * Per toast-chunk handling for toast reconstruction
2686  *
2687  * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
2688  * toasted Datum comes along.
2689  */
2690 static void
2692  Relation relation, ReorderBufferChange *change)
2693 {
2694  ReorderBufferToastEnt *ent;
2695  ReorderBufferTupleBuf *newtup;
2696  bool found;
2697  int32 chunksize;
2698  bool isnull;
2699  Pointer chunk;
2700  TupleDesc desc = RelationGetDescr(relation);
2701  Oid chunk_id;
2702  int32 chunk_seq;
2703 
2704  if (txn->toast_hash == NULL)
2705  ReorderBufferToastInitHash(rb, txn);
2706 
2707  Assert(IsToastRelation(relation));
2708 
2709  newtup = change->data.tp.newtuple;
2710  chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
2711  Assert(!isnull);
2712  chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
2713  Assert(!isnull);
2714 
2715  ent = (ReorderBufferToastEnt *)
2716  hash_search(txn->toast_hash,
2717  (void *) &chunk_id,
2718  HASH_ENTER,
2719  &found);
2720 
2721  if (!found)
2722  {
2723  Assert(ent->chunk_id == chunk_id);
2724  ent->num_chunks = 0;
2725  ent->last_chunk_seq = 0;
2726  ent->size = 0;
2727  ent->reconstructed = NULL;
2728  dlist_init(&ent->chunks);
2729 
2730  if (chunk_seq != 0)
2731  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
2732  chunk_seq, chunk_id);
2733  }
2734  else if (found && chunk_seq != ent->last_chunk_seq + 1)
2735  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
2736  chunk_seq, chunk_id, ent->last_chunk_seq + 1);
2737 
2738  chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
2739  Assert(!isnull);
2740 
2741  /* calculate size so we can allocate the right size at once later */
2742  if (!VARATT_IS_EXTENDED(chunk))
2743  chunksize = VARSIZE(chunk) - VARHDRSZ;
2744  else if (VARATT_IS_SHORT(chunk))
2745  /* could happen due to heap_form_tuple doing its thing */
2746  chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
2747  else
2748  elog(ERROR, "unexpected type of toast chunk");
2749 
2750  ent->size += chunksize;
2751  ent->last_chunk_seq = chunk_seq;
2752  ent->num_chunks++;
2753  dlist_push_tail(&ent->chunks, &change->node);
2754 }
2755 
2756 /*
2757  * Rejigger change->newtuple to point to in-memory toast tuples instead to
2758  * on-disk toast tuples that may not longer exist (think DROP TABLE or VACUUM).
2759  *
2760  * We cannot replace unchanged toast tuples though, so those will still point
2761  * to on-disk toast data.
2762  */
2763 static void
2765  Relation relation, ReorderBufferChange *change)
2766 {
2767  TupleDesc desc;
2768  int natt;
2769  Datum *attrs;
2770  bool *isnull;
2771  bool *free;
2772  HeapTuple tmphtup;
2773  Relation toast_rel;
2774  TupleDesc toast_desc;
2775  MemoryContext oldcontext;
2776  ReorderBufferTupleBuf *newtup;
2777 
2778  /* no toast tuples changed */
2779  if (txn->toast_hash == NULL)
2780  return;
2781 
2782  oldcontext = MemoryContextSwitchTo(rb->context);
2783 
2784  /* we should only have toast tuples in an INSERT or UPDATE */
2785  Assert(change->data.tp.newtuple);
2786 
2787  desc = RelationGetDescr(relation);
2788 
2789  toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
2790  toast_desc = RelationGetDescr(toast_rel);
2791 
2792  /* should we allocate from stack instead? */
2793  attrs = palloc0(sizeof(Datum) * desc->natts);
2794  isnull = palloc0(sizeof(bool) * desc->natts);
2795  free = palloc0(sizeof(bool) * desc->natts);
2796 
2797  newtup = change->data.tp.newtuple;
2798 
2799  heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
2800 
2801  for (natt = 0; natt < desc->natts; natt++)
2802  {
2803  Form_pg_attribute attr = desc->attrs[natt];
2804  ReorderBufferToastEnt *ent;
2805  struct varlena *varlena;
2806 
2807  /* va_rawsize is the size of the original datum -- including header */
2808  struct varatt_external toast_pointer;
2809  struct varatt_indirect redirect_pointer;
2810  struct varlena *new_datum = NULL;
2811  struct varlena *reconstructed;
2812  dlist_iter it;
2813  Size data_done = 0;
2814 
2815  /* system columns aren't toasted */
2816  if (attr->attnum < 0)
2817  continue;
2818 
2819  if (attr->attisdropped)
2820  continue;
2821 
2822  /* not a varlena datatype */
2823  if (attr->attlen != -1)
2824  continue;
2825 
2826  /* no data */
2827  if (isnull[natt])
2828  continue;
2829 
2830  /* ok, we know we have a toast datum */
2831  varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
2832 
2833  /* no need to do anything if the tuple isn't external */
2834  if (!VARATT_IS_EXTERNAL(varlena))
2835  continue;
2836 
2837  VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
2838 
2839  /*
2840  * Check whether the toast tuple changed, replace if so.
2841  */
2842  ent = (ReorderBufferToastEnt *)
2843  hash_search(txn->toast_hash,
2844  (void *) &toast_pointer.va_valueid,
2845  HASH_FIND,
2846  NULL);
2847  if (ent == NULL)
2848  continue;
2849 
2850  new_datum =
2851  (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
2852 
2853  free[natt] = true;
2854 
2855  reconstructed = palloc0(toast_pointer.va_rawsize);
2856 
2857  ent->reconstructed = reconstructed;
2858 
2859  /* stitch toast tuple back together from its parts */
2860  dlist_foreach(it, &ent->chunks)
2861  {
2862  bool isnull;
2863  ReorderBufferChange *cchange;
2864  ReorderBufferTupleBuf *ctup;
2865  Pointer chunk;
2866 
2867  cchange = dlist_container(ReorderBufferChange, node, it.cur);
2868  ctup = cchange->data.tp.newtuple;
2869  chunk = DatumGetPointer(
2870  fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
2871 
2872  Assert(!isnull);
2873  Assert(!VARATT_IS_EXTERNAL(chunk));
2874  Assert(!VARATT_IS_SHORT(chunk));
2875 
2876  memcpy(VARDATA(reconstructed) + data_done,
2877  VARDATA(chunk),
2878  VARSIZE(chunk) - VARHDRSZ);
2879  data_done += VARSIZE(chunk) - VARHDRSZ;
2880  }
2881  Assert(data_done == toast_pointer.va_extsize);
2882 
2883  /* make sure its marked as compressed or not */
2884  if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
2885  SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
2886  else
2887  SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
2888 
2889  memset(&redirect_pointer, 0, sizeof(redirect_pointer));
2890  redirect_pointer.pointer = reconstructed;
2891 
2893  memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
2894  sizeof(redirect_pointer));
2895 
2896  attrs[natt] = PointerGetDatum(new_datum);
2897  }
2898 
2899  /*
2900  * Build tuple in separate memory & copy tuple back into the tuplebuf
2901  * passed to the output plugin. We can't directly heap_fill_tuple() into
2902  * the tuplebuf because attrs[] will point back into the current content.
2903  */
2904  tmphtup = heap_form_tuple(desc, attrs, isnull);
2905  Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
2906  Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
2907 
2908  memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
2909  newtup->tuple.t_len = tmphtup->t_len;
2910 
2911  /*
2912  * free resources we won't further need, more persistent stuff will be
2913  * free'd in ReorderBufferToastReset().
2914  */
2915  RelationClose(toast_rel);
2916  pfree(tmphtup);
2917  for (natt = 0; natt < desc->natts; natt++)
2918  {
2919  if (free[natt])
2920  pfree(DatumGetPointer(attrs[natt]));
2921  }
2922  pfree(attrs);
2923  pfree(free);
2924  pfree(isnull);
2925 
2926  MemoryContextSwitchTo(oldcontext);
2927 }
2928 
2929 /*
2930  * Free all resources allocated for toast reconstruction.
2931  */
2932 static void
2934 {
2935  HASH_SEQ_STATUS hstat;
2936  ReorderBufferToastEnt *ent;
2937 
2938  if (txn->toast_hash == NULL)
2939  return;
2940 
2941  /* sequentially walk over the hash and free everything */
2942  hash_seq_init(&hstat, txn->toast_hash);
2943  while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
2944  {
2945  dlist_mutable_iter it;
2946 
2947  if (ent->reconstructed != NULL)
2948  pfree(ent->reconstructed);
2949 
2950  dlist_foreach_modify(it, &ent->chunks)
2951  {
2952  ReorderBufferChange *change =
2954 
2955  dlist_delete(&change->node);
2956  ReorderBufferReturnChange(rb, change);
2957  }
2958  }
2959 
2960  hash_destroy(txn->toast_hash);
2961  txn->toast_hash = NULL;
2962 }
2963 
2964 
2965 /* ---------------------------------------
2966  * Visibility support for logical decoding
2967  *
2968  *
2969  * Lookup actual cmin/cmax values when using decoding snapshot. We can't
2970  * always rely on stored cmin/cmax values because of two scenarios:
2971  *
2972  * * A tuple got changed multiple times during a single transaction and thus
2973  * has got a combocid. Combocid's are only valid for the duration of a
2974  * single transaction.
2975  * * A tuple with a cmin but no cmax (and thus no combocid) got
2976  * deleted/updated in another transaction than the one which created it
2977  * which we are looking at right now. As only one of cmin, cmax or combocid
2978  * is actually stored in the heap we don't have access to the value we
2979  * need anymore.
2980  *
2981  * To resolve those problems we have a per-transaction hash of (cmin,
2982  * cmax) tuples keyed by (relfilenode, ctid) which contains the actual
2983  * (cmin, cmax) values. That also takes care of combocids by simply
2984  * not caring about them at all. As we have the real cmin/cmax values
2985  * combocids aren't interesting.
2986  *
2987  * As we only care about catalog tuples here the overhead of this
2988  * hashtable should be acceptable.
2989  *
2990  * Heap rewrites complicate this a bit, check rewriteheap.c for
2991  * details.
2992  * -------------------------------------------------------------------------
2993  */
2994 
2995 /* struct for qsort()ing mapping files by lsn somewhat efficiently */
2996 typedef struct RewriteMappingFile
2997 {
3001 
3002 #if NOT_USED
3003 static void
3004 DisplayMapping(HTAB *tuplecid_data)
3005 {
3006  HASH_SEQ_STATUS hstat;
3008 
3009  hash_seq_init(&hstat, tuplecid_data);
3010  while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
3011  {
3012  elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
3013  ent->key.relnode.dbNode,
3014  ent->key.relnode.spcNode,
3015  ent->key.relnode.relNode,
3018  ent->cmin,
3019  ent->cmax
3020  );
3021  }
3022 }
3023 #endif
3024 
3025 /*
3026  * Apply a single mapping file to tuplecid_data.
3027  *
3028  * The mapping file has to have been verified to be a) committed b) for our
3029  * transaction c) applied in LSN order.
3030  */
3031 static void
3032 ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
3033 {
3034  char path[MAXPGPATH];
3035  int fd;
3036  int readBytes;
3038 
3039  sprintf(path, "pg_logical/mappings/%s", fname);
3040  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
3041  if (fd < 0)
3042  ereport(ERROR,
3044  errmsg("could not open file \"%s\": %m", path)));
3045 
3046  while (true)
3047  {
3050  ReorderBufferTupleCidEnt *new_ent;
3051  bool found;
3052 
3053  /* be careful about padding */
3054  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
3055 
3056  /* read all mappings till the end of the file */
3058  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
3060 
3061  if (readBytes < 0)
3062  ereport(ERROR,
3064  errmsg("could not read file \"%s\": %m",
3065  path)));
3066  else if (readBytes == 0) /* EOF */
3067  break;
3068  else if (readBytes != sizeof(LogicalRewriteMappingData))
3069  ereport(ERROR,
3071  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
3072  path, readBytes,
3073  (int32) sizeof(LogicalRewriteMappingData))));
3074 
3075  key.relnode = map.old_node;
3076  ItemPointerCopy(&map.old_tid,
3077  &key.tid);
3078 
3079 
3080  ent = (ReorderBufferTupleCidEnt *)
3081  hash_search(tuplecid_data,
3082  (void *) &key,
3083  HASH_FIND,
3084  NULL);
3085 
3086  /* no existing mapping, no need to update */
3087  if (!ent)
3088  continue;
3089 
3090  key.relnode = map.new_node;
3091  ItemPointerCopy(&map.new_tid,
3092  &key.tid);
3093 
3094  new_ent = (ReorderBufferTupleCidEnt *)
3095  hash_search(tuplecid_data,
3096  (void *) &key,
3097  HASH_ENTER,
3098  &found);
3099 
3100  if (found)
3101  {
3102  /*
3103  * Make sure the existing mapping makes sense. We sometime update
3104  * old records that did not yet have a cmax (e.g. pg_class' own
3105  * entry while rewriting it) during rewrites, so allow that.
3106  */
3107  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
3108  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
3109  }
3110  else
3111  {
3112  /* update mapping */
3113  new_ent->cmin = ent->cmin;
3114  new_ent->cmax = ent->cmax;
3115  new_ent->combocid = ent->combocid;
3116  }
3117  }
3118 }
3119 
3120 
3121 /*
3122  * Check whether the TransactionOId 'xid' is in the pre-sorted array 'xip'.
3123  */
3124 static bool
3126 {
3127  return bsearch(&xid, xip, num,
3128  sizeof(TransactionId), xidComparator) != NULL;
3129 }
3130 
3131 /*
3132  * qsort() comparator for sorting RewriteMappingFiles in LSN order.
3133  */
3134 static int
3135 file_sort_by_lsn(const void *a_p, const void *b_p)
3136 {
3137  RewriteMappingFile *a = *(RewriteMappingFile **) a_p;
3138  RewriteMappingFile *b = *(RewriteMappingFile **) b_p;
3139 
3140  if (a->lsn < b->lsn)
3141  return -1;
3142  else if (a->lsn > b->lsn)
3143  return 1;
3144  return 0;
3145 }
3146 
3147 /*
3148  * Apply any existing logical remapping files if there are any targeted at our
3149  * transaction for relid.
3150  */
3151 static void
3153 {
3154  DIR *mapping_dir;
3155  struct dirent *mapping_de;
3156  List *files = NIL;
3157  ListCell *file;
3158  RewriteMappingFile **files_a;
3159  size_t off;
3160  Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
3161 
3162  mapping_dir = AllocateDir("pg_logical/mappings");
3163  while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
3164  {
3165  Oid f_dboid;
3166  Oid f_relid;
3167  TransactionId f_mapped_xid;
3168  TransactionId f_create_xid;
3169  XLogRecPtr f_lsn;
3170  uint32 f_hi,
3171  f_lo;
3172  RewriteMappingFile *f;
3173 
3174  if (strcmp(mapping_de->d_name, ".") == 0 ||
3175  strcmp(mapping_de->d_name, "..") == 0)
3176  continue;
3177 
3178  /* Ignore files that aren't ours */
3179  if (strncmp(mapping_de->d_name, "map-", 4) != 0)
3180  continue;
3181 
3182  if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
3183  &f_dboid, &f_relid, &f_hi, &f_lo,
3184  &f_mapped_xid, &f_create_xid) != 6)
3185  elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
3186 
3187  f_lsn = ((uint64) f_hi) << 32 | f_lo;
3188 
3189  /* mapping for another database */
3190  if (f_dboid != dboid)
3191  continue;
3192 
3193  /* mapping for another relation */
3194  if (f_relid != relid)
3195  continue;
3196 
3197  /* did the creating transaction abort? */
3198  if (!TransactionIdDidCommit(f_create_xid))
3199  continue;
3200 
3201  /* not for our transaction */
3202  if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
3203  continue;
3204 
3205  /* ok, relevant, queue for apply */
3206  f = palloc(sizeof(RewriteMappingFile));
3207  f->lsn = f_lsn;
3208  strcpy(f->fname, mapping_de->d_name);
3209  files = lappend(files, f);
3210  }
3211  FreeDir(mapping_dir);
3212 
3213  /* build array we can easily sort */
3214  files_a = palloc(list_length(files) * sizeof(RewriteMappingFile *));
3215  off = 0;
3216  foreach(file, files)
3217  {
3218  files_a[off++] = lfirst(file);
3219  }
3220 
3221  /* sort files so we apply them in LSN order */
3222  qsort(files_a, list_length(files), sizeof(RewriteMappingFile *),
3224 
3225  for (off = 0; off < list_length(files); off++)
3226  {
3227  RewriteMappingFile *f = files_a[off];
3228 
3229  elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
3230  snapshot->subxip[0]);
3231  ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
3232  pfree(f);
3233  }
3234 }
3235 
3236 /*
3237  * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
3238  * combocids.
3239  */
3240 bool
3242  Snapshot snapshot,
3243  HeapTuple htup, Buffer buffer,
3244  CommandId *cmin, CommandId *cmax)
3245 {
3248  ForkNumber forkno;
3249  BlockNumber blockno;
3250  bool updated_mapping = false;
3251 
3252  /* be careful about padding */
3253  memset(&key, 0, sizeof(key));
3254 
3255  Assert(!BufferIsLocal(buffer));
3256 
3257  /*
3258  * get relfilenode from the buffer, no convenient way to access it other
3259  * than that.
3260  */
3261  BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
3262 
3263  /* tuples can only be in the main fork */
3264  Assert(forkno == MAIN_FORKNUM);
3265  Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
3266 
3267  ItemPointerCopy(&htup->t_self,
3268  &key.tid);
3269 
3270 restart:
3271  ent = (ReorderBufferTupleCidEnt *)
3272  hash_search(tuplecid_data,
3273  (void *) &key,
3274  HASH_FIND,
3275  NULL);
3276 
3277  /*
3278  * failed to find a mapping, check whether the table was rewritten and
3279  * apply mapping if so, but only do that once - there can be no new
3280  * mappings while we are in here since we have to hold a lock on the
3281  * relation.
3282  */
3283  if (ent == NULL && !updated_mapping)
3284  {
3285  UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
3286  /* now check but don't update for a mapping again */
3287  updated_mapping = true;
3288  goto restart;
3289  }
3290  else if (ent == NULL)
3291  return false;
3292 
3293  if (cmin)
3294  *cmin = ent->cmin;
3295  if (cmax)
3296  *cmax = ent->cmax;
3297  return true;
3298 }
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
XLogRecPtr first_lsn
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
#define NIL
Definition: pg_list.h:69
uint32 CommandId
Definition: c.h:411
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
TimestampTz commit_time
struct ReorderBufferToastEnt ReorderBufferToastEnt
void AbortCurrentTransaction(void)
Definition: xact.c:2986
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
#define SizeofHeapTupleHeader
Definition: htup_details.h:170
bool IsToastRelation(Relation relation)
Definition: catalog.c:135
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:793
void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
#define relpathperm(rnode, forknum)
Definition: relpath.h:67
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
ReorderBufferApplyChangeCB apply_change
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
HeapTupleData * HeapTuple
Definition: htup.h:70
#define DEBUG1
Definition: elog.h:25
#define VALGRIND_MAKE_MEM_DEFINED(addr, size)
Definition: memdebug.h:26
dlist_node * cur
Definition: ilist.h:180
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
void StartupReorderBuffer(void)
#define VARDATA(PTR)
Definition: postgres.h:303
#define fastgetattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:719
static int32 next
Definition: blutils.c:210
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
static const Size max_cached_tuplebufs
void ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
uint32 TransactionId
Definition: c.h:397
void ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
bool copied
Definition: snapshot.h:94
void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
MemoryContext hcxt
Definition: hsearch.h:78
#define DatumGetInt32(X)
Definition: postgres.h:478
#define RelationGetDescr(relation)
Definition: rel.h:429
#define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)
Definition: tuptoaster.h:111
#define DEBUG3
Definition: elog.h:23
#define write(a, b, c)
Definition: win32.h:14
#define VARHDRSZ_SHORT
Definition: postgres.h:269
TransactionId by_txn_last_xid
#define VARSIZE(PTR)
Definition: postgres.h:304
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
Definition: memdebug.h:28
int64 TimestampTz
Definition: timestamp.h:39
#define PointerGetDatum(X)
Definition: postgres.h:562
ReorderBufferTXN * txn
static bool TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
#define VARHDRSZ
Definition: c.h:445
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
XLogRecPtr current_restart_decoding_lsn
#define DatumGetObjectId(X)
Definition: postgres.h:506
char * pstrdup(const char *in)
Definition: mcxt.c:1077
static ReorderBufferTXN * ReorderBufferGetTXN(ReorderBuffer *rb)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
union ReorderBufferChange::@93 data
Form_pg_attribute * attrs
Definition: tupdesc.h:74
void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void ReorderBufferFree(ReorderBuffer *rb)
static void slist_push_head(slist_head *head, slist_node *node)
Definition: ilist.h:574
uint16 RepOriginId
Definition: xlogdefs.h:51
Size entrysize
Definition: hsearch.h:73
struct cursor * cur
Definition: ecpg.c:28
char fname[MAXPGPATH]
int32 va_rawsize
Definition: postgres.h:70
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4322
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:183
void binaryheap_replace_first(binaryheap *heap, Datum d)
Definition: binaryheap.c:204
void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileNode node, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
uint32 BlockNumber
Definition: block.h:31
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:1974
ReorderBufferCommitCB commit
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:692
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:576
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:885
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:125
ReplicationSlotPersistentData data
Definition: slot.h:115
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, int *fd, XLogSegNo *segno)
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
struct SnapshotData * Snapshot
Definition: snapshot.h:23
Form_pg_class rd_rel
Definition: rel.h:114
unsigned int Oid
Definition: postgres_ext.h:31
XLogRecPtr base_snapshot_lsn
Definition: dirent.h:9
uint32 regd_count
Definition: snapshot.h:108
#define PANIC
Definition: elog.h:53
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
void binaryheap_add_unordered(binaryheap *heap, Datum d)
Definition: binaryheap.c:110
MemoryContext change_context
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define VARDATA_EXTERNAL(PTR)
Definition: postgres.h:311
#define PG_BINARY
Definition: c.h:1038
int natts
Definition: tupdesc.h:73
XLogRecPtr origin_lsn
static void ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
signed int int32
Definition: c.h:256
static int file_sort_by_lsn(const void *a_p, const void *b_p)
#define XLogSegNoOffsetToRecPtr(segno, offset, dest)
Definition: xlog_internal.h:95
#define FirstCommandId
Definition: c.h:413
void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
HeapTupleHeader t_data
Definition: htup.h:67
#define VARATT_IS_EXTERNAL(PTR)
Definition: postgres.h:314
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:173
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:421
Definition: dynahash.c:193
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:36
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:950
static void slist_init(slist_head *head)
Definition: ilist.h:554
char * Pointer
Definition: c.h:245
Definition: dirent.c:25
#define ERROR
Definition: elog.h:43
#define VARATT_IS_SHORT(PTR)
Definition: postgres.h:325
dlist_head changes
Datum binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:159
#define MAXPGPATH
ItemPointerData t_self
Definition: htup.h:65
ReorderBufferTupleCidKey key
Definition: reorderbuffer.c:94
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
#define DEBUG2
Definition: elog.h:24
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:417
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
struct ReorderBufferChange::@93::@94 tp
uint32 t_len
Definition: htup.h:64
#define MaxHeapTupleSize
Definition: htup_details.h:561
struct varlena * reconstructed
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4157
#define SET_VARTAG_EXTERNAL(PTR, tag)
Definition: postgres.h:332
uint64 XLogSegNo
Definition: xlogdefs.h:34
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2144
int errcode_for_file_access(void)
Definition: elog.c:598
HeapTupleData tuple
Definition: reorderbuffer.h:27
struct SnapshotData SnapshotData
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:434
#define InvalidTransactionId
Definition: transam.h:31
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:187
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
unsigned int uint32
Definition: c.h:268
XLogRecPtr final_lsn
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2335
Oid t_tableOid
Definition: htup.h:66
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1232
void RelationClose(Relation relation)
Definition: relcache.c:2156
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
ReorderBufferMessageCB message
int unlink(const char *filename)
#define ereport(elevel, rest)
Definition: elog.h:122
int bh_size
Definition: binaryheap.h:32
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
TransactionId * xip
Definition: snapshot.h:77
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
#define VARSIZE_SHORT(PTR)
Definition: postgres.h:306
ForkNumber
Definition: relpath.h:24
static slist_node * slist_pop_head_node(slist_head *head)
Definition: ilist.h:596
List * lappend(List *list, void *datum)
Definition: list.c:128
static HTAB * tuplecid_data
Definition: snapmgr.c:170
int CloseTransientFile(int fd)
Definition: fd.c:2305
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
#define INDIRECT_POINTER_SIZE
Definition: tuptoaster.h:102
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
#define HASH_BLOBS
Definition: hsearch.h:88
ReorderBufferChange * change
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
MemoryContext context
#define slist_container(type, membername, ptr)
Definition: ilist.h:674
void ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
void * palloc0(Size size)
Definition: mcxt.c:878
static bool dlist_has_next(dlist_head *head, dlist_node *node)
Definition: ilist.h:402
#define InvalidCommandId
Definition: c.h:414
uintptr_t Datum
Definition: postgres.h:372
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:301
ReorderBufferTXN * by_txn_last_txn
struct ReorderBufferChange::@93::@96 tuplecid
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void cleanup(void)
Definition: bootstrap.c:855
dlist_head toplevel_by_lsn
struct RewriteMappingFile RewriteMappingFile
Oid MyDatabaseId
Definition: globals.c:76
TransactionId xid
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
bool IsSharedRelation(Oid relationId)
Definition: catalog.c:219
Size keysize
Definition: hsearch.h:72
dlist_node * cur
Definition: ilist.h:161
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:742
static void AssertTXNLsnOrder(ReorderBuffer *rb)
void ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
#define InvalidOid
Definition: postgres_ext.h:36
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
CommandId curcid
Definition: snapshot.h:96
struct ReorderBufferIterTXNState ReorderBufferIterTXNState
static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
struct ReorderBufferDiskChange ReorderBufferDiskChange
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:126
#define free(a)
Definition: header.h:65
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
#define PG_CATCH()
Definition: elog.h:293
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define XLByteToSeg(xlrp, logSegNo)
ItemPointerData new_tid
Definition: rewriteheap.h:40
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
#define lfirst(lc)
Definition: pg_list.h:106
ReorderBufferTXN * ReorderBufferGetOldestTXN(ReorderBuffer *rb)
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
Definition: regguts.h:298
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2401
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
Definition: tuptoaster.h:121
int32 va_extsize
Definition: postgres.h:71
XLogRecPtr end_lsn
void StartTransactionCommand(void)
Definition: xact.c:2677
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:207
void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
size_t Size
Definition: c.h:356
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1208
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:69
SharedInvalidationMessage * invalidations
static int list_length(const List *l)
Definition: pg_list.h:89
void BeginInternalSubTransaction(char *name)
Definition: xact.c:4053
#define BufferIsLocal(buffer)
Definition: buf.h:37
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:194
#define ItemPointerGetOffsetNumber(pointer)
Definition: itemptr.h:94
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
#define PG_RE_THROW()
Definition: elog.h:314
ReorderBuffer * ReorderBufferAllocate(void)
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1353
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:963
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1343
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
struct varlena * pointer
Definition: postgres.h:87
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:398
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
dlist_head subtxns
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:33
#define VARATT_IS_EXTENDED(PTR)
Definition: postgres.h:326
#define DatumGetPointer(X)
Definition: postgres.h:555
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:935
Size nr_cached_tuplebufs
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:547
#define Int32GetDatum(X)
Definition: postgres.h:485
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
uint32 xcnt
Definition: snapshot.h:78
void * palloc(Size size)
Definition: mcxt.c:849
int errmsg(const char *fmt,...)
Definition: elog.c:797
static void UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:707
static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBufferTXN * txn
Definition: reorderbuffer.c:82
void ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:1958
int i
XLogRecPtr restart_decoding_lsn
#define NameStr(name)
Definition: c.h:499
#define SET_VARSIZE_COMPRESSED(PTR, len)
Definition: postgres.h:330
void * arg
Datum binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:174
static const Size max_changes_in_memory
Definition: c.h:439
struct ReorderBufferChange::@93::@95 msg
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
#define SET_VARSIZE(PTR, len)
Definition: postgres.h:328
char d_name[MAX_PATH]
Definition: dirent.h:14
#define elog
Definition: elog.h:219
#define ItemPointerGetBlockNumber(pointer)
Definition: itemptr.h:75
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define qsort(a, b, c, d)
Definition: port.h:440
#define TransactionIdIsValid(xid)
Definition: transam.h:41
ReorderBufferChange change
ReorderBufferBeginCB begin
static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
void BufferGetTag(Buffer buffer, RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
Definition: bufmgr.c:2626
#define PG_TRY()
Definition: elog.h:284
#define XLByteInSeg(xlrp, logSegNo)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
#define RELKIND_SEQUENCE
Definition: pg_class.h:162
#define lstat(path, sb)
Definition: win32.h:262
Definition: pg_list.h:45
int Buffer
Definition: buf.h:23
static ReorderBufferIterTXNState * ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2067
#define PG_END_TRY()
Definition: elog.h:300
#define read(a, b, c)
Definition: win32.h:13
int FreeDir(DIR *dir)
Definition: fd.c:2444
struct HeapTupleData HeapTupleData
#define offsetof(type, field)
Definition: c.h:555
MemoryContext txn_context
dlist_head tuplecids
slist_head cached_tuplebufs
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:138
TransactionId * subxip
Definition: snapshot.h:89
uint32 active_count
Definition: snapshot.h:107
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:138
int32 subxcnt
Definition: snapshot.h:90
void ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
ReorderBufferTupleBuf * ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
bool ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
ItemPointerData old_tid
Definition: rewriteheap.h:39