PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
reorderbuffer.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * reorderbuffer.c
4 * PostgreSQL logical replay/reorder buffer management
5 *
6 *
7 * Copyright (c) 2012-2025, PostgreSQL Global Development Group
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/replication/logical/reorderbuffer.c
12 *
13 * NOTES
14 * This module gets handed individual pieces of transactions in the order
15 * they are written to the WAL and is responsible to reassemble them into
16 * toplevel transaction sized pieces. When a transaction is completely
17 * reassembled - signaled by reading the transaction commit record - it
18 * will then call the output plugin (cf. ReorderBufferCommit()) with the
19 * individual changes. The output plugins rely on snapshots built by
20 * snapbuild.c which hands them to us.
21 *
22 * Transactions and subtransactions/savepoints in postgres are not
23 * immediately linked to each other from outside the performing
24 * backend. Only at commit/abort (or special xact_assignment records) they
25 * are linked together. Which means that we will have to splice together a
26 * toplevel transaction from its subtransactions. To do that efficiently we
27 * build a binary heap indexed by the smallest current lsn of the individual
28 * subtransactions' changestreams. As the individual streams are inherently
29 * ordered by LSN - since that is where we build them from - the transaction
30 * can easily be reassembled by always using the subtransaction with the
31 * smallest current LSN from the heap.
32 *
33 * In order to cope with large transactions - which can be several times as
34 * big as the available memory - this module supports spooling the contents
35 * of large transactions to disk. When the transaction is replayed the
36 * contents of individual (sub-)transactions will be read from disk in
37 * chunks.
38 *
39 * This module also has to deal with reassembling toast records from the
40 * individual chunks stored in WAL. When a new (or initial) version of a
41 * tuple is stored in WAL it will always be preceded by the toast chunks
42 * emitted for the columns stored out of line. Within a single toplevel
43 * transaction there will be no other data carrying records between a row's
44 * toast chunks and the row data itself. See ReorderBufferToast* for
45 * details.
46 *
47 * ReorderBuffer uses two special memory context types - SlabContext for
48 * allocations of fixed-length structures (changes and transactions), and
49 * GenerationContext for the variable-length transaction data (allocated
50 * and freed in groups with similar lifespans).
51 *
52 * To limit the amount of memory used by decoded changes, we track memory
53 * used at the reorder buffer level (i.e. total amount of memory), and for
54 * each transaction. When the total amount of used memory exceeds the
55 * limit, the transaction consuming the most memory is then serialized to
56 * disk.
57 *
58 * Only decoded changes are evicted from memory (spilled to disk), not the
59 * transaction records. The number of toplevel transactions is limited,
60 * but a transaction with many subtransactions may still consume significant
61 * amounts of memory. However, the transaction records are fairly small and
62 * are not included in the memory limit.
63 *
64 * The current eviction algorithm is very simple - the transaction is
65 * picked merely by size, while it might be useful to also consider age
66 * (LSN) of the changes for example. With the new Generational memory
67 * allocator, evicting the oldest changes would make it more likely the
68 * memory gets actually freed.
69 *
70 * We use a max-heap with transaction size as the key to efficiently find
71 * the largest transaction. We update the max-heap whenever the memory
72 * counter is updated; however transactions with size 0 are not stored in
73 * the heap, because they have no changes to evict.
74 *
75 * We still rely on max_changes_in_memory when loading serialized changes
76 * back into memory. At that point we can't use the memory limit directly
77 * as we load the subxacts independently. One option to deal with this
78 * would be to count the subxacts, and allow each to allocate 1/N of the
79 * memory limit. That however does not seem very appealing, because with
80 * many subtransactions it may easily cause thrashing (short cycles of
81 * deserializing and applying very few changes). We probably should give
82 * a bit more memory to the oldest subtransactions, because it's likely
83 * they are the source for the next sequence of changes.
84 *
85 * -------------------------------------------------------------------------
86 */
87#include "postgres.h"
88
89#include <unistd.h>
90#include <sys/stat.h>
91
92#include "access/detoast.h"
93#include "access/heapam.h"
94#include "access/rewriteheap.h"
95#include "access/transam.h"
96#include "access/xact.h"
98#include "catalog/catalog.h"
99#include "common/int.h"
100#include "lib/binaryheap.h"
101#include "miscadmin.h"
102#include "pgstat.h"
103#include "replication/logical.h"
105#include "replication/slot.h"
106#include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
107#include "storage/bufmgr.h"
108#include "storage/fd.h"
109#include "storage/procarray.h"
110#include "storage/sinval.h"
111#include "utils/builtins.h"
112#include "utils/memutils.h"
113#include "utils/rel.h"
115
116/* entry for a hash table we use to map from xid to our transaction state */
118{
122
123/* data structures for (relfilelocator, ctid) => (cmin, cmax) mapping */
125{
129
131{
135 CommandId combocid; /* just for debugging */
137
138/* Virtual file descriptor with file offset tracking */
139typedef struct TXNEntryFile
140{
141 File vfd; /* -1 when the file is closed */
142 off_t curOffset; /* offset for next write or read. Reset to 0
143 * when vfd is opened. */
145
146/* k-way in-order change iteration support structures */
148{
155
157{
163
164/* toast datastructures */
166{
167 Oid chunk_id; /* toast_table.chunk_id */
168 int32 last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
169 * have seen */
170 Size num_chunks; /* number of chunks we've already seen */
171 Size size; /* combined size of chunks seen */
172 dlist_head chunks; /* linked list of chunks */
173 struct varlena *reconstructed; /* reconstructed varlena now pointed to in
174 * main tup */
176
177/* Disk serialization support datastructures */
179{
182 /* data follows */
184
185#define IsSpecInsert(action) \
186( \
187 ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \
188)
189#define IsSpecConfirmOrAbort(action) \
190( \
191 (((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) || \
192 ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT)) \
193)
194#define IsInsertOrUpdate(action) \
195( \
196 (((action) == REORDER_BUFFER_CHANGE_INSERT) || \
197 ((action) == REORDER_BUFFER_CHANGE_UPDATE) || \
198 ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT)) \
199)
200
201/*
202 * Maximum number of changes kept in memory, per transaction. After that,
203 * changes are spooled to disk.
204 *
205 * The current value should be sufficient to decode the entire transaction
206 * without hitting disk in OLTP workloads, while starting to spool to disk in
207 * other workloads reasonably fast.
208 *
209 * At some point in the future it probably makes sense to have a more elaborate
210 * resource management here, but it's not entirely clear what that would look
211 * like.
212 */
214static const Size max_changes_in_memory = 4096; /* XXX for restore only */
215
216/* GUC variable */
218
219/* ---------------------------------------
220 * primary reorderbuffer support routines
221 * ---------------------------------------
222 */
226 TransactionId xid, bool create, bool *is_new,
227 XLogRecPtr lsn, bool create_as_top);
229 ReorderBufferTXN *subtxn);
230
231static void AssertTXNLsnOrder(ReorderBuffer *rb);
232
233/* ---------------------------------------
234 * support functions for lsn-order iterating over the ->changes of a
235 * transaction and its subtransactions
236 *
237 * used for iteration over the k-way heap merge of a transaction and its
238 * subtransactions
239 * ---------------------------------------
240 */
242 ReorderBufferIterTXNState *volatile *iter_state);
247
248/*
249 * ---------------------------------------
250 * Disk serialization support functions
251 * ---------------------------------------
252 */
256 int fd, ReorderBufferChange *change);
258 TXNEntryFile *file, XLogSegNo *segno);
260 char *data);
263 bool txn_prepared);
266static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
267static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
268 TransactionId xid, XLogSegNo segno);
269static int ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg);
270
271static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
273 ReorderBufferTXN *txn, CommandId cid);
274
275/*
276 * ---------------------------------------
277 * Streaming support functions
278 * ---------------------------------------
279 */
280static inline bool ReorderBufferCanStream(ReorderBuffer *rb);
281static inline bool ReorderBufferCanStartStreaming(ReorderBuffer *rb);
284
285/* ---------------------------------------
286 * toast reassembly support
287 * ---------------------------------------
288 */
292 Relation relation, ReorderBufferChange *change);
294 Relation relation, ReorderBufferChange *change);
295
296/*
297 * ---------------------------------------
298 * memory accounting
299 * ---------------------------------------
300 */
303 ReorderBufferChange *change,
304 ReorderBufferTXN *txn,
305 bool addition, Size sz);
306
307/*
308 * Allocate a new ReorderBuffer and clean out any old serialized state from
309 * prior ReorderBuffer instances for the same slot.
310 */
313{
314 ReorderBuffer *buffer;
315 HASHCTL hash_ctl;
316 MemoryContext new_ctx;
317
318 Assert(MyReplicationSlot != NULL);
319
320 /* allocate memory in own context, to have better accountability */
322 "ReorderBuffer",
324
325 buffer =
326 (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
327
328 memset(&hash_ctl, 0, sizeof(hash_ctl));
329
330 buffer->context = new_ctx;
331
332 buffer->change_context = SlabContextCreate(new_ctx,
333 "Change",
335 sizeof(ReorderBufferChange));
336
337 buffer->txn_context = SlabContextCreate(new_ctx,
338 "TXN",
340 sizeof(ReorderBufferTXN));
341
342 /*
343 * To minimize memory fragmentation caused by long-running transactions
344 * with changes spanning multiple memory blocks, we use a single
345 * fixed-size memory block for decoded tuple storage. The performance
346 * testing showed that the default memory block size maintains logical
347 * decoding performance without causing fragmentation due to concurrent
348 * transactions. One might think that we can use the max size as
349 * SLAB_LARGE_BLOCK_SIZE but the test also showed it doesn't help resolve
350 * the memory fragmentation.
351 */
352 buffer->tup_context = GenerationContextCreate(new_ctx,
353 "Tuples",
357
358 hash_ctl.keysize = sizeof(TransactionId);
359 hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
360 hash_ctl.hcxt = buffer->context;
361
362 buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
364
366 buffer->by_txn_last_txn = NULL;
367
368 buffer->outbuf = NULL;
369 buffer->outbufsize = 0;
370 buffer->size = 0;
371
372 /* txn_heap is ordered by transaction size */
374
375 buffer->spillTxns = 0;
376 buffer->spillCount = 0;
377 buffer->spillBytes = 0;
378 buffer->streamTxns = 0;
379 buffer->streamCount = 0;
380 buffer->streamBytes = 0;
381 buffer->totalTxns = 0;
382 buffer->totalBytes = 0;
383
385
386 dlist_init(&buffer->toplevel_by_lsn);
388 dclist_init(&buffer->catchange_txns);
389
390 /*
391 * Ensure there's no stale data from prior uses of this slot, in case some
392 * prior exit avoided calling ReorderBufferFree. Failure to do this can
393 * produce duplicated txns, and it's very cheap if there's nothing there.
394 */
396
397 return buffer;
398}
399
400/*
401 * Free a ReorderBuffer
402 */
403void
405{
406 MemoryContext context = rb->context;
407
408 /*
409 * We free separately allocated data by entirely scrapping reorderbuffer's
410 * memory context.
411 */
412 MemoryContextDelete(context);
413
414 /* Free disk space used by unconsumed reorder buffers */
416}
417
418/*
419 * Allocate a new ReorderBufferTXN.
420 */
421static ReorderBufferTXN *
423{
424 ReorderBufferTXN *txn;
425
426 txn = (ReorderBufferTXN *)
428
429 memset(txn, 0, sizeof(ReorderBufferTXN));
430
431 dlist_init(&txn->changes);
432 dlist_init(&txn->tuplecids);
433 dlist_init(&txn->subtxns);
434
435 /* InvalidCommandId is not zero, so set it explicitly */
437 txn->output_plugin_private = NULL;
438
439 return txn;
440}
441
442/*
443 * Free a ReorderBufferTXN.
444 */
445static void
447{
448 /* clean the lookup cache if we were cached (quite likely) */
449 if (rb->by_txn_last_xid == txn->xid)
450 {
452 rb->by_txn_last_txn = NULL;
453 }
454
455 /* free data that's contained */
456
457 if (txn->gid != NULL)
458 {
459 pfree(txn->gid);
460 txn->gid = NULL;
461 }
462
463 if (txn->tuplecid_hash != NULL)
464 {
466 txn->tuplecid_hash = NULL;
467 }
468
469 if (txn->invalidations)
470 {
471 pfree(txn->invalidations);
472 txn->invalidations = NULL;
473 }
474
475 /* Reset the toast hash */
477
478 /* All changes must be deallocated */
479 Assert(txn->size == 0);
480
481 pfree(txn);
482}
483
484/*
485 * Allocate a ReorderBufferChange.
486 */
489{
490 ReorderBufferChange *change;
491
492 change = (ReorderBufferChange *)
494
495 memset(change, 0, sizeof(ReorderBufferChange));
496 return change;
497}
498
499/*
500 * Free a ReorderBufferChange and update memory accounting, if requested.
501 */
502void
504 bool upd_mem)
505{
506 /* update memory accounting info */
507 if (upd_mem)
508 ReorderBufferChangeMemoryUpdate(rb, change, NULL, false,
510
511 /* free contained data */
512 switch (change->action)
513 {
518 if (change->data.tp.newtuple)
519 {
521 change->data.tp.newtuple = NULL;
522 }
523
524 if (change->data.tp.oldtuple)
525 {
527 change->data.tp.oldtuple = NULL;
528 }
529 break;
531 if (change->data.msg.prefix != NULL)
532 pfree(change->data.msg.prefix);
533 change->data.msg.prefix = NULL;
534 if (change->data.msg.message != NULL)
535 pfree(change->data.msg.message);
536 change->data.msg.message = NULL;
537 break;
539 if (change->data.inval.invalidations)
540 pfree(change->data.inval.invalidations);
541 change->data.inval.invalidations = NULL;
542 break;
544 if (change->data.snapshot)
545 {
547 change->data.snapshot = NULL;
548 }
549 break;
550 /* no data in addition to the struct itself */
552 if (change->data.truncate.relids != NULL)
553 {
555 change->data.truncate.relids = NULL;
556 }
557 break;
562 break;
563 }
564
565 pfree(change);
566}
567
568/*
569 * Allocate a HeapTuple fitting a tuple of size tuple_len (excluding header
570 * overhead).
571 */
574{
575 HeapTuple tuple;
576 Size alloc_len;
577
578 alloc_len = tuple_len + SizeofHeapTupleHeader;
579
581 HEAPTUPLESIZE + alloc_len);
582 tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
583
584 return tuple;
585}
586
587/*
588 * Free a HeapTuple returned by ReorderBufferAllocTupleBuf().
589 */
590void
592{
593 pfree(tuple);
594}
595
596/*
597 * Allocate an array for relids of truncated relations.
598 *
599 * We use the global memory context (for the whole reorder buffer), because
600 * none of the existing ones seems like a good match (some are SLAB, so we
601 * can't use those, and tup_context is meant for tuple data, not relids). We
602 * could add yet another context, but it seems like an overkill - TRUNCATE is
603 * not particularly common operation, so it does not seem worth it.
604 */
605Oid *
607{
608 Oid *relids;
609 Size alloc_len;
610
611 alloc_len = sizeof(Oid) * nrelids;
612
613 relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
614
615 return relids;
616}
617
618/*
619 * Free an array of relids.
620 */
621void
623{
624 pfree(relids);
625}
626
627/*
628 * Return the ReorderBufferTXN from the given buffer, specified by Xid.
629 * If create is true, and a transaction doesn't already exist, create it
630 * (with the given LSN, and as top transaction if that's specified);
631 * when this happens, is_new is set to true.
632 */
633static ReorderBufferTXN *
635 bool *is_new, XLogRecPtr lsn, bool create_as_top)
636{
637 ReorderBufferTXN *txn;
639 bool found;
640
642
643 /*
644 * Check the one-entry lookup cache first
645 */
647 rb->by_txn_last_xid == xid)
648 {
649 txn = rb->by_txn_last_txn;
650
651 if (txn != NULL)
652 {
653 /* found it, and it's valid */
654 if (is_new)
655 *is_new = false;
656 return txn;
657 }
658
659 /*
660 * cached as non-existent, and asked not to create? Then nothing else
661 * to do.
662 */
663 if (!create)
664 return NULL;
665 /* otherwise fall through to create it */
666 }
667
668 /*
669 * If the cache wasn't hit or it yielded a "does-not-exist" and we want to
670 * create an entry.
671 */
672
673 /* search the lookup table */
676 &xid,
677 create ? HASH_ENTER : HASH_FIND,
678 &found);
679 if (found)
680 txn = ent->txn;
681 else if (create)
682 {
683 /* initialize the new entry, if creation was requested */
684 Assert(ent != NULL);
686
687 ent->txn = ReorderBufferAllocTXN(rb);
688 ent->txn->xid = xid;
689 txn = ent->txn;
690 txn->first_lsn = lsn;
692
693 if (create_as_top)
694 {
697 }
698 }
699 else
700 txn = NULL; /* not found and not asked to create */
701
702 /* update cache */
703 rb->by_txn_last_xid = xid;
704 rb->by_txn_last_txn = txn;
705
706 if (is_new)
707 *is_new = !found;
708
709 Assert(!create || txn != NULL);
710 return txn;
711}
712
713/*
714 * Record the partial change for the streaming of in-progress transactions. We
715 * can stream only complete changes so if we have a partial change like toast
716 * table insert or speculative insert then we mark such a 'txn' so that it
717 * can't be streamed. We also ensure that if the changes in such a 'txn' can
718 * be streamed and are above logical_decoding_work_mem threshold then we stream
719 * them as soon as we have a complete change.
720 */
721static void
723 ReorderBufferChange *change,
724 bool toast_insert)
725{
726 ReorderBufferTXN *toptxn;
727
728 /*
729 * The partial changes need to be processed only while streaming
730 * in-progress transactions.
731 */
732 if (!ReorderBufferCanStream(rb))
733 return;
734
735 /* Get the top transaction. */
736 toptxn = rbtxn_get_toptxn(txn);
737
738 /*
739 * Indicate a partial change for toast inserts. The change will be
740 * considered as complete once we get the insert or update on the main
741 * table and we are sure that the pending toast chunks are not required
742 * anymore.
743 *
744 * If we allow streaming when there are pending toast chunks then such
745 * chunks won't be released till the insert (multi_insert) is complete and
746 * we expect the txn to have streamed all changes after streaming. This
747 * restriction is mainly to ensure the correctness of streamed
748 * transactions and it doesn't seem worth uplifting such a restriction
749 * just to allow this case because anyway we will stream the transaction
750 * once such an insert is complete.
751 */
752 if (toast_insert)
754 else if (rbtxn_has_partial_change(toptxn) &&
755 IsInsertOrUpdate(change->action) &&
757 toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
758
759 /*
760 * Indicate a partial change for speculative inserts. The change will be
761 * considered as complete once we get the speculative confirm or abort
762 * token.
763 */
764 if (IsSpecInsert(change->action))
766 else if (rbtxn_has_partial_change(toptxn) &&
768 toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
769
770 /*
771 * Stream the transaction if it is serialized before and the changes are
772 * now complete in the top-level transaction.
773 *
774 * The reason for doing the streaming of such a transaction as soon as we
775 * get the complete change for it is that previously it would have reached
776 * the memory threshold and wouldn't get streamed because of incomplete
777 * changes. Delaying such transactions would increase apply lag for them.
778 */
780 !(rbtxn_has_partial_change(toptxn)) &&
781 rbtxn_is_serialized(txn) &&
783 ReorderBufferStreamTXN(rb, toptxn);
784}
785
786/*
787 * Queue a change into a transaction so it can be replayed upon commit or will be
788 * streamed when we reach logical_decoding_work_mem threshold.
789 */
790void
792 ReorderBufferChange *change, bool toast_insert)
793{
794 ReorderBufferTXN *txn;
795
796 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
797
798 /*
799 * If we have detected that the transaction is aborted while streaming the
800 * previous changes or by checking its CLOG, there is no point in
801 * collecting further changes for it.
802 */
803 if (rbtxn_is_aborted(txn))
804 {
805 /*
806 * We don't need to update memory accounting for this change as we
807 * have not added it to the queue yet.
808 */
809 ReorderBufferFreeChange(rb, change, false);
810 return;
811 }
812
813 /*
814 * The changes that are sent downstream are considered streamable. We
815 * remember such transactions so that only those will later be considered
816 * for streaming.
817 */
818 if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
824 {
825 ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
826
828 }
829
830 change->lsn = lsn;
831 change->txn = txn;
832
834 dlist_push_tail(&txn->changes, &change->node);
835 txn->nentries++;
836 txn->nentries_mem++;
837
838 /* update memory accounting information */
839 ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
841
842 /* process partial change */
843 ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
844
845 /* check the memory limits and evict something if needed */
847}
848
849/*
850 * A transactional message is queued to be processed upon commit and a
851 * non-transactional message gets processed immediately.
852 */
853void
855 Snapshot snap, XLogRecPtr lsn,
856 bool transactional, const char *prefix,
857 Size message_size, const char *message)
858{
859 if (transactional)
860 {
861 MemoryContext oldcontext;
862 ReorderBufferChange *change;
863
865
866 /*
867 * We don't expect snapshots for transactional changes - we'll use the
868 * snapshot derived later during apply (unless the change gets
869 * skipped).
870 */
871 Assert(!snap);
872
873 oldcontext = MemoryContextSwitchTo(rb->context);
874
875 change = ReorderBufferAllocChange(rb);
877 change->data.msg.prefix = pstrdup(prefix);
878 change->data.msg.message_size = message_size;
879 change->data.msg.message = palloc(message_size);
880 memcpy(change->data.msg.message, message, message_size);
881
882 ReorderBufferQueueChange(rb, xid, lsn, change, false);
883
884 MemoryContextSwitchTo(oldcontext);
885 }
886 else
887 {
888 ReorderBufferTXN *txn = NULL;
889 volatile Snapshot snapshot_now = snap;
890
891 /* Non-transactional changes require a valid snapshot. */
892 Assert(snapshot_now);
893
894 if (xid != InvalidTransactionId)
895 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
896
897 /* setup snapshot to allow catalog access */
898 SetupHistoricSnapshot(snapshot_now, NULL);
899 PG_TRY();
900 {
901 rb->message(rb, txn, lsn, false, prefix, message_size, message);
902
904 }
905 PG_CATCH();
906 {
908 PG_RE_THROW();
909 }
910 PG_END_TRY();
911 }
912}
913
914/*
915 * AssertTXNLsnOrder
916 * Verify LSN ordering of transaction lists in the reorderbuffer
917 *
918 * Other LSN-related invariants are checked too.
919 *
920 * No-op if assertions are not in use.
921 */
922static void
924{
925#ifdef USE_ASSERT_CHECKING
927 dlist_iter iter;
928 XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
929 XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
930
931 /*
932 * Skip the verification if we don't reach the LSN at which we start
933 * decoding the contents of transactions yet because until we reach the
934 * LSN, we could have transactions that don't have the association between
935 * the top-level transaction and subtransaction yet and consequently have
936 * the same LSN. We don't guarantee this association until we try to
937 * decode the actual contents of transaction. The ordering of the records
938 * prior to the start_decoding_at LSN should have been checked before the
939 * restart.
940 */
942 return;
943
945 {
947 iter.cur);
948
949 /* start LSN must be set */
951
952 /* If there is an end LSN, it must be higher than start LSN */
953 if (cur_txn->end_lsn != InvalidXLogRecPtr)
954 Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
955
956 /* Current initial LSN must be strictly higher than previous */
957 if (prev_first_lsn != InvalidXLogRecPtr)
958 Assert(prev_first_lsn < cur_txn->first_lsn);
959
960 /* known-as-subtxn txns must not be listed */
962
963 prev_first_lsn = cur_txn->first_lsn;
964 }
965
967 {
969 base_snapshot_node,
970 iter.cur);
971
972 /* base snapshot (and its LSN) must be set */
973 Assert(cur_txn->base_snapshot != NULL);
975
976 /* current LSN must be strictly higher than previous */
977 if (prev_base_snap_lsn != InvalidXLogRecPtr)
978 Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
979
980 /* known-as-subtxn txns must not be listed */
982
983 prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
984 }
985#endif
986}
987
988/*
989 * AssertChangeLsnOrder
990 *
991 * Check ordering of changes in the (sub)transaction.
992 */
993static void
995{
996#ifdef USE_ASSERT_CHECKING
997 dlist_iter iter;
998 XLogRecPtr prev_lsn = txn->first_lsn;
999
1000 dlist_foreach(iter, &txn->changes)
1001 {
1002 ReorderBufferChange *cur_change;
1003
1004 cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
1005
1007 Assert(cur_change->lsn != InvalidXLogRecPtr);
1008 Assert(txn->first_lsn <= cur_change->lsn);
1009
1010 if (txn->end_lsn != InvalidXLogRecPtr)
1011 Assert(cur_change->lsn <= txn->end_lsn);
1012
1013 Assert(prev_lsn <= cur_change->lsn);
1014
1015 prev_lsn = cur_change->lsn;
1016 }
1017#endif
1018}
1019
1020/*
1021 * ReorderBufferGetOldestTXN
1022 * Return oldest transaction in reorderbuffer
1023 */
1026{
1027 ReorderBufferTXN *txn;
1028
1030
1032 return NULL;
1033
1035
1038 return txn;
1039}
1040
1041/*
1042 * ReorderBufferGetOldestXmin
1043 * Return oldest Xmin in reorderbuffer
1044 *
1045 * Returns oldest possibly running Xid from the point of view of snapshots
1046 * used in the transactions kept by reorderbuffer, or InvalidTransactionId if
1047 * there are none.
1048 *
1049 * Since snapshots are assigned monotonically, this equals the Xmin of the
1050 * base snapshot with minimal base_snapshot_lsn.
1051 */
1054{
1055 ReorderBufferTXN *txn;
1056
1058
1060 return InvalidTransactionId;
1061
1062 txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
1064 return txn->base_snapshot->xmin;
1065}
1066
1067void
1069{
1071}
1072
1073/*
1074 * ReorderBufferAssignChild
1075 *
1076 * Make note that we know that subxid is a subtransaction of xid, seen as of
1077 * the given lsn.
1078 */
1079void
1081 TransactionId subxid, XLogRecPtr lsn)
1082{
1083 ReorderBufferTXN *txn;
1084 ReorderBufferTXN *subtxn;
1085 bool new_top;
1086 bool new_sub;
1087
1088 txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1089 subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1090
1091 if (!new_sub)
1092 {
1093 if (rbtxn_is_known_subxact(subtxn))
1094 {
1095 /* already associated, nothing to do */
1096 return;
1097 }
1098 else
1099 {
1100 /*
1101 * We already saw this transaction, but initially added it to the
1102 * list of top-level txns. Now that we know it's not top-level,
1103 * remove it from there.
1104 */
1105 dlist_delete(&subtxn->node);
1106 }
1107 }
1108
1109 subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1110 subtxn->toplevel_xid = xid;
1111 Assert(subtxn->nsubtxns == 0);
1112
1113 /* set the reference to top-level transaction */
1114 subtxn->toptxn = txn;
1115
1116 /* add to subtransaction list */
1117 dlist_push_tail(&txn->subtxns, &subtxn->node);
1118 txn->nsubtxns++;
1119
1120 /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1122
1123 /* Verify LSN-ordering invariant */
1125}
1126
1127/*
1128 * ReorderBufferTransferSnapToParent
1129 * Transfer base snapshot from subtxn to top-level txn, if needed
1130 *
1131 * This is done if the top-level txn doesn't have a base snapshot, or if the
1132 * subtxn's base snapshot has an earlier LSN than the top-level txn's base
1133 * snapshot's LSN. This can happen if there are no changes in the toplevel
1134 * txn but there are some in the subtxn, or the first change in subtxn has
1135 * earlier LSN than first change in the top-level txn and we learned about
1136 * their kinship only now.
1137 *
1138 * The subtransaction's snapshot is cleared regardless of the transfer
1139 * happening, since it's not needed anymore in either case.
1140 *
1141 * We do this as soon as we become aware of their kinship, to avoid queueing
1142 * extra snapshots to txns known-as-subtxns -- only top-level txns will
1143 * receive further snapshots.
1144 */
1145static void
1147 ReorderBufferTXN *subtxn)
1148{
1149 Assert(subtxn->toplevel_xid == txn->xid);
1150
1151 if (subtxn->base_snapshot != NULL)
1152 {
1153 if (txn->base_snapshot == NULL ||
1154 subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
1155 {
1156 /*
1157 * If the toplevel transaction already has a base snapshot but
1158 * it's newer than the subxact's, purge it.
1159 */
1160 if (txn->base_snapshot != NULL)
1161 {
1164 }
1165
1166 /*
1167 * The snapshot is now the top transaction's; transfer it, and
1168 * adjust the list position of the top transaction in the list by
1169 * moving it to where the subtransaction is.
1170 */
1171 txn->base_snapshot = subtxn->base_snapshot;
1172 txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
1174 &txn->base_snapshot_node);
1175
1176 /*
1177 * The subtransaction doesn't have a snapshot anymore (so it
1178 * mustn't be in the list.)
1179 */
1180 subtxn->base_snapshot = NULL;
1183 }
1184 else
1185 {
1186 /* Base snap of toplevel is fine, so subxact's is not needed */
1189 subtxn->base_snapshot = NULL;
1191 }
1192 }
1193}
1194
1195/*
1196 * Associate a subtransaction with its toplevel transaction at commit
1197 * time. There may be no further changes added after this.
1198 */
1199void
1201 TransactionId subxid, XLogRecPtr commit_lsn,
1202 XLogRecPtr end_lsn)
1203{
1204 ReorderBufferTXN *subtxn;
1205
1206 subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1207 InvalidXLogRecPtr, false);
1208
1209 /*
1210 * No need to do anything if that subtxn didn't contain any changes
1211 */
1212 if (!subtxn)
1213 return;
1214
1215 subtxn->final_lsn = commit_lsn;
1216 subtxn->end_lsn = end_lsn;
1217
1218 /*
1219 * Assign this subxact as a child of the toplevel xact (no-op if already
1220 * done.)
1221 */
1223}
1224
1225
1226/*
1227 * Support for efficiently iterating over a transaction's and its
1228 * subtransactions' changes.
1229 *
1230 * We do by doing a k-way merge between transactions/subtransactions. For that
1231 * we model the current heads of the different transactions as a binary heap
1232 * so we easily know which (sub-)transaction has the change with the smallest
1233 * lsn next.
1234 *
1235 * We assume the changes in individual transactions are already sorted by LSN.
1236 */
1237
1238/*
1239 * Binary heap comparison function.
1240 */
1241static int
1243{
1245 XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
1246 XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
1247
1248 if (pos_a < pos_b)
1249 return 1;
1250 else if (pos_a == pos_b)
1251 return 0;
1252 return -1;
1253}
1254
1255/*
1256 * Allocate & initialize an iterator which iterates in lsn order over a
1257 * transaction and all its subtransactions.
1258 *
1259 * Note: The iterator state is returned through iter_state parameter rather
1260 * than the function's return value. This is because the state gets cleaned up
1261 * in a PG_CATCH block in the caller, so we want to make sure the caller gets
1262 * back the state even if this function throws an exception.
1263 */
1264static void
1266 ReorderBufferIterTXNState *volatile *iter_state)
1267{
1268 Size nr_txns = 0;
1270 dlist_iter cur_txn_i;
1271 int32 off;
1272
1273 *iter_state = NULL;
1274
1275 /* Check ordering of changes in the toplevel transaction. */
1277
1278 /*
1279 * Calculate the size of our heap: one element for every transaction that
1280 * contains changes. (Besides the transactions already in the reorder
1281 * buffer, we count the one we were directly passed.)
1282 */
1283 if (txn->nentries > 0)
1284 nr_txns++;
1285
1286 dlist_foreach(cur_txn_i, &txn->subtxns)
1287 {
1288 ReorderBufferTXN *cur_txn;
1289
1290 cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1291
1292 /* Check ordering of changes in this subtransaction. */
1293 AssertChangeLsnOrder(cur_txn);
1294
1295 if (cur_txn->nentries > 0)
1296 nr_txns++;
1297 }
1298
1299 /* allocate iteration state */
1303 sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1304
1305 state->nr_txns = nr_txns;
1306 dlist_init(&state->old_change);
1307
1308 for (off = 0; off < state->nr_txns; off++)
1309 {
1310 state->entries[off].file.vfd = -1;
1311 state->entries[off].segno = 0;
1312 }
1313
1314 /* allocate heap */
1315 state->heap = binaryheap_allocate(state->nr_txns,
1317 state);
1318
1319 /* Now that the state fields are initialized, it is safe to return it. */
1320 *iter_state = state;
1321
1322 /*
1323 * Now insert items into the binary heap, in an unordered fashion. (We
1324 * will run a heap assembly step at the end; this is more efficient.)
1325 */
1326
1327 off = 0;
1328
1329 /* add toplevel transaction if it contains changes */
1330 if (txn->nentries > 0)
1331 {
1332 ReorderBufferChange *cur_change;
1333
1334 if (rbtxn_is_serialized(txn))
1335 {
1336 /* serialize remaining changes */
1338 ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1339 &state->entries[off].segno);
1340 }
1341
1342 cur_change = dlist_head_element(ReorderBufferChange, node,
1343 &txn->changes);
1344
1345 state->entries[off].lsn = cur_change->lsn;
1346 state->entries[off].change = cur_change;
1347 state->entries[off].txn = txn;
1348
1350 }
1351
1352 /* add subtransactions if they contain changes */
1353 dlist_foreach(cur_txn_i, &txn->subtxns)
1354 {
1355 ReorderBufferTXN *cur_txn;
1356
1357 cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1358
1359 if (cur_txn->nentries > 0)
1360 {
1361 ReorderBufferChange *cur_change;
1362
1363 if (rbtxn_is_serialized(cur_txn))
1364 {
1365 /* serialize remaining changes */
1366 ReorderBufferSerializeTXN(rb, cur_txn);
1367 ReorderBufferRestoreChanges(rb, cur_txn,
1368 &state->entries[off].file,
1369 &state->entries[off].segno);
1370 }
1371 cur_change = dlist_head_element(ReorderBufferChange, node,
1372 &cur_txn->changes);
1373
1374 state->entries[off].lsn = cur_change->lsn;
1375 state->entries[off].change = cur_change;
1376 state->entries[off].txn = cur_txn;
1377
1379 }
1380 }
1381
1382 /* assemble a valid binary heap */
1383 binaryheap_build(state->heap);
1384}
1385
1386/*
1387 * Return the next change when iterating over a transaction and its
1388 * subtransactions.
1389 *
1390 * Returns NULL when no further changes exist.
1391 */
1392static ReorderBufferChange *
1394{
1395 ReorderBufferChange *change;
1397 int32 off;
1398
1399 /* nothing there anymore */
1400 if (state->heap->bh_size == 0)
1401 return NULL;
1402
1403 off = DatumGetInt32(binaryheap_first(state->heap));
1404 entry = &state->entries[off];
1405
1406 /* free memory we might have "leaked" in the previous *Next call */
1407 if (!dlist_is_empty(&state->old_change))
1408 {
1409 change = dlist_container(ReorderBufferChange, node,
1410 dlist_pop_head_node(&state->old_change));
1411 ReorderBufferFreeChange(rb, change, true);
1412 Assert(dlist_is_empty(&state->old_change));
1413 }
1414
1415 change = entry->change;
1416
1417 /*
1418 * update heap with information about which transaction has the next
1419 * relevant change in LSN order
1420 */
1421
1422 /* there are in-memory changes */
1423 if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1424 {
1425 dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1426 ReorderBufferChange *next_change =
1428
1429 /* txn stays the same */
1430 state->entries[off].lsn = next_change->lsn;
1431 state->entries[off].change = next_change;
1432
1434 return change;
1435 }
1436
1437 /* try to load changes from disk */
1438 if (entry->txn->nentries != entry->txn->nentries_mem)
1439 {
1440 /*
1441 * Ugly: restoring changes will reuse *Change records, thus delete the
1442 * current one from the per-tx list and only free in the next call.
1443 */
1444 dlist_delete(&change->node);
1445 dlist_push_tail(&state->old_change, &change->node);
1446
1447 /*
1448 * Update the total bytes processed by the txn for which we are
1449 * releasing the current set of changes and restoring the new set of
1450 * changes.
1451 */
1452 rb->totalBytes += entry->txn->size;
1453 if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1454 &state->entries[off].segno))
1455 {
1456 /* successfully restored changes from disk */
1457 ReorderBufferChange *next_change =
1459 &entry->txn->changes);
1460
1461 elog(DEBUG2, "restored %u/%u changes from disk",
1462 (uint32) entry->txn->nentries_mem,
1463 (uint32) entry->txn->nentries);
1464
1465 Assert(entry->txn->nentries_mem);
1466 /* txn stays the same */
1467 state->entries[off].lsn = next_change->lsn;
1468 state->entries[off].change = next_change;
1470
1471 return change;
1472 }
1473 }
1474
1475 /* ok, no changes there anymore, remove */
1477
1478 return change;
1479}
1480
1481/*
1482 * Deallocate the iterator
1483 */
1484static void
1487{
1488 int32 off;
1489
1490 for (off = 0; off < state->nr_txns; off++)
1491 {
1492 if (state->entries[off].file.vfd != -1)
1493 FileClose(state->entries[off].file.vfd);
1494 }
1495
1496 /* free memory we might have "leaked" in the last *Next call */
1497 if (!dlist_is_empty(&state->old_change))
1498 {
1499 ReorderBufferChange *change;
1500
1501 change = dlist_container(ReorderBufferChange, node,
1502 dlist_pop_head_node(&state->old_change));
1503 ReorderBufferFreeChange(rb, change, true);
1504 Assert(dlist_is_empty(&state->old_change));
1505 }
1506
1507 binaryheap_free(state->heap);
1508 pfree(state);
1509}
1510
1511/*
1512 * Cleanup the contents of a transaction, usually after the transaction
1513 * committed or aborted.
1514 */
1515static void
1517{
1518 bool found;
1519 dlist_mutable_iter iter;
1520 Size mem_freed = 0;
1521
1522 /* cleanup subtransactions & their changes */
1523 dlist_foreach_modify(iter, &txn->subtxns)
1524 {
1525 ReorderBufferTXN *subtxn;
1526
1527 subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1528
1529 /*
1530 * Subtransactions are always associated to the toplevel TXN, even if
1531 * they originally were happening inside another subtxn, so we won't
1532 * ever recurse more than one level deep here.
1533 */
1535 Assert(subtxn->nsubtxns == 0);
1536
1537 ReorderBufferCleanupTXN(rb, subtxn);
1538 }
1539
1540 /* cleanup changes in the txn */
1541 dlist_foreach_modify(iter, &txn->changes)
1542 {
1543 ReorderBufferChange *change;
1544
1545 change = dlist_container(ReorderBufferChange, node, iter.cur);
1546
1547 /* Check we're not mixing changes from different transactions. */
1548 Assert(change->txn == txn);
1549
1550 /*
1551 * Instead of updating the memory counter for individual changes, we
1552 * sum up the size of memory to free so we can update the memory
1553 * counter all together below. This saves costs of maintaining the
1554 * max-heap.
1555 */
1556 mem_freed += ReorderBufferChangeSize(change);
1557
1558 ReorderBufferFreeChange(rb, change, false);
1559 }
1560
1561 /* Update the memory counter */
1562 ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
1563
1564 /*
1565 * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1566 * They are always stored in the toplevel transaction.
1567 */
1568 dlist_foreach_modify(iter, &txn->tuplecids)
1569 {
1570 ReorderBufferChange *change;
1571
1572 change = dlist_container(ReorderBufferChange, node, iter.cur);
1573
1574 /* Check we're not mixing changes from different transactions. */
1575 Assert(change->txn == txn);
1577
1578 ReorderBufferFreeChange(rb, change, true);
1579 }
1580
1581 /*
1582 * Cleanup the base snapshot, if set.
1583 */
1584 if (txn->base_snapshot != NULL)
1585 {
1588 }
1589
1590 /*
1591 * Cleanup the snapshot for the last streamed run.
1592 */
1593 if (txn->snapshot_now != NULL)
1594 {
1597 }
1598
1599 /*
1600 * Remove TXN from its containing lists.
1601 *
1602 * Note: if txn is known as subxact, we are deleting the TXN from its
1603 * parent's list of known subxacts; this leaves the parent's nsubxacts
1604 * count too high, but we don't care. Otherwise, we are deleting the TXN
1605 * from the LSN-ordered list of toplevel TXNs. We remove the TXN from the
1606 * list of catalog modifying transactions as well.
1607 */
1608 dlist_delete(&txn->node);
1611
1612 /* now remove reference from buffer */
1613 hash_search(rb->by_txn, &txn->xid, HASH_REMOVE, &found);
1614 Assert(found);
1615
1616 /* remove entries spilled to disk */
1617 if (rbtxn_is_serialized(txn))
1619
1620 /* deallocate */
1621 ReorderBufferFreeTXN(rb, txn);
1622}
1623
1624/*
1625 * Discard changes from a transaction (and subtransactions), either after
1626 * streaming, decoding them at PREPARE, or detecting the transaction abort.
1627 * Keep the remaining info - transactions, tuplecids, invalidations and
1628 * snapshots.
1629 *
1630 * We additionally remove tuplecids after decoding the transaction at prepare
1631 * time as we only need to perform invalidation at rollback or commit prepared.
1632 *
1633 * 'txn_prepared' indicates that we have decoded the transaction at prepare
1634 * time.
1635 */
1636static void
1638{
1639 dlist_mutable_iter iter;
1640 Size mem_freed = 0;
1641
1642 /* cleanup subtransactions & their changes */
1643 dlist_foreach_modify(iter, &txn->subtxns)
1644 {
1645 ReorderBufferTXN *subtxn;
1646
1647 subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1648
1649 /*
1650 * Subtransactions are always associated to the toplevel TXN, even if
1651 * they originally were happening inside another subtxn, so we won't
1652 * ever recurse more than one level deep here.
1653 */
1655 Assert(subtxn->nsubtxns == 0);
1656
1658 ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
1659 }
1660
1661 /* cleanup changes in the txn */
1662 dlist_foreach_modify(iter, &txn->changes)
1663 {
1664 ReorderBufferChange *change;
1665
1666 change = dlist_container(ReorderBufferChange, node, iter.cur);
1667
1668 /* Check we're not mixing changes from different transactions. */
1669 Assert(change->txn == txn);
1670
1671 /* remove the change from its containing list */
1672 dlist_delete(&change->node);
1673
1674 /*
1675 * Instead of updating the memory counter for individual changes, we
1676 * sum up the size of memory to free so we can update the memory
1677 * counter all together below. This saves costs of maintaining the
1678 * max-heap.
1679 */
1680 mem_freed += ReorderBufferChangeSize(change);
1681
1682 ReorderBufferFreeChange(rb, change, false);
1683 }
1684
1685 /* Update the memory counter */
1686 ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
1687
1688 if (txn_prepared)
1689 {
1690 /*
1691 * If this is a prepared txn, cleanup the tuplecids we stored for
1692 * decoding catalog snapshot access. They are always stored in the
1693 * toplevel transaction.
1694 */
1695 dlist_foreach_modify(iter, &txn->tuplecids)
1696 {
1697 ReorderBufferChange *change;
1698
1699 change = dlist_container(ReorderBufferChange, node, iter.cur);
1700
1701 /* Check we're not mixing changes from different transactions. */
1702 Assert(change->txn == txn);
1704
1705 /* Remove the change from its containing list. */
1706 dlist_delete(&change->node);
1707
1708 ReorderBufferFreeChange(rb, change, true);
1709 }
1710 }
1711
1712 /*
1713 * Destroy the (relfilelocator, ctid) hashtable, so that we don't leak any
1714 * memory. We could also keep the hash table and update it with new ctid
1715 * values, but this seems simpler and good enough for now.
1716 */
1717 if (txn->tuplecid_hash != NULL)
1718 {
1720 txn->tuplecid_hash = NULL;
1721 }
1722
1723 /* If this txn is serialized then clean the disk space. */
1724 if (rbtxn_is_serialized(txn))
1725 {
1727 txn->txn_flags &= ~RBTXN_IS_SERIALIZED;
1728
1729 /*
1730 * We set this flag to indicate if the transaction is ever serialized.
1731 * We need this to accurately update the stats as otherwise the same
1732 * transaction can be counted as serialized multiple times.
1733 */
1735 }
1736
1737 /* also reset the number of entries in the transaction */
1738 txn->nentries_mem = 0;
1739 txn->nentries = 0;
1740}
1741
1742/*
1743 * Check the transaction status by CLOG lookup and discard all changes if
1744 * the transaction is aborted. The transaction status is cached in
1745 * txn->txn_flags so we can skip future changes and avoid CLOG lookups on the
1746 * next call.
1747 *
1748 * Return true if the transaction is aborted, otherwise return false.
1749 *
1750 * When the 'debug_logical_replication_streaming' is set to "immediate", we
1751 * don't check the transaction status, meaning the caller will always process
1752 * this transaction.
1753 */
1754static bool
1756{
1757 /* Quick return for regression tests */
1759 return false;
1760
1761 /*
1762 * Quick return if the transaction status is already known.
1763 */
1764
1765 if (rbtxn_is_committed(txn))
1766 return false;
1767 if (rbtxn_is_aborted(txn))
1768 {
1769 /* Already-aborted transactions should not have any changes */
1770 Assert(txn->size == 0);
1771
1772 return true;
1773 }
1774
1775 /* Otherwise, check the transaction status using CLOG lookup */
1776
1778 return false;
1779
1780 if (TransactionIdDidCommit(txn->xid))
1781 {
1782 /*
1783 * Remember the transaction is committed so that we can skip CLOG
1784 * check next time, avoiding the pressure on CLOG lookup.
1785 */
1786 Assert(!rbtxn_is_aborted(txn));
1788 return false;
1789 }
1790
1791 /*
1792 * The transaction aborted. We discard both the changes collected so far
1793 * and the toast reconstruction data. The full cleanup will happen as part
1794 * of decoding ABORT record of this transaction.
1795 */
1797 ReorderBufferToastReset(rb, txn);
1798
1799 /* All changes should be discarded */
1800 Assert(txn->size == 0);
1801
1802 /*
1803 * Mark the transaction as aborted so we can ignore future changes of this
1804 * transaction.
1805 */
1808
1809 return true;
1810}
1811
1812/*
1813 * Build a hash with a (relfilelocator, ctid) -> (cmin, cmax) mapping for use by
1814 * HeapTupleSatisfiesHistoricMVCC.
1815 */
1816static void
1818{
1819 dlist_iter iter;
1820 HASHCTL hash_ctl;
1821
1823 return;
1824
1825 hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1826 hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1827 hash_ctl.hcxt = rb->context;
1828
1829 /*
1830 * create the hash with the exact number of to-be-stored tuplecids from
1831 * the start
1832 */
1833 txn->tuplecid_hash =
1834 hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1836
1837 dlist_foreach(iter, &txn->tuplecids)
1838 {
1841 bool found;
1842 ReorderBufferChange *change;
1843
1844 change = dlist_container(ReorderBufferChange, node, iter.cur);
1845
1847
1848 /* be careful about padding */
1849 memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1850
1851 key.rlocator = change->data.tuplecid.locator;
1852
1854 &key.tid);
1855
1856 ent = (ReorderBufferTupleCidEnt *)
1857 hash_search(txn->tuplecid_hash, &key, HASH_ENTER, &found);
1858 if (!found)
1859 {
1860 ent->cmin = change->data.tuplecid.cmin;
1861 ent->cmax = change->data.tuplecid.cmax;
1862 ent->combocid = change->data.tuplecid.combocid;
1863 }
1864 else
1865 {
1866 /*
1867 * Maybe we already saw this tuple before in this transaction, but
1868 * if so it must have the same cmin.
1869 */
1870 Assert(ent->cmin == change->data.tuplecid.cmin);
1871
1872 /*
1873 * cmax may be initially invalid, but once set it can only grow,
1874 * and never become invalid again.
1875 */
1876 Assert((ent->cmax == InvalidCommandId) ||
1877 ((change->data.tuplecid.cmax != InvalidCommandId) &&
1878 (change->data.tuplecid.cmax > ent->cmax)));
1879 ent->cmax = change->data.tuplecid.cmax;
1880 }
1881 }
1882}
1883
1884/*
1885 * Copy a provided snapshot so we can modify it privately. This is needed so
1886 * that catalog modifying transactions can look into intermediate catalog
1887 * states.
1888 */
1889static Snapshot
1891 ReorderBufferTXN *txn, CommandId cid)
1892{
1893 Snapshot snap;
1894 dlist_iter iter;
1895 int i = 0;
1896 Size size;
1897
1898 size = sizeof(SnapshotData) +
1899 sizeof(TransactionId) * orig_snap->xcnt +
1900 sizeof(TransactionId) * (txn->nsubtxns + 1);
1901
1902 snap = MemoryContextAllocZero(rb->context, size);
1903 memcpy(snap, orig_snap, sizeof(SnapshotData));
1904
1905 snap->copied = true;
1906 snap->active_count = 1; /* mark as active so nobody frees it */
1907 snap->regd_count = 0;
1908 snap->xip = (TransactionId *) (snap + 1);
1909
1910 memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1911
1912 /*
1913 * snap->subxip contains all txids that belong to our transaction which we
1914 * need to check via cmin/cmax. That's why we store the toplevel
1915 * transaction in there as well.
1916 */
1917 snap->subxip = snap->xip + snap->xcnt;
1918 snap->subxip[i++] = txn->xid;
1919
1920 /*
1921 * txn->nsubtxns isn't decreased when subtransactions abort, so count
1922 * manually. Since it's an upper boundary it is safe to use it for the
1923 * allocation above.
1924 */
1925 snap->subxcnt = 1;
1926
1927 dlist_foreach(iter, &txn->subtxns)
1928 {
1929 ReorderBufferTXN *sub_txn;
1930
1931 sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1932 snap->subxip[i++] = sub_txn->xid;
1933 snap->subxcnt++;
1934 }
1935
1936 /* sort so we can bsearch() later */
1937 qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1938
1939 /* store the specified current CommandId */
1940 snap->curcid = cid;
1941
1942 return snap;
1943}
1944
1945/*
1946 * Free a previously ReorderBufferCopySnap'ed snapshot
1947 */
1948static void
1950{
1951 if (snap->copied)
1952 pfree(snap);
1953 else
1955}
1956
1957/*
1958 * If the transaction was (partially) streamed, we need to prepare or commit
1959 * it in a 'streamed' way. That is, we first stream the remaining part of the
1960 * transaction, and then invoke stream_prepare or stream_commit message as per
1961 * the case.
1962 */
1963static void
1965{
1966 /* we should only call this for previously streamed transactions */
1968
1969 ReorderBufferStreamTXN(rb, txn);
1970
1971 if (rbtxn_is_prepared(txn))
1972 {
1973 /*
1974 * Note, we send stream prepare even if a concurrent abort is
1975 * detected. See DecodePrepare for more information.
1976 */
1978 rb->stream_prepare(rb, txn, txn->final_lsn);
1980
1981 /*
1982 * This is a PREPARED transaction, part of a two-phase commit. The
1983 * full cleanup will happen as part of the COMMIT PREPAREDs, so now
1984 * just truncate txn by removing changes and tuplecids.
1985 */
1986 ReorderBufferTruncateTXN(rb, txn, true);
1987 /* Reset the CheckXidAlive */
1989 }
1990 else
1991 {
1992 rb->stream_commit(rb, txn, txn->final_lsn);
1993 ReorderBufferCleanupTXN(rb, txn);
1994 }
1995}
1996
1997/*
1998 * Set xid to detect concurrent aborts.
1999 *
2000 * While streaming an in-progress transaction or decoding a prepared
2001 * transaction there is a possibility that the (sub)transaction might get
2002 * aborted concurrently. In such case if the (sub)transaction has catalog
2003 * update then we might decode the tuple using wrong catalog version. For
2004 * example, suppose there is one catalog tuple with (xmin: 500, xmax: 0). Now,
2005 * the transaction 501 updates the catalog tuple and after that we will have
2006 * two tuples (xmin: 500, xmax: 501) and (xmin: 501, xmax: 0). Now, if 501 is
2007 * aborted and some other transaction say 502 updates the same catalog tuple
2008 * then the first tuple will be changed to (xmin: 500, xmax: 502). So, the
2009 * problem is that when we try to decode the tuple inserted/updated in 501
2010 * after the catalog update, we will see the catalog tuple with (xmin: 500,
2011 * xmax: 502) as visible because it will consider that the tuple is deleted by
2012 * xid 502 which is not visible to our snapshot. And when we will try to
2013 * decode with that catalog tuple, it can lead to a wrong result or a crash.
2014 * So, it is necessary to detect concurrent aborts to allow streaming of
2015 * in-progress transactions or decoding of prepared transactions.
2016 *
2017 * For detecting the concurrent abort we set CheckXidAlive to the current
2018 * (sub)transaction's xid for which this change belongs to. And, during
2019 * catalog scan we can check the status of the xid and if it is aborted we will
2020 * report a specific error so that we can stop streaming current transaction
2021 * and discard the already streamed changes on such an error. We might have
2022 * already streamed some of the changes for the aborted (sub)transaction, but
2023 * that is fine because when we decode the abort we will stream abort message
2024 * to truncate the changes in the subscriber. Similarly, for prepared
2025 * transactions, we stop decoding if concurrent abort is detected and then
2026 * rollback the changes when rollback prepared is encountered. See
2027 * DecodePrepare.
2028 */
2029static inline void
2031{
2032 /*
2033 * If the input transaction id is already set as a CheckXidAlive then
2034 * nothing to do.
2035 */
2037 return;
2038
2039 /*
2040 * setup CheckXidAlive if it's not committed yet. We don't check if the
2041 * xid is aborted. That will happen during catalog access.
2042 */
2043 if (!TransactionIdDidCommit(xid))
2044 CheckXidAlive = xid;
2045 else
2047}
2048
2049/*
2050 * Helper function for ReorderBufferProcessTXN for applying change.
2051 */
2052static inline void
2054 Relation relation, ReorderBufferChange *change,
2055 bool streaming)
2056{
2057 if (streaming)
2058 rb->stream_change(rb, txn, relation, change);
2059 else
2060 rb->apply_change(rb, txn, relation, change);
2061}
2062
2063/*
2064 * Helper function for ReorderBufferProcessTXN for applying the truncate.
2065 */
2066static inline void
2068 int nrelations, Relation *relations,
2069 ReorderBufferChange *change, bool streaming)
2070{
2071 if (streaming)
2072 rb->stream_truncate(rb, txn, nrelations, relations, change);
2073 else
2074 rb->apply_truncate(rb, txn, nrelations, relations, change);
2075}
2076
2077/*
2078 * Helper function for ReorderBufferProcessTXN for applying the message.
2079 */
2080static inline void
2082 ReorderBufferChange *change, bool streaming)
2083{
2084 if (streaming)
2085 rb->stream_message(rb, txn, change->lsn, true,
2086 change->data.msg.prefix,
2087 change->data.msg.message_size,
2088 change->data.msg.message);
2089 else
2090 rb->message(rb, txn, change->lsn, true,
2091 change->data.msg.prefix,
2092 change->data.msg.message_size,
2093 change->data.msg.message);
2094}
2095
2096/*
2097 * Function to store the command id and snapshot at the end of the current
2098 * stream so that we can reuse the same while sending the next stream.
2099 */
2100static inline void
2102 Snapshot snapshot_now, CommandId command_id)
2103{
2104 txn->command_id = command_id;
2105
2106 /* Avoid copying if it's already copied. */
2107 if (snapshot_now->copied)
2108 txn->snapshot_now = snapshot_now;
2109 else
2110 txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2111 txn, command_id);
2112}
2113
2114/*
2115 * Mark the given transaction as streamed if it's a top-level transaction
2116 * or has changes.
2117 */
2118static void
2120{
2121 /*
2122 * The top-level transaction, is marked as streamed always, even if it
2123 * does not contain any changes (that is, when all the changes are in
2124 * subtransactions).
2125 *
2126 * For subtransactions, we only mark them as streamed when there are
2127 * changes in them.
2128 *
2129 * We do it this way because of aborts - we don't want to send aborts for
2130 * XIDs the downstream is not aware of. And of course, it always knows
2131 * about the top-level xact (we send the XID in all messages), but we
2132 * never stream XIDs of empty subxacts.
2133 */
2134 if (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0))
2136}
2137
2138/*
2139 * Helper function for ReorderBufferProcessTXN to handle the concurrent
2140 * abort of the streaming transaction. This resets the TXN such that it
2141 * can be used to stream the remaining data of transaction being processed.
2142 * This can happen when the subtransaction is aborted and we still want to
2143 * continue processing the main or other subtransactions data.
2144 */
2145static void
2147 Snapshot snapshot_now,
2148 CommandId command_id,
2149 XLogRecPtr last_lsn,
2150 ReorderBufferChange *specinsert)
2151{
2152 /* Discard the changes that we just streamed */
2154
2155 /* Free all resources allocated for toast reconstruction */
2156 ReorderBufferToastReset(rb, txn);
2157
2158 /* Return the spec insert change if it is not NULL */
2159 if (specinsert != NULL)
2160 {
2161 ReorderBufferFreeChange(rb, specinsert, true);
2162 specinsert = NULL;
2163 }
2164
2165 /*
2166 * For the streaming case, stop the stream and remember the command ID and
2167 * snapshot for the streaming run.
2168 */
2169 if (rbtxn_is_streamed(txn))
2170 {
2171 rb->stream_stop(rb, txn, last_lsn);
2172 ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2173 }
2174
2175 /* All changes must be deallocated */
2176 Assert(txn->size == 0);
2177}
2178
2179/*
2180 * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN.
2181 *
2182 * Send data of a transaction (and its subtransactions) to the
2183 * output plugin. We iterate over the top and subtransactions (using a k-way
2184 * merge) and replay the changes in lsn order.
2185 *
2186 * If streaming is true then data will be sent using stream API.
2187 *
2188 * Note: "volatile" markers on some parameters are to avoid trouble with
2189 * PG_TRY inside the function.
2190 */
2191static void
2193 XLogRecPtr commit_lsn,
2194 volatile Snapshot snapshot_now,
2195 volatile CommandId command_id,
2196 bool streaming)
2197{
2198 bool using_subtxn;
2200 ReorderBufferIterTXNState *volatile iterstate = NULL;
2201 volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
2202 ReorderBufferChange *volatile specinsert = NULL;
2203 volatile bool stream_started = false;
2204 ReorderBufferTXN *volatile curtxn = NULL;
2205
2206 /* build data to be able to lookup the CommandIds of catalog tuples */
2208
2209 /* setup the initial snapshot */
2210 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2211
2212 /*
2213 * Decoding needs access to syscaches et al., which in turn use
2214 * heavyweight locks and such. Thus we need to have enough state around to
2215 * keep track of those. The easiest way is to simply use a transaction
2216 * internally. That also allows us to easily enforce that nothing writes
2217 * to the database by checking for xid assignments.
2218 *
2219 * When we're called via the SQL SRF there's already a transaction
2220 * started, so start an explicit subtransaction there.
2221 */
2222 using_subtxn = IsTransactionOrTransactionBlock();
2223
2224 PG_TRY();
2225 {
2226 ReorderBufferChange *change;
2227 int changes_count = 0; /* used to accumulate the number of
2228 * changes */
2229
2230 if (using_subtxn)
2231 BeginInternalSubTransaction(streaming ? "stream" : "replay");
2232 else
2234
2235 /*
2236 * We only need to send begin/begin-prepare for non-streamed
2237 * transactions.
2238 */
2239 if (!streaming)
2240 {
2241 if (rbtxn_is_prepared(txn))
2242 rb->begin_prepare(rb, txn);
2243 else
2244 rb->begin(rb, txn);
2245 }
2246
2247 ReorderBufferIterTXNInit(rb, txn, &iterstate);
2248 while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
2249 {
2250 Relation relation = NULL;
2251 Oid reloid;
2252
2254
2255 /*
2256 * We can't call start stream callback before processing first
2257 * change.
2258 */
2259 if (prev_lsn == InvalidXLogRecPtr)
2260 {
2261 if (streaming)
2262 {
2263 txn->origin_id = change->origin_id;
2264 rb->stream_start(rb, txn, change->lsn);
2265 stream_started = true;
2266 }
2267 }
2268
2269 /*
2270 * Enforce correct ordering of changes, merged from multiple
2271 * subtransactions. The changes may have the same LSN due to
2272 * MULTI_INSERT xlog records.
2273 */
2274 Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
2275
2276 prev_lsn = change->lsn;
2277
2278 /*
2279 * Set the current xid to detect concurrent aborts. This is
2280 * required for the cases when we decode the changes before the
2281 * COMMIT record is processed.
2282 */
2283 if (streaming || rbtxn_is_prepared(change->txn))
2284 {
2285 curtxn = change->txn;
2286 SetupCheckXidLive(curtxn->xid);
2287 }
2288
2289 switch (change->action)
2290 {
2292
2293 /*
2294 * Confirmation for speculative insertion arrived. Simply
2295 * use as a normal record. It'll be cleaned up at the end
2296 * of INSERT processing.
2297 */
2298 if (specinsert == NULL)
2299 elog(ERROR, "invalid ordering of speculative insertion changes");
2300 Assert(specinsert->data.tp.oldtuple == NULL);
2301 change = specinsert;
2303
2304 /* intentionally fall through */
2308 Assert(snapshot_now);
2309
2310 reloid = RelidByRelfilenumber(change->data.tp.rlocator.spcOid,
2311 change->data.tp.rlocator.relNumber);
2312
2313 /*
2314 * Mapped catalog tuple without data, emitted while
2315 * catalog table was in the process of being rewritten. We
2316 * can fail to look up the relfilenumber, because the
2317 * relmapper has no "historic" view, in contrast to the
2318 * normal catalog during decoding. Thus repeated rewrites
2319 * can cause a lookup failure. That's OK because we do not
2320 * decode catalog changes anyway. Normally such tuples
2321 * would be skipped over below, but we can't identify
2322 * whether the table should be logically logged without
2323 * mapping the relfilenumber to the oid.
2324 */
2325 if (reloid == InvalidOid &&
2326 change->data.tp.newtuple == NULL &&
2327 change->data.tp.oldtuple == NULL)
2328 goto change_done;
2329 else if (reloid == InvalidOid)
2330 elog(ERROR, "could not map filenumber \"%s\" to relation OID",
2331 relpathperm(change->data.tp.rlocator,
2332 MAIN_FORKNUM).str);
2333
2334 relation = RelationIdGetRelation(reloid);
2335
2336 if (!RelationIsValid(relation))
2337 elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",
2338 reloid,
2339 relpathperm(change->data.tp.rlocator,
2340 MAIN_FORKNUM).str);
2341
2342 if (!RelationIsLogicallyLogged(relation))
2343 goto change_done;
2344
2345 /*
2346 * Ignore temporary heaps created during DDL unless the
2347 * plugin has asked for them.
2348 */
2349 if (relation->rd_rel->relrewrite && !rb->output_rewrites)
2350 goto change_done;
2351
2352 /*
2353 * For now ignore sequence changes entirely. Most of the
2354 * time they don't log changes using records we
2355 * understand, so it doesn't make sense to handle the few
2356 * cases we do.
2357 */
2358 if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
2359 goto change_done;
2360
2361 /* user-triggered change */
2362 if (!IsToastRelation(relation))
2363 {
2364 ReorderBufferToastReplace(rb, txn, relation, change);
2365 ReorderBufferApplyChange(rb, txn, relation, change,
2366 streaming);
2367
2368 /*
2369 * Only clear reassembled toast chunks if we're sure
2370 * they're not required anymore. The creator of the
2371 * tuple tells us.
2372 */
2373 if (change->data.tp.clear_toast_afterwards)
2374 ReorderBufferToastReset(rb, txn);
2375 }
2376 /* we're not interested in toast deletions */
2377 else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
2378 {
2379 /*
2380 * Need to reassemble the full toasted Datum in
2381 * memory, to ensure the chunks don't get reused till
2382 * we're done remove it from the list of this
2383 * transaction's changes. Otherwise it will get
2384 * freed/reused while restoring spooled data from
2385 * disk.
2386 */
2387 Assert(change->data.tp.newtuple != NULL);
2388
2389 dlist_delete(&change->node);
2390 ReorderBufferToastAppendChunk(rb, txn, relation,
2391 change);
2392 }
2393
2394 change_done:
2395
2396 /*
2397 * If speculative insertion was confirmed, the record
2398 * isn't needed anymore.
2399 */
2400 if (specinsert != NULL)
2401 {
2402 ReorderBufferFreeChange(rb, specinsert, true);
2403 specinsert = NULL;
2404 }
2405
2406 if (RelationIsValid(relation))
2407 {
2408 RelationClose(relation);
2409 relation = NULL;
2410 }
2411 break;
2412
2414
2415 /*
2416 * Speculative insertions are dealt with by delaying the
2417 * processing of the insert until the confirmation record
2418 * arrives. For that we simply unlink the record from the
2419 * chain, so it does not get freed/reused while restoring
2420 * spooled data from disk.
2421 *
2422 * This is safe in the face of concurrent catalog changes
2423 * because the relevant relation can't be changed between
2424 * speculative insertion and confirmation due to
2425 * CheckTableNotInUse() and locking.
2426 */
2427
2428 /* clear out a pending (and thus failed) speculation */
2429 if (specinsert != NULL)
2430 {
2431 ReorderBufferFreeChange(rb, specinsert, true);
2432 specinsert = NULL;
2433 }
2434
2435 /* and memorize the pending insertion */
2436 dlist_delete(&change->node);
2437 specinsert = change;
2438 break;
2439
2441
2442 /*
2443 * Abort for speculative insertion arrived. So cleanup the
2444 * specinsert tuple and toast hash.
2445 *
2446 * Note that we get the spec abort change for each toast
2447 * entry but we need to perform the cleanup only the first
2448 * time we get it for the main table.
2449 */
2450 if (specinsert != NULL)
2451 {
2452 /*
2453 * We must clean the toast hash before processing a
2454 * completely new tuple to avoid confusion about the
2455 * previous tuple's toast chunks.
2456 */
2458 ReorderBufferToastReset(rb, txn);
2459
2460 /* We don't need this record anymore. */
2461 ReorderBufferFreeChange(rb, specinsert, true);
2462 specinsert = NULL;
2463 }
2464 break;
2465
2467 {
2468 int i;
2469 int nrelids = change->data.truncate.nrelids;
2470 int nrelations = 0;
2471 Relation *relations;
2472
2473 relations = palloc0(nrelids * sizeof(Relation));
2474 for (i = 0; i < nrelids; i++)
2475 {
2476 Oid relid = change->data.truncate.relids[i];
2477 Relation rel;
2478
2479 rel = RelationIdGetRelation(relid);
2480
2481 if (!RelationIsValid(rel))
2482 elog(ERROR, "could not open relation with OID %u", relid);
2483
2484 if (!RelationIsLogicallyLogged(rel))
2485 continue;
2486
2487 relations[nrelations++] = rel;
2488 }
2489
2490 /* Apply the truncate. */
2491 ReorderBufferApplyTruncate(rb, txn, nrelations,
2492 relations, change,
2493 streaming);
2494
2495 for (i = 0; i < nrelations; i++)
2496 RelationClose(relations[i]);
2497
2498 break;
2499 }
2500
2502 ReorderBufferApplyMessage(rb, txn, change, streaming);
2503 break;
2504
2506 /* Execute the invalidation messages locally */
2508 change->data.inval.invalidations);
2509 break;
2510
2512 /* get rid of the old */
2514
2515 if (snapshot_now->copied)
2516 {
2517 ReorderBufferFreeSnap(rb, snapshot_now);
2518 snapshot_now =
2520 txn, command_id);
2521 }
2522
2523 /*
2524 * Restored from disk, need to be careful not to double
2525 * free. We could introduce refcounting for that, but for
2526 * now this seems infrequent enough not to care.
2527 */
2528 else if (change->data.snapshot->copied)
2529 {
2530 snapshot_now =
2532 txn, command_id);
2533 }
2534 else
2535 {
2536 snapshot_now = change->data.snapshot;
2537 }
2538
2539 /* and continue with the new one */
2540 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2541 break;
2542
2545
2546 if (command_id < change->data.command_id)
2547 {
2548 command_id = change->data.command_id;
2549
2550 if (!snapshot_now->copied)
2551 {
2552 /* we don't use the global one anymore */
2553 snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2554 txn, command_id);
2555 }
2556
2557 snapshot_now->curcid = command_id;
2558
2560 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2561 }
2562
2563 break;
2564
2566 elog(ERROR, "tuplecid value in changequeue");
2567 break;
2568 }
2569
2570 /*
2571 * It is possible that the data is not sent to downstream for a
2572 * long time either because the output plugin filtered it or there
2573 * is a DDL that generates a lot of data that is not processed by
2574 * the plugin. So, in such cases, the downstream can timeout. To
2575 * avoid that we try to send a keepalive message if required.
2576 * Trying to send a keepalive message after every change has some
2577 * overhead, but testing showed there is no noticeable overhead if
2578 * we do it after every ~100 changes.
2579 */
2580#define CHANGES_THRESHOLD 100
2581
2582 if (++changes_count >= CHANGES_THRESHOLD)
2583 {
2584 rb->update_progress_txn(rb, txn, change->lsn);
2585 changes_count = 0;
2586 }
2587 }
2588
2589 /* speculative insertion record must be freed by now */
2590 Assert(!specinsert);
2591
2592 /* clean up the iterator */
2593 ReorderBufferIterTXNFinish(rb, iterstate);
2594 iterstate = NULL;
2595
2596 /*
2597 * Update total transaction count and total bytes processed by the
2598 * transaction and its subtransactions. Ensure to not count the
2599 * streamed transaction multiple times.
2600 *
2601 * Note that the statistics computation has to be done after
2602 * ReorderBufferIterTXNFinish as it releases the serialized change
2603 * which we have already accounted in ReorderBufferIterTXNNext.
2604 */
2605 if (!rbtxn_is_streamed(txn))
2606 rb->totalTxns++;
2607
2608 rb->totalBytes += txn->total_size;
2609
2610 /*
2611 * Done with current changes, send the last message for this set of
2612 * changes depending upon streaming mode.
2613 */
2614 if (streaming)
2615 {
2616 if (stream_started)
2617 {
2618 rb->stream_stop(rb, txn, prev_lsn);
2619 stream_started = false;
2620 }
2621 }
2622 else
2623 {
2624 /*
2625 * Call either PREPARE (for two-phase transactions) or COMMIT (for
2626 * regular ones).
2627 */
2628 if (rbtxn_is_prepared(txn))
2629 {
2631 rb->prepare(rb, txn, commit_lsn);
2633 }
2634 else
2635 rb->commit(rb, txn, commit_lsn);
2636 }
2637
2638 /* this is just a sanity check against bad output plugin behaviour */
2640 elog(ERROR, "output plugin used XID %u",
2642
2643 /*
2644 * Remember the command ID and snapshot for the next set of changes in
2645 * streaming mode.
2646 */
2647 if (streaming)
2648 ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2649 else if (snapshot_now->copied)
2650 ReorderBufferFreeSnap(rb, snapshot_now);
2651
2652 /* cleanup */
2654
2655 /*
2656 * Aborting the current (sub-)transaction as a whole has the right
2657 * semantics. We want all locks acquired in here to be released, not
2658 * reassigned to the parent and we do not want any database access
2659 * have persistent effects.
2660 */
2662
2663 /* make sure there's no cache pollution */
2665
2666 if (using_subtxn)
2668
2669 /*
2670 * We are here due to one of the four reasons: 1. Decoding an
2671 * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
2672 * prepared txn that was (partially) streamed. 4. Decoding a committed
2673 * txn.
2674 *
2675 * For 1, we allow truncation of txn data by removing the changes
2676 * already streamed but still keeping other things like invalidations,
2677 * snapshot, and tuplecids. For 2 and 3, we indicate
2678 * ReorderBufferTruncateTXN to do more elaborate truncation of txn
2679 * data as the entire transaction has been decoded except for commit.
2680 * For 4, as the entire txn has been decoded, we can fully clean up
2681 * the TXN reorder buffer.
2682 */
2683 if (streaming || rbtxn_is_prepared(txn))
2684 {
2685 if (streaming)
2687
2689 /* Reset the CheckXidAlive */
2691 }
2692 else
2693 ReorderBufferCleanupTXN(rb, txn);
2694 }
2695 PG_CATCH();
2696 {
2698 ErrorData *errdata = CopyErrorData();
2699
2700 /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
2701 if (iterstate)
2702 ReorderBufferIterTXNFinish(rb, iterstate);
2703
2705
2706 /*
2707 * Force cache invalidation to happen outside of a valid transaction
2708 * to prevent catalog access as we just caught an error.
2709 */
2711
2712 /* make sure there's no cache pollution */
2714 txn->invalidations);
2715
2716 if (using_subtxn)
2718
2719 /*
2720 * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
2721 * abort of the (sub)transaction we are streaming or preparing. We
2722 * need to do the cleanup and return gracefully on this error, see
2723 * SetupCheckXidLive.
2724 *
2725 * This error code can be thrown by one of the callbacks we call
2726 * during decoding so we need to ensure that we return gracefully only
2727 * when we are sending the data in streaming mode and the streaming is
2728 * not finished yet or when we are sending the data out on a PREPARE
2729 * during a two-phase commit.
2730 */
2731 if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
2732 (stream_started || rbtxn_is_prepared(txn)))
2733 {
2734 /* curtxn must be set for streaming or prepared transactions */
2735 Assert(curtxn);
2736
2737 /* Cleanup the temporary error state. */
2739 FreeErrorData(errdata);
2740 errdata = NULL;
2741
2742 /* Remember the transaction is aborted. */
2743 Assert(!rbtxn_is_committed(curtxn));
2744 curtxn->txn_flags |= RBTXN_IS_ABORTED;
2745
2746 /* Mark the transaction is streamed if appropriate */
2747 if (stream_started)
2749
2750 /* Reset the TXN so that it is allowed to stream remaining data. */
2751 ReorderBufferResetTXN(rb, txn, snapshot_now,
2752 command_id, prev_lsn,
2753 specinsert);
2754 }
2755 else
2756 {
2757 ReorderBufferCleanupTXN(rb, txn);
2759 PG_RE_THROW();
2760 }
2761 }
2762 PG_END_TRY();
2763}
2764
2765/*
2766 * Perform the replay of a transaction and its non-aborted subtransactions.
2767 *
2768 * Subtransactions previously have to be processed by
2769 * ReorderBufferCommitChild(), even if previously assigned to the toplevel
2770 * transaction with ReorderBufferAssignChild.
2771 *
2772 * This interface is called once a prepare or toplevel commit is read for both
2773 * streamed as well as non-streamed transactions.
2774 */
2775static void
2778 XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2779 TimestampTz commit_time,
2780 RepOriginId origin_id, XLogRecPtr origin_lsn)
2781{
2782 Snapshot snapshot_now;
2783 CommandId command_id = FirstCommandId;
2784
2785 txn->final_lsn = commit_lsn;
2786 txn->end_lsn = end_lsn;
2787 txn->xact_time.commit_time = commit_time;
2788 txn->origin_id = origin_id;
2789 txn->origin_lsn = origin_lsn;
2790
2791 /*
2792 * If the transaction was (partially) streamed, we need to commit it in a
2793 * 'streamed' way. That is, we first stream the remaining part of the
2794 * transaction, and then invoke stream_commit message.
2795 *
2796 * Called after everything (origin ID, LSN, ...) is stored in the
2797 * transaction to avoid passing that information directly.
2798 */
2799 if (rbtxn_is_streamed(txn))
2800 {
2802 return;
2803 }
2804
2805 /*
2806 * If this transaction has no snapshot, it didn't make any changes to the
2807 * database, so there's nothing to decode. Note that
2808 * ReorderBufferCommitChild will have transferred any snapshots from
2809 * subtransactions if there were any.
2810 */
2811 if (txn->base_snapshot == NULL)
2812 {
2813 Assert(txn->ninvalidations == 0);
2814
2815 /*
2816 * Removing this txn before a commit might result in the computation
2817 * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
2818 */
2819 if (!rbtxn_is_prepared(txn))
2820 ReorderBufferCleanupTXN(rb, txn);
2821 return;
2822 }
2823
2824 snapshot_now = txn->base_snapshot;
2825
2826 /* Process and send the changes to output plugin. */
2827 ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2828 command_id, false);
2829}
2830
2831/*
2832 * Commit a transaction.
2833 *
2834 * See comments for ReorderBufferReplay().
2835 */
2836void
2838 XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2839 TimestampTz commit_time,
2840 RepOriginId origin_id, XLogRecPtr origin_lsn)
2841{
2842 ReorderBufferTXN *txn;
2843
2844 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2845 false);
2846
2847 /* unknown transaction, nothing to replay */
2848 if (txn == NULL)
2849 return;
2850
2851 ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2852 origin_id, origin_lsn);
2853}
2854
2855/*
2856 * Record the prepare information for a transaction. Also, mark the transaction
2857 * as a prepared transaction.
2858 */
2859bool
2861 XLogRecPtr prepare_lsn, XLogRecPtr end_lsn,
2862 TimestampTz prepare_time,
2863 RepOriginId origin_id, XLogRecPtr origin_lsn)
2864{
2865 ReorderBufferTXN *txn;
2866
2867 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2868
2869 /* unknown transaction, nothing to do */
2870 if (txn == NULL)
2871 return false;
2872
2873 /*
2874 * Remember the prepare information to be later used by commit prepared in
2875 * case we skip doing prepare.
2876 */
2877 txn->final_lsn = prepare_lsn;
2878 txn->end_lsn = end_lsn;
2879 txn->xact_time.prepare_time = prepare_time;
2880 txn->origin_id = origin_id;
2881 txn->origin_lsn = origin_lsn;
2882
2883 /* Mark this transaction as a prepared transaction */
2886
2887 return true;
2888}
2889
2890/* Remember that we have skipped prepare */
2891void
2893{
2894 ReorderBufferTXN *txn;
2895
2896 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2897
2898 /* unknown transaction, nothing to do */
2899 if (txn == NULL)
2900 return;
2901
2902 /* txn must have been marked as a prepared transaction */
2905}
2906
2907/*
2908 * Prepare a two-phase transaction.
2909 *
2910 * See comments for ReorderBufferReplay().
2911 */
2912void
2914 char *gid)
2915{
2916 ReorderBufferTXN *txn;
2917
2918 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2919 false);
2920
2921 /* unknown transaction, nothing to replay */
2922 if (txn == NULL)
2923 return;
2924
2925 /*
2926 * txn must have been marked as a prepared transaction and must have
2927 * neither been skipped nor sent a prepare. Also, the prepare info must
2928 * have been updated in it by now.
2929 */
2932
2933 txn->gid = pstrdup(gid);
2934
2935 ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2936 txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2937
2938 /*
2939 * Send a prepare if not already done so. This might occur if we have
2940 * detected a concurrent abort while replaying the non-streaming
2941 * transaction.
2942 */
2943 if (!rbtxn_sent_prepare(txn))
2944 {
2945 rb->prepare(rb, txn, txn->final_lsn);
2947 }
2948}
2949
2950/*
2951 * This is used to handle COMMIT/ROLLBACK PREPARED.
2952 */
2953void
2955 XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2956 XLogRecPtr two_phase_at,
2957 TimestampTz commit_time, RepOriginId origin_id,
2958 XLogRecPtr origin_lsn, char *gid, bool is_commit)
2959{
2960 ReorderBufferTXN *txn;
2961 XLogRecPtr prepare_end_lsn;
2962 TimestampTz prepare_time;
2963
2964 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
2965
2966 /* unknown transaction, nothing to do */
2967 if (txn == NULL)
2968 return;
2969
2970 /*
2971 * By this time the txn has the prepare record information, remember it to
2972 * be later used for rollback.
2973 */
2974 prepare_end_lsn = txn->end_lsn;
2975 prepare_time = txn->xact_time.prepare_time;
2976
2977 /* add the gid in the txn */
2978 txn->gid = pstrdup(gid);
2979
2980 /*
2981 * It is possible that this transaction is not decoded at prepare time
2982 * either because by that time we didn't have a consistent snapshot, or
2983 * two_phase was not enabled, or it was decoded earlier but we have
2984 * restarted. We only need to send the prepare if it was not decoded
2985 * earlier. We don't need to decode the xact for aborts if it is not done
2986 * already.
2987 */
2988 if ((txn->final_lsn < two_phase_at) && is_commit)
2989 {
2990 /*
2991 * txn must have been marked as a prepared transaction and skipped but
2992 * not sent a prepare. Also, the prepare info must have been updated
2993 * in txn even if we skip prepare.
2994 */
2998
2999 /*
3000 * By this time the txn has the prepare record information and it is
3001 * important to use that so that downstream gets the accurate
3002 * information. If instead, we have passed commit information here
3003 * then downstream can behave as it has already replayed commit
3004 * prepared after the restart.
3005 */
3006 ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
3007 txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
3008 }
3009
3010 txn->final_lsn = commit_lsn;
3011 txn->end_lsn = end_lsn;
3012 txn->xact_time.commit_time = commit_time;
3013 txn->origin_id = origin_id;
3014 txn->origin_lsn = origin_lsn;
3015
3016 if (is_commit)
3017 rb->commit_prepared(rb, txn, commit_lsn);
3018 else
3019 rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
3020
3021 /* cleanup: make sure there's no cache pollution */
3023 txn->invalidations);
3024 ReorderBufferCleanupTXN(rb, txn);
3025}
3026
3027/*
3028 * Abort a transaction that possibly has previous changes. Needs to be first
3029 * called for subtransactions and then for the toplevel xid.
3030 *
3031 * NB: Transactions handled here have to have actively aborted (i.e. have
3032 * produced an abort record). Implicitly aborted transactions are handled via
3033 * ReorderBufferAbortOld(); transactions we're just not interested in, but
3034 * which have committed are handled in ReorderBufferForget().
3035 *
3036 * This function purges this transaction and its contents from memory and
3037 * disk.
3038 */
3039void
3041 TimestampTz abort_time)
3042{
3043 ReorderBufferTXN *txn;
3044
3045 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3046 false);
3047
3048 /* unknown, nothing to remove */
3049 if (txn == NULL)
3050 return;
3051
3052 txn->xact_time.abort_time = abort_time;
3053
3054 /* For streamed transactions notify the remote node about the abort. */
3055 if (rbtxn_is_streamed(txn))
3056 {
3057 rb->stream_abort(rb, txn, lsn);
3058
3059 /*
3060 * We might have decoded changes for this transaction that could load
3061 * the cache as per the current transaction's view (consider DDL's
3062 * happened in this transaction). We don't want the decoding of future
3063 * transactions to use those cache entries so execute invalidations.
3064 */
3065 if (txn->ninvalidations > 0)
3067 txn->invalidations);
3068 }
3069
3070 /* cosmetic... */
3071 txn->final_lsn = lsn;
3072
3073 /* remove potential on-disk data, and deallocate */
3074 ReorderBufferCleanupTXN(rb, txn);
3075}
3076
3077/*
3078 * Abort all transactions that aren't actually running anymore because the
3079 * server restarted.
3080 *
3081 * NB: These really have to be transactions that have aborted due to a server
3082 * crash/immediate restart, as we don't deal with invalidations here.
3083 */
3084void
3086{
3088
3089 /*
3090 * Iterate through all (potential) toplevel TXNs and abort all that are
3091 * older than what possibly can be running. Once we've found the first
3092 * that is alive we stop, there might be some that acquired an xid earlier
3093 * but started writing later, but it's unlikely and they will be cleaned
3094 * up in a later call to this function.
3095 */
3097 {
3098 ReorderBufferTXN *txn;
3099
3100 txn = dlist_container(ReorderBufferTXN, node, it.cur);
3101
3102 if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
3103 {
3104 elog(DEBUG2, "aborting old transaction %u", txn->xid);
3105
3106 /* Notify the remote node about the crash/immediate restart. */
3107 if (rbtxn_is_streamed(txn))
3108 rb->stream_abort(rb, txn, InvalidXLogRecPtr);
3109
3110 /* remove potential on-disk data, and deallocate this tx */
3111 ReorderBufferCleanupTXN(rb, txn);
3112 }
3113 else
3114 return;
3115 }
3116}
3117
3118/*
3119 * Forget the contents of a transaction if we aren't interested in its
3120 * contents. Needs to be first called for subtransactions and then for the
3121 * toplevel xid.
3122 *
3123 * This is significantly different to ReorderBufferAbort() because
3124 * transactions that have committed need to be treated differently from aborted
3125 * ones since they may have modified the catalog.
3126 *
3127 * Note that this is only allowed to be called in the moment a transaction
3128 * commit has just been read, not earlier; otherwise later records referring
3129 * to this xid might re-create the transaction incompletely.
3130 */
3131void
3133{
3134 ReorderBufferTXN *txn;
3135
3136 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3137 false);
3138
3139 /* unknown, nothing to forget */
3140 if (txn == NULL)
3141 return;
3142
3143 /* this transaction mustn't be streamed */
3145
3146 /* cosmetic... */
3147 txn->final_lsn = lsn;
3148
3149 /*
3150 * Process cache invalidation messages if there are any. Even if we're not
3151 * interested in the transaction's contents, it could have manipulated the
3152 * catalog and we need to update the caches according to that.
3153 */
3154 if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3156 txn->invalidations);
3157 else
3158 Assert(txn->ninvalidations == 0);
3159
3160 /* remove potential on-disk data, and deallocate */
3161 ReorderBufferCleanupTXN(rb, txn);
3162}
3163
3164/*
3165 * Invalidate cache for those transactions that need to be skipped just in case
3166 * catalogs were manipulated as part of the transaction.
3167 *
3168 * Note that this is a special-purpose function for prepared transactions where
3169 * we don't want to clean up the TXN even when we decide to skip it. See
3170 * DecodePrepare.
3171 */
3172void
3174{
3175 ReorderBufferTXN *txn;
3176
3177 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3178 false);
3179
3180 /* unknown, nothing to do */
3181 if (txn == NULL)
3182 return;
3183
3184 /*
3185 * Process cache invalidation messages if there are any. Even if we're not
3186 * interested in the transaction's contents, it could have manipulated the
3187 * catalog and we need to update the caches according to that.
3188 */
3189 if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3191 txn->invalidations);
3192 else
3193 Assert(txn->ninvalidations == 0);
3194}
3195
3196
3197/*
3198 * Execute invalidations happening outside the context of a decoded
3199 * transaction. That currently happens either for xid-less commits
3200 * (cf. RecordTransactionCommit()) or for invalidations in uninteresting
3201 * transactions (via ReorderBufferForget()).
3202 */
3203void
3205 SharedInvalidationMessage *invalidations)
3206{
3207 bool use_subtxn = IsTransactionOrTransactionBlock();
3208 int i;
3209
3210 if (use_subtxn)
3212
3213 /*
3214 * Force invalidations to happen outside of a valid transaction - that way
3215 * entries will just be marked as invalid without accessing the catalog.
3216 * That's advantageous because we don't need to setup the full state
3217 * necessary for catalog access.
3218 */
3219 if (use_subtxn)
3221
3222 for (i = 0; i < ninvalidations; i++)
3223 LocalExecuteInvalidationMessage(&invalidations[i]);
3224
3225 if (use_subtxn)
3227}
3228
3229/*
3230 * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
3231 * least once for every xid in XLogRecord->xl_xid (other places in records
3232 * may, but do not have to be passed through here).
3233 *
3234 * Reorderbuffer keeps some data structures about transactions in LSN order,
3235 * for efficiency. To do that it has to know about when transactions are seen
3236 * first in the WAL. As many types of records are not actually interesting for
3237 * logical decoding, they do not necessarily pass through here.
3238 */
3239void
3241{
3242 /* many records won't have an xid assigned, centralize check here */
3243 if (xid != InvalidTransactionId)
3244 ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3245}
3246
3247/*
3248 * Add a new snapshot to this transaction that may only used after lsn 'lsn'
3249 * because the previous snapshot doesn't describe the catalog correctly for
3250 * following rows.
3251 */
3252void
3254 XLogRecPtr lsn, Snapshot snap)
3255{
3257
3258 change->data.snapshot = snap;
3260
3261 ReorderBufferQueueChange(rb, xid, lsn, change, false);
3262}
3263
3264/*
3265 * Set up the transaction's base snapshot.
3266 *
3267 * If we know that xid is a subtransaction, set the base snapshot on the
3268 * top-level transaction instead.
3269 */
3270void
3272 XLogRecPtr lsn, Snapshot snap)
3273{
3274 ReorderBufferTXN *txn;
3275 bool is_new;
3276
3277 Assert(snap != NULL);
3278
3279 /*
3280 * Fetch the transaction to operate on. If we know it's a subtransaction,
3281 * operate on its top-level transaction instead.
3282 */
3283 txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
3284 if (rbtxn_is_known_subxact(txn))
3285 txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3286 NULL, InvalidXLogRecPtr, false);
3287 Assert(txn->base_snapshot == NULL);
3288
3289 txn->base_snapshot = snap;
3290 txn->base_snapshot_lsn = lsn;
3292
3294}
3295
3296/*
3297 * Access the catalog with this CommandId at this point in the changestream.
3298 *
3299 * May only be called for command ids > 1
3300 */
3301void
3303 XLogRecPtr lsn, CommandId cid)
3304{
3306
3307 change->data.command_id = cid;
3309
3310 ReorderBufferQueueChange(rb, xid, lsn, change, false);
3311}
3312
3313/*
3314 * Update memory counters to account for the new or removed change.
3315 *
3316 * We update two counters - in the reorder buffer, and in the transaction
3317 * containing the change. The reorder buffer counter allows us to quickly
3318 * decide if we reached the memory limit, the transaction counter allows
3319 * us to quickly pick the largest transaction for eviction.
3320 *
3321 * Either txn or change must be non-NULL at least. We update the memory
3322 * counter of txn if it's non-NULL, otherwise change->txn.
3323 *
3324 * When streaming is enabled, we need to update the toplevel transaction
3325 * counters instead - we don't really care about subtransactions as we
3326 * can't stream them individually anyway, and we only pick toplevel
3327 * transactions for eviction. So only toplevel transactions matter.
3328 */
3329static void
3331 ReorderBufferChange *change,
3332 ReorderBufferTXN *txn,
3333 bool addition, Size sz)
3334{
3335 ReorderBufferTXN *toptxn;
3336
3337 Assert(txn || change);
3338
3339 /*
3340 * Ignore tuple CID changes, because those are not evicted when reaching
3341 * memory limit. So we just don't count them, because it might easily
3342 * trigger a pointless attempt to spill.
3343 */
3344 if (change && change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
3345 return;
3346
3347 if (sz == 0)
3348 return;
3349
3350 if (txn == NULL)
3351 txn = change->txn;
3352 Assert(txn != NULL);
3353
3354 /*
3355 * Update the total size in top level as well. This is later used to
3356 * compute the decoding stats.
3357 */
3358 toptxn = rbtxn_get_toptxn(txn);
3359
3360 if (addition)
3361 {
3362 Size oldsize = txn->size;
3363
3364 txn->size += sz;
3365 rb->size += sz;
3366
3367 /* Update the total size in the top transaction. */
3368 toptxn->total_size += sz;
3369
3370 /* Update the max-heap */
3371 if (oldsize != 0)
3373 pairingheap_add(rb->txn_heap, &txn->txn_node);
3374 }
3375 else
3376 {
3377 Assert((rb->size >= sz) && (txn->size >= sz));
3378 txn->size -= sz;
3379 rb->size -= sz;
3380
3381 /* Update the total size in the top transaction. */
3382 toptxn->total_size -= sz;
3383
3384 /* Update the max-heap */
3386 if (txn->size != 0)
3387 pairingheap_add(rb->txn_heap, &txn->txn_node);
3388 }
3389
3390 Assert(txn->size <= rb->size);
3391}
3392
3393/*
3394 * Add new (relfilelocator, tid) -> (cmin, cmax) mappings.
3395 *
3396 * We do not include this change type in memory accounting, because we
3397 * keep CIDs in a separate list and do not evict them when reaching
3398 * the memory limit.
3399 */
3400void
3402 XLogRecPtr lsn, RelFileLocator locator,
3403 ItemPointerData tid, CommandId cmin,
3404 CommandId cmax, CommandId combocid)
3405{
3407 ReorderBufferTXN *txn;
3408
3409 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3410
3411 change->data.tuplecid.locator = locator;
3412 change->data.tuplecid.tid = tid;
3413 change->data.tuplecid.cmin = cmin;
3414 change->data.tuplecid.cmax = cmax;
3415 change->data.tuplecid.combocid = combocid;
3416 change->lsn = lsn;
3417 change->txn = txn;
3419
3420 dlist_push_tail(&txn->tuplecids, &change->node);
3421 txn->ntuplecids++;
3422}
3423
3424/*
3425 * Accumulate the invalidations for executing them later.
3426 *
3427 * This needs to be called for each XLOG_XACT_INVALIDATIONS message and
3428 * accumulates all the invalidation messages in the toplevel transaction, if
3429 * available, otherwise in the current transaction, as well as in the form of
3430 * change in reorder buffer. We require to record it in form of the change
3431 * so that we can execute only the required invalidations instead of executing
3432 * all the invalidations on each CommandId increment. We also need to
3433 * accumulate these in the txn buffer because in some cases where we skip
3434 * processing the transaction (see ReorderBufferForget), we need to execute
3435 * all the invalidations together.
3436 */
3437void
3439 XLogRecPtr lsn, Size nmsgs,
3441{
3442 ReorderBufferTXN *txn;
3443 MemoryContext oldcontext;
3444 ReorderBufferChange *change;
3445
3446 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3447
3448 oldcontext = MemoryContextSwitchTo(rb->context);
3449
3450 /*
3451 * Collect all the invalidations under the top transaction, if available,
3452 * so that we can execute them all together. See comments atop this
3453 * function.
3454 */
3455 txn = rbtxn_get_toptxn(txn);
3456
3457 Assert(nmsgs > 0);
3458
3459 /* Accumulate invalidations. */
3460 if (txn->ninvalidations == 0)
3461 {
3462 txn->ninvalidations = nmsgs;
3464 palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3465 memcpy(txn->invalidations, msgs,
3466 sizeof(SharedInvalidationMessage) * nmsgs);
3467 }
3468 else
3469 {
3472 (txn->ninvalidations + nmsgs));
3473
3474 memcpy(txn->invalidations + txn->ninvalidations, msgs,
3475 nmsgs * sizeof(SharedInvalidationMessage));
3476 txn->ninvalidations += nmsgs;
3477 }
3478
3479 change = ReorderBufferAllocChange(rb);
3481 change->data.inval.ninvalidations = nmsgs;
3483 palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3484 memcpy(change->data.inval.invalidations, msgs,
3485 sizeof(SharedInvalidationMessage) * nmsgs);
3486
3487 ReorderBufferQueueChange(rb, xid, lsn, change, false);
3488
3489 MemoryContextSwitchTo(oldcontext);
3490}
3491
3492/*
3493 * Apply all invalidations we know. Possibly we only need parts at this point
3494 * in the changestream but we don't know which those are.
3495 */
3496static void
3498{
3499 int i;
3500
3501 for (i = 0; i < nmsgs; i++)
3503}
3504
3505/*
3506 * Mark a transaction as containing catalog changes
3507 */
3508void
3510 XLogRecPtr lsn)
3511{
3512 ReorderBufferTXN *txn;
3513
3514 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3515
3516 if (!rbtxn_has_catalog_changes(txn))
3517 {
3520 }
3521
3522 /*
3523 * Mark top-level transaction as having catalog changes too if one of its
3524 * children has so that the ReorderBufferBuildTupleCidHash can
3525 * conveniently check just top-level transaction and decide whether to
3526 * build the hash table or not.
3527 */
3528 if (rbtxn_is_subtxn(txn))
3529 {
3530 ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
3531
3532 if (!rbtxn_has_catalog_changes(toptxn))
3533 {
3536 }
3537 }
3538}
3539
3540/*
3541 * Return palloc'ed array of the transactions that have changed catalogs.
3542 * The returned array is sorted in xidComparator order.
3543 *
3544 * The caller must free the returned array when done with it.
3545 */
3548{
3549 dlist_iter iter;
3550 TransactionId *xids = NULL;
3551 size_t xcnt = 0;
3552
3553 /* Quick return if the list is empty */
3554 if (dclist_count(&rb->catchange_txns) == 0)
3555 return NULL;
3556
3557 /* Initialize XID array */
3558 xids = (TransactionId *) palloc(sizeof(TransactionId) *
3560 dclist_foreach(iter, &rb->catchange_txns)
3561 {
3563 catchange_node,
3564 iter.cur);
3565
3567
3568 xids[xcnt++] = txn->xid;
3569 }
3570
3571 qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
3572
3573 Assert(xcnt == dclist_count(&rb->catchange_txns));
3574 return xids;
3575}
3576
3577/*
3578 * Query whether a transaction is already *known* to contain catalog
3579 * changes. This can be wrong until directly before the commit!
3580 */
3581bool
3583{
3584 ReorderBufferTXN *txn;
3585
3586 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3587 false);
3588 if (txn == NULL)
3589 return false;
3590
3591 return rbtxn_has_catalog_changes(txn);
3592}
3593
3594/*
3595 * ReorderBufferXidHasBaseSnapshot
3596 * Have we already set the base snapshot for the given txn/subtxn?
3597 */
3598bool
3600{
3601 ReorderBufferTXN *txn;
3602
3603 txn = ReorderBufferTXNByXid(rb, xid, false,
3604 NULL, InvalidXLogRecPtr, false);
3605
3606 /* transaction isn't known yet, ergo no snapshot */
3607 if (txn == NULL)
3608 return false;
3609
3610 /* a known subtxn? operate on top-level txn instead */
3611 if (rbtxn_is_known_subxact(txn))
3612 txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3613 NULL, InvalidXLogRecPtr, false);
3614
3615 return txn->base_snapshot != NULL;
3616}
3617
3618
3619/*
3620 * ---------------------------------------
3621 * Disk serialization support
3622 * ---------------------------------------
3623 */
3624
3625/*
3626 * Ensure the IO buffer is >= sz.
3627 */
3628static void
3630{
3631 if (!rb->outbufsize)
3632 {
3633 rb->outbuf = MemoryContextAlloc(rb->context, sz);
3634 rb->outbufsize = sz;
3635 }
3636 else if (rb->outbufsize < sz)
3637 {
3638 rb->outbuf = repalloc(rb->outbuf, sz);
3639 rb->outbufsize = sz;
3640 }
3641}
3642
3643
3644/* Compare two transactions by size */
3645static int
3647{
3650
3651 if (ta->size < tb->size)
3652 return -1;
3653 if (ta->size > tb->size)
3654 return 1;
3655 return 0;
3656}
3657
3658/*
3659 * Find the largest transaction (toplevel or subxact) to evict (spill to disk).
3660 */
3661static ReorderBufferTXN *
3663{
3664 ReorderBufferTXN *largest;
3665
3666 /* Get the largest transaction from the max-heap */
3667 largest = pairingheap_container(ReorderBufferTXN, txn_node,
3669
3670 Assert(largest);
3671 Assert(largest->size > 0);
3672 Assert(largest->size <= rb->size);
3673
3674 return largest;
3675}
3676
3677/*
3678 * Find the largest streamable (and non-aborted) toplevel transaction to evict
3679 * (by streaming).
3680 *
3681 * This can be seen as an optimized version of ReorderBufferLargestTXN, which
3682 * should give us the same transaction (because we don't update memory account
3683 * for subtransaction with streaming, so it's always 0). But we can simply
3684 * iterate over the limited number of toplevel transactions that have a base
3685 * snapshot. There is no use of selecting a transaction that doesn't have base
3686 * snapshot because we don't decode such transactions. Also, we do not select
3687 * the transaction which doesn't have any streamable change.
3688 *
3689 * Note that, we skip transactions that contain incomplete changes. There
3690 * is a scope of optimization here such that we can select the largest
3691 * transaction which has incomplete changes. But that will make the code and
3692 * design quite complex and that might not be worth the benefit. If we plan to
3693 * stream the transactions that contain incomplete changes then we need to
3694 * find a way to partially stream/truncate the transaction changes in-memory
3695 * and build a mechanism to partially truncate the spilled files.
3696 * Additionally, whenever we partially stream the transaction we need to
3697 * maintain the last streamed lsn and next time we need to restore from that
3698 * segment and the offset in WAL. As we stream the changes from the top
3699 * transaction and restore them subtransaction wise, we need to even remember
3700 * the subxact from where we streamed the last change.
3701 */
3702static ReorderBufferTXN *
3704{
3705 dlist_iter iter;
3706 Size largest_size = 0;
3707 ReorderBufferTXN *largest = NULL;
3708
3709 /* Find the largest top-level transaction having a base snapshot. */
3711 {
3712 ReorderBufferTXN *txn;
3713
3714 txn = dlist_container(ReorderBufferTXN, base_snapshot_node, iter.cur);
3715
3716 /* must not be a subtxn */
3718 /* base_snapshot must be set */
3719 Assert(txn->base_snapshot != NULL);
3720
3721 /* Don't consider these kinds of transactions for eviction. */
3722 if (rbtxn_has_partial_change(txn) ||
3724 rbtxn_is_aborted(txn))
3725 continue;
3726
3727 /* Find the largest of the eviction candidates. */
3728 if ((largest == NULL || txn->total_size > largest_size) &&
3729 (txn->total_size > 0))
3730 {
3731 largest = txn;
3732 largest_size = txn->total_size;
3733 }
3734 }
3735
3736 return largest;
3737}
3738
3739/*
3740 * Check whether the logical_decoding_work_mem limit was reached, and if yes
3741 * pick the largest (sub)transaction at-a-time to evict and spill its changes to
3742 * disk or send to the output plugin until we reach under the memory limit.
3743 *
3744 * If debug_logical_replication_streaming is set to "immediate", stream or
3745 * serialize the changes immediately.
3746 *
3747 * XXX At this point we select the transactions until we reach under the memory
3748 * limit, but we might also adapt a more elaborate eviction strategy - for example
3749 * evicting enough transactions to free certain fraction (e.g. 50%) of the memory
3750 * limit.
3751 */
3752static void
3754{
3755 ReorderBufferTXN *txn;
3756
3757 /*
3758 * Bail out if debug_logical_replication_streaming is buffered and we
3759 * haven't exceeded the memory limit.
3760 */
3762 rb->size < logical_decoding_work_mem * (Size) 1024)
3763 return;
3764
3765 /*
3766 * If debug_logical_replication_streaming is immediate, loop until there's
3767 * no change. Otherwise, loop until we reach under the memory limit. One
3768 * might think that just by evicting the largest (sub)transaction we will
3769 * come under the memory limit based on assumption that the selected
3770 * transaction is at least as large as the most recent change (which
3771 * caused us to go over the memory limit). However, that is not true
3772 * because a user can reduce the logical_decoding_work_mem to a smaller
3773 * value before the most recent change.
3774 */
3775 while (rb->size >= logical_decoding_work_mem * (Size) 1024 ||
3777 rb->size > 0))
3778 {
3779 /*
3780 * Pick the largest non-aborted transaction and evict it from memory
3781 * by streaming, if possible. Otherwise, spill to disk.
3782 */
3784 (txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
3785 {
3786 /* we know there has to be one, because the size is not zero */
3787 Assert(txn && rbtxn_is_toptxn(txn));
3788 Assert(txn->total_size > 0);
3789 Assert(rb->size >= txn->total_size);
3790
3791 /* skip the transaction if aborted */
3793 continue;
3794
3795 ReorderBufferStreamTXN(rb, txn);
3796 }
3797 else
3798 {
3799 /*
3800 * Pick the largest transaction (or subtransaction) and evict it
3801 * from memory by serializing it to disk.
3802 */
3803 txn = ReorderBufferLargestTXN(rb);
3804
3805 /* we know there has to be one, because the size is not zero */
3806 Assert(txn);
3807 Assert(txn->size > 0);
3808 Assert(rb->size >= txn->size);
3809
3810 /* skip the transaction if aborted */
3812 continue;
3813
3815 }
3816
3817 /*
3818 * After eviction, the transaction should have no entries in memory,
3819 * and should use 0 bytes for changes.
3820 */
3821 Assert(txn->size == 0);
3822 Assert(txn->nentries_mem == 0);
3823 }
3824
3825 /* We must be under the memory limit now. */
3826 Assert(rb->size < logical_decoding_work_mem * (Size) 1024);
3827}
3828
3829/*
3830 * Spill data of a large transaction (and its subtransactions) to disk.
3831 */
3832static void
3834{
3835 dlist_iter subtxn_i;
3836 dlist_mutable_iter change_i;
3837 int fd = -1;
3838 XLogSegNo curOpenSegNo = 0;
3839 Size spilled = 0;
3840 Size size = txn->size;
3841
3842 elog(DEBUG2, "spill %u changes in XID %u to disk",
3843 (uint32) txn->nentries_mem, txn->xid);
3844
3845 /* do the same to all child TXs */
3846 dlist_foreach(subtxn_i, &txn->subtxns)
3847 {
3848 ReorderBufferTXN *subtxn;
3849
3850 subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
3851 ReorderBufferSerializeTXN(rb, subtxn);
3852 }
3853
3854 /* serialize changestream */
3855 dlist_foreach_modify(change_i, &txn->changes)
3856 {
3857 ReorderBufferChange *change;
3858
3859 change = dlist_container(ReorderBufferChange, node, change_i.cur);
3860
3861 /*
3862 * store in segment in which it belongs by start lsn, don't split over
3863 * multiple segments tho
3864 */
3865 if (fd == -1 ||
3866 !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
3867 {
3868 char path[MAXPGPATH];
3869
3870 if (fd != -1)
3872
3873 XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
3874
3875 /*
3876 * No need to care about TLIs here, only used during a single run,
3877 * so each LSN only maps to a specific WAL record.
3878 */
3880 curOpenSegNo);
3881
3882 /* open segment, create it if necessary */
3883 fd = OpenTransientFile(path,
3884 O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
3885
3886 if (fd < 0)
3887 ereport(ERROR,
3889 errmsg("could not open file \"%s\": %m", path)));
3890 }
3891
3892 ReorderBufferSerializeChange(rb, txn, fd, change);
3893 dlist_delete(&change->node);
3894 ReorderBufferFreeChange(rb, change, false);
3895
3896 spilled++;
3897 }
3898
3899 /* Update the memory counter */
3900 ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, size);
3901
3902 /* update the statistics iff we have spilled anything */
3903 if (spilled)
3904 {
3905 rb->spillCount += 1;
3906 rb->spillBytes += size;
3907
3908 /* don't consider already serialized transactions */
3909 rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
3910
3911 /* update the decoding stats */
3913 }
3914
3915 Assert(spilled == txn->nentries_mem);
3917 txn->nentries_mem = 0;
3919
3920 if (fd != -1)
3922}
3923
3924/*
3925 * Serialize individual change to disk.
3926 */
3927static void
3929 int fd, ReorderBufferChange *change)
3930{
3932 Size sz = sizeof(ReorderBufferDiskChange);
3933
3935
3936 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3937 memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
3938
3939 switch (change->action)
3940 {
3941 /* fall through these, they're all similar enough */
3946 {
3947 char *data;
3948 HeapTuple oldtup,
3949 newtup;
3950 Size oldlen = 0;
3951 Size newlen = 0;
3952
3953 oldtup = change->data.tp.oldtuple;
3954 newtup = change->data.tp.newtuple;
3955
3956 if (oldtup)
3957 {
3958 sz += sizeof(HeapTupleData);
3959 oldlen = oldtup->t_len;
3960 sz += oldlen;
3961 }
3962
3963 if (newtup)
3964 {
3965 sz += sizeof(HeapTupleData);
3966 newlen = newtup->t_len;
3967 sz += newlen;
3968 }
3969
3970 /* make sure we have enough space */
3972
3973 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3974 /* might have been reallocated above */
3975 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3976
3977 if (oldlen)
3978 {
3979 memcpy(data, oldtup, sizeof(HeapTupleData));
3980 data += sizeof(HeapTupleData);
3981
3982 memcpy(data, oldtup->t_data, oldlen);
3983 data += oldlen;
3984 }
3985
3986 if (newlen)
3987 {
3988 memcpy(data, newtup, sizeof(HeapTupleData));
3989 data += sizeof(HeapTupleData);
3990
3991 memcpy(data, newtup->t_data, newlen);
3992 data += newlen;
3993 }
3994 break;
3995 }
3997 {
3998 char *data;
3999 Size prefix_size = strlen(change->data.msg.prefix) + 1;
4000
4001 sz += prefix_size + change->data.msg.message_size +
4002 sizeof(Size) + sizeof(Size);
4004
4005 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
4006
4007 /* might have been reallocated above */
4008 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4009
4010 /* write the prefix including the size */
4011 memcpy(data, &prefix_size, sizeof(Size));
4012 data += sizeof(Size);
4013 memcpy(data, change->data.msg.prefix,
4014 prefix_size);
4015 data += prefix_size;
4016
4017 /* write the message including the size */
4018 memcpy(data, &change->data.msg.message_size, sizeof(Size));
4019 data += sizeof(Size);
4020 memcpy(data, change->data.msg.message,
4021 change->data.msg.message_size);
4022 data += change->data.msg.message_size;
4023
4024 break;
4025 }
4027 {
4028 char *data;
4029 Size inval_size = sizeof(SharedInvalidationMessage) *
4030 change->data.inval.ninvalidations;
4031
4032 sz += inval_size;
4033
4035 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
4036
4037 /* might have been reallocated above */
4038 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4039 memcpy(data, change->data.inval.invalidations, inval_size);
4040 data += inval_size;
4041
4042 break;
4043 }
4045 {
4046 Snapshot snap;
4047 char *data;
4048
4049 snap = change->data.snapshot;
4050
4051 sz += sizeof(SnapshotData) +
4052 sizeof(TransactionId) * snap->xcnt +
4053 sizeof(TransactionId) * snap->subxcnt;
4054
4055 /* make sure we have enough space */
4057 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
4058 /* might have been reallocated above */
4059 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4060
4061 memcpy(data, snap, sizeof(SnapshotData));
4062 data += sizeof(SnapshotData);
4063
4064 if (snap->xcnt)
4065 {
4066 memcpy(data, snap->xip,
4067 sizeof(TransactionId) * snap->xcnt);
4068 data += sizeof(TransactionId) * snap->xcnt;
4069 }
4070
4071 if (snap->subxcnt)
4072 {
4073 memcpy(data, snap->subxip,
4074 sizeof(TransactionId) * snap->subxcnt);
4075 data += sizeof(TransactionId) * snap->subxcnt;
4076 }
4077 break;
4078 }
4080 {
4081 Size size;
4082 char *data;
4083
4084 /* account for the OIDs of truncated relations */
4085 size = sizeof(Oid) * change->data.truncate.nrelids;
4086 sz += size;
4087
4088 /* make sure we have enough space */
4090
4091 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
4092 /* might have been reallocated above */
4093 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4094
4095 memcpy(data, change->data.truncate.relids, size);
4096 data += size;
4097
4098 break;
4099 }
4104 /* ReorderBufferChange contains everything important */
4105 break;
4106 }
4107
4108 ondisk->size = sz;
4109
4110 errno = 0;
4111 pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
4112 if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
4113 {
4114 int save_errno = errno;
4115
4117
4118 /* if write didn't set errno, assume problem is no disk space */
4119 errno = save_errno ? save_errno : ENOSPC;
4120 ereport(ERROR,
4122 errmsg("could not write to data file for XID %u: %m",
4123 txn->xid)));
4124 }
4126
4127 /*
4128 * Keep the transaction's final_lsn up to date with each change we send to
4129 * disk, so that ReorderBufferRestoreCleanup works correctly. (We used to
4130 * only do this on commit and abort records, but that doesn't work if a
4131 * system crash leaves a transaction without its abort record).
4132 *
4133 * Make sure not to move it backwards.
4134 */
4135 if (txn->final_lsn < change->lsn)
4136 txn->final_lsn = change->lsn;
4137
4138 Assert(ondisk->change.action == change->action);
4139}
4140
4141/* Returns true, if the output plugin supports streaming, false, otherwise. */
4142static inline bool
4144{
4146
4147 return ctx->streaming;
4148}
4149
4150/* Returns true, if the streaming can be started now, false, otherwise. */
4151static inline bool
4153{
4155 SnapBuild *builder = ctx->snapshot_builder;
4156
4157 /* We can't start streaming unless a consistent state is reached. */
4159 return false;
4160
4161 /*
4162 * We can't start streaming immediately even if the streaming is enabled
4163 * because we previously decoded this transaction and now just are
4164 * restarting.
4165 */
4166 if (ReorderBufferCanStream(rb) &&
4167 !SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr))
4168 return true;
4169
4170 return false;
4171}
4172
4173/*
4174 * Send data of a large transaction (and its subtransactions) to the
4175 * output plugin, but using the stream API.
4176 */
4177static void
4179{
4180 Snapshot snapshot_now;
4181 CommandId command_id;
4182 Size stream_bytes;
4183 bool txn_is_streamed;
4184
4185 /* We can never reach here for a subtransaction. */
4186 Assert(rbtxn_is_toptxn(txn));
4187
4188 /*
4189 * We can't make any assumptions about base snapshot here, similar to what
4190 * ReorderBufferCommit() does. That relies on base_snapshot getting
4191 * transferred from subxact in ReorderBufferCommitChild(), but that was
4192 * not yet called as the transaction is in-progress.
4193 *
4194 * So just walk the subxacts and use the same logic here. But we only need
4195 * to do that once, when the transaction is streamed for the first time.
4196 * After that we need to reuse the snapshot from the previous run.
4197 *
4198 * Unlike DecodeCommit which adds xids of all the subtransactions in
4199 * snapshot's xip array via SnapBuildCommitTxn, we can't do that here but
4200 * we do add them to subxip array instead via ReorderBufferCopySnap. This
4201 * allows the catalog changes made in subtransactions decoded till now to
4202 * be visible.
4203 */
4204 if (txn->snapshot_now == NULL)
4205 {
4206 dlist_iter subxact_i;
4207
4208 /* make sure this transaction is streamed for the first time */
4210
4211 /* at the beginning we should have invalid command ID */
4213
4214 dlist_foreach(subxact_i, &txn->subtxns)
4215 {
4216 ReorderBufferTXN *subtxn;
4217
4218 subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
4220 }
4221
4222 /*
4223 * If this transaction has no snapshot, it didn't make any changes to
4224 * the database till now, so there's nothing to decode.
4225 */
4226 if (txn->base_snapshot == NULL)
4227 {
4228 Assert(txn->ninvalidations == 0);
4229 return;
4230 }
4231
4232 command_id = FirstCommandId;
4233 snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
4234 txn, command_id);
4235 }
4236 else
4237 {
4238 /* the transaction must have been already streamed */
4240
4241 /*
4242 * Nah, we already have snapshot from the previous streaming run. We
4243 * assume new subxacts can't move the LSN backwards, and so can't beat
4244 * the LSN condition in the previous branch (so no need to walk
4245 * through subxacts again). In fact, we must not do that as we may be
4246 * using snapshot half-way through the subxact.
4247 */
4248 command_id = txn->command_id;
4249
4250 /*
4251 * We can't use txn->snapshot_now directly because after the last
4252 * streaming run, we might have got some new sub-transactions. So we
4253 * need to add them to the snapshot.
4254 */
4255 snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now,
4256 txn, command_id);
4257
4258 /* Free the previously copied snapshot. */
4259 Assert(txn->snapshot_now->copied);
4261 txn->snapshot_now = NULL;
4262 }
4263
4264 /*
4265 * Remember this information to be used later to update stats. We can't
4266 * update the stats here as an error while processing the changes would
4267 * lead to the accumulation of stats even though we haven't streamed all
4268 * the changes.
4269 */
4270 txn_is_streamed = rbtxn_is_streamed(txn);
4271 stream_bytes = txn->total_size;
4272
4273 /* Process and send the changes to output plugin. */
4274 ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
4275 command_id, true);
4276
4277 rb->streamCount += 1;
4278 rb->streamBytes += stream_bytes;
4279
4280 /* Don't consider already streamed transaction. */
4281 rb->streamTxns += (txn_is_streamed) ? 0 : 1;
4282
4283 /* update the decoding stats */
4285
4287 Assert(txn->nentries == 0);
4288 Assert(txn->nentries_mem == 0);
4289}
4290
4291/*
4292 * Size of a change in memory.
4293 */
4294static Size
4296{
4297 Size sz = sizeof(ReorderBufferChange);
4298
4299 switch (change->action)
4300 {
4301 /* fall through these, they're all similar enough */
4306 {
4307 HeapTuple oldtup,
4308 newtup;
4309 Size oldlen = 0;
4310 Size newlen = 0;
4311
4312 oldtup = change->data.tp.oldtuple;
4313 newtup = change->data.tp.newtuple;
4314
4315 if (oldtup)
4316 {
4317 sz += sizeof(HeapTupleData);
4318 oldlen = oldtup->t_len;
4319 sz += oldlen;
4320 }
4321
4322 if (newtup)
4323 {
4324 sz += sizeof(HeapTupleData);
4325 newlen = newtup->t_len;
4326 sz += newlen;
4327 }
4328
4329 break;
4330 }
4332 {
4333 Size prefix_size = strlen(change->data.msg.prefix) + 1;
4334
4335 sz += prefix_size + change->data.msg.message_size +
4336 sizeof(Size) + sizeof(Size);
4337
4338 break;
4339 }
4341 {
4342 sz += sizeof(SharedInvalidationMessage) *
4343 change->data.inval.ninvalidations;
4344 break;
4345 }
4347 {
4348 Snapshot snap;
4349
4350 snap = change->data.snapshot;
4351
4352 sz += sizeof(SnapshotData) +
4353 sizeof(TransactionId) * snap->xcnt +
4354 sizeof(TransactionId) * snap->subxcnt;
4355
4356 break;
4357 }
4359 {
4360 sz += sizeof(Oid) * change->data.truncate.nrelids;
4361
4362 break;
4363 }
4368 /* ReorderBufferChange contains everything important */
4369 break;
4370 }
4371
4372 return sz;
4373}
4374
4375
4376/*
4377 * Restore a number of changes spilled to disk back into memory.
4378 */
4379static Size
4381 TXNEntryFile *file, XLogSegNo *segno)
4382{
4383 Size restored = 0;
4384 XLogSegNo last_segno;
4385 dlist_mutable_iter cleanup_iter;
4386 File *fd = &file->vfd;
4387
4390
4391 /* free current entries, so we have memory for more */
4392 dlist_foreach_modify(cleanup_iter, &txn->changes)
4393 {
4395 dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
4396
4397 dlist_delete(&cleanup->node);
4399 }
4400 txn->nentries_mem = 0;
4402
4403 XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
4404
4405 while (restored < max_changes_in_memory && *segno <= last_segno)
4406 {
4407 int readBytes;
4409
4411
4412 if (*fd == -1)
4413 {
4414 char path[MAXPGPATH];
4415
4416 /* first time in */
4417 if (*segno == 0)
4418 XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
4419
4420 Assert(*segno != 0 || dlist_is_empty(&txn->changes));
4421
4422 /*
4423 * No need to care about TLIs here, only used during a single run,
4424 * so each LSN only maps to a specific WAL record.
4425 */
4427 *segno);
4428
4429 *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
4430
4431 /* No harm in resetting the offset even in case of failure */
4432 file->curOffset = 0;
4433
4434 if (*fd < 0 && errno == ENOENT)
4435 {
4436 *fd = -1;
4437 (*segno)++;
4438 continue;
4439 }
4440 else if (*fd < 0)
4441 ereport(ERROR,
4443 errmsg("could not open file \"%s\": %m",
4444 path)));
4445 }
4446
4447 /*
4448 * Read the statically sized part of a change which has information
4449 * about the total size. If we couldn't read a record, we're at the
4450 * end of this file.
4451 */
4453 readBytes = FileRead(file->vfd, rb->outbuf,
4455 file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
4456
4457 /* eof */
4458 if (readBytes == 0)
4459 {
4460 FileClose(*fd);
4461 *fd = -1;
4462 (*segno)++;
4463 continue;
4464 }
4465 else if (readBytes < 0)
4466 ereport(ERROR,
4468 errmsg("could not read from reorderbuffer spill file: %m")));
4469 else if (readBytes != sizeof(ReorderBufferDiskChange))
4470 ereport(ERROR,
4472 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4473 readBytes,
4474 (uint32) sizeof(ReorderBufferDiskChange))));
4475
4476 file->curOffset += readBytes;
4477
4478 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4479
4481 sizeof(ReorderBufferDiskChange) + ondisk->size);
4482 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4483
4484 readBytes = FileRead(file->vfd,
4485 rb->outbuf + sizeof(ReorderBufferDiskChange),
4486 ondisk->size - sizeof(ReorderBufferDiskChange),
4487 file->curOffset,
4488 WAIT_EVENT_REORDER_BUFFER_READ);
4489
4490 if (readBytes < 0)
4491 ereport(ERROR,
4493 errmsg("could not read from reorderbuffer spill file: %m")));
4494 else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
4495 ereport(ERROR,
4497 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4498 readBytes,
4499 (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
4500
4501 file->curOffset += readBytes;
4502
4503 /*
4504 * ok, read a full change from disk, now restore it into proper
4505 * in-memory format
4506 */
4507 ReorderBufferRestoreChange(rb, txn, rb->outbuf);
4508 restored++;
4509 }
4510
4511 return restored;
4512}
4513
4514/*
4515 * Convert change from its on-disk format to in-memory format and queue it onto
4516 * the TXN's ->changes list.
4517 *
4518 * Note: although "data" is declared char*, at entry it points to a
4519 * maxalign'd buffer, making it safe in most of this function to assume
4520 * that the pointed-to data is suitably aligned for direct access.
4521 */
4522static void
4524 char *data)
4525{
4527 ReorderBufferChange *change;
4528
4529 ondisk = (ReorderBufferDiskChange *) data;
4530
4531 change = ReorderBufferAllocChange(rb);
4532
4533 /* copy static part */
4534 memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
4535
4536 data += sizeof(ReorderBufferDiskChange);
4537
4538 /* restore individual stuff */
4539 switch (change->action)
4540 {
4541 /* fall through these, they're all similar enough */
4546 if (change->data.tp.oldtuple)
4547 {
4548 uint32 tuplelen = ((HeapTuple) data)->t_len;
4549
4550 change->data.tp.oldtuple =
4552
4553 /* restore ->tuple */
4554 memcpy(change->data.tp.oldtuple, data,
4555 sizeof(HeapTupleData));
4556 data += sizeof(HeapTupleData);
4557
4558 /* reset t_data pointer into the new tuplebuf */
4559 change->data.tp.oldtuple->t_data =
4560 (HeapTupleHeader) ((char *) change->data.tp.oldtuple + HEAPTUPLESIZE);
4561
4562 /* restore tuple data itself */
4563 memcpy(change->data.tp.oldtuple->t_data, data, tuplelen);
4564 data += tuplelen;
4565 }
4566
4567 if (change->data.tp.newtuple)
4568 {
4569 /* here, data might not be suitably aligned! */
4570 uint32 tuplelen;
4571
4572 memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
4573 sizeof(uint32));
4574
4575 change->data.tp.newtuple =
4577
4578 /* restore ->tuple */
4579 memcpy(change->data.tp.newtuple, data,
4580 sizeof(HeapTupleData));
4581 data += sizeof(HeapTupleData);
4582
4583 /* reset t_data pointer into the new tuplebuf */
4584 change->data.tp.newtuple->t_data =
4585 (HeapTupleHeader) ((char *) change->data.tp.newtuple + HEAPTUPLESIZE);
4586
4587 /* restore tuple data itself */
4588 memcpy(change->data.tp.newtuple->t_data, data, tuplelen);
4589 data += tuplelen;
4590 }
4591
4592 break;
4594 {
4595 Size prefix_size;
4596
4597 /* read prefix */
4598 memcpy(&prefix_size, data, sizeof(Size));
4599 data += sizeof(Size);
4601 prefix_size);
4602 memcpy(change->data.msg.prefix, data, prefix_size);
4603 Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
4604 data += prefix_size;
4605
4606 /* read the message */
4607 memcpy(&change->data.msg.message_size, data, sizeof(Size));
4608 data += sizeof(Size);
4610 change->data.msg.message_size);
4611 memcpy(change->data.msg.message, data,
4612 change->data.msg.message_size);
4613 data += change->data.msg.message_size;
4614
4615 break;
4616 }
4618 {
4619 Size inval_size = sizeof(SharedInvalidationMessage) *
4620 change->data.inval.ninvalidations;
4621
4622 change->data.inval.invalidations =
4623 MemoryContextAlloc(rb->context, inval_size);
4624
4625 /* read the message */
4626 memcpy(change->data.inval.invalidations, data, inval_size);
4627
4628 break;
4629 }
4631 {
4632 Snapshot oldsnap;
4633 Snapshot newsnap;
4634 Size size;
4635
4636 oldsnap = (Snapshot) data;
4637
4638 size = sizeof(SnapshotData) +
4639 sizeof(TransactionId) * oldsnap->xcnt +
4640 sizeof(TransactionId) * (oldsnap->subxcnt + 0);
4641
4642 change->data.snapshot = MemoryContextAllocZero(rb->context, size);
4643
4644 newsnap = change->data.snapshot;
4645
4646 memcpy(newsnap, data, size);
4647 newsnap->xip = (TransactionId *)
4648 (((char *) newsnap) + sizeof(SnapshotData));
4649 newsnap->subxip = newsnap->xip + newsnap->xcnt;
4650 newsnap->copied = true;
4651 break;
4652 }
4653 /* the base struct contains all the data, easy peasy */
4655 {
4656 Oid *relids;
4657
4658 relids = ReorderBufferAllocRelids(rb, change->data.truncate.nrelids);
4659 memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
4660 change->data.truncate.relids = relids;
4661
4662 break;
4663 }
4668 break;
4669 }
4670
4671 dlist_push_tail(&txn->changes, &change->node);
4672 txn->nentries_mem++;
4673
4674 /*
4675 * Update memory accounting for the restored change. We need to do this
4676 * although we don't check the memory limit when restoring the changes in
4677 * this branch (we only do that when initially queueing the changes after
4678 * decoding), because we will release the changes later, and that will
4679 * update the accounting too (subtracting the size from the counters). And
4680 * we don't want to underflow there.
4681 */
4682 ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
4683 ReorderBufferChangeSize(change));
4684}
4685
4686/*
4687 * Remove all on-disk stored for the passed in transaction.
4688 */
4689static void
4691{
4692 XLogSegNo first;
4693 XLogSegNo cur;
4694 XLogSegNo last;
4695
4698
4701
4702 /* iterate over all possible filenames, and delete them */
4703 for (cur = first; cur <= last; cur++)
4704 {
4705 char path[MAXPGPATH];
4706
4708 if (unlink(path) != 0 && errno != ENOENT)
4709 ereport(ERROR,
4711 errmsg("could not remove file \"%s\": %m", path)));
4712 }
4713}
4714
4715/*
4716 * Remove any leftover serialized reorder buffers from a slot directory after a
4717 * prior crash or decoding session exit.
4718 */
4719static void
4721{
4722 DIR *spill_dir;
4723 struct dirent *spill_de;
4724 struct stat statbuf;
4725 char path[MAXPGPATH * 2 + sizeof(PG_REPLSLOT_DIR)];
4726
4727 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, slotname);
4728
4729 /* we're only handling directories here, skip if it's not ours */
4730 if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
4731 return;
4732
4733 spill_dir = AllocateDir(path);
4734 while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
4735 {
4736 /* only look at names that can be ours */
4737 if (strncmp(spill_de->d_name, "xid", 3) == 0)
4738 {
4739 snprintf(path, sizeof(path),
4740 "%s/%s/%s", PG_REPLSLOT_DIR, slotname,
4741 spill_de->d_name);
4742
4743 if (unlink(path) != 0)
4744 ereport(ERROR,
4746 errmsg("could not remove file \"%s\" during removal of %s/%s/xid*: %m",
4747 path, PG_REPLSLOT_DIR, slotname)));
4748 }
4749 }
4750 FreeDir(spill_dir);
4751}
4752
4753/*
4754 * Given a replication slot, transaction ID and segment number, fill in the
4755 * corresponding spill file into 'path', which is a caller-owned buffer of size
4756 * at least MAXPGPATH.
4757 */
4758static void
4760 XLogSegNo segno)
4761{
4762 XLogRecPtr recptr;
4763
4764 XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
4765
4766 snprintf(path, MAXPGPATH, "%s/%s/xid-%u-lsn-%X-%X.spill",
4769 xid, LSN_FORMAT_ARGS(recptr));
4770}
4771
4772/*
4773 * Delete all data spilled to disk after we've restarted/crashed. It will be
4774 * recreated when the respective slots are reused.
4775 */
4776void
4778{
4779 DIR *logical_dir;
4780 struct dirent *logical_de;
4781
4782 logical_dir = AllocateDir(PG_REPLSLOT_DIR);
4783 while ((logical_de = ReadDir(logical_dir, PG_REPLSLOT_DIR)) != NULL)
4784 {
4785 if (strcmp(logical_de->d_name, ".") == 0 ||
4786 strcmp(logical_de->d_name, "..") == 0)
4787 continue;
4788
4789 /* if it cannot be a slot, skip the directory */
4790 if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
4791 continue;
4792
4793 /*
4794 * ok, has to be a surviving logical slot, iterate and delete
4795 * everything starting with xid-*
4796 */
4798 }
4799 FreeDir(logical_dir);
4800}
4801
4802/* ---------------------------------------
4803 * toast reassembly support
4804 * ---------------------------------------
4805 */
4806
4807/*
4808 * Initialize per tuple toast reconstruction support.
4809 */
4810static void
4812{
4813 HASHCTL hash_ctl;
4814
4815 Assert(txn->toast_hash == NULL);
4816
4817 hash_ctl.keysize = sizeof(Oid);
4818 hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
4819 hash_ctl.hcxt = rb->context;
4820 txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
4822}
4823
4824/*
4825 * Per toast-chunk handling for toast reconstruction
4826 *
4827 * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
4828 * toasted Datum comes along.
4829 */
4830static void
4832 Relation relation, ReorderBufferChange *change)
4833{
4835 HeapTuple newtup;
4836 bool found;
4837 int32 chunksize;
4838 bool isnull;
4839 Pointer chunk;
4840 TupleDesc desc = RelationGetDescr(relation);
4841 Oid chunk_id;
4842 int32 chunk_seq;
4843
4844 if (txn->toast_hash == NULL)
4846
4847 Assert(IsToastRelation(relation));
4848
4849 newtup = change->data.tp.newtuple;
4850 chunk_id = DatumGetObjectId(fastgetattr(newtup, 1, desc, &isnull));
4851 Assert(!isnull);
4852 chunk_seq = DatumGetInt32(fastgetattr(newtup, 2, desc, &isnull));
4853 Assert(!isnull);
4854
4855 ent = (ReorderBufferToastEnt *)
4856 hash_search(txn->toast_hash, &chunk_id, HASH_ENTER, &found);
4857
4858 if (!found)
4859 {
4860 Assert(ent->chunk_id == chunk_id);
4861 ent->num_chunks = 0;
4862 ent->last_chunk_seq = 0;
4863 ent->size = 0;
4864 ent->reconstructed = NULL;
4865 dlist_init(&ent->chunks);
4866
4867 if (chunk_seq != 0)
4868 elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
4869 chunk_seq, chunk_id);
4870 }
4871 else if (found && chunk_seq != ent->last_chunk_seq + 1)
4872 elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
4873 chunk_seq, chunk_id, ent->last_chunk_seq + 1);
4874
4875 chunk = DatumGetPointer(fastgetattr(newtup, 3, desc, &isnull));
4876 Assert(!isnull);
4877
4878 /* calculate size so we can allocate the right size at once later */
4879 if (!VARATT_IS_EXTENDED(chunk))
4880 chunksize = VARSIZE(chunk) - VARHDRSZ;
4881 else if (VARATT_IS_SHORT(chunk))
4882 /* could happen due to heap_form_tuple doing its thing */
4883 chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
4884 else
4885 elog(ERROR, "unexpected type of toast chunk");
4886
4887 ent->size += chunksize;
4888 ent->last_chunk_seq = chunk_seq;
4889 ent->num_chunks++;
4890 dlist_push_tail(&ent->chunks, &change->node);
4891}
4892
4893/*
4894 * Rejigger change->newtuple to point to in-memory toast tuples instead of
4895 * on-disk toast tuples that may no longer exist (think DROP TABLE or VACUUM).
4896 *
4897 * We cannot replace unchanged toast tuples though, so those will still point
4898 * to on-disk toast data.
4899 *
4900 * While updating the existing change with detoasted tuple data, we need to
4901 * update the memory accounting info, because the change size will differ.
4902 * Otherwise the accounting may get out of sync, triggering serialization
4903 * at unexpected times.
4904 *
4905 * We simply subtract size of the change before rejiggering the tuple, and
4906 * then add the new size. This makes it look like the change was removed
4907 * and then added back, except it only tweaks the accounting info.
4908 *
4909 * In particular it can't trigger serialization, which would be pointless
4910 * anyway as it happens during commit processing right before handing
4911 * the change to the output plugin.
4912 */
4913static void
4915 Relation relation, ReorderBufferChange *change)
4916{
4917 TupleDesc desc;
4918 int natt;
4919 Datum *attrs;
4920 bool *isnull;
4921 bool *free;
4922 HeapTuple tmphtup;
4923 Relation toast_rel;
4924 TupleDesc toast_desc;
4925 MemoryContext oldcontext;
4926 HeapTuple newtup;
4927 Size old_size;
4928
4929 /* no toast tuples changed */
4930 if (txn->toast_hash == NULL)
4931 return;
4932
4933 /*
4934 * We're going to modify the size of the change. So, to make sure the
4935 * accounting is correct we record the current change size and then after
4936 * re-computing the change we'll subtract the recorded size and then
4937 * re-add the new change size at the end. We don't immediately subtract
4938 * the old size because if there is any error before we add the new size,
4939 * we will release the changes and that will update the accounting info
4940 * (subtracting the size from the counters). And we don't want to
4941 * underflow there.
4942 */
4943 old_size = ReorderBufferChangeSize(change);
4944
4945 oldcontext = MemoryContextSwitchTo(rb->context);
4946
4947 /* we should only have toast tuples in an INSERT or UPDATE */
4948 Assert(change->data.tp.newtuple);
4949
4950 desc = RelationGetDescr(relation);
4951
4952 toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
4953 if (!RelationIsValid(toast_rel))
4954 elog(ERROR, "could not open toast relation with OID %u (base relation \"%s\")",
4955 relation->rd_rel->reltoastrelid, RelationGetRelationName(relation));
4956
4957 toast_desc = RelationGetDescr(toast_rel);
4958
4959 /* should we allocate from stack instead? */
4960 attrs = palloc0(sizeof(Datum) * desc->natts);
4961 isnull = palloc0(sizeof(bool) * desc->natts);
4962 free = palloc0(sizeof(bool) * desc->natts);
4963
4964 newtup = change->data.tp.newtuple;
4965
4966 heap_deform_tuple(newtup, desc, attrs, isnull);
4967
4968 for (natt = 0; natt < desc->natts; natt++)
4969 {
4970 Form_pg_attribute attr = TupleDescAttr(desc, natt);
4972 struct varlena *varlena;
4973
4974 /* va_rawsize is the size of the original datum -- including header */
4975 struct varatt_external toast_pointer;
4976 struct varatt_indirect redirect_pointer;
4977 struct varlena *new_datum = NULL;
4978 struct varlena *reconstructed;
4979 dlist_iter it;
4980 Size data_done = 0;
4981
4982 /* system columns aren't toasted */
4983 if (attr->attnum < 0)
4984 continue;
4985
4986 if (attr->attisdropped)
4987 continue;
4988
4989 /* not a varlena datatype */
4990 if (attr->attlen != -1)
4991 continue;
4992
4993 /* no data */
4994 if (isnull[natt])
4995 continue;
4996
4997 /* ok, we know we have a toast datum */
4998 varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
4999
5000 /* no need to do anything if the tuple isn't external */
5002 continue;
5003
5004 VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
5005
5006 /*
5007 * Check whether the toast tuple changed, replace if so.
5008 */
5009 ent = (ReorderBufferToastEnt *)
5011 &toast_pointer.va_valueid,
5012 HASH_FIND,
5013 NULL);
5014 if (ent == NULL)
5015 continue;
5016
5017 new_datum =
5019
5020 free[natt] = true;
5021
5022 reconstructed = palloc0(toast_pointer.va_rawsize);
5023
5024 ent->reconstructed = reconstructed;
5025
5026 /* stitch toast tuple back together from its parts */
5027 dlist_foreach(it, &ent->chunks)
5028 {
5029 bool cisnull;
5030 ReorderBufferChange *cchange;
5031 HeapTuple ctup;
5032 Pointer chunk;
5033
5034 cchange = dlist_container(ReorderBufferChange, node, it.cur);
5035 ctup = cchange->data.tp.newtuple;
5036 chunk = DatumGetPointer(fastgetattr(ctup, 3, toast_desc, &cisnull));
5037
5038 Assert(!cisnull);
5039 Assert(!VARATT_IS_EXTERNAL(chunk));
5040 Assert(!VARATT_IS_SHORT(chunk));
5041
5042 memcpy(VARDATA(reconstructed) + data_done,
5043 VARDATA(chunk),
5044 VARSIZE(chunk) - VARHDRSZ);
5045 data_done += VARSIZE(chunk) - VARHDRSZ;
5046 }
5047 Assert(data_done == VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer));
5048
5049 /* make sure its marked as compressed or not */
5050 if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
5051 SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
5052 else
5053 SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
5054
5055 memset(&redirect_pointer, 0, sizeof(redirect_pointer));
5056 redirect_pointer.pointer = reconstructed;
5057
5059 memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
5060 sizeof(redirect_pointer));
5061
5062 attrs[natt] = PointerGetDatum(new_datum);
5063 }
5064
5065 /*
5066 * Build tuple in separate memory & copy tuple back into the tuplebuf
5067 * passed to the output plugin. We can't directly heap_fill_tuple() into
5068 * the tuplebuf because attrs[] will point back into the current content.
5069 */
5070 tmphtup = heap_form_tuple(desc, attrs, isnull);
5071 Assert(newtup->t_len <= MaxHeapTupleSize);
5072 Assert(newtup->t_data == (HeapTupleHeader) ((char *) newtup + HEAPTUPLESIZE));
5073
5074 memcpy(newtup->t_data, tmphtup->t_data, tmphtup->t_len);
5075 newtup->t_len = tmphtup->t_len;
5076
5077 /*
5078 * free resources we won't further need, more persistent stuff will be
5079 * free'd in ReorderBufferToastReset().
5080 */
5081 RelationClose(toast_rel);
5082 pfree(tmphtup);
5083 for (natt = 0; natt < desc->natts; natt++)
5084 {
5085 if (free[natt])
5086 pfree(DatumGetPointer(attrs[natt]));
5087 }
5088 pfree(attrs);
5089 pfree(free);
5090 pfree(isnull);
5091
5092 MemoryContextSwitchTo(oldcontext);
5093
5094 /* subtract the old change size */
5095 ReorderBufferChangeMemoryUpdate(rb, change, NULL, false, old_size);
5096 /* now add the change back, with the correct size */
5097 ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
5098 ReorderBufferChangeSize(change));
5099}
5100
5101/*
5102 * Free all resources allocated for toast reconstruction.
5103 */
5104static void
5106{
5107 HASH_SEQ_STATUS hstat;
5109
5110 if (txn->toast_hash == NULL)
5111 return;
5112
5113 /* sequentially walk over the hash and free everything */
5114 hash_seq_init(&hstat, txn->toast_hash);
5115 while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
5116 {
5118
5119 if (ent->reconstructed != NULL)
5120 pfree(ent->reconstructed);
5121
5122 dlist_foreach_modify(it, &ent->chunks)
5123 {
5124 ReorderBufferChange *change =
5126
5127 dlist_delete(&change->node);
5128 ReorderBufferFreeChange(rb, change, true);
5129 }
5130 }
5131
5133 txn->toast_hash = NULL;
5134}
5135
5136
5137/* ---------------------------------------
5138 * Visibility support for logical decoding
5139 *
5140 *
5141 * Lookup actual cmin/cmax values when using decoding snapshot. We can't
5142 * always rely on stored cmin/cmax values because of two scenarios:
5143 *
5144 * * A tuple got changed multiple times during a single transaction and thus
5145 * has got a combo CID. Combo CIDs are only valid for the duration of a
5146 * single transaction.
5147 * * A tuple with a cmin but no cmax (and thus no combo CID) got
5148 * deleted/updated in another transaction than the one which created it
5149 * which we are looking at right now. As only one of cmin, cmax or combo CID
5150 * is actually stored in the heap we don't have access to the value we
5151 * need anymore.
5152 *
5153 * To resolve those problems we have a per-transaction hash of (cmin,
5154 * cmax) tuples keyed by (relfilelocator, ctid) which contains the actual
5155 * (cmin, cmax) values. That also takes care of combo CIDs by simply
5156 * not caring about them at all. As we have the real cmin/cmax values
5157 * combo CIDs aren't interesting.
5158 *
5159 * As we only care about catalog tuples here the overhead of this
5160 * hashtable should be acceptable.
5161 *
5162 * Heap rewrites complicate this a bit, check rewriteheap.c for
5163 * details.
5164 * -------------------------------------------------------------------------
5165 */
5166
5167/* struct for sorting mapping files by LSN efficiently */
5168typedef struct RewriteMappingFile
5169{
5173
5174#ifdef NOT_USED
5175static void
5176DisplayMapping(HTAB *tuplecid_data)
5177{
5178 HASH_SEQ_STATUS hstat;
5180
5182 while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
5183 {
5184 elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
5185 ent->key.rlocator.dbOid,
5186 ent->key.rlocator.spcOid,
5187 ent->key.rlocator.relNumber,
5190 ent->cmin,
5191 ent->cmax
5192 );
5193 }
5194}
5195#endif
5196
5197/*
5198 * Apply a single mapping file to tuplecid_data.
5199 *
5200 * The mapping file has to have been verified to be a) committed b) for our
5201 * transaction c) applied in LSN order.
5202 */
5203static void
5205{
5206 char path[MAXPGPATH];
5207 int fd;
5208 int readBytes;
5210
5211 sprintf(path, "%s/%s", PG_LOGICAL_MAPPINGS_DIR, fname);
5212 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
5213 if (fd < 0)
5214 ereport(ERROR,
5216 errmsg("could not open file \"%s\": %m", path)));
5217
5218 while (true)
5219 {
5222 ReorderBufferTupleCidEnt *new_ent;
5223 bool found;
5224
5225 /* be careful about padding */
5226 memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
5227
5228 /* read all mappings till the end of the file */
5229 pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ);
5230 readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
5232
5233 if (readBytes < 0)
5234 ereport(ERROR,
5236 errmsg("could not read file \"%s\": %m",
5237 path)));
5238 else if (readBytes == 0) /* EOF */
5239 break;
5240 else if (readBytes != sizeof(LogicalRewriteMappingData))
5241 ereport(ERROR,
5243 errmsg("could not read from file \"%s\": read %d instead of %d bytes",
5244 path, readBytes,
5245 (int32) sizeof(LogicalRewriteMappingData))));
5246
5247 key.rlocator = map.old_locator;
5249 &key.tid);
5250
5251
5252 ent = (ReorderBufferTupleCidEnt *)
5254
5255 /* no existing mapping, no need to update */
5256 if (!ent)
5257 continue;
5258
5259 key.rlocator = map.new_locator;
5261 &key.tid);
5262
5263 new_ent = (ReorderBufferTupleCidEnt *)
5265
5266 if (found)
5267 {
5268 /*
5269 * Make sure the existing mapping makes sense. We sometime update
5270 * old records that did not yet have a cmax (e.g. pg_class' own
5271 * entry while rewriting it) during rewrites, so allow that.
5272 */
5273 Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
5274 Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
5275 }
5276 else
5277 {
5278 /* update mapping */
5279 new_ent->cmin = ent->cmin;
5280 new_ent->cmax = ent->cmax;
5281 new_ent->combocid = ent->combocid;
5282 }
5283 }
5284
5285 if (CloseTransientFile(fd) != 0)
5286 ereport(ERROR,
5288 errmsg("could not close file \"%s\": %m", path)));
5289}
5290
5291
5292/*
5293 * Check whether the TransactionId 'xid' is in the pre-sorted array 'xip'.
5294 */
5295static bool
5297{
5298 return bsearch(&xid, xip, num,
5299 sizeof(TransactionId), xidComparator) != NULL;
5300}
5301
5302/*
5303 * list_sort() comparator for sorting RewriteMappingFiles in LSN order.
5304 */
5305static int
5306file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
5307{
5310
5311 return pg_cmp_u64(a->lsn, b->lsn);
5312}
5313
5314/*
5315 * Apply any existing logical remapping files if there are any targeted at our
5316 * transaction for relid.
5317 */
5318static void
5320{
5321 DIR *mapping_dir;
5322 struct dirent *mapping_de;
5323 List *files = NIL;
5324 ListCell *file;
5325 Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
5326
5327 mapping_dir = AllocateDir(PG_LOGICAL_MAPPINGS_DIR);
5328 while ((mapping_de = ReadDir(mapping_dir, PG_LOGICAL_MAPPINGS_DIR)) != NULL)
5329 {
5330 Oid f_dboid;
5331 Oid f_relid;
5332 TransactionId f_mapped_xid;
5333 TransactionId f_create_xid;
5334 XLogRecPtr f_lsn;
5335 uint32 f_hi,
5336 f_lo;
5338
5339 if (strcmp(mapping_de->d_name, ".") == 0 ||
5340 strcmp(mapping_de->d_name, "..") == 0)
5341 continue;
5342
5343 /* Ignore files that aren't ours */
5344 if (strncmp(mapping_de->d_name, "map-", 4) != 0)
5345 continue;
5346
5347 if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
5348 &f_dboid, &f_relid, &f_hi, &f_lo,
5349 &f_mapped_xid, &f_create_xid) != 6)
5350 elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
5351
5352 f_lsn = ((uint64) f_hi) << 32 | f_lo;
5353
5354 /* mapping for another database */
5355 if (f_dboid != dboid)
5356 continue;
5357
5358 /* mapping for another relation */
5359 if (f_relid != relid)
5360 continue;
5361
5362 /* did the creating transaction abort? */
5363 if (!TransactionIdDidCommit(f_create_xid))
5364 continue;
5365
5366 /* not for our transaction */
5367 if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
5368 continue;
5369
5370 /* ok, relevant, queue for apply */
5371 f = palloc(sizeof(RewriteMappingFile));
5372 f->lsn = f_lsn;
5373 strcpy(f->fname, mapping_de->d_name);
5374 files = lappend(files, f);
5375 }
5376 FreeDir(mapping_dir);
5377
5378 /* sort files so we apply them in LSN order */
5380
5381 foreach(file, files)
5382 {
5384
5385 elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
5386 snapshot->subxip[0]);
5388 pfree(f);
5389 }
5390}
5391
5392/*
5393 * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
5394 * combo CIDs.
5395 */
5396bool
5398 Snapshot snapshot,
5399 HeapTuple htup, Buffer buffer,
5400 CommandId *cmin, CommandId *cmax)
5401{
5404 ForkNumber forkno;
5405 BlockNumber blockno;
5406 bool updated_mapping = false;
5407
5408 /*
5409 * Return unresolved if tuplecid_data is not valid. That's because when
5410 * streaming in-progress transactions we may run into tuples with the CID
5411 * before actually decoding them. Think e.g. about INSERT followed by
5412 * TRUNCATE, where the TRUNCATE may not be decoded yet when applying the
5413 * INSERT. So in such cases, we assume the CID is from the future
5414 * command.
5415 */
5416 if (tuplecid_data == NULL)
5417 return false;
5418
5419 /* be careful about padding */
5420 memset(&key, 0, sizeof(key));
5421
5422 Assert(!BufferIsLocal(buffer));
5423
5424 /*
5425 * get relfilelocator from the buffer, no convenient way to access it
5426 * other than that.
5427 */
5428 BufferGetTag(buffer, &key.rlocator, &forkno, &blockno);
5429
5430 /* tuples can only be in the main fork */
5431 Assert(forkno == MAIN_FORKNUM);
5432 Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
5433
5434 ItemPointerCopy(&htup->t_self,
5435 &key.tid);
5436
5437restart:
5438 ent = (ReorderBufferTupleCidEnt *)
5440
5441 /*
5442 * failed to find a mapping, check whether the table was rewritten and
5443 * apply mapping if so, but only do that once - there can be no new
5444 * mappings while we are in here since we have to hold a lock on the
5445 * relation.
5446 */
5447 if (ent == NULL && !updated_mapping)
5448 {
5450 /* now check but don't update for a mapping again */
5451 updated_mapping = true;
5452 goto restart;
5453 }
5454 else if (ent == NULL)
5455 return false;
5456
5457 if (cmin)
5458 *cmin = ent->cmin;
5459 if (cmax)
5460 *cmax = ent->cmax;
5461 return true;
5462}
5463
5464/*
5465 * Count invalidation messages of specified transaction.
5466 *
5467 * Returns number of messages, and msgs is set to the pointer of the linked
5468 * list for the messages.
5469 */
5470uint32
5473{
5474 ReorderBufferTXN *txn;
5475
5476 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
5477 false);
5478
5479 if (txn == NULL)
5480 return 0;
5481
5482 *msgs = txn->invalidations;
5483
5484 return txn->ninvalidations;
5485}
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:138
void binaryheap_replace_first(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:255
bh_node_type binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:177
bh_node_type binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:192
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:75
void binaryheap_add_unordered(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:116
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:39
uint32 BlockNumber
Definition: block.h:31
static int32 next
Definition: blutils.c:224
static void cleanup(void)
Definition: bootstrap.c:713
int Buffer
Definition: buf.h:23
#define BufferIsLocal(buffer)
Definition: buf.h:37
void BufferGetTag(Buffer buffer, RelFileLocator *rlocator, ForkNumber *forknum, BlockNumber *blknum)
Definition: bufmgr.c:4182
#define NameStr(name)
Definition: c.h:717
#define InvalidCommandId
Definition: c.h:640
char * Pointer
Definition: c.h:493
#define VARHDRSZ
Definition: c.h:663
#define PG_BINARY
Definition: c.h:1244
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:434
#define FirstCommandId
Definition: c.h:639
int32_t int32
Definition: c.h:498
uint64_t uint64
Definition: c.h:503
#define unlikely(x)
Definition: c.h:347
uint32_t uint32
Definition: c.h:502
uint32 CommandId
Definition: c.h:637
uint32 TransactionId
Definition: c.h:623
size_t Size
Definition: c.h:576
bool IsToastRelation(Relation relation)
Definition: catalog.c:175
bool IsSharedRelation(Oid relationId)
Definition: catalog.c:273
int64 TimestampTz
Definition: timestamp.h:39
#define INDIRECT_POINTER_SIZE
Definition: detoast.h:34
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
Definition: detoast.h:22
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:865
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1420
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1385
struct cursor * cur
Definition: ecpg.c:29
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1840
int errcode_for_file_access(void)
Definition: elog.c:877
ErrorData * CopyErrorData(void)
Definition: elog.c:1768
void FlushErrorState(void)
Definition: elog.c:1889
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define PG_RE_THROW()
Definition: elog.h:405
#define DEBUG3
Definition: elog.h:28
#define PG_TRY(...)
Definition: elog.h:372
#define DEBUG2
Definition: elog.h:29
#define PG_END_TRY(...)
Definition: elog.h:397
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define PG_CATCH(...)
Definition: elog.h:382
#define elog(elevel,...)
Definition: elog.h:226
#define INFO
Definition: elog.h:34
#define ereport(elevel,...)
Definition: elog.h:149
int FreeDir(DIR *dir)
Definition: fd.c:3025
int CloseTransientFile(int fd)
Definition: fd.c:2871
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2988
void FileClose(File file)
Definition: fd.c:1982
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1579
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2907
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2973
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2694
static ssize_t FileRead(File file, void *buffer, size_t amount, off_t offset, uint32 wait_event_info)
Definition: fd.h:199
int File
Definition: fd.h:51
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: generation.c:160
Oid MyDatabaseId
Definition: globals.c:95
Assert(PointerIsAligned(start, uint64))
#define free(a)
Definition: header.h:65
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1117
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:1346
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_REMOVE
Definition: hsearch.h:115
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
#define HEAPTUPLESIZE
Definition: htup.h:73
HeapTupleData * HeapTuple
Definition: htup.h:71
struct HeapTupleData HeapTupleData
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
#define SizeofHeapTupleHeader
Definition: htup_details.h:185
#define MaxHeapTupleSize
Definition: htup_details.h:610
static Datum fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull)
Definition: htup_details.h:861
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:450
#define dlist_foreach(iter, lhead)
Definition: ilist.h:623
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
#define dclist_container(type, membername, ptr)
Definition: ilist.h:947
static bool dlist_has_next(const dlist_head *head, const dlist_node *node)
Definition: ilist.h:503
static void dclist_push_tail(dclist_head *head, dlist_node *node)
Definition: ilist.h:709
static void dlist_insert_before(dlist_node *before, dlist_node *node)
Definition: ilist.h:393
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:603
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:537
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
static uint32 dclist_count(const dclist_head *head)
Definition: ilist.h:932
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:640
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:364
static void dclist_delete_from(dclist_head *head, dlist_node *node)
Definition: ilist.h:763
static void dclist_init(dclist_head *head)
Definition: ilist.h:671
#define dlist_container(type, membername, ptr)
Definition: ilist.h:593
#define dclist_foreach(iter, lhead)
Definition: ilist.h:970
static int pg_cmp_u64(uint64 a, uint64 b)
Definition: int.h:664
#define write(a, b, c)
Definition: win32.h:14
#define read(a, b, c)
Definition: win32.h:13
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:822
int b
Definition: isn.c:74
int a
Definition: isn.c:73
int i
Definition: isn.c:77
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:81
static OffsetNumber ItemPointerGetOffsetNumber(const ItemPointerData *pointer)
Definition: itemptr.h:124
static BlockNumber ItemPointerGetBlockNumber(const ItemPointerData *pointer)
Definition: itemptr.h:103
static void ItemPointerCopy(const ItemPointerData *fromPointer, ItemPointerData *toPointer)
Definition: itemptr.h:172
List * lappend(List *list, void *datum)
Definition: list.c:339
void list_sort(List *list, list_sort_comparator cmp)
Definition: list.c:1674
void UpdateDecodingStats(LogicalDecodingContext *ctx)
Definition: logical.c:1915
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1256
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1290
char * pstrdup(const char *in)
Definition: mcxt.c:2322
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:2167
void pfree(void *pointer)
Definition: mcxt.c:2147
void * palloc0(Size size)
Definition: mcxt.c:1970
void * palloc(Size size)
Definition: mcxt.c:1940
MemoryContext CurrentMemoryContext
Definition: mcxt.c:159
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:485
#define AllocSetContextCreate
Definition: memutils.h:149
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:180
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:209
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:170
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:112
pairingheap * pairingheap_allocate(pairingheap_comparator compare, void *arg)
Definition: pairingheap.c:42
pairingheap_node * pairingheap_first(pairingheap *heap)
Definition: pairingheap.c:130
#define pairingheap_container(type, membername, ptr)
Definition: pairingheap.h:43
#define pairingheap_const_container(type, membername, ptr)
Definition: pairingheap.h:51
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:202
void * arg
#define MAXPGPATH
const void * data
#define lfirst(lc)
Definition: pg_list.h:172
#define NIL
Definition: pg_list.h:68
#define sprintf
Definition: port.h:241
#define snprintf
Definition: port.h:239
#define qsort(a, b, c, d)
Definition: port.h:479
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:327
uintptr_t Datum
Definition: postgres.h:69
static Oid DatumGetObjectId(Datum X)
Definition: postgres.h:247
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:317
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:217
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:207
#define InvalidOid
Definition: postgres_ext.h:35
unsigned int Oid
Definition: postgres_ext.h:30
static int fd(const char *x, int i)
Definition: preproc-init.c:105
bool TransactionIdIsInProgress(TransactionId xid)
Definition: procarray.c:1402
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:712
#define RelationGetDescr(relation)
Definition: rel.h:542
#define RelationGetRelationName(relation)
Definition: rel.h:550
#define RelationIsValid(relation)
Definition: rel.h:489
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2082
void RelationClose(Relation relation)
Definition: relcache.c:2204
Oid RelidByRelfilenumber(Oid reltablespace, RelFileNumber relfilenumber)
ForkNumber
Definition: relpath.h:56
@ MAIN_FORKNUM
Definition: relpath.h:58
#define relpathperm(rlocator, forknum)
Definition: relpath.h:146
static int file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
void ReorderBufferFreeRelids(ReorderBuffer *rb, Oid *relids)
void ReorderBufferFreeChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb)
void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileLocator locator, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
static void ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, TimestampTz abort_time)
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)
static void ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
struct ReorderBufferDiskChange ReorderBufferDiskChange
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
void ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb)
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
bool ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
void ReorderBufferFreeTupleBuf(HeapTuple tuple)
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid)
uint32 ReorderBufferGetInvalidations(ReorderBuffer *rb, TransactionId xid, SharedInvalidationMessage **msgs)
void ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
void ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
TransactionId * ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb)
static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
#define IsSpecInsert(action)
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
ReorderBuffer * ReorderBufferAllocate(void)
int logical_decoding_work_mem
static void AssertChangeLsnOrder(ReorderBufferTXN *txn)
static bool ReorderBufferCanStream(ReorderBuffer *rb)
static int ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg)
static void ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid)
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
struct ReorderBufferIterTXNState ReorderBufferIterTXNState
void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
int debug_logical_replication_streaming
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
#define IsInsertOrUpdate(action)
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
struct RewriteMappingFile RewriteMappingFile
void ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
#define CHANGES_THRESHOLD
static ReorderBufferTXN * ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
static bool ReorderBufferCheckAndTruncateAbortedTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *data)
HeapTuple ReorderBufferAllocTupleBuf(ReorderBuffer *rb, Size tuple_len)
void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static void ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
ReorderBufferChange * ReorderBufferAllocChange(ReorderBuffer *rb)
void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
static void SetupCheckXidLive(TransactionId xid)
static bool TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
static void ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
static ReorderBufferTXN * ReorderBufferAllocTXN(ReorderBuffer *rb)
bool ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
static void ReorderBufferFreeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
struct TXNEntryFile TXNEntryFile
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
Oid * ReorderBufferAllocRelids(ReorderBuffer *rb, int nrelids)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, ReorderBufferTXN *txn, bool addition, Size sz)
struct ReorderBufferToastEnt ReorderBufferToastEnt
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
void ReorderBufferFree(ReorderBuffer *rb)
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
#define IsSpecConfirmOrAbort(action)
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
static const Size max_changes_in_memory
void StartupReorderBuffer(void)
void ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
static void ReorderBufferMaybeMarkTXNStreamed(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBufferTXN * ReorderBufferGetOldestTXN(ReorderBuffer *rb)
static void ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)
#define rbtxn_is_committed(txn)
#define rbtxn_has_streamable_change(txn)
#define rbtxn_has_catalog_changes(txn)
@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE
Definition: reorderbuffer.h:34
@ DEBUG_LOGICAL_REP_STREAMING_BUFFERED
Definition: reorderbuffer.h:33
#define RBTXN_PREPARE_STATUS_MASK
#define rbtxn_is_serialized_clear(txn)
#define RBTXN_IS_STREAMED
#define rbtxn_is_prepared(txn)
#define RBTXN_HAS_PARTIAL_CHANGE
#define rbtxn_is_streamed(txn)
struct ReorderBufferChange ReorderBufferChange
#define RBTXN_SENT_PREPARE
#define rbtxn_is_toptxn(txn)
#define rbtxn_get_toptxn(txn)
#define rbtxn_is_known_subxact(txn)
#define rbtxn_is_subtxn(txn)
#define RBTXN_HAS_CATALOG_CHANGES
#define RBTXN_IS_COMMITTED
#define PG_LOGICAL_MAPPINGS_DIR
Definition: reorderbuffer.h:23
#define RBTXN_IS_SERIALIZED_CLEAR
#define rbtxn_sent_prepare(txn)
#define RBTXN_IS_PREPARED
#define RBTXN_SKIPPED_PREPARE
#define RBTXN_HAS_STREAMABLE_CHANGE
@ REORDER_BUFFER_CHANGE_INVALIDATION
Definition: reorderbuffer.h:56
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
Definition: reorderbuffer.h:61
@ REORDER_BUFFER_CHANGE_INSERT
Definition: reorderbuffer.h:52
@ REORDER_BUFFER_CHANGE_MESSAGE
Definition: reorderbuffer.h:55
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT
Definition: reorderbuffer.h:62
@ REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID
Definition: reorderbuffer.h:58
@ REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID
Definition: reorderbuffer.h:59
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT
Definition: reorderbuffer.h:60
@ REORDER_BUFFER_CHANGE_TRUNCATE
Definition: reorderbuffer.h:63
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:54
@ REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT
Definition: reorderbuffer.h:57
@ REORDER_BUFFER_CHANGE_UPDATE
Definition: reorderbuffer.h:53
#define rbtxn_is_aborted(txn)
#define RBTXN_IS_SERIALIZED
#define rbtxn_is_serialized(txn)
#define RBTXN_IS_ABORTED
#define RBTXN_IS_SUBXACT
#define rbtxn_has_partial_change(txn)
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:322
ReplicationSlot * MyReplicationSlot
Definition: slot.c:147
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:267
#define PG_REPLSLOT_DIR
Definition: slot.h:21
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:328
bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
Definition: snapbuild.c:304
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
Definition: snapbuild.c:277
@ SNAPBUILD_CONSISTENT
Definition: snapbuild.h:50
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:1672
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:1656
static HTAB * tuplecid_data
Definition: snapmgr.c:162
struct SnapshotData * Snapshot
Definition: snapshot.h:117
struct SnapshotData SnapshotData
Definition: dirent.c:26
int sqlerrcode
Definition: elog.h:431
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
MemoryContext hcxt
Definition: hsearch.h:86
Definition: dynahash.c:220
ItemPointerData t_self
Definition: htup.h:65
uint32 t_len
Definition: htup.h:64
HeapTupleHeader t_data
Definition: htup.h:68
Oid t_tableOid
Definition: htup.h:66
Definition: pg_list.h:54
XLogReaderState * reader
Definition: logical.h:42
struct SnapBuild * snapshot_builder
Definition: logical.h:44
ItemPointerData new_tid
Definition: rewriteheap.h:40
RelFileLocator old_locator
Definition: rewriteheap.h:37
ItemPointerData old_tid
Definition: rewriteheap.h:39
RelFileLocator new_locator
Definition: rewriteheap.h:38
RelFileNumber relNumber
Form_pg_class rd_rel
Definition: rel.h:111
struct ReorderBufferChange::@110::@112 truncate
ReorderBufferChangeType action
Definition: reorderbuffer.h:81
RelFileLocator rlocator
Definition: reorderbuffer.h:98
union ReorderBufferChange::@110 data
ItemPointerData tid
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:84
RelFileLocator locator
RepOriginId origin_id
Definition: reorderbuffer.h:86
struct ReorderBufferChange::@110::@111 tp
struct ReorderBufferChange::@110::@114 tuplecid
struct ReorderBufferChange::@110::@113 msg
struct ReorderBufferChange::@110::@115 inval
SharedInvalidationMessage * invalidations
ReorderBufferChange change
ReorderBufferChange * change
ReorderBufferTXN * txn
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]
ReorderBufferTXN * txn
CommandId command_id
XLogRecPtr restart_decoding_lsn
pairingheap_node txn_node
TimestampTz commit_time
XLogRecPtr base_snapshot_lsn
Snapshot snapshot_now
TransactionId toplevel_xid
dlist_node catchange_node
Snapshot base_snapshot
SharedInvalidationMessage * invalidations
RepOriginId origin_id
struct ReorderBufferTXN * toptxn
dlist_head tuplecids
XLogRecPtr first_lsn
TimestampTz abort_time
XLogRecPtr final_lsn
void * output_plugin_private
XLogRecPtr end_lsn
XLogRecPtr origin_lsn
TimestampTz prepare_time
TransactionId xid
dlist_node base_snapshot_node
dlist_head changes
dlist_head subtxns
union ReorderBufferTXN::@116 xact_time
struct varlena * reconstructed
ReorderBufferTupleCidKey key
ReorderBufferStreamMessageCB stream_message
ReorderBufferStreamChangeCB stream_change
ReorderBufferBeginCB begin_prepare
ReorderBufferStreamTruncateCB stream_truncate
ReorderBufferCommitPreparedCB commit_prepared
ReorderBufferUpdateProgressTxnCB update_progress_txn
ReorderBufferMessageCB message
dlist_head txns_by_base_snapshot_lsn
MemoryContext context
dclist_head catchange_txns
ReorderBufferRollbackPreparedCB rollback_prepared
ReorderBufferPrepareCB prepare
ReorderBufferStreamStopCB stream_stop
ReorderBufferApplyChangeCB apply_change
MemoryContext change_context
ReorderBufferTXN * by_txn_last_txn
TransactionId by_txn_last_xid
ReorderBufferStreamPrepareCB stream_prepare
ReorderBufferStreamAbortCB stream_abort
MemoryContext tup_context
ReorderBufferCommitCB commit
ReorderBufferStreamStartCB stream_start
ReorderBufferStreamCommitCB stream_commit
ReorderBufferApplyTruncateCB apply_truncate
dlist_head toplevel_by_lsn
pairingheap * txn_heap
ReorderBufferBeginCB begin
MemoryContext txn_context
XLogRecPtr current_restart_decoding_lsn
void * private_data
ReplicationSlotPersistentData data
Definition: slot.h:185
char fname[MAXPGPATH]
TransactionId xmin
Definition: snapshot.h:153
int32 subxcnt
Definition: snapshot.h:177
bool copied
Definition: snapshot.h:181
uint32 regd_count
Definition: snapshot.h:201
uint32 active_count
Definition: snapshot.h:200
CommandId curcid
Definition: snapshot.h:183
uint32 xcnt
Definition: snapshot.h:165
TransactionId * subxip
Definition: snapshot.h:176
TransactionId * xip
Definition: snapshot.h:164
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207
XLogRecPtr ReadRecPtr
Definition: xlogreader.h:206
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15
dlist_node * cur
Definition: ilist.h:179
dlist_node * cur
Definition: ilist.h:200
unsigned short st_mode
Definition: win32_port.h:258
Definition: regguts.h:323
int32 va_rawsize
Definition: varatt.h:34
Oid va_valueid
Definition: varatt.h:37
struct varlena * pointer
Definition: varatt.h:59
Definition: c.h:658
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:126
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:160
#define VARHDRSZ_SHORT
Definition: varatt.h:255
#define VARSIZE_SHORT(PTR)
Definition: varatt.h:281
#define VARATT_IS_EXTENDED(PTR)
Definition: varatt.h:303
#define VARATT_IS_SHORT(PTR)
Definition: varatt.h:302
#define SET_VARSIZE_COMPRESSED(PTR, len)
Definition: varatt.h:307
#define VARDATA(PTR)
Definition: varatt.h:278
#define SET_VARTAG_EXTERNAL(PTR, tag)
Definition: varatt.h:309
#define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)
Definition: varatt.h:354
#define VARDATA_EXTERNAL(PTR)
Definition: varatt.h:286
#define SET_VARSIZE(PTR, len)
Definition: varatt.h:305
#define VARSIZE(PTR)
Definition: varatt.h:279
#define VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer)
Definition: varatt.h:334
#define VARATT_IS_EXTERNAL(PTR)
Definition: varatt.h:289
@ VARTAG_INDIRECT
Definition: varatt.h:86
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:85
static void pgstat_report_wait_end(void)
Definition: wait_event.h:101
#define lstat(path, sb)
Definition: win32_port.h:275
#define S_ISDIR(m)
Definition: win32_port.h:315
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4989
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4694
TransactionId CheckXidAlive
Definition: xact.c:99
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4796
void StartTransactionCommand(void)
Definition: xact.c:3059
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:471
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:454
void AbortCurrentTransaction(void)
Definition: xact.c:3451
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:152
int wal_segment_size
Definition: xlog.c:143
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint64 XLogSegNo
Definition: xlogdefs.h:48