PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
reorderbuffer.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * reorderbuffer.c
4  * PostgreSQL logical replay/reorder buffer management
5  *
6  *
7  * Copyright (c) 2012-2017, PostgreSQL Global Development Group
8  *
9  *
10  * IDENTIFICATION
11  * src/backend/replication/reorderbuffer.c
12  *
13  * NOTES
14  * This module gets handed individual pieces of transactions in the order
15  * they are written to the WAL and is responsible to reassemble them into
16  * toplevel transaction sized pieces. When a transaction is completely
17  * reassembled - signalled by reading the transaction commit record - it
18  * will then call the output plugin (c.f. ReorderBufferCommit()) with the
19  * individual changes. The output plugins rely on snapshots built by
20  * snapbuild.c which hands them to us.
21  *
22  * Transactions and subtransactions/savepoints in postgres are not
23  * immediately linked to each other from outside the performing
24  * backend. Only at commit/abort (or special xact_assignment records) they
25  * are linked together. Which means that we will have to splice together a
26  * toplevel transaction from its subtransactions. To do that efficiently we
27  * build a binary heap indexed by the smallest current lsn of the individual
28  * subtransactions' changestreams. As the individual streams are inherently
29  * ordered by LSN - since that is where we build them from - the transaction
30  * can easily be reassembled by always using the subtransaction with the
31  * smallest current LSN from the heap.
32  *
33  * In order to cope with large transactions - which can be several times as
34  * big as the available memory - this module supports spooling the contents
35  * of a large transactions to disk. When the transaction is replayed the
36  * contents of individual (sub-)transactions will be read from disk in
37  * chunks.
38  *
39  * This module also has to deal with reassembling toast records from the
40  * individual chunks stored in WAL. When a new (or initial) version of a
41  * tuple is stored in WAL it will always be preceded by the toast chunks
42  * emitted for the columns stored out of line. Within a single toplevel
43  * transaction there will be no other data carrying records between a row's
44  * toast chunks and the row data itself. See ReorderBufferToast* for
45  * details.
46  * -------------------------------------------------------------------------
47  */
48 #include "postgres.h"
49 
50 #include <unistd.h>
51 #include <sys/stat.h>
52 
53 #include "access/rewriteheap.h"
54 #include "access/transam.h"
55 #include "access/tuptoaster.h"
56 #include "access/xact.h"
57 #include "access/xlog_internal.h"
58 #include "catalog/catalog.h"
59 #include "lib/binaryheap.h"
60 #include "miscadmin.h"
61 #include "pgstat.h"
62 #include "replication/logical.h"
64 #include "replication/slot.h"
65 #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
66 #include "storage/bufmgr.h"
67 #include "storage/fd.h"
68 #include "storage/sinval.h"
69 #include "utils/builtins.h"
70 #include "utils/combocid.h"
71 #include "utils/memdebug.h"
72 #include "utils/memutils.h"
73 #include "utils/rel.h"
74 #include "utils/relfilenodemap.h"
75 #include "utils/tqual.h"
76 
77 
78 /* entry for a hash table we use to map from xid to our transaction state */
80 {
84 
85 /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
87 {
91 
93 {
97  CommandId combocid; /* just for debugging */
99 
100 /* k-way in-order change iteration support structures */
102 {
106  int fd;
109 
111 {
115  ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER];
117 
118 /* toast datastructures */
119 typedef struct ReorderBufferToastEnt
120 {
121  Oid chunk_id; /* toast_table.chunk_id */
122  int32 last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
123  * have seen */
124  Size num_chunks; /* number of chunks we've already seen */
125  Size size; /* combined size of chunks seen */
126  dlist_head chunks; /* linked list of chunks */
127  struct varlena *reconstructed; /* reconstructed varlena now pointed to in
128  * main tup */
130 
131 /* Disk serialization support datastructures */
133 {
136  /* data follows */
138 
139 /*
140  * Maximum number of changes kept in memory, per transaction. After that,
141  * changes are spooled to disk.
142  *
143  * The current value should be sufficient to decode the entire transaction
144  * without hitting disk in OLTP workloads, while starting to spool to disk in
145  * other workloads reasonably fast.
146  *
147  * At some point in the future it probably makes sense to have a more elaborate
148  * resource management here, but it's not entirely clear what that would look
149  * like.
150  */
151 static const Size max_changes_in_memory = 4096;
152 
153 /*
154  * We use a very simple form of a slab allocator for frequently allocated
155  * objects, simply keeping a fixed number in a linked list when unused,
156  * instead pfree()ing them. Without that in many workloads aset.c becomes a
157  * major bottleneck, especially when spilling to disk while decoding batch
158  * workloads.
159  */
160 static const Size max_cached_tuplebufs = 4096 * 2; /* ~8MB */
161 
162 /* ---------------------------------------
163  * primary reorderbuffer support routines
164  * ---------------------------------------
165  */
169  TransactionId xid, bool create, bool *is_new,
170  XLogRecPtr lsn, bool create_as_top);
171 
172 static void AssertTXNLsnOrder(ReorderBuffer *rb);
173 
174 /* ---------------------------------------
175  * support functions for lsn-order iterating over the ->changes of a
176  * transaction and its subtransactions
177  *
178  * used for iteration over the k-way heap merge of a transaction and its
179  * subtransactions
180  * ---------------------------------------
181  */
187 
188 /*
189  * ---------------------------------------
190  * Disk serialization support functions
191  * ---------------------------------------
192  */
196  int fd, ReorderBufferChange *change);
198  int *fd, XLogSegNo *segno);
200  char *change);
202 
203 static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
205  ReorderBufferTXN *txn, CommandId cid);
206 
207 /* ---------------------------------------
208  * toast reassembly support
209  * ---------------------------------------
210  */
214  Relation relation, ReorderBufferChange *change);
216  Relation relation, ReorderBufferChange *change);
217 
218 
219 /*
220  * Allocate a new ReorderBuffer
221  */
224 {
226  HASHCTL hash_ctl;
227  MemoryContext new_ctx;
228 
229  /* allocate memory in own context, to have better accountability */
231  "ReorderBuffer",
233 
234  buffer =
235  (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
236 
237  memset(&hash_ctl, 0, sizeof(hash_ctl));
238 
239  buffer->context = new_ctx;
240 
241  buffer->change_context = SlabContextCreate(new_ctx,
242  "Change",
244  sizeof(ReorderBufferChange));
245 
246  buffer->txn_context = SlabContextCreate(new_ctx,
247  "TXN",
249  sizeof(ReorderBufferTXN));
250 
251  hash_ctl.keysize = sizeof(TransactionId);
252  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
253  hash_ctl.hcxt = buffer->context;
254 
255  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
257 
259  buffer->by_txn_last_txn = NULL;
260 
261  buffer->nr_cached_tuplebufs = 0;
262 
263  buffer->outbuf = NULL;
264  buffer->outbufsize = 0;
265 
267 
268  dlist_init(&buffer->toplevel_by_lsn);
269  slist_init(&buffer->cached_tuplebufs);
270 
271  return buffer;
272 }
273 
274 /*
275  * Free a ReorderBuffer
276  */
277 void
279 {
280  MemoryContext context = rb->context;
281 
282  /*
283  * We free separately allocated data by entirely scrapping reorderbuffer's
284  * memory context.
285  */
286  MemoryContextDelete(context);
287 }
288 
289 /*
290  * Get an unused, possibly preallocated, ReorderBufferTXN.
291  */
292 static ReorderBufferTXN *
294 {
295  ReorderBufferTXN *txn;
296 
297  txn = (ReorderBufferTXN *)
299 
300  memset(txn, 0, sizeof(ReorderBufferTXN));
301 
302  dlist_init(&txn->changes);
303  dlist_init(&txn->tuplecids);
304  dlist_init(&txn->subtxns);
305 
306  return txn;
307 }
308 
309 /*
310  * Free a ReorderBufferTXN.
311  *
312  * Deallocation might be delayed for efficiency purposes, for details check
313  * the comments above max_cached_changes's definition.
314  */
315 static void
317 {
318  /* clean the lookup cache if we were cached (quite likely) */
319  if (rb->by_txn_last_xid == txn->xid)
320  {
322  rb->by_txn_last_txn = NULL;
323  }
324 
325  /* free data that's contained */
326 
327  if (txn->tuplecid_hash != NULL)
328  {
330  txn->tuplecid_hash = NULL;
331  }
332 
333  if (txn->invalidations)
334  {
335  pfree(txn->invalidations);
336  txn->invalidations = NULL;
337  }
338 
339  pfree(txn);
340 }
341 
342 /*
343  * Get an unused, possibly preallocated, ReorderBufferChange.
344  */
347 {
348  ReorderBufferChange *change;
349 
350  change = (ReorderBufferChange *)
352 
353  memset(change, 0, sizeof(ReorderBufferChange));
354  return change;
355 }
356 
357 /*
358  * Free an ReorderBufferChange.
359  *
360  * Deallocation might be delayed for efficiency purposes, for details check
361  * the comments above max_cached_changes's definition.
362  */
363 void
365 {
366  /* free contained data */
367  switch (change->action)
368  {
373  if (change->data.tp.newtuple)
374  {
375  ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
376  change->data.tp.newtuple = NULL;
377  }
378 
379  if (change->data.tp.oldtuple)
380  {
381  ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
382  change->data.tp.oldtuple = NULL;
383  }
384  break;
386  if (change->data.msg.prefix != NULL)
387  pfree(change->data.msg.prefix);
388  change->data.msg.prefix = NULL;
389  if (change->data.msg.message != NULL)
390  pfree(change->data.msg.message);
391  change->data.msg.message = NULL;
392  break;
394  if (change->data.snapshot)
395  {
396  ReorderBufferFreeSnap(rb, change->data.snapshot);
397  change->data.snapshot = NULL;
398  }
399  break;
400  /* no data in addition to the struct itself */
404  break;
405  }
406 
407  pfree(change);
408 }
409 
410 /*
411  * Get an unused, possibly preallocated, ReorderBufferTupleBuf fitting at
412  * least a tuple of size tuple_len (excluding header overhead).
413  */
416 {
417  ReorderBufferTupleBuf *tuple;
418  Size alloc_len;
419 
420  alloc_len = tuple_len + SizeofHeapTupleHeader;
421 
422  /*
423  * Most tuples are below MaxHeapTupleSize, so we use a slab allocator for
424  * those. Thus always allocate at least MaxHeapTupleSize. Note that tuples
425  * generated for oldtuples can be bigger, as they don't have out-of-line
426  * toast columns.
427  */
428  if (alloc_len < MaxHeapTupleSize)
429  alloc_len = MaxHeapTupleSize;
430 
431 
432  /* if small enough, check the slab cache */
433  if (alloc_len <= MaxHeapTupleSize && rb->nr_cached_tuplebufs)
434  {
435  rb->nr_cached_tuplebufs--;
439 #ifdef USE_ASSERT_CHECKING
440  memset(&tuple->tuple, 0xa9, sizeof(HeapTupleData));
442 #endif
443  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
444 #ifdef USE_ASSERT_CHECKING
445  memset(tuple->tuple.t_data, 0xa8, tuple->alloc_tuple_size);
447 #endif
448  }
449  else
450  {
451  tuple = (ReorderBufferTupleBuf *)
453  sizeof(ReorderBufferTupleBuf) +
454  MAXIMUM_ALIGNOF + alloc_len);
455  tuple->alloc_tuple_size = alloc_len;
456  tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
457  }
458 
459  return tuple;
460 }
461 
462 /*
463  * Free an ReorderBufferTupleBuf.
464  *
465  * Deallocation might be delayed for efficiency purposes, for details check
466  * the comments above max_cached_changes's definition.
467  */
468 void
470 {
471  /* check whether to put into the slab cache, oversized tuples never are */
472  if (tuple->alloc_tuple_size == MaxHeapTupleSize &&
474  {
475  rb->nr_cached_tuplebufs++;
476  slist_push_head(&rb->cached_tuplebufs, &tuple->node);
479  VALGRIND_MAKE_MEM_DEFINED(&tuple->node, sizeof(tuple->node));
481  }
482  else
483  {
484  pfree(tuple);
485  }
486 }
487 
488 /*
489  * Return the ReorderBufferTXN from the given buffer, specified by Xid.
490  * If create is true, and a transaction doesn't already exist, create it
491  * (with the given LSN, and as top transaction if that's specified);
492  * when this happens, is_new is set to true.
493  */
494 static ReorderBufferTXN *
496  bool *is_new, XLogRecPtr lsn, bool create_as_top)
497 {
498  ReorderBufferTXN *txn;
500  bool found;
501 
503  Assert(!create || lsn != InvalidXLogRecPtr);
504 
505  /*
506  * Check the one-entry lookup cache first
507  */
509  rb->by_txn_last_xid == xid)
510  {
511  txn = rb->by_txn_last_txn;
512 
513  if (txn != NULL)
514  {
515  /* found it, and it's valid */
516  if (is_new)
517  *is_new = false;
518  return txn;
519  }
520 
521  /*
522  * cached as non-existent, and asked not to create? Then nothing else
523  * to do.
524  */
525  if (!create)
526  return NULL;
527  /* otherwise fall through to create it */
528  }
529 
530  /*
531  * If the cache wasn't hit or it yielded an "does-not-exist" and we want
532  * to create an entry.
533  */
534 
535  /* search the lookup table */
536  ent = (ReorderBufferTXNByIdEnt *)
537  hash_search(rb->by_txn,
538  (void *) &xid,
539  create ? HASH_ENTER : HASH_FIND,
540  &found);
541  if (found)
542  txn = ent->txn;
543  else if (create)
544  {
545  /* initialize the new entry, if creation was requested */
546  Assert(ent != NULL);
547 
548  ent->txn = ReorderBufferGetTXN(rb);
549  ent->txn->xid = xid;
550  txn = ent->txn;
551  txn->first_lsn = lsn;
553 
554  if (create_as_top)
555  {
556  dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
557  AssertTXNLsnOrder(rb);
558  }
559  }
560  else
561  txn = NULL; /* not found and not asked to create */
562 
563  /* update cache */
564  rb->by_txn_last_xid = xid;
565  rb->by_txn_last_txn = txn;
566 
567  if (is_new)
568  *is_new = !found;
569 
570  Assert(!create || txn != NULL);
571  return txn;
572 }
573 
574 /*
575  * Queue a change into a transaction so it can be replayed upon commit.
576  */
577 void
579  ReorderBufferChange *change)
580 {
581  ReorderBufferTXN *txn;
582 
583  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
584 
585  change->lsn = lsn;
586  Assert(InvalidXLogRecPtr != lsn);
587  dlist_push_tail(&txn->changes, &change->node);
588  txn->nentries++;
589  txn->nentries_mem++;
590 
592 }
593 
594 /*
595  * Queue message into a transaction so it can be processed upon commit.
596  */
597 void
599  Snapshot snapshot, XLogRecPtr lsn,
600  bool transactional, const char *prefix,
601  Size message_size, const char *message)
602 {
603  if (transactional)
604  {
605  MemoryContext oldcontext;
606  ReorderBufferChange *change;
607 
609 
610  oldcontext = MemoryContextSwitchTo(rb->context);
611 
612  change = ReorderBufferGetChange(rb);
614  change->data.msg.prefix = pstrdup(prefix);
615  change->data.msg.message_size = message_size;
616  change->data.msg.message = palloc(message_size);
617  memcpy(change->data.msg.message, message, message_size);
618 
619  ReorderBufferQueueChange(rb, xid, lsn, change);
620 
621  MemoryContextSwitchTo(oldcontext);
622  }
623  else
624  {
625  ReorderBufferTXN *txn = NULL;
626  volatile Snapshot snapshot_now = snapshot;
627 
628  if (xid != InvalidTransactionId)
629  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
630 
631  /* setup snapshot to allow catalog access */
632  SetupHistoricSnapshot(snapshot_now, NULL);
633  PG_TRY();
634  {
635  rb->message(rb, txn, lsn, false, prefix, message_size, message);
636 
638  }
639  PG_CATCH();
640  {
642  PG_RE_THROW();
643  }
644  PG_END_TRY();
645  }
646 }
647 
648 
649 static void
651 {
652 #ifdef USE_ASSERT_CHECKING
653  dlist_iter iter;
654  XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
655 
656  dlist_foreach(iter, &rb->toplevel_by_lsn)
657  {
658  ReorderBufferTXN *cur_txn;
659 
660  cur_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
661  Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
662 
663  if (cur_txn->end_lsn != InvalidXLogRecPtr)
664  Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
665 
666  if (prev_first_lsn != InvalidXLogRecPtr)
667  Assert(prev_first_lsn < cur_txn->first_lsn);
668 
669  Assert(!cur_txn->is_known_as_subxact);
670  prev_first_lsn = cur_txn->first_lsn;
671  }
672 #endif
673 }
674 
677 {
678  ReorderBufferTXN *txn;
679 
681  return NULL;
682 
683  AssertTXNLsnOrder(rb);
684 
686 
689  return txn;
690 }
691 
692 void
694 {
696 }
697 
698 void
700  TransactionId subxid, XLogRecPtr lsn)
701 {
702  ReorderBufferTXN *txn;
703  ReorderBufferTXN *subtxn;
704  bool new_top;
705  bool new_sub;
706 
707  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
708  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
709 
710  if (new_sub)
711  {
712  /*
713  * we assign subtransactions to top level transaction even if we don't
714  * have data for it yet, assignment records frequently reference xids
715  * that have not yet produced any records. Knowing those aren't top
716  * level xids allows us to make processing cheaper in some places.
717  */
718  dlist_push_tail(&txn->subtxns, &subtxn->node);
719  txn->nsubtxns++;
720  }
721  else if (!subtxn->is_known_as_subxact)
722  {
723  subtxn->is_known_as_subxact = true;
724  Assert(subtxn->nsubtxns == 0);
725 
726  /* remove from lsn order list of top-level transactions */
727  dlist_delete(&subtxn->node);
728 
729  /* add to toplevel transaction */
730  dlist_push_tail(&txn->subtxns, &subtxn->node);
731  txn->nsubtxns++;
732  }
733  else if (new_top)
734  {
735  elog(ERROR, "existing subxact assigned to unknown toplevel xact");
736  }
737 }
738 
739 /*
740  * Associate a subtransaction with its toplevel transaction at commit
741  * time. There may be no further changes added after this.
742  */
743 void
745  TransactionId subxid, XLogRecPtr commit_lsn,
746  XLogRecPtr end_lsn)
747 {
748  ReorderBufferTXN *txn;
749  ReorderBufferTXN *subtxn;
750 
751  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
752  InvalidXLogRecPtr, false);
753 
754  /*
755  * No need to do anything if that subtxn didn't contain any changes
756  */
757  if (!subtxn)
758  return;
759 
760  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true);
761 
762  if (txn == NULL)
763  elog(ERROR, "subxact logged without previous toplevel record");
764 
765  /*
766  * Pass our base snapshot to the parent transaction if it doesn't have
767  * one, or ours is older. That can happen if there are no changes in the
768  * toplevel transaction but in one of the child transactions. This allows
769  * the parent to simply use its base snapshot initially.
770  */
771  if (subtxn->base_snapshot != NULL &&
772  (txn->base_snapshot == NULL ||
773  txn->base_snapshot_lsn > subtxn->base_snapshot_lsn))
774  {
775  txn->base_snapshot = subtxn->base_snapshot;
776  txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
777  subtxn->base_snapshot = NULL;
779  }
780 
781  subtxn->final_lsn = commit_lsn;
782  subtxn->end_lsn = end_lsn;
783 
784  if (!subtxn->is_known_as_subxact)
785  {
786  subtxn->is_known_as_subxact = true;
787  Assert(subtxn->nsubtxns == 0);
788 
789  /* remove from lsn order list of top-level transactions */
790  dlist_delete(&subtxn->node);
791 
792  /* add to subtransaction list */
793  dlist_push_tail(&txn->subtxns, &subtxn->node);
794  txn->nsubtxns++;
795  }
796 }
797 
798 
799 /*
800  * Support for efficiently iterating over a transaction's and its
801  * subtransactions' changes.
802  *
803  * We do by doing a k-way merge between transactions/subtransactions. For that
804  * we model the current heads of the different transactions as a binary heap
805  * so we easily know which (sub-)transaction has the change with the smallest
806  * lsn next.
807  *
808  * We assume the changes in individual transactions are already sorted by LSN.
809  */
810 
811 /*
812  * Binary heap comparison function.
813  */
814 static int
816 {
818  XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
819  XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
820 
821  if (pos_a < pos_b)
822  return 1;
823  else if (pos_a == pos_b)
824  return 0;
825  return -1;
826 }
827 
828 /*
829  * Allocate & initialize an iterator which iterates in lsn order over a
830  * transaction and all its subtransactions.
831  */
834 {
835  Size nr_txns = 0;
837  dlist_iter cur_txn_i;
838  int32 off;
839 
840  /*
841  * Calculate the size of our heap: one element for every transaction that
842  * contains changes. (Besides the transactions already in the reorder
843  * buffer, we count the one we were directly passed.)
844  */
845  if (txn->nentries > 0)
846  nr_txns++;
847 
848  dlist_foreach(cur_txn_i, &txn->subtxns)
849  {
850  ReorderBufferTXN *cur_txn;
851 
852  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
853 
854  if (cur_txn->nentries > 0)
855  nr_txns++;
856  }
857 
858  /*
859  * TODO: Consider adding fastpath for the rather common nr_txns=1 case, no
860  * need to allocate/build a heap then.
861  */
862 
863  /* allocate iteration state */
864  state = (ReorderBufferIterTXNState *)
866  sizeof(ReorderBufferIterTXNState) +
867  sizeof(ReorderBufferIterTXNEntry) * nr_txns);
868 
869  state->nr_txns = nr_txns;
870  dlist_init(&state->old_change);
871 
872  for (off = 0; off < state->nr_txns; off++)
873  {
874  state->entries[off].fd = -1;
875  state->entries[off].segno = 0;
876  }
877 
878  /* allocate heap */
879  state->heap = binaryheap_allocate(state->nr_txns,
881  state);
882 
883  /*
884  * Now insert items into the binary heap, in an unordered fashion. (We
885  * will run a heap assembly step at the end; this is more efficient.)
886  */
887 
888  off = 0;
889 
890  /* add toplevel transaction if it contains changes */
891  if (txn->nentries > 0)
892  {
893  ReorderBufferChange *cur_change;
894 
895  if (txn->serialized)
896  {
897  /* serialize remaining changes */
898  ReorderBufferSerializeTXN(rb, txn);
899  ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
900  &state->entries[off].segno);
901  }
902 
903  cur_change = dlist_head_element(ReorderBufferChange, node,
904  &txn->changes);
905 
906  state->entries[off].lsn = cur_change->lsn;
907  state->entries[off].change = cur_change;
908  state->entries[off].txn = txn;
909 
911  }
912 
913  /* add subtransactions if they contain changes */
914  dlist_foreach(cur_txn_i, &txn->subtxns)
915  {
916  ReorderBufferTXN *cur_txn;
917 
918  cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
919 
920  if (cur_txn->nentries > 0)
921  {
922  ReorderBufferChange *cur_change;
923 
924  if (cur_txn->serialized)
925  {
926  /* serialize remaining changes */
927  ReorderBufferSerializeTXN(rb, cur_txn);
928  ReorderBufferRestoreChanges(rb, cur_txn,
929  &state->entries[off].fd,
930  &state->entries[off].segno);
931  }
932  cur_change = dlist_head_element(ReorderBufferChange, node,
933  &cur_txn->changes);
934 
935  state->entries[off].lsn = cur_change->lsn;
936  state->entries[off].change = cur_change;
937  state->entries[off].txn = cur_txn;
938 
940  }
941  }
942 
943  /* assemble a valid binary heap */
944  binaryheap_build(state->heap);
945 
946  return state;
947 }
948 
949 /*
950  * Return the next change when iterating over a transaction and its
951  * subtransactions.
952  *
953  * Returns NULL when no further changes exist.
954  */
955 static ReorderBufferChange *
957 {
958  ReorderBufferChange *change;
960  int32 off;
961 
962  /* nothing there anymore */
963  if (state->heap->bh_size == 0)
964  return NULL;
965 
966  off = DatumGetInt32(binaryheap_first(state->heap));
967  entry = &state->entries[off];
968 
969  /* free memory we might have "leaked" in the previous *Next call */
970  if (!dlist_is_empty(&state->old_change))
971  {
972  change = dlist_container(ReorderBufferChange, node,
974  ReorderBufferReturnChange(rb, change);
975  Assert(dlist_is_empty(&state->old_change));
976  }
977 
978  change = entry->change;
979 
980  /*
981  * update heap with information about which transaction has the next
982  * relevant change in LSN order
983  */
984 
985  /* there are in-memory changes */
986  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
987  {
988  dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
989  ReorderBufferChange *next_change =
991 
992  /* txn stays the same */
993  state->entries[off].lsn = next_change->lsn;
994  state->entries[off].change = next_change;
995 
997  return change;
998  }
999 
1000  /* try to load changes from disk */
1001  if (entry->txn->nentries != entry->txn->nentries_mem)
1002  {
1003  /*
1004  * Ugly: restoring changes will reuse *Change records, thus delete the
1005  * current one from the per-tx list and only free in the next call.
1006  */
1007  dlist_delete(&change->node);
1008  dlist_push_tail(&state->old_change, &change->node);
1009 
1010  if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
1011  &state->entries[off].segno))
1012  {
1013  /* successfully restored changes from disk */
1014  ReorderBufferChange *next_change =
1016  &entry->txn->changes);
1017 
1018  elog(DEBUG2, "restored %u/%u changes from disk",
1019  (uint32) entry->txn->nentries_mem,
1020  (uint32) entry->txn->nentries);
1021 
1022  Assert(entry->txn->nentries_mem);
1023  /* txn stays the same */
1024  state->entries[off].lsn = next_change->lsn;
1025  state->entries[off].change = next_change;
1027 
1028  return change;
1029  }
1030  }
1031 
1032  /* ok, no changes there anymore, remove */
1033  binaryheap_remove_first(state->heap);
1034 
1035  return change;
1036 }
1037 
1038 /*
1039  * Deallocate the iterator
1040  */
1041 static void
1044 {
1045  int32 off;
1046 
1047  for (off = 0; off < state->nr_txns; off++)
1048  {
1049  if (state->entries[off].fd != -1)
1050  CloseTransientFile(state->entries[off].fd);
1051  }
1052 
1053  /* free memory we might have "leaked" in the last *Next call */
1054  if (!dlist_is_empty(&state->old_change))
1055  {
1056  ReorderBufferChange *change;
1057 
1058  change = dlist_container(ReorderBufferChange, node,
1059  dlist_pop_head_node(&state->old_change));
1060  ReorderBufferReturnChange(rb, change);
1061  Assert(dlist_is_empty(&state->old_change));
1062  }
1063 
1064  binaryheap_free(state->heap);
1065  pfree(state);
1066 }
1067 
1068 /*
1069  * Cleanup the contents of a transaction, usually after the transaction
1070  * committed or aborted.
1071  */
1072 static void
1074 {
1075  bool found;
1076  dlist_mutable_iter iter;
1077 
1078  /* cleanup subtransactions & their changes */
1079  dlist_foreach_modify(iter, &txn->subtxns)
1080  {
1081  ReorderBufferTXN *subtxn;
1082 
1083  subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1084 
1085  /*
1086  * Subtransactions are always associated to the toplevel TXN, even if
1087  * they originally were happening inside another subtxn, so we won't
1088  * ever recurse more than one level deep here.
1089  */
1090  Assert(subtxn->is_known_as_subxact);
1091  Assert(subtxn->nsubtxns == 0);
1092 
1093  ReorderBufferCleanupTXN(rb, subtxn);
1094  }
1095 
1096  /* cleanup changes in the toplevel txn */
1097  dlist_foreach_modify(iter, &txn->changes)
1098  {
1099  ReorderBufferChange *change;
1100 
1101  change = dlist_container(ReorderBufferChange, node, iter.cur);
1102 
1103  ReorderBufferReturnChange(rb, change);
1104  }
1105 
1106  /*
1107  * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1108  * They are always stored in the toplevel transaction.
1109  */
1110  dlist_foreach_modify(iter, &txn->tuplecids)
1111  {
1112  ReorderBufferChange *change;
1113 
1114  change = dlist_container(ReorderBufferChange, node, iter.cur);
1116  ReorderBufferReturnChange(rb, change);
1117  }
1118 
1119  if (txn->base_snapshot != NULL)
1120  {
1122  txn->base_snapshot = NULL;
1124  }
1125 
1126  /*
1127  * Remove TXN from its containing list.
1128  *
1129  * Note: if txn->is_known_as_subxact, we are deleting the TXN from its
1130  * parent's list of known subxacts; this leaves the parent's nsubxacts
1131  * count too high, but we don't care. Otherwise, we are deleting the TXN
1132  * from the LSN-ordered list of toplevel TXNs.
1133  */
1134  dlist_delete(&txn->node);
1135 
1136  /* now remove reference from buffer */
1137  hash_search(rb->by_txn,
1138  (void *) &txn->xid,
1139  HASH_REMOVE,
1140  &found);
1141  Assert(found);
1142 
1143  /* remove entries spilled to disk */
1144  if (txn->serialized)
1145  ReorderBufferRestoreCleanup(rb, txn);
1146 
1147  /* deallocate */
1148  ReorderBufferReturnTXN(rb, txn);
1149 }
1150 
1151 /*
1152  * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by
1153  * tqual.c's HeapTupleSatisfiesHistoricMVCC.
1154  */
1155 static void
1157 {
1158  dlist_iter iter;
1159  HASHCTL hash_ctl;
1160 
1161  if (!txn->has_catalog_changes || dlist_is_empty(&txn->tuplecids))
1162  return;
1163 
1164  memset(&hash_ctl, 0, sizeof(hash_ctl));
1165 
1166  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1167  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1168  hash_ctl.hcxt = rb->context;
1169 
1170  /*
1171  * create the hash with the exact number of to-be-stored tuplecids from
1172  * the start
1173  */
1174  txn->tuplecid_hash =
1175  hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1177 
1178  dlist_foreach(iter, &txn->tuplecids)
1179  {
1182  bool found;
1183  ReorderBufferChange *change;
1184 
1185  change = dlist_container(ReorderBufferChange, node, iter.cur);
1186 
1188 
1189  /* be careful about padding */
1190  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1191 
1192  key.relnode = change->data.tuplecid.node;
1193 
1194  ItemPointerCopy(&change->data.tuplecid.tid,
1195  &key.tid);
1196 
1197  ent = (ReorderBufferTupleCidEnt *)
1199  (void *) &key,
1201  &found);
1202  if (!found)
1203  {
1204  ent->cmin = change->data.tuplecid.cmin;
1205  ent->cmax = change->data.tuplecid.cmax;
1206  ent->combocid = change->data.tuplecid.combocid;
1207  }
1208  else
1209  {
1210  Assert(ent->cmin == change->data.tuplecid.cmin);
1211  Assert(ent->cmax == InvalidCommandId ||
1212  ent->cmax == change->data.tuplecid.cmax);
1213 
1214  /*
1215  * if the tuple got valid in this transaction and now got deleted
1216  * we already have a valid cmin stored. The cmax will be
1217  * InvalidCommandId though.
1218  */
1219  ent->cmax = change->data.tuplecid.cmax;
1220  }
1221  }
1222 }
1223 
1224 /*
1225  * Copy a provided snapshot so we can modify it privately. This is needed so
1226  * that catalog modifying transactions can look into intermediate catalog
1227  * states.
1228  */
1229 static Snapshot
1231  ReorderBufferTXN *txn, CommandId cid)
1232 {
1233  Snapshot snap;
1234  dlist_iter iter;
1235  int i = 0;
1236  Size size;
1237 
1238  size = sizeof(SnapshotData) +
1239  sizeof(TransactionId) * orig_snap->xcnt +
1240  sizeof(TransactionId) * (txn->nsubtxns + 1);
1241 
1242  snap = MemoryContextAllocZero(rb->context, size);
1243  memcpy(snap, orig_snap, sizeof(SnapshotData));
1244 
1245  snap->copied = true;
1246  snap->active_count = 1; /* mark as active so nobody frees it */
1247  snap->regd_count = 0;
1248  snap->xip = (TransactionId *) (snap + 1);
1249 
1250  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1251 
1252  /*
1253  * snap->subxip contains all txids that belong to our transaction which we
1254  * need to check via cmin/cmax. That's why we store the toplevel
1255  * transaction in there as well.
1256  */
1257  snap->subxip = snap->xip + snap->xcnt;
1258  snap->subxip[i++] = txn->xid;
1259 
1260  /*
1261  * nsubxcnt isn't decreased when subtransactions abort, so count manually.
1262  * Since it's an upper boundary it is safe to use it for the allocation
1263  * above.
1264  */
1265  snap->subxcnt = 1;
1266 
1267  dlist_foreach(iter, &txn->subtxns)
1268  {
1269  ReorderBufferTXN *sub_txn;
1270 
1271  sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1272  snap->subxip[i++] = sub_txn->xid;
1273  snap->subxcnt++;
1274  }
1275 
1276  /* sort so we can bsearch() later */
1277  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1278 
1279  /* store the specified current CommandId */
1280  snap->curcid = cid;
1281 
1282  return snap;
1283 }
1284 
1285 /*
1286  * Free a previously ReorderBufferCopySnap'ed snapshot
1287  */
1288 static void
1290 {
1291  if (snap->copied)
1292  pfree(snap);
1293  else
1295 }
1296 
1297 /*
1298  * Perform the replay of a transaction and it's non-aborted subtransactions.
1299  *
1300  * Subtransactions previously have to be processed by
1301  * ReorderBufferCommitChild(), even if previously assigned to the toplevel
1302  * transaction with ReorderBufferAssignChild.
1303  *
1304  * We currently can only decode a transaction's contents in when their commit
1305  * record is read because that's currently the only place where we know about
1306  * cache invalidations. Thus, once a toplevel commit is read, we iterate over
1307  * the top and subtransactions (using a k-way merge) and replay the changes in
1308  * lsn order.
1309  */
1310 void
1312  XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
1313  TimestampTz commit_time,
1314  RepOriginId origin_id, XLogRecPtr origin_lsn)
1315 {
1316  ReorderBufferTXN *txn;
1317  volatile Snapshot snapshot_now;
1318  volatile CommandId command_id = FirstCommandId;
1319  bool using_subtxn;
1320  ReorderBufferIterTXNState *volatile iterstate = NULL;
1321 
1322  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1323  false);
1324 
1325  /* unknown transaction, nothing to replay */
1326  if (txn == NULL)
1327  return;
1328 
1329  txn->final_lsn = commit_lsn;
1330  txn->end_lsn = end_lsn;
1331  txn->commit_time = commit_time;
1332  txn->origin_id = origin_id;
1333  txn->origin_lsn = origin_lsn;
1334 
1335  /*
1336  * If this transaction didn't have any real changes in our database, it's
1337  * OK not to have a snapshot. Note that ReorderBufferCommitChild will have
1338  * transferred its snapshot to this transaction if it had one and the
1339  * toplevel tx didn't.
1340  */
1341  if (txn->base_snapshot == NULL)
1342  {
1343  Assert(txn->ninvalidations == 0);
1344  ReorderBufferCleanupTXN(rb, txn);
1345  return;
1346  }
1347 
1348  snapshot_now = txn->base_snapshot;
1349 
1350  /* build data to be able to lookup the CommandIds of catalog tuples */
1352 
1353  /* setup the initial snapshot */
1354  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1355 
1356  /*
1357  * Decoding needs access to syscaches et al., which in turn use
1358  * heavyweight locks and such. Thus we need to have enough state around to
1359  * keep track of those. The easiest way is to simply use a transaction
1360  * internally. That also allows us to easily enforce that nothing writes
1361  * to the database by checking for xid assignments.
1362  *
1363  * When we're called via the SQL SRF there's already a transaction
1364  * started, so start an explicit subtransaction there.
1365  */
1366  using_subtxn = IsTransactionOrTransactionBlock();
1367 
1368  PG_TRY();
1369  {
1370  ReorderBufferChange *change;
1371  ReorderBufferChange *specinsert = NULL;
1372 
1373  if (using_subtxn)
1374  BeginInternalSubTransaction("replay");
1375  else
1377 
1378  rb->begin(rb, txn);
1379 
1380  iterstate = ReorderBufferIterTXNInit(rb, txn);
1381  while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
1382  {
1383  Relation relation = NULL;
1384  Oid reloid;
1385 
1386  switch (change->action)
1387  {
1389 
1390  /*
1391  * Confirmation for speculative insertion arrived. Simply
1392  * use as a normal record. It'll be cleaned up at the end
1393  * of INSERT processing.
1394  */
1395  Assert(specinsert->data.tp.oldtuple == NULL);
1396  change = specinsert;
1398 
1399  /* intentionally fall through */
1403  Assert(snapshot_now);
1404 
1405  reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode,
1406  change->data.tp.relnode.relNode);
1407 
1408  /*
1409  * Catalog tuple without data, emitted while catalog was
1410  * in the process of being rewritten.
1411  */
1412  if (reloid == InvalidOid &&
1413  change->data.tp.newtuple == NULL &&
1414  change->data.tp.oldtuple == NULL)
1415  goto change_done;
1416  else if (reloid == InvalidOid)
1417  elog(ERROR, "could not map filenode \"%s\" to relation OID",
1418  relpathperm(change->data.tp.relnode,
1419  MAIN_FORKNUM));
1420 
1421  relation = RelationIdGetRelation(reloid);
1422 
1423  if (relation == NULL)
1424  elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
1425  reloid,
1426  relpathperm(change->data.tp.relnode,
1427  MAIN_FORKNUM));
1428 
1429  if (!RelationIsLogicallyLogged(relation))
1430  goto change_done;
1431 
1432  /*
1433  * For now ignore sequence changes entirely. Most of the
1434  * time they don't log changes using records we
1435  * understand, so it doesn't make sense to handle the few
1436  * cases we do.
1437  */
1438  if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
1439  goto change_done;
1440 
1441  /* user-triggered change */
1442  if (!IsToastRelation(relation))
1443  {
1444  ReorderBufferToastReplace(rb, txn, relation, change);
1445  rb->apply_change(rb, txn, relation, change);
1446 
1447  /*
1448  * Only clear reassembled toast chunks if we're sure
1449  * they're not required anymore. The creator of the
1450  * tuple tells us.
1451  */
1452  if (change->data.tp.clear_toast_afterwards)
1453  ReorderBufferToastReset(rb, txn);
1454  }
1455  /* we're not interested in toast deletions */
1456  else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
1457  {
1458  /*
1459  * Need to reassemble the full toasted Datum in
1460  * memory, to ensure the chunks don't get reused till
1461  * we're done remove it from the list of this
1462  * transaction's changes. Otherwise it will get
1463  * freed/reused while restoring spooled data from
1464  * disk.
1465  */
1466  dlist_delete(&change->node);
1467  ReorderBufferToastAppendChunk(rb, txn, relation,
1468  change);
1469  }
1470 
1471  change_done:
1472 
1473  /*
1474  * Either speculative insertion was confirmed, or it was
1475  * unsuccessful and the record isn't needed anymore.
1476  */
1477  if (specinsert != NULL)
1478  {
1479  ReorderBufferReturnChange(rb, specinsert);
1480  specinsert = NULL;
1481  }
1482 
1483  if (relation != NULL)
1484  {
1485  RelationClose(relation);
1486  relation = NULL;
1487  }
1488  break;
1489 
1491 
1492  /*
1493  * Speculative insertions are dealt with by delaying the
1494  * processing of the insert until the confirmation record
1495  * arrives. For that we simply unlink the record from the
1496  * chain, so it does not get freed/reused while restoring
1497  * spooled data from disk.
1498  *
1499  * This is safe in the face of concurrent catalog changes
1500  * because the relevant relation can't be changed between
1501  * speculative insertion and confirmation due to
1502  * CheckTableNotInUse() and locking.
1503  */
1504 
1505  /* clear out a pending (and thus failed) speculation */
1506  if (specinsert != NULL)
1507  {
1508  ReorderBufferReturnChange(rb, specinsert);
1509  specinsert = NULL;
1510  }
1511 
1512  /* and memorize the pending insertion */
1513  dlist_delete(&change->node);
1514  specinsert = change;
1515  break;
1516 
1518  rb->message(rb, txn, change->lsn, true,
1519  change->data.msg.prefix,
1520  change->data.msg.message_size,
1521  change->data.msg.message);
1522  break;
1523 
1525  /* get rid of the old */
1526  TeardownHistoricSnapshot(false);
1527 
1528  if (snapshot_now->copied)
1529  {
1530  ReorderBufferFreeSnap(rb, snapshot_now);
1531  snapshot_now =
1532  ReorderBufferCopySnap(rb, change->data.snapshot,
1533  txn, command_id);
1534  }
1535 
1536  /*
1537  * Restored from disk, need to be careful not to double
1538  * free. We could introduce refcounting for that, but for
1539  * now this seems infrequent enough not to care.
1540  */
1541  else if (change->data.snapshot->copied)
1542  {
1543  snapshot_now =
1544  ReorderBufferCopySnap(rb, change->data.snapshot,
1545  txn, command_id);
1546  }
1547  else
1548  {
1549  snapshot_now = change->data.snapshot;
1550  }
1551 
1552 
1553  /* and continue with the new one */
1554  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1555  break;
1556 
1558  Assert(change->data.command_id != InvalidCommandId);
1559 
1560  if (command_id < change->data.command_id)
1561  {
1562  command_id = change->data.command_id;
1563 
1564  if (!snapshot_now->copied)
1565  {
1566  /* we don't use the global one anymore */
1567  snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
1568  txn, command_id);
1569  }
1570 
1571  snapshot_now->curcid = command_id;
1572 
1573  TeardownHistoricSnapshot(false);
1574  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
1575 
1576  /*
1577  * Every time the CommandId is incremented, we could
1578  * see new catalog contents, so execute all
1579  * invalidations.
1580  */
1582  }
1583 
1584  break;
1585 
1587  elog(ERROR, "tuplecid value in changequeue");
1588  break;
1589  }
1590  }
1591 
1592  /*
1593  * There's a speculative insertion remaining, just clean in up, it
1594  * can't have been successful, otherwise we'd gotten a confirmation
1595  * record.
1596  */
1597  if (specinsert)
1598  {
1599  ReorderBufferReturnChange(rb, specinsert);
1600  specinsert = NULL;
1601  }
1602 
1603  /* clean up the iterator */
1604  ReorderBufferIterTXNFinish(rb, iterstate);
1605  iterstate = NULL;
1606 
1607  /* call commit callback */
1608  rb->commit(rb, txn, commit_lsn);
1609 
1610  /* this is just a sanity check against bad output plugin behaviour */
1612  elog(ERROR, "output plugin used XID %u",
1614 
1615  /* cleanup */
1616  TeardownHistoricSnapshot(false);
1617 
1618  /*
1619  * Aborting the current (sub-)transaction as a whole has the right
1620  * semantics. We want all locks acquired in here to be released, not
1621  * reassigned to the parent and we do not want any database access
1622  * have persistent effects.
1623  */
1625 
1626  /* make sure there's no cache pollution */
1628 
1629  if (using_subtxn)
1631 
1632  if (snapshot_now->copied)
1633  ReorderBufferFreeSnap(rb, snapshot_now);
1634 
1635  /* remove potential on-disk data, and deallocate */
1636  ReorderBufferCleanupTXN(rb, txn);
1637  }
1638  PG_CATCH();
1639  {
1640  /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
1641  if (iterstate)
1642  ReorderBufferIterTXNFinish(rb, iterstate);
1643 
1645 
1646  /*
1647  * Force cache invalidation to happen outside of a valid transaction
1648  * to prevent catalog access as we just caught an error.
1649  */
1651 
1652  /* make sure there's no cache pollution */
1654 
1655  if (using_subtxn)
1657 
1658  if (snapshot_now->copied)
1659  ReorderBufferFreeSnap(rb, snapshot_now);
1660 
1661  /* remove potential on-disk data, and deallocate */
1662  ReorderBufferCleanupTXN(rb, txn);
1663 
1664  PG_RE_THROW();
1665  }
1666  PG_END_TRY();
1667 }
1668 
1669 /*
1670  * Abort a transaction that possibly has previous changes. Needs to be first
1671  * called for subtransactions and then for the toplevel xid.
1672  *
1673  * NB: Transactions handled here have to have actively aborted (i.e. have
1674  * produced an abort record). Implicitly aborted transactions are handled via
1675  * ReorderBufferAbortOld(); transactions we're just not interested in, but
1676  * which have committed are handled in ReorderBufferForget().
1677  *
1678  * This function purges this transaction and its contents from memory and
1679  * disk.
1680  */
1681 void
1683 {
1684  ReorderBufferTXN *txn;
1685 
1686  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1687  false);
1688 
1689  /* unknown, nothing to remove */
1690  if (txn == NULL)
1691  return;
1692 
1693  /* cosmetic... */
1694  txn->final_lsn = lsn;
1695 
1696  /* remove potential on-disk data, and deallocate */
1697  ReorderBufferCleanupTXN(rb, txn);
1698 }
1699 
1700 /*
1701  * Abort all transactions that aren't actually running anymore because the
1702  * server restarted.
1703  *
1704  * NB: These really have to be transactions that have aborted due to a server
1705  * crash/immediate restart, as we don't deal with invalidations here.
1706  */
1707 void
1709 {
1710  dlist_mutable_iter it;
1711 
1712  /*
1713  * Iterate through all (potential) toplevel TXNs and abort all that are
1714  * older than what possibly can be running. Once we've found the first
1715  * that is alive we stop, there might be some that acquired an xid earlier
1716  * but started writing later, but it's unlikely and they will cleaned up
1717  * in a later call to ReorderBufferAbortOld().
1718  */
1720  {
1721  ReorderBufferTXN *txn;
1722 
1723  txn = dlist_container(ReorderBufferTXN, node, it.cur);
1724 
1725  if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
1726  {
1727  elog(DEBUG2, "aborting old transaction %u", txn->xid);
1728 
1729  /* remove potential on-disk data, and deallocate this tx */
1730  ReorderBufferCleanupTXN(rb, txn);
1731  }
1732  else
1733  return;
1734  }
1735 }
1736 
1737 /*
1738  * Forget the contents of a transaction if we aren't interested in it's
1739  * contents. Needs to be first called for subtransactions and then for the
1740  * toplevel xid.
1741  *
1742  * This is significantly different to ReorderBufferAbort() because
1743  * transactions that have committed need to be treated differently from aborted
1744  * ones since they may have modified the catalog.
1745  *
1746  * Note that this is only allowed to be called in the moment a transaction
1747  * commit has just been read, not earlier; otherwise later records referring
1748  * to this xid might re-create the transaction incompletely.
1749  */
1750 void
1752 {
1753  ReorderBufferTXN *txn;
1754 
1755  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1756  false);
1757 
1758  /* unknown, nothing to forget */
1759  if (txn == NULL)
1760  return;
1761 
1762  /* cosmetic... */
1763  txn->final_lsn = lsn;
1764 
1765  /*
1766  * Process cache invalidation messages if there are any. Even if we're not
1767  * interested in the transaction's contents, it could have manipulated the
1768  * catalog and we need to update the caches according to that.
1769  */
1770  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
1772  txn->invalidations);
1773  else
1774  Assert(txn->ninvalidations == 0);
1775 
1776  /* remove potential on-disk data, and deallocate */
1777  ReorderBufferCleanupTXN(rb, txn);
1778 }
1779 
1780 /*
1781  * Execute invalidations happening outside the context of a decoded
1782  * transaction. That currently happens either for xid-less commits
1783  * (c.f. RecordTransactionCommit()) or for invalidations in uninteresting
1784  * transactions (via ReorderBufferForget()).
1785  */
1786 void
1788  SharedInvalidationMessage *invalidations)
1789 {
1790  bool use_subtxn = IsTransactionOrTransactionBlock();
1791  int i;
1792 
1793  if (use_subtxn)
1794  BeginInternalSubTransaction("replay");
1795 
1796  /*
1797  * Force invalidations to happen outside of a valid transaction - that way
1798  * entries will just be marked as invalid without accessing the catalog.
1799  * That's advantageous because we don't need to setup the full state
1800  * necessary for catalog access.
1801  */
1802  if (use_subtxn)
1804 
1805  for (i = 0; i < ninvalidations; i++)
1806  LocalExecuteInvalidationMessage(&invalidations[i]);
1807 
1808  if (use_subtxn)
1810 }
1811 
1812 /*
1813  * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
1814  * least once for every xid in XLogRecord->xl_xid (other places in records
1815  * may, but do not have to be passed through here).
1816  *
1817  * Reorderbuffer keeps some datastructures about transactions in LSN order,
1818  * for efficiency. To do that it has to know about when transactions are seen
1819  * first in the WAL. As many types of records are not actually interesting for
1820  * logical decoding, they do not necessarily pass though here.
1821  */
1822 void
1824 {
1825  /* many records won't have an xid assigned, centralize check here */
1826  if (xid != InvalidTransactionId)
1827  ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1828 }
1829 
1830 /*
1831  * Add a new snapshot to this transaction that may only used after lsn 'lsn'
1832  * because the previous snapshot doesn't describe the catalog correctly for
1833  * following rows.
1834  */
1835 void
1837  XLogRecPtr lsn, Snapshot snap)
1838 {
1840 
1841  change->data.snapshot = snap;
1843 
1844  ReorderBufferQueueChange(rb, xid, lsn, change);
1845 }
1846 
1847 /*
1848  * Setup the base snapshot of a transaction. The base snapshot is the snapshot
1849  * that is used to decode all changes until either this transaction modifies
1850  * the catalog or another catalog modifying transaction commits.
1851  *
1852  * Needs to be called before any changes are added with
1853  * ReorderBufferQueueChange().
1854  */
1855 void
1857  XLogRecPtr lsn, Snapshot snap)
1858 {
1859  ReorderBufferTXN *txn;
1860  bool is_new;
1861 
1862  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
1863  Assert(txn->base_snapshot == NULL);
1864  Assert(snap != NULL);
1865 
1866  txn->base_snapshot = snap;
1867  txn->base_snapshot_lsn = lsn;
1868 }
1869 
1870 /*
1871  * Access the catalog with this CommandId at this point in the changestream.
1872  *
1873  * May only be called for command ids > 1
1874  */
1875 void
1877  XLogRecPtr lsn, CommandId cid)
1878 {
1880 
1881  change->data.command_id = cid;
1883 
1884  ReorderBufferQueueChange(rb, xid, lsn, change);
1885 }
1886 
1887 
1888 /*
1889  * Add new (relfilenode, tid) -> (cmin, cmax) mappings.
1890  */
1891 void
1893  XLogRecPtr lsn, RelFileNode node,
1894  ItemPointerData tid, CommandId cmin,
1895  CommandId cmax, CommandId combocid)
1896 {
1898  ReorderBufferTXN *txn;
1899 
1900  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1901 
1902  change->data.tuplecid.node = node;
1903  change->data.tuplecid.tid = tid;
1904  change->data.tuplecid.cmin = cmin;
1905  change->data.tuplecid.cmax = cmax;
1906  change->data.tuplecid.combocid = combocid;
1907  change->lsn = lsn;
1909 
1910  dlist_push_tail(&txn->tuplecids, &change->node);
1911  txn->ntuplecids++;
1912 }
1913 
1914 /*
1915  * Setup the invalidation of the toplevel transaction.
1916  *
1917  * This needs to be done before ReorderBufferCommit is called!
1918  */
1919 void
1921  XLogRecPtr lsn, Size nmsgs,
1923 {
1924  ReorderBufferTXN *txn;
1925 
1926  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1927 
1928  if (txn->ninvalidations != 0)
1929  elog(ERROR, "only ever add one set of invalidations");
1930 
1931  Assert(nmsgs > 0);
1932 
1933  txn->ninvalidations = nmsgs;
1936  sizeof(SharedInvalidationMessage) * nmsgs);
1937  memcpy(txn->invalidations, msgs,
1938  sizeof(SharedInvalidationMessage) * nmsgs);
1939 }
1940 
1941 /*
1942  * Apply all invalidations we know. Possibly we only need parts at this point
1943  * in the changestream but we don't know which those are.
1944  */
1945 static void
1947 {
1948  int i;
1949 
1950  for (i = 0; i < txn->ninvalidations; i++)
1952 }
1953 
1954 /*
1955  * Mark a transaction as containing catalog changes
1956  */
1957 void
1959  XLogRecPtr lsn)
1960 {
1961  ReorderBufferTXN *txn;
1962 
1963  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
1964 
1965  txn->has_catalog_changes = true;
1966 }
1967 
1968 /*
1969  * Query whether a transaction is already *known* to contain catalog
1970  * changes. This can be wrong until directly before the commit!
1971  */
1972 bool
1974 {
1975  ReorderBufferTXN *txn;
1976 
1977  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1978  false);
1979  if (txn == NULL)
1980  return false;
1981 
1982  return txn->has_catalog_changes;
1983 }
1984 
1985 /*
1986  * Have we already added the first snapshot?
1987  */
1988 bool
1990 {
1991  ReorderBufferTXN *txn;
1992 
1993  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
1994  false);
1995 
1996  /* transaction isn't known yet, ergo no snapshot */
1997  if (txn == NULL)
1998  return false;
1999 
2000  /*
2001  * TODO: It would be a nice improvement if we would check the toplevel
2002  * transaction in subtransactions, but we'd need to keep track of a bit
2003  * more state.
2004  */
2005  return txn->base_snapshot != NULL;
2006 }
2007 
2008 
2009 /*
2010  * ---------------------------------------
2011  * Disk serialization support
2012  * ---------------------------------------
2013  */
2014 
2015 /*
2016  * Ensure the IO buffer is >= sz.
2017  */
2018 static void
2020 {
2021  if (!rb->outbufsize)
2022  {
2023  rb->outbuf = MemoryContextAlloc(rb->context, sz);
2024  rb->outbufsize = sz;
2025  }
2026  else if (rb->outbufsize < sz)
2027  {
2028  rb->outbuf = repalloc(rb->outbuf, sz);
2029  rb->outbufsize = sz;
2030  }
2031 }
2032 
2033 /*
2034  * Check whether the transaction tx should spill its data to disk.
2035  */
2036 static void
2038 {
2039  /*
2040  * TODO: improve accounting so we cheaply can take subtransactions into
2041  * account here.
2042  */
2043  if (txn->nentries_mem >= max_changes_in_memory)
2044  {
2045  ReorderBufferSerializeTXN(rb, txn);
2046  Assert(txn->nentries_mem == 0);
2047  }
2048 }
2049 
2050 /*
2051  * Spill data of a large transaction (and its subtransactions) to disk.
2052  */
2053 static void
2055 {
2056  dlist_iter subtxn_i;
2057  dlist_mutable_iter change_i;
2058  int fd = -1;
2059  XLogSegNo curOpenSegNo = 0;
2060  Size spilled = 0;
2061  char path[MAXPGPATH];
2062 
2063  elog(DEBUG2, "spill %u changes in XID %u to disk",
2064  (uint32) txn->nentries_mem, txn->xid);
2065 
2066  /* do the same to all child TXs */
2067  dlist_foreach(subtxn_i, &txn->subtxns)
2068  {
2069  ReorderBufferTXN *subtxn;
2070 
2071  subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
2072  ReorderBufferSerializeTXN(rb, subtxn);
2073  }
2074 
2075  /* serialize changestream */
2076  dlist_foreach_modify(change_i, &txn->changes)
2077  {
2078  ReorderBufferChange *change;
2079 
2080  change = dlist_container(ReorderBufferChange, node, change_i.cur);
2081 
2082  /*
2083  * store in segment in which it belongs by start lsn, don't split over
2084  * multiple segments tho
2085  */
2086  if (fd == -1 || !XLByteInSeg(change->lsn, curOpenSegNo))
2087  {
2088  XLogRecPtr recptr;
2089 
2090  if (fd != -1)
2091  CloseTransientFile(fd);
2092 
2093  XLByteToSeg(change->lsn, curOpenSegNo);
2094  XLogSegNoOffsetToRecPtr(curOpenSegNo, 0, recptr);
2095 
2096  /*
2097  * No need to care about TLIs here, only used during a single run,
2098  * so each LSN only maps to a specific WAL record.
2099  */
2100  sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2102  (uint32) (recptr >> 32), (uint32) recptr);
2103 
2104  /* open segment, create it if necessary */
2105  fd = OpenTransientFile(path,
2106  O_CREAT | O_WRONLY | O_APPEND | PG_BINARY,
2107  S_IRUSR | S_IWUSR);
2108 
2109  if (fd < 0)
2110  ereport(ERROR,
2112  errmsg("could not open file \"%s\": %m",
2113  path)));
2114  }
2115 
2116  ReorderBufferSerializeChange(rb, txn, fd, change);
2117  dlist_delete(&change->node);
2118  ReorderBufferReturnChange(rb, change);
2119 
2120  spilled++;
2121  }
2122 
2123  Assert(spilled == txn->nentries_mem);
2124  Assert(dlist_is_empty(&txn->changes));
2125  txn->nentries_mem = 0;
2126  txn->serialized = true;
2127 
2128  if (fd != -1)
2129  CloseTransientFile(fd);
2130 }
2131 
2132 /*
2133  * Serialize individual change to disk.
2134  */
2135 static void
2137  int fd, ReorderBufferChange *change)
2138 {
2139  ReorderBufferDiskChange *ondisk;
2140  Size sz = sizeof(ReorderBufferDiskChange);
2141 
2143 
2144  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2145  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
2146 
2147  switch (change->action)
2148  {
2149  /* fall through these, they're all similar enough */
2154  {
2155  char *data;
2156  ReorderBufferTupleBuf *oldtup,
2157  *newtup;
2158  Size oldlen = 0;
2159  Size newlen = 0;
2160 
2161  oldtup = change->data.tp.oldtuple;
2162  newtup = change->data.tp.newtuple;
2163 
2164  if (oldtup)
2165  {
2166  sz += sizeof(HeapTupleData);
2167  oldlen = oldtup->tuple.t_len;
2168  sz += oldlen;
2169  }
2170 
2171  if (newtup)
2172  {
2173  sz += sizeof(HeapTupleData);
2174  newlen = newtup->tuple.t_len;
2175  sz += newlen;
2176  }
2177 
2178  /* make sure we have enough space */
2180 
2181  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2182  /* might have been reallocated above */
2183  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2184 
2185  if (oldlen)
2186  {
2187  memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
2188  data += sizeof(HeapTupleData);
2189 
2190  memcpy(data, oldtup->tuple.t_data, oldlen);
2191  data += oldlen;
2192  }
2193 
2194  if (newlen)
2195  {
2196  memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
2197  data += sizeof(HeapTupleData);
2198 
2199  memcpy(data, newtup->tuple.t_data, newlen);
2200  data += newlen;
2201  }
2202  break;
2203  }
2205  {
2206  char *data;
2207  Size prefix_size = strlen(change->data.msg.prefix) + 1;
2208 
2209  sz += prefix_size + change->data.msg.message_size +
2210  sizeof(Size) + sizeof(Size);
2212 
2213  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2214 
2215  /* might have been reallocated above */
2216  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2217 
2218  /* write the prefix including the size */
2219  memcpy(data, &prefix_size, sizeof(Size));
2220  data += sizeof(Size);
2221  memcpy(data, change->data.msg.prefix,
2222  prefix_size);
2223  data += prefix_size;
2224 
2225  /* write the message including the size */
2226  memcpy(data, &change->data.msg.message_size, sizeof(Size));
2227  data += sizeof(Size);
2228  memcpy(data, change->data.msg.message,
2229  change->data.msg.message_size);
2230  data += change->data.msg.message_size;
2231 
2232  break;
2233  }
2235  {
2236  Snapshot snap;
2237  char *data;
2238 
2239  snap = change->data.snapshot;
2240 
2241  sz += sizeof(SnapshotData) +
2242  sizeof(TransactionId) * snap->xcnt +
2243  sizeof(TransactionId) * snap->subxcnt
2244  ;
2245 
2246  /* make sure we have enough space */
2248  data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
2249  /* might have been reallocated above */
2250  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2251 
2252  memcpy(data, snap, sizeof(SnapshotData));
2253  data += sizeof(SnapshotData);
2254 
2255  if (snap->xcnt)
2256  {
2257  memcpy(data, snap->xip,
2258  sizeof(TransactionId) * snap->xcnt);
2259  data += sizeof(TransactionId) * snap->xcnt;
2260  }
2261 
2262  if (snap->subxcnt)
2263  {
2264  memcpy(data, snap->subxip,
2265  sizeof(TransactionId) * snap->subxcnt);
2266  data += sizeof(TransactionId) * snap->subxcnt;
2267  }
2268  break;
2269  }
2273  /* ReorderBufferChange contains everything important */
2274  break;
2275  }
2276 
2277  ondisk->size = sz;
2278 
2280  if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
2281  {
2282  int save_errno = errno;
2283 
2284  CloseTransientFile(fd);
2285  errno = save_errno;
2286  ereport(ERROR,
2288  errmsg("could not write to data file for XID %u: %m",
2289  txn->xid)));
2290  }
2292 
2293  Assert(ondisk->change.action == change->action);
2294 }
2295 
2296 /*
2297  * Restore a number of changes spilled to disk back into memory.
2298  */
2299 static Size
2301  int *fd, XLogSegNo *segno)
2302 {
2303  Size restored = 0;
2304  XLogSegNo last_segno;
2305  dlist_mutable_iter cleanup_iter;
2306 
2309 
2310  /* free current entries, so we have memory for more */
2311  dlist_foreach_modify(cleanup_iter, &txn->changes)
2312  {
2314  dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
2315 
2316  dlist_delete(&cleanup->node);
2317  ReorderBufferReturnChange(rb, cleanup);
2318  }
2319  txn->nentries_mem = 0;
2320  Assert(dlist_is_empty(&txn->changes));
2321 
2322  XLByteToSeg(txn->final_lsn, last_segno);
2323 
2324  while (restored < max_changes_in_memory && *segno <= last_segno)
2325  {
2326  int readBytes;
2327  ReorderBufferDiskChange *ondisk;
2328 
2329  if (*fd == -1)
2330  {
2331  XLogRecPtr recptr;
2332  char path[MAXPGPATH];
2333 
2334  /* first time in */
2335  if (*segno == 0)
2336  {
2337  XLByteToSeg(txn->first_lsn, *segno);
2338  }
2339 
2340  Assert(*segno != 0 || dlist_is_empty(&txn->changes));
2341  XLogSegNoOffsetToRecPtr(*segno, 0, recptr);
2342 
2343  /*
2344  * No need to care about TLIs here, only used during a single run,
2345  * so each LSN only maps to a specific WAL record.
2346  */
2347  sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2349  (uint32) (recptr >> 32), (uint32) recptr);
2350 
2351  *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
2352  if (*fd < 0 && errno == ENOENT)
2353  {
2354  *fd = -1;
2355  (*segno)++;
2356  continue;
2357  }
2358  else if (*fd < 0)
2359  ereport(ERROR,
2361  errmsg("could not open file \"%s\": %m",
2362  path)));
2363 
2364  }
2365 
2366  /*
2367  * Read the statically sized part of a change which has information
2368  * about the total size. If we couldn't read a record, we're at the
2369  * end of this file.
2370  */
2373  readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
2375 
2376  /* eof */
2377  if (readBytes == 0)
2378  {
2379  CloseTransientFile(*fd);
2380  *fd = -1;
2381  (*segno)++;
2382  continue;
2383  }
2384  else if (readBytes < 0)
2385  ereport(ERROR,
2387  errmsg("could not read from reorderbuffer spill file: %m")));
2388  else if (readBytes != sizeof(ReorderBufferDiskChange))
2389  ereport(ERROR,
2391  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2392  readBytes,
2393  (uint32) sizeof(ReorderBufferDiskChange))));
2394 
2395  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2396 
2398  sizeof(ReorderBufferDiskChange) + ondisk->size);
2399  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
2400 
2402  readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
2403  ondisk->size - sizeof(ReorderBufferDiskChange));
2405 
2406  if (readBytes < 0)
2407  ereport(ERROR,
2409  errmsg("could not read from reorderbuffer spill file: %m")));
2410  else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
2411  ereport(ERROR,
2413  errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
2414  readBytes,
2415  (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
2416 
2417  /*
2418  * ok, read a full change from disk, now restore it into proper
2419  * in-memory format
2420  */
2421  ReorderBufferRestoreChange(rb, txn, rb->outbuf);
2422  restored++;
2423  }
2424 
2425  return restored;
2426 }
2427 
2428 /*
2429  * Convert change from its on-disk format to in-memory format and queue it onto
2430  * the TXN's ->changes list.
2431  *
2432  * Note: although "data" is declared char*, at entry it points to a
2433  * maxalign'd buffer, making it safe in most of this function to assume
2434  * that the pointed-to data is suitably aligned for direct access.
2435  */
2436 static void
2438  char *data)
2439 {
2440  ReorderBufferDiskChange *ondisk;
2441  ReorderBufferChange *change;
2442 
2443  ondisk = (ReorderBufferDiskChange *) data;
2444 
2445  change = ReorderBufferGetChange(rb);
2446 
2447  /* copy static part */
2448  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
2449 
2450  data += sizeof(ReorderBufferDiskChange);
2451 
2452  /* restore individual stuff */
2453  switch (change->action)
2454  {
2455  /* fall through these, they're all similar enough */
2460  if (change->data.tp.oldtuple)
2461  {
2462  uint32 tuplelen = ((HeapTuple) data)->t_len;
2463 
2464  change->data.tp.oldtuple =
2466 
2467  /* restore ->tuple */
2468  memcpy(&change->data.tp.oldtuple->tuple, data,
2469  sizeof(HeapTupleData));
2470  data += sizeof(HeapTupleData);
2471 
2472  /* reset t_data pointer into the new tuplebuf */
2473  change->data.tp.oldtuple->tuple.t_data =
2474  ReorderBufferTupleBufData(change->data.tp.oldtuple);
2475 
2476  /* restore tuple data itself */
2477  memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
2478  data += tuplelen;
2479  }
2480 
2481  if (change->data.tp.newtuple)
2482  {
2483  /* here, data might not be suitably aligned! */
2484  uint32 tuplelen;
2485 
2486  memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
2487  sizeof(uint32));
2488 
2489  change->data.tp.newtuple =
2491 
2492  /* restore ->tuple */
2493  memcpy(&change->data.tp.newtuple->tuple, data,
2494  sizeof(HeapTupleData));
2495  data += sizeof(HeapTupleData);
2496 
2497  /* reset t_data pointer into the new tuplebuf */
2498  change->data.tp.newtuple->tuple.t_data =
2499  ReorderBufferTupleBufData(change->data.tp.newtuple);
2500 
2501  /* restore tuple data itself */
2502  memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
2503  data += tuplelen;
2504  }
2505 
2506  break;
2508  {
2509  Size prefix_size;
2510 
2511  /* read prefix */
2512  memcpy(&prefix_size, data, sizeof(Size));
2513  data += sizeof(Size);
2514  change->data.msg.prefix = MemoryContextAlloc(rb->context,
2515  prefix_size);
2516  memcpy(change->data.msg.prefix, data, prefix_size);
2517  Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
2518  data += prefix_size;
2519 
2520  /* read the message */
2521  memcpy(&change->data.msg.message_size, data, sizeof(Size));
2522  data += sizeof(Size);
2523  change->data.msg.message = MemoryContextAlloc(rb->context,
2524  change->data.msg.message_size);
2525  memcpy(change->data.msg.message, data,
2526  change->data.msg.message_size);
2527  data += change->data.msg.message_size;
2528 
2529  break;
2530  }
2532  {
2533  Snapshot oldsnap;
2534  Snapshot newsnap;
2535  Size size;
2536 
2537  oldsnap = (Snapshot) data;
2538 
2539  size = sizeof(SnapshotData) +
2540  sizeof(TransactionId) * oldsnap->xcnt +
2541  sizeof(TransactionId) * (oldsnap->subxcnt + 0);
2542 
2543  change->data.snapshot = MemoryContextAllocZero(rb->context, size);
2544 
2545  newsnap = change->data.snapshot;
2546 
2547  memcpy(newsnap, data, size);
2548  newsnap->xip = (TransactionId *)
2549  (((char *) newsnap) + sizeof(SnapshotData));
2550  newsnap->subxip = newsnap->xip + newsnap->xcnt;
2551  newsnap->copied = true;
2552  break;
2553  }
2554  /* the base struct contains all the data, easy peasy */
2558  break;
2559  }
2560 
2561  dlist_push_tail(&txn->changes, &change->node);
2562  txn->nentries_mem++;
2563 }
2564 
2565 /*
2566  * Remove all on-disk stored for the passed in transaction.
2567  */
2568 static void
2570 {
2571  XLogSegNo first;
2572  XLogSegNo cur;
2573  XLogSegNo last;
2574 
2577 
2578  XLByteToSeg(txn->first_lsn, first);
2579  XLByteToSeg(txn->final_lsn, last);
2580 
2581  /* iterate over all possible filenames, and delete them */
2582  for (cur = first; cur <= last; cur++)
2583  {
2584  char path[MAXPGPATH];
2585  XLogRecPtr recptr;
2586 
2587  XLogSegNoOffsetToRecPtr(cur, 0, recptr);
2588 
2589  sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2591  (uint32) (recptr >> 32), (uint32) recptr);
2592  if (unlink(path) != 0 && errno != ENOENT)
2593  ereport(ERROR,
2595  errmsg("could not remove file \"%s\": %m", path)));
2596  }
2597 }
2598 
2599 /*
2600  * Delete all data spilled to disk after we've restarted/crashed. It will be
2601  * recreated when the respective slots are reused.
2602  */
2603 void
2605 {
2606  DIR *logical_dir;
2607  struct dirent *logical_de;
2608 
2609  DIR *spill_dir;
2610  struct dirent *spill_de;
2611 
2612  logical_dir = AllocateDir("pg_replslot");
2613  while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
2614  {
2615  struct stat statbuf;
2616  char path[MAXPGPATH * 2 + 12];
2617 
2618  if (strcmp(logical_de->d_name, ".") == 0 ||
2619  strcmp(logical_de->d_name, "..") == 0)
2620  continue;
2621 
2622  /* if it cannot be a slot, skip the directory */
2623  if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
2624  continue;
2625 
2626  /*
2627  * ok, has to be a surviving logical slot, iterate and delete
2628  * everything starting with xid-*
2629  */
2630  sprintf(path, "pg_replslot/%s", logical_de->d_name);
2631 
2632  /* we're only creating directories here, skip if it's not our's */
2633  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2634  continue;
2635 
2636  spill_dir = AllocateDir(path);
2637  while ((spill_de = ReadDir(spill_dir, path)) != NULL)
2638  {
2639  if (strcmp(spill_de->d_name, ".") == 0 ||
2640  strcmp(spill_de->d_name, "..") == 0)
2641  continue;
2642 
2643  /* only look at names that can be ours */
2644  if (strncmp(spill_de->d_name, "xid", 3) == 0)
2645  {
2646  sprintf(path, "pg_replslot/%s/%s", logical_de->d_name,
2647  spill_de->d_name);
2648 
2649  if (unlink(path) != 0)
2650  ereport(PANIC,
2652  errmsg("could not remove file \"%s\": %m",
2653  path)));
2654  }
2655  }
2656  FreeDir(spill_dir);
2657  }
2658  FreeDir(logical_dir);
2659 }
2660 
2661 /* ---------------------------------------
2662  * toast reassembly support
2663  * ---------------------------------------
2664  */
2665 
2666 /*
2667  * Initialize per tuple toast reconstruction support.
2668  */
2669 static void
2671 {
2672  HASHCTL hash_ctl;
2673 
2674  Assert(txn->toast_hash == NULL);
2675 
2676  memset(&hash_ctl, 0, sizeof(hash_ctl));
2677  hash_ctl.keysize = sizeof(Oid);
2678  hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
2679  hash_ctl.hcxt = rb->context;
2680  txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
2682 }
2683 
2684 /*
2685  * Per toast-chunk handling for toast reconstruction
2686  *
2687  * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
2688  * toasted Datum comes along.
2689  */
2690 static void
2692  Relation relation, ReorderBufferChange *change)
2693 {
2694  ReorderBufferToastEnt *ent;
2695  ReorderBufferTupleBuf *newtup;
2696  bool found;
2697  int32 chunksize;
2698  bool isnull;
2699  Pointer chunk;
2700  TupleDesc desc = RelationGetDescr(relation);
2701  Oid chunk_id;
2702  int32 chunk_seq;
2703 
2704  if (txn->toast_hash == NULL)
2705  ReorderBufferToastInitHash(rb, txn);
2706 
2707  Assert(IsToastRelation(relation));
2708 
2709  newtup = change->data.tp.newtuple;
2710  chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
2711  Assert(!isnull);
2712  chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
2713  Assert(!isnull);
2714 
2715  ent = (ReorderBufferToastEnt *)
2716  hash_search(txn->toast_hash,
2717  (void *) &chunk_id,
2718  HASH_ENTER,
2719  &found);
2720 
2721  if (!found)
2722  {
2723  Assert(ent->chunk_id == chunk_id);
2724  ent->num_chunks = 0;
2725  ent->last_chunk_seq = 0;
2726  ent->size = 0;
2727  ent->reconstructed = NULL;
2728  dlist_init(&ent->chunks);
2729 
2730  if (chunk_seq != 0)
2731  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
2732  chunk_seq, chunk_id);
2733  }
2734  else if (found && chunk_seq != ent->last_chunk_seq + 1)
2735  elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
2736  chunk_seq, chunk_id, ent->last_chunk_seq + 1);
2737 
2738  chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
2739  Assert(!isnull);
2740 
2741  /* calculate size so we can allocate the right size at once later */
2742  if (!VARATT_IS_EXTENDED(chunk))
2743  chunksize = VARSIZE(chunk) - VARHDRSZ;
2744  else if (VARATT_IS_SHORT(chunk))
2745  /* could happen due to heap_form_tuple doing its thing */
2746  chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
2747  else
2748  elog(ERROR, "unexpected type of toast chunk");
2749 
2750  ent->size += chunksize;
2751  ent->last_chunk_seq = chunk_seq;
2752  ent->num_chunks++;
2753  dlist_push_tail(&ent->chunks, &change->node);
2754 }
2755 
2756 /*
2757  * Rejigger change->newtuple to point to in-memory toast tuples instead to
2758  * on-disk toast tuples that may not longer exist (think DROP TABLE or VACUUM).
2759  *
2760  * We cannot replace unchanged toast tuples though, so those will still point
2761  * to on-disk toast data.
2762  */
2763 static void
2765  Relation relation, ReorderBufferChange *change)
2766 {
2767  TupleDesc desc;
2768  int natt;
2769  Datum *attrs;
2770  bool *isnull;
2771  bool *free;
2772  HeapTuple tmphtup;
2773  Relation toast_rel;
2774  TupleDesc toast_desc;
2775  MemoryContext oldcontext;
2776  ReorderBufferTupleBuf *newtup;
2777 
2778  /* no toast tuples changed */
2779  if (txn->toast_hash == NULL)
2780  return;
2781 
2782  oldcontext = MemoryContextSwitchTo(rb->context);
2783 
2784  /* we should only have toast tuples in an INSERT or UPDATE */
2785  Assert(change->data.tp.newtuple);
2786 
2787  desc = RelationGetDescr(relation);
2788 
2789  toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
2790  toast_desc = RelationGetDescr(toast_rel);
2791 
2792  /* should we allocate from stack instead? */
2793  attrs = palloc0(sizeof(Datum) * desc->natts);
2794  isnull = palloc0(sizeof(bool) * desc->natts);
2795  free = palloc0(sizeof(bool) * desc->natts);
2796 
2797  newtup = change->data.tp.newtuple;
2798 
2799  heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
2800 
2801  for (natt = 0; natt < desc->natts; natt++)
2802  {
2803  Form_pg_attribute attr = TupleDescAttr(desc, natt);
2804  ReorderBufferToastEnt *ent;
2805  struct varlena *varlena;
2806 
2807  /* va_rawsize is the size of the original datum -- including header */
2808  struct varatt_external toast_pointer;
2809  struct varatt_indirect redirect_pointer;
2810  struct varlena *new_datum = NULL;
2811  struct varlena *reconstructed;
2812  dlist_iter it;
2813  Size data_done = 0;
2814 
2815  /* system columns aren't toasted */
2816  if (attr->attnum < 0)
2817  continue;
2818 
2819  if (attr->attisdropped)
2820  continue;
2821 
2822  /* not a varlena datatype */
2823  if (attr->attlen != -1)
2824  continue;
2825 
2826  /* no data */
2827  if (isnull[natt])
2828  continue;
2829 
2830  /* ok, we know we have a toast datum */
2831  varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
2832 
2833  /* no need to do anything if the tuple isn't external */
2834  if (!VARATT_IS_EXTERNAL(varlena))
2835  continue;
2836 
2837  VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
2838 
2839  /*
2840  * Check whether the toast tuple changed, replace if so.
2841  */
2842  ent = (ReorderBufferToastEnt *)
2843  hash_search(txn->toast_hash,
2844  (void *) &toast_pointer.va_valueid,
2845  HASH_FIND,
2846  NULL);
2847  if (ent == NULL)
2848  continue;
2849 
2850  new_datum =
2851  (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
2852 
2853  free[natt] = true;
2854 
2855  reconstructed = palloc0(toast_pointer.va_rawsize);
2856 
2857  ent->reconstructed = reconstructed;
2858 
2859  /* stitch toast tuple back together from its parts */
2860  dlist_foreach(it, &ent->chunks)
2861  {
2862  bool isnull;
2863  ReorderBufferChange *cchange;
2864  ReorderBufferTupleBuf *ctup;
2865  Pointer chunk;
2866 
2867  cchange = dlist_container(ReorderBufferChange, node, it.cur);
2868  ctup = cchange->data.tp.newtuple;
2869  chunk = DatumGetPointer(
2870  fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
2871 
2872  Assert(!isnull);
2873  Assert(!VARATT_IS_EXTERNAL(chunk));
2874  Assert(!VARATT_IS_SHORT(chunk));
2875 
2876  memcpy(VARDATA(reconstructed) + data_done,
2877  VARDATA(chunk),
2878  VARSIZE(chunk) - VARHDRSZ);
2879  data_done += VARSIZE(chunk) - VARHDRSZ;
2880  }
2881  Assert(data_done == toast_pointer.va_extsize);
2882 
2883  /* make sure its marked as compressed or not */
2884  if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
2885  SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
2886  else
2887  SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
2888 
2889  memset(&redirect_pointer, 0, sizeof(redirect_pointer));
2890  redirect_pointer.pointer = reconstructed;
2891 
2893  memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
2894  sizeof(redirect_pointer));
2895 
2896  attrs[natt] = PointerGetDatum(new_datum);
2897  }
2898 
2899  /*
2900  * Build tuple in separate memory & copy tuple back into the tuplebuf
2901  * passed to the output plugin. We can't directly heap_fill_tuple() into
2902  * the tuplebuf because attrs[] will point back into the current content.
2903  */
2904  tmphtup = heap_form_tuple(desc, attrs, isnull);
2905  Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
2906  Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
2907 
2908  memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
2909  newtup->tuple.t_len = tmphtup->t_len;
2910 
2911  /*
2912  * free resources we won't further need, more persistent stuff will be
2913  * free'd in ReorderBufferToastReset().
2914  */
2915  RelationClose(toast_rel);
2916  pfree(tmphtup);
2917  for (natt = 0; natt < desc->natts; natt++)
2918  {
2919  if (free[natt])
2920  pfree(DatumGetPointer(attrs[natt]));
2921  }
2922  pfree(attrs);
2923  pfree(free);
2924  pfree(isnull);
2925 
2926  MemoryContextSwitchTo(oldcontext);
2927 }
2928 
2929 /*
2930  * Free all resources allocated for toast reconstruction.
2931  */
2932 static void
2934 {
2935  HASH_SEQ_STATUS hstat;
2936  ReorderBufferToastEnt *ent;
2937 
2938  if (txn->toast_hash == NULL)
2939  return;
2940 
2941  /* sequentially walk over the hash and free everything */
2942  hash_seq_init(&hstat, txn->toast_hash);
2943  while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
2944  {
2945  dlist_mutable_iter it;
2946 
2947  if (ent->reconstructed != NULL)
2948  pfree(ent->reconstructed);
2949 
2950  dlist_foreach_modify(it, &ent->chunks)
2951  {
2952  ReorderBufferChange *change =
2954 
2955  dlist_delete(&change->node);
2956  ReorderBufferReturnChange(rb, change);
2957  }
2958  }
2959 
2960  hash_destroy(txn->toast_hash);
2961  txn->toast_hash = NULL;
2962 }
2963 
2964 
2965 /* ---------------------------------------
2966  * Visibility support for logical decoding
2967  *
2968  *
2969  * Lookup actual cmin/cmax values when using decoding snapshot. We can't
2970  * always rely on stored cmin/cmax values because of two scenarios:
2971  *
2972  * * A tuple got changed multiple times during a single transaction and thus
2973  * has got a combocid. Combocid's are only valid for the duration of a
2974  * single transaction.
2975  * * A tuple with a cmin but no cmax (and thus no combocid) got
2976  * deleted/updated in another transaction than the one which created it
2977  * which we are looking at right now. As only one of cmin, cmax or combocid
2978  * is actually stored in the heap we don't have access to the value we
2979  * need anymore.
2980  *
2981  * To resolve those problems we have a per-transaction hash of (cmin,
2982  * cmax) tuples keyed by (relfilenode, ctid) which contains the actual
2983  * (cmin, cmax) values. That also takes care of combocids by simply
2984  * not caring about them at all. As we have the real cmin/cmax values
2985  * combocids aren't interesting.
2986  *
2987  * As we only care about catalog tuples here the overhead of this
2988  * hashtable should be acceptable.
2989  *
2990  * Heap rewrites complicate this a bit, check rewriteheap.c for
2991  * details.
2992  * -------------------------------------------------------------------------
2993  */
2994 
2995 /* struct for qsort()ing mapping files by lsn somewhat efficiently */
2996 typedef struct RewriteMappingFile
2997 {
3001 
3002 #if NOT_USED
3003 static void
3004 DisplayMapping(HTAB *tuplecid_data)
3005 {
3006  HASH_SEQ_STATUS hstat;
3008 
3009  hash_seq_init(&hstat, tuplecid_data);
3010  while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
3011  {
3012  elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
3013  ent->key.relnode.dbNode,
3014  ent->key.relnode.spcNode,
3015  ent->key.relnode.relNode,
3018  ent->cmin,
3019  ent->cmax
3020  );
3021  }
3022 }
3023 #endif
3024 
3025 /*
3026  * Apply a single mapping file to tuplecid_data.
3027  *
3028  * The mapping file has to have been verified to be a) committed b) for our
3029  * transaction c) applied in LSN order.
3030  */
3031 static void
3032 ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
3033 {
3034  char path[MAXPGPATH];
3035  int fd;
3036  int readBytes;
3038 
3039  sprintf(path, "pg_logical/mappings/%s", fname);
3040  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
3041  if (fd < 0)
3042  ereport(ERROR,
3044  errmsg("could not open file \"%s\": %m", path)));
3045 
3046  while (true)
3047  {
3050  ReorderBufferTupleCidEnt *new_ent;
3051  bool found;
3052 
3053  /* be careful about padding */
3054  memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
3055 
3056  /* read all mappings till the end of the file */
3058  readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
3060 
3061  if (readBytes < 0)
3062  ereport(ERROR,
3064  errmsg("could not read file \"%s\": %m",
3065  path)));
3066  else if (readBytes == 0) /* EOF */
3067  break;
3068  else if (readBytes != sizeof(LogicalRewriteMappingData))
3069  ereport(ERROR,
3071  errmsg("could not read from file \"%s\": read %d instead of %d bytes",
3072  path, readBytes,
3073  (int32) sizeof(LogicalRewriteMappingData))));
3074 
3075  key.relnode = map.old_node;
3076  ItemPointerCopy(&map.old_tid,
3077  &key.tid);
3078 
3079 
3080  ent = (ReorderBufferTupleCidEnt *)
3081  hash_search(tuplecid_data,
3082  (void *) &key,
3083  HASH_FIND,
3084  NULL);
3085 
3086  /* no existing mapping, no need to update */
3087  if (!ent)
3088  continue;
3089 
3090  key.relnode = map.new_node;
3091  ItemPointerCopy(&map.new_tid,
3092  &key.tid);
3093 
3094  new_ent = (ReorderBufferTupleCidEnt *)
3095  hash_search(tuplecid_data,
3096  (void *) &key,
3097  HASH_ENTER,
3098  &found);
3099 
3100  if (found)
3101  {
3102  /*
3103  * Make sure the existing mapping makes sense. We sometime update
3104  * old records that did not yet have a cmax (e.g. pg_class' own
3105  * entry while rewriting it) during rewrites, so allow that.
3106  */
3107  Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
3108  Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
3109  }
3110  else
3111  {
3112  /* update mapping */
3113  new_ent->cmin = ent->cmin;
3114  new_ent->cmax = ent->cmax;
3115  new_ent->combocid = ent->combocid;
3116  }
3117  }
3118 }
3119 
3120 
3121 /*
3122  * Check whether the TransactionOId 'xid' is in the pre-sorted array 'xip'.
3123  */
3124 static bool
3126 {
3127  return bsearch(&xid, xip, num,
3128  sizeof(TransactionId), xidComparator) != NULL;
3129 }
3130 
3131 /*
3132  * qsort() comparator for sorting RewriteMappingFiles in LSN order.
3133  */
3134 static int
3135 file_sort_by_lsn(const void *a_p, const void *b_p)
3136 {
3137  RewriteMappingFile *a = *(RewriteMappingFile **) a_p;
3138  RewriteMappingFile *b = *(RewriteMappingFile **) b_p;
3139 
3140  if (a->lsn < b->lsn)
3141  return -1;
3142  else if (a->lsn > b->lsn)
3143  return 1;
3144  return 0;
3145 }
3146 
3147 /*
3148  * Apply any existing logical remapping files if there are any targeted at our
3149  * transaction for relid.
3150  */
3151 static void
3153 {
3154  DIR *mapping_dir;
3155  struct dirent *mapping_de;
3156  List *files = NIL;
3157  ListCell *file;
3158  RewriteMappingFile **files_a;
3159  size_t off;
3160  Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
3161 
3162  mapping_dir = AllocateDir("pg_logical/mappings");
3163  while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
3164  {
3165  Oid f_dboid;
3166  Oid f_relid;
3167  TransactionId f_mapped_xid;
3168  TransactionId f_create_xid;
3169  XLogRecPtr f_lsn;
3170  uint32 f_hi,
3171  f_lo;
3172  RewriteMappingFile *f;
3173 
3174  if (strcmp(mapping_de->d_name, ".") == 0 ||
3175  strcmp(mapping_de->d_name, "..") == 0)
3176  continue;
3177 
3178  /* Ignore files that aren't ours */
3179  if (strncmp(mapping_de->d_name, "map-", 4) != 0)
3180  continue;
3181 
3182  if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
3183  &f_dboid, &f_relid, &f_hi, &f_lo,
3184  &f_mapped_xid, &f_create_xid) != 6)
3185  elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
3186 
3187  f_lsn = ((uint64) f_hi) << 32 | f_lo;
3188 
3189  /* mapping for another database */
3190  if (f_dboid != dboid)
3191  continue;
3192 
3193  /* mapping for another relation */
3194  if (f_relid != relid)
3195  continue;
3196 
3197  /* did the creating transaction abort? */
3198  if (!TransactionIdDidCommit(f_create_xid))
3199  continue;
3200 
3201  /* not for our transaction */
3202  if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
3203  continue;
3204 
3205  /* ok, relevant, queue for apply */
3206  f = palloc(sizeof(RewriteMappingFile));
3207  f->lsn = f_lsn;
3208  strcpy(f->fname, mapping_de->d_name);
3209  files = lappend(files, f);
3210  }
3211  FreeDir(mapping_dir);
3212 
3213  /* build array we can easily sort */
3214  files_a = palloc(list_length(files) * sizeof(RewriteMappingFile *));
3215  off = 0;
3216  foreach(file, files)
3217  {
3218  files_a[off++] = lfirst(file);
3219  }
3220 
3221  /* sort files so we apply them in LSN order */
3222  qsort(files_a, list_length(files), sizeof(RewriteMappingFile *),
3224 
3225  for (off = 0; off < list_length(files); off++)
3226  {
3227  RewriteMappingFile *f = files_a[off];
3228 
3229  elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
3230  snapshot->subxip[0]);
3231  ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
3232  pfree(f);
3233  }
3234 }
3235 
3236 /*
3237  * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
3238  * combocids.
3239  */
3240 bool
3242  Snapshot snapshot,
3243  HeapTuple htup, Buffer buffer,
3244  CommandId *cmin, CommandId *cmax)
3245 {
3248  ForkNumber forkno;
3249  BlockNumber blockno;
3250  bool updated_mapping = false;
3251 
3252  /* be careful about padding */
3253  memset(&key, 0, sizeof(key));
3254 
3255  Assert(!BufferIsLocal(buffer));
3256 
3257  /*
3258  * get relfilenode from the buffer, no convenient way to access it other
3259  * than that.
3260  */
3261  BufferGetTag(buffer, &key.relnode, &forkno, &blockno);
3262 
3263  /* tuples can only be in the main fork */
3264  Assert(forkno == MAIN_FORKNUM);
3265  Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
3266 
3267  ItemPointerCopy(&htup->t_self,
3268  &key.tid);
3269 
3270 restart:
3271  ent = (ReorderBufferTupleCidEnt *)
3272  hash_search(tuplecid_data,
3273  (void *) &key,
3274  HASH_FIND,
3275  NULL);
3276 
3277  /*
3278  * failed to find a mapping, check whether the table was rewritten and
3279  * apply mapping if so, but only do that once - there can be no new
3280  * mappings while we are in here since we have to hold a lock on the
3281  * relation.
3282  */
3283  if (ent == NULL && !updated_mapping)
3284  {
3285  UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
3286  /* now check but don't update for a mapping again */
3287  updated_mapping = true;
3288  goto restart;
3289  }
3290  else if (ent == NULL)
3291  return false;
3292 
3293  if (cmin)
3294  *cmin = ent->cmin;
3295  if (cmax)
3296  *cmax = ent->cmax;
3297  return true;
3298 }
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
XLogRecPtr first_lsn
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
#define NIL
Definition: pg_list.h:69
uint32 CommandId
Definition: c.h:405
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
TimestampTz commit_time
struct ReorderBufferToastEnt ReorderBufferToastEnt
void AbortCurrentTransaction(void)
Definition: xact.c:2992
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
#define SizeofHeapTupleHeader
Definition: htup_details.h:170
bool IsToastRelation(Relation relation)
Definition: catalog.c:136
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:810
void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
#define relpathperm(rnode, forknum)
Definition: relpath.h:67
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Snapshot base_snapshot
ReorderBufferApplyChangeCB apply_change
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
HeapTupleData * HeapTuple
Definition: htup.h:70
#define DEBUG1
Definition: elog.h:25
#define VALGRIND_MAKE_MEM_DEFINED(addr, size)
Definition: memdebug.h:26
dlist_node * cur
Definition: ilist.h:180
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
RepOriginId origin_id
void StartupReorderBuffer(void)
#define VARDATA(PTR)
Definition: postgres.h:303
#define fastgetattr(tup, attnum, tupleDesc, isnull)
Definition: htup_details.h:719
static int32 next
Definition: blutils.c:210
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
static const Size max_cached_tuplebufs
void ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:524
uint32 TransactionId
Definition: c.h:391
void ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
bool copied
Definition: snapshot.h:96
void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
MemoryContext hcxt
Definition: hsearch.h:78
#define DatumGetInt32(X)
Definition: postgres.h:478
#define RelationGetDescr(relation)
Definition: rel.h:428
#define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)
Definition: tuptoaster.h:111
#define DEBUG3
Definition: elog.h:23
#define write(a, b, c)
Definition: win32.h:14
#define VARHDRSZ_SHORT
Definition: postgres.h:269
TransactionId by_txn_last_xid
#define VARSIZE(PTR)
Definition: postgres.h:304
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
Definition: memdebug.h:28
int64 TimestampTz
Definition: timestamp.h:39
#define PointerGetDatum(X)
Definition: postgres.h:562
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:84
ReorderBufferTXN * txn
static bool TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
#define VARHDRSZ
Definition: c.h:439
#define dlist_foreach(iter, lhead)
Definition: ilist.h:507
XLogRecPtr current_restart_decoding_lsn
#define DatumGetObjectId(X)
Definition: postgres.h:506
char * pstrdup(const char *in)
Definition: mcxt.c:1076
static ReorderBufferTXN * ReorderBufferGetTXN(ReorderBuffer *rb)
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:317
Oid RelidByRelfilenode(Oid reltablespace, Oid relfilenode)
union ReorderBufferChange::@93 data
void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void ReorderBufferFree(ReorderBuffer *rb)
static void slist_push_head(slist_head *head, slist_node *node)
Definition: ilist.h:574
uint16 RepOriginId
Definition: xlogdefs.h:51
Size entrysize
Definition: hsearch.h:73
struct cursor * cur
Definition: ecpg.c:28
char fname[MAXPGPATH]
int32 va_rawsize
Definition: postgres.h:70
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4473
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:183
void binaryheap_replace_first(binaryheap *heap, Datum d)
Definition: binaryheap.c:204
void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileNode node, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
uint32 BlockNumber
Definition: block.h:31
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:2011
ReorderBufferCommitCB commit
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:695
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:575
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:902
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:125
ReplicationSlotPersistentData data
Definition: slot.h:120
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, int *fd, XLogSegNo *segno)
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
struct SnapshotData * Snapshot
Definition: snapshot.h:23
Form_pg_class rd_rel
Definition: rel.h:114
unsigned int Oid
Definition: postgres_ext.h:31
XLogRecPtr base_snapshot_lsn
Definition: dirent.h:9
uint32 regd_count
Definition: snapshot.h:110
#define PANIC
Definition: elog.h:53
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
void binaryheap_add_unordered(binaryheap *heap, Datum d)
Definition: binaryheap.c:110
MemoryContext change_context
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define VARDATA_EXTERNAL(PTR)
Definition: postgres.h:311
#define PG_BINARY
Definition: c.h:1027
int natts
Definition: tupdesc.h:73
XLogRecPtr origin_lsn
static void ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
signed int int32
Definition: c.h:246
static int file_sort_by_lsn(const void *a_p, const void *b_p)
#define XLogSegNoOffsetToRecPtr(segno, offset, dest)
Definition: xlog_internal.h:95
#define FirstCommandId
Definition: c.h:407
void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
HeapTupleHeader t_data
Definition: htup.h:67
#define VARATT_IS_EXTERNAL(PTR)
Definition: postgres.h:314
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:174
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:421
Definition: dynahash.c:208
#define ReorderBufferTupleBufData(p)
Definition: reorderbuffer.h:36
#define dlist_container(type, membername, ptr)
Definition: ilist.h:477
void pfree(void *pointer)
Definition: mcxt.c:949
static void slist_init(slist_head *head)
Definition: ilist.h:554
char * Pointer
Definition: c.h:235
Definition: dirent.c:25
#define ERROR
Definition: elog.h:43
#define VARATT_IS_SHORT(PTR)
Definition: postgres.h:325
dlist_head changes
Datum binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:159
#define MAXPGPATH
ItemPointerData t_self
Definition: htup.h:65
ReorderBufferTupleCidKey key
Definition: reorderbuffer.c:94
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
#define DEBUG2
Definition: elog.h:24
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:418
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
struct ReorderBufferChange::@93::@94 tp
uint32 t_len
Definition: htup.h:64
#define MaxHeapTupleSize
Definition: htup_details.h:561
struct varlena * reconstructed
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4284
#define SET_VARTAG_EXTERNAL(PTR, tag)
Definition: postgres.h:332
uint64 XLogSegNo
Definition: xlogdefs.h:34
int OpenTransientFile(FileName fileName, int fileFlags, int fileMode)
Definition: fd.c:2144
int errcode_for_file_access(void)
Definition: elog.c:598
HeapTupleData tuple
Definition: reorderbuffer.h:27
struct SnapshotData SnapshotData
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:435
#define InvalidTransactionId
Definition: transam.h:31
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:187
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
unsigned int uint32
Definition: c.h:258
XLogRecPtr final_lsn
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2335
Oid t_tableOid
Definition: htup.h:66
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1236
void RelationClose(Relation relation)
Definition: relcache.c:2164
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
static void dlist_delete(dlist_node *node)
Definition: ilist.h:358
ReorderBufferMessageCB message
int unlink(const char *filename)
#define ereport(elevel, rest)
Definition: elog.h:122
int bh_size
Definition: binaryheap.h:32
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:300
TransactionId * xip
Definition: snapshot.h:79
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
#define VARSIZE_SHORT(PTR)
Definition: postgres.h:306
ForkNumber
Definition: relpath.h:24
static slist_node * slist_pop_head_node(slist_head *head)
Definition: ilist.h:596
List * lappend(List *list, void *datum)
Definition: list.c:128
static HTAB * tuplecid_data
Definition: snapmgr.c:170
int CloseTransientFile(int fd)
Definition: fd.c:2305
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
#define INDIRECT_POINTER_SIZE
Definition: tuptoaster.h:102
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change)
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:487
#define HASH_BLOBS
Definition: hsearch.h:88
ReorderBufferChange * change
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
MemoryContext context
#define slist_container(type, membername, ptr)
Definition: ilist.h:674
void ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
void * palloc0(Size size)
Definition: mcxt.c:877
static bool dlist_has_next(dlist_head *head, dlist_node *node)
Definition: ilist.h:402
#define InvalidCommandId
Definition: c.h:408
uintptr_t Datum
Definition: postgres.h:372
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
ReorderBufferTXN * by_txn_last_txn
struct ReorderBufferChange::@93::@96 tuplecid
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void cleanup(void)
Definition: bootstrap.c:860
dlist_head toplevel_by_lsn
struct RewriteMappingFile RewriteMappingFile
Oid MyDatabaseId
Definition: globals.c:77
TransactionId xid
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
bool IsSharedRelation(Oid relationId)
Definition: catalog.c:220
Size keysize
Definition: hsearch.h:72
dlist_node * cur
Definition: ilist.h:161
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:741
static void AssertTXNLsnOrder(ReorderBuffer *rb)
void ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
#define InvalidOid
Definition: postgres_ext.h:36
static void dlist_init(dlist_head *head)
Definition: ilist.h:278
CommandId curcid
Definition: snapshot.h:98
struct ReorderBufferIterTXNState ReorderBufferIterTXNState
static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
struct ReorderBufferDiskChange ReorderBufferDiskChange
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:126
#define free(a)
Definition: header.h:65
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
#define PG_CATCH()
Definition: elog.h:293
ReplicationSlot * MyReplicationSlot
Definition: slot.c:96
#define XLByteToSeg(xlrp, logSegNo)
ItemPointerData new_tid
Definition: rewriteheap.h:40
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:664
#define lfirst(lc)
Definition: pg_list.h:106
ReorderBufferTXN * ReorderBufferGetOldestTXN(ReorderBuffer *rb)
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
Definition: regguts.h:298
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2401
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
Definition: tuptoaster.h:121
int32 va_extsize
Definition: postgres.h:71
XLogRecPtr end_lsn
void StartTransactionCommand(void)
Definition: xact.c:2681
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:214
void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
static bool dlist_is_empty(dlist_head *head)
Definition: ilist.h:289
size_t Size
Definition: c.h:350
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1212
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:69
SharedInvalidationMessage * invalidations
static int list_length(const List *l)
Definition: pg_list.h:89
void BeginInternalSubTransaction(char *name)
Definition: xact.c:4179
#define BufferIsLocal(buffer)
Definition: buf.h:37
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:194
#define ItemPointerGetOffsetNumber(pointer)
Definition: itemptr.h:95
void ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
#define PG_RE_THROW()
Definition: elog.h:314
ReorderBuffer * ReorderBufferAllocate(void)
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1385
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:962
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1375
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
struct varlena * pointer
Definition: postgres.h:87
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change)
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:434
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
dlist_head subtxns
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:33
#define VARATT_IS_EXTENDED(PTR)
Definition: postgres.h:326
#define DatumGetPointer(X)
Definition: postgres.h:555
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:936
Size nr_cached_tuplebufs
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:554
#define Int32GetDatum(X)
Definition: postgres.h:485
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
uint32 xcnt
Definition: snapshot.h:80
void * palloc(Size size)
Definition: mcxt.c:848
int errmsg(const char *fmt,...)
Definition: elog.c:797
static void UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:706
static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBufferTXN * txn
Definition: reorderbuffer.c:82
void ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:368
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:1995
int i
XLogRecPtr restart_decoding_lsn
#define NameStr(name)
Definition: c.h:493
#define SET_VARSIZE_COMPRESSED(PTR, len)
Definition: postgres.h:330
void * arg
Datum binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:174
static const Size max_changes_in_memory
Definition: c.h:433
struct ReorderBufferChange::@93::@95 msg
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
#define SET_VARSIZE(PTR, len)
Definition: postgres.h:328
char d_name[MAX_PATH]
Definition: dirent.h:14
#define elog
Definition: elog.h:219
#define ItemPointerGetBlockNumber(pointer)
Definition: itemptr.h:76
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define qsort(a, b, c, d)
Definition: port.h:443
#define TransactionIdIsValid(xid)
Definition: transam.h:41
ReorderBufferChange change
ReorderBufferBeginCB begin
static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
void BufferGetTag(Buffer buffer, RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
Definition: bufmgr.c:2626
#define PG_TRY()
Definition: elog.h:284
#define XLByteInSeg(xlrp, logSegNo)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
#define RELKIND_SEQUENCE
Definition: pg_class.h:162
#define lstat(path, sb)
Definition: win32.h:262
Definition: pg_list.h:45
int Buffer
Definition: buf.h:23
static ReorderBufferIterTXNState * ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2075
#define PG_END_TRY()
Definition: elog.h:300
#define read(a, b, c)
Definition: win32.h:13
int FreeDir(DIR *dir)
Definition: fd.c:2444
struct HeapTupleData HeapTupleData
#define offsetof(type, field)
Definition: c.h:549
MemoryContext txn_context
dlist_head tuplecids
slist_head cached_tuplebufs
#define ItemPointerCopy(fromPointer, toPointer)
Definition: itemptr.h:139
TransactionId * subxip
Definition: snapshot.h:91
uint32 active_count
Definition: snapshot.h:109
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:138
int32 subxcnt
Definition: snapshot.h:92
void ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
ReorderBufferTupleBuf * ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
bool ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
ItemPointerData old_tid
Definition: rewriteheap.h:39