PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
reorderbuffer.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/detoast.h"
#include "access/heapam.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "common/int.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/procarray.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenumbermap.h"
Include dependency graph for reorderbuffer.c:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTXNByIdEnt
 
struct  ReorderBufferTupleCidKey
 
struct  ReorderBufferTupleCidEnt
 
struct  TXNEntryFile
 
struct  ReorderBufferIterTXNEntry
 
struct  ReorderBufferIterTXNState
 
struct  ReorderBufferToastEnt
 
struct  ReorderBufferDiskChange
 
struct  RewriteMappingFile
 

Macros

#define IsSpecInsert(action)
 
#define IsSpecConfirmOrAbort(action)
 
#define IsInsertOrUpdate(action)
 
#define CHANGES_THRESHOLD   100
 

Typedefs

typedef struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
 
typedef struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
 
typedef struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
 
typedef struct TXNEntryFile TXNEntryFile
 
typedef struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
 
typedef struct ReorderBufferIterTXNState ReorderBufferIterTXNState
 
typedef struct ReorderBufferToastEnt ReorderBufferToastEnt
 
typedef struct ReorderBufferDiskChange ReorderBufferDiskChange
 
typedef struct RewriteMappingFile RewriteMappingFile
 

Functions

static ReorderBufferTXNReorderBufferAllocTXN (ReorderBuffer *rb)
 
static void ReorderBufferFreeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferTXNReorderBufferTXNByXid (ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
 
static void ReorderBufferTransferSnapToParent (ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
 
static void AssertTXNLsnOrder (ReorderBuffer *rb)
 
static void ReorderBufferIterTXNInit (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
 
static ReorderBufferChangeReorderBufferIterTXNNext (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferIterTXNFinish (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferExecuteInvalidations (uint32 nmsgs, SharedInvalidationMessage *msgs)
 
static void ReorderBufferCheckMemoryLimit (ReorderBuffer *rb)
 
static void ReorderBufferSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeChange (ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
 
static Size ReorderBufferRestoreChanges (ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
 
static void ReorderBufferRestoreChange (ReorderBuffer *rb, ReorderBufferTXN *txn, char *data)
 
static void ReorderBufferRestoreCleanup (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferTruncateTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
 
static void ReorderBufferMaybeMarkTXNStreamed (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static bool ReorderBufferCheckAndTruncateAbortedTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferCleanupSerializedTXNs (const char *slotname)
 
static void ReorderBufferSerializedPath (char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
 
static int ReorderBufferTXNSizeCompare (const pairingheap_node *a, const pairingheap_node *b, void *arg)
 
static void ReorderBufferFreeSnap (ReorderBuffer *rb, Snapshot snap)
 
static Snapshot ReorderBufferCopySnap (ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
 
static bool ReorderBufferCanStream (ReorderBuffer *rb)
 
static bool ReorderBufferCanStartStreaming (ReorderBuffer *rb)
 
static void ReorderBufferStreamTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferStreamCommit (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastInitHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReset (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReplace (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void ReorderBufferToastAppendChunk (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static Size ReorderBufferChangeSize (ReorderBufferChange *change)
 
static void ReorderBufferChangeMemoryUpdate (ReorderBuffer *rb, ReorderBufferChange *change, ReorderBufferTXN *txn, bool addition, Size sz)
 
ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *rb)
 
ReorderBufferChangeReorderBufferAllocChange (ReorderBuffer *rb)
 
void ReorderBufferFreeChange (ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
 
HeapTuple ReorderBufferAllocTupleBuf (ReorderBuffer *rb, Size tuple_len)
 
void ReorderBufferFreeTupleBuf (HeapTuple tuple)
 
OidReorderBufferAllocRelids (ReorderBuffer *rb, int nrelids)
 
void ReorderBufferFreeRelids (ReorderBuffer *rb, Oid *relids)
 
static void ReorderBufferProcessPartialChange (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
static void AssertChangeLsnOrder (ReorderBufferTXN *txn)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *rb)
 
TransactionId ReorderBufferGetOldestXmin (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *rb, XLogRecPtr ptr)
 
void ReorderBufferAssignChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
static int ReorderBufferIterCompare (Datum a, Datum b, void *arg)
 
static void ReorderBufferCleanupTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferBuildTupleCidHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void SetupCheckXidLive (TransactionId xid)
 
static void ReorderBufferApplyChange (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyTruncate (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyMessage (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferSaveTXNSnapshot (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
 
static void ReorderBufferResetTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
 
static void ReorderBufferProcessTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)
 
static void ReorderBufferReplay (ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
bool ReorderBufferRememberPrepareInfo (ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferSkipPrepare (ReorderBuffer *rb, TransactionId xid)
 
void ReorderBufferPrepare (ReorderBuffer *rb, TransactionId xid, char *gid)
 
void ReorderBufferFinishPrepared (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)
 
void ReorderBufferAbort (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, TimestampTz abort_time)
 
void ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
void ReorderBufferForget (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferInvalidate (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAddSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileLocator locator, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
TransactionIdReorderBufferGetCatalogChangesXacts (ReorderBuffer *rb)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *rb, TransactionId xid)
 
static void ReorderBufferSerializeReserve (ReorderBuffer *rb, Size sz)
 
static ReorderBufferTXNReorderBufferLargestTXN (ReorderBuffer *rb)
 
static ReorderBufferTXNReorderBufferLargestStreamableTopTXN (ReorderBuffer *rb)
 
void StartupReorderBuffer (void)
 
static void ApplyLogicalMappingFile (HTAB *tuplecid_data, Oid relid, const char *fname)
 
static bool TransactionIdInArray (TransactionId xid, TransactionId *xip, Size num)
 
static int file_sort_by_lsn (const ListCell *a_p, const ListCell *b_p)
 
static void UpdateLogicalMappings (HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
 
bool ResolveCminCmaxDuringDecoding (HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
 
uint32 ReorderBufferGetInvalidations (ReorderBuffer *rb, TransactionId xid, SharedInvalidationMessage **msgs)
 

Variables

int logical_decoding_work_mem
 
static const Size max_changes_in_memory = 4096
 
int debug_logical_replication_streaming = DEBUG_LOGICAL_REP_STREAMING_BUFFERED
 

Macro Definition Documentation

◆ CHANGES_THRESHOLD

#define CHANGES_THRESHOLD   100

◆ IsInsertOrUpdate

#define IsInsertOrUpdate (   action)
Value:

Definition at line 194 of file reorderbuffer.c.

◆ IsSpecConfirmOrAbort

#define IsSpecConfirmOrAbort (   action)
Value:
( \
)
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
Definition: reorderbuffer.h:61
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT
Definition: reorderbuffer.h:62

Definition at line 189 of file reorderbuffer.c.

◆ IsSpecInsert

#define IsSpecInsert (   action)
Value:

Definition at line 185 of file reorderbuffer.c.

Typedef Documentation

◆ ReorderBufferDiskChange

◆ ReorderBufferIterTXNEntry

◆ ReorderBufferIterTXNState

◆ ReorderBufferToastEnt

◆ ReorderBufferTupleCidEnt

◆ ReorderBufferTupleCidKey

◆ ReorderBufferTXNByIdEnt

◆ RewriteMappingFile

◆ TXNEntryFile

typedef struct TXNEntryFile TXNEntryFile

Function Documentation

◆ ApplyLogicalMappingFile()

static void ApplyLogicalMappingFile ( HTAB tuplecid_data,
Oid  relid,
const char *  fname 
)
static

Definition at line 5204 of file reorderbuffer.c.

5205{
5206 char path[MAXPGPATH];
5207 int fd;
5208 int readBytes;
5210
5211 sprintf(path, "%s/%s", PG_LOGICAL_MAPPINGS_DIR, fname);
5212 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
5213 if (fd < 0)
5214 ereport(ERROR,
5216 errmsg("could not open file \"%s\": %m", path)));
5217
5218 while (true)
5219 {
5222 ReorderBufferTupleCidEnt *new_ent;
5223 bool found;
5224
5225 /* be careful about padding */
5226 memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
5227
5228 /* read all mappings till the end of the file */
5229 pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ);
5230 readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
5232
5233 if (readBytes < 0)
5234 ereport(ERROR,
5236 errmsg("could not read file \"%s\": %m",
5237 path)));
5238 else if (readBytes == 0) /* EOF */
5239 break;
5240 else if (readBytes != sizeof(LogicalRewriteMappingData))
5241 ereport(ERROR,
5243 errmsg("could not read from file \"%s\": read %d instead of %d bytes",
5244 path, readBytes,
5245 (int32) sizeof(LogicalRewriteMappingData))));
5246
5247 key.rlocator = map.old_locator;
5249 &key.tid);
5250
5251
5252 ent = (ReorderBufferTupleCidEnt *)
5254
5255 /* no existing mapping, no need to update */
5256 if (!ent)
5257 continue;
5258
5259 key.rlocator = map.new_locator;
5261 &key.tid);
5262
5263 new_ent = (ReorderBufferTupleCidEnt *)
5265
5266 if (found)
5267 {
5268 /*
5269 * Make sure the existing mapping makes sense. We sometime update
5270 * old records that did not yet have a cmax (e.g. pg_class' own
5271 * entry while rewriting it) during rewrites, so allow that.
5272 */
5273 Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
5274 Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
5275 }
5276 else
5277 {
5278 /* update mapping */
5279 new_ent->cmin = ent->cmin;
5280 new_ent->cmax = ent->cmax;
5281 new_ent->combocid = ent->combocid;
5282 }
5283 }
5284
5285 if (CloseTransientFile(fd) != 0)
5286 ereport(ERROR,
5288 errmsg("could not close file \"%s\": %m", path)));
5289}
#define InvalidCommandId
Definition: c.h:640
#define PG_BINARY
Definition: c.h:1244
int32_t int32
Definition: c.h:498
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
int errcode_for_file_access(void)
Definition: elog.c:877
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
int CloseTransientFile(int fd)
Definition: fd.c:2871
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2694
Assert(PointerIsAligned(start, uint64))
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_ENTER
Definition: hsearch.h:114
#define read(a, b, c)
Definition: win32.h:13
static void ItemPointerCopy(const ItemPointerData *fromPointer, ItemPointerData *toPointer)
Definition: itemptr.h:172
#define MAXPGPATH
#define sprintf
Definition: port.h:241
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_LOGICAL_MAPPINGS_DIR
Definition: reorderbuffer.h:23
static HTAB * tuplecid_data
Definition: snapmgr.c:162
ItemPointerData new_tid
Definition: rewriteheap.h:40
RelFileLocator old_locator
Definition: rewriteheap.h:37
ItemPointerData old_tid
Definition: rewriteheap.h:39
RelFileLocator new_locator
Definition: rewriteheap.h:38
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:85
static void pgstat_report_wait_end(void)
Definition: wait_event.h:101

References Assert(), CloseTransientFile(), ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), HASH_ENTER, HASH_FIND, hash_search(), InvalidCommandId, ItemPointerCopy(), sort-test::key, MAXPGPATH, LogicalRewriteMappingData::new_locator, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_locator, LogicalRewriteMappingData::old_tid, OpenTransientFile(), PG_BINARY, PG_LOGICAL_MAPPINGS_DIR, pgstat_report_wait_end(), pgstat_report_wait_start(), read, sprintf, and tuplecid_data.

Referenced by UpdateLogicalMappings().

◆ AssertChangeLsnOrder()

static void AssertChangeLsnOrder ( ReorderBufferTXN txn)
static

Definition at line 994 of file reorderbuffer.c.

995{
996#ifdef USE_ASSERT_CHECKING
997 dlist_iter iter;
998 XLogRecPtr prev_lsn = txn->first_lsn;
999
1000 dlist_foreach(iter, &txn->changes)
1001 {
1002 ReorderBufferChange *cur_change;
1003
1004 cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
1005
1007 Assert(cur_change->lsn != InvalidXLogRecPtr);
1008 Assert(txn->first_lsn <= cur_change->lsn);
1009
1010 if (txn->end_lsn != InvalidXLogRecPtr)
1011 Assert(cur_change->lsn <= txn->end_lsn);
1012
1013 Assert(prev_lsn <= cur_change->lsn);
1014
1015 prev_lsn = cur_change->lsn;
1016 }
1017#endif
1018}
#define dlist_foreach(iter, lhead)
Definition: ilist.h:623
#define dlist_container(type, membername, ptr)
Definition: ilist.h:593
XLogRecPtr first_lsn
XLogRecPtr end_lsn
dlist_head changes
dlist_node * cur
Definition: ilist.h:179
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert(), ReorderBufferTXN::changes, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, and ReorderBufferChange::lsn.

Referenced by ReorderBufferIterTXNInit().

◆ AssertTXNLsnOrder()

static void AssertTXNLsnOrder ( ReorderBuffer rb)
static

Definition at line 923 of file reorderbuffer.c.

924{
925#ifdef USE_ASSERT_CHECKING
927 dlist_iter iter;
928 XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
929 XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
930
931 /*
932 * Skip the verification if we don't reach the LSN at which we start
933 * decoding the contents of transactions yet because until we reach the
934 * LSN, we could have transactions that don't have the association between
935 * the top-level transaction and subtransaction yet and consequently have
936 * the same LSN. We don't guarantee this association until we try to
937 * decode the actual contents of transaction. The ordering of the records
938 * prior to the start_decoding_at LSN should have been checked before the
939 * restart.
940 */
942 return;
943
945 {
947 iter.cur);
948
949 /* start LSN must be set */
951
952 /* If there is an end LSN, it must be higher than start LSN */
953 if (cur_txn->end_lsn != InvalidXLogRecPtr)
954 Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
955
956 /* Current initial LSN must be strictly higher than previous */
957 if (prev_first_lsn != InvalidXLogRecPtr)
958 Assert(prev_first_lsn < cur_txn->first_lsn);
959
960 /* known-as-subtxn txns must not be listed */
962
963 prev_first_lsn = cur_txn->first_lsn;
964 }
965
967 {
969 base_snapshot_node,
970 iter.cur);
971
972 /* base snapshot (and its LSN) must be set */
973 Assert(cur_txn->base_snapshot != NULL);
975
976 /* current LSN must be strictly higher than previous */
977 if (prev_base_snap_lsn != InvalidXLogRecPtr)
978 Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
979
980 /* known-as-subtxn txns must not be listed */
982
983 prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
984 }
985#endif
986}
#define rbtxn_is_known_subxact(txn)
bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
Definition: snapbuild.c:304
XLogReaderState * reader
Definition: logical.h:42
struct SnapBuild * snapshot_builder
Definition: logical.h:44
XLogRecPtr base_snapshot_lsn
Snapshot base_snapshot
dlist_head txns_by_base_snapshot_lsn
dlist_head toplevel_by_lsn
void * private_data
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, XLogReaderState::EndRecPtr, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, ReorderBuffer::private_data, rbtxn_is_known_subxact, LogicalDecodingContext::reader, SnapBuildXactNeedsSkip(), LogicalDecodingContext::snapshot_builder, ReorderBuffer::toplevel_by_lsn, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferAssignChild(), ReorderBufferGetOldestTXN(), ReorderBufferGetOldestXmin(), ReorderBufferSetBaseSnapshot(), and ReorderBufferTXNByXid().

◆ file_sort_by_lsn()

static int file_sort_by_lsn ( const ListCell a_p,
const ListCell b_p 
)
static

Definition at line 5306 of file reorderbuffer.c.

5307{
5310
5311 return pg_cmp_u64(a->lsn, b->lsn);
5312}
static int pg_cmp_u64(uint64 a, uint64 b)
Definition: int.h:664
int b
Definition: isn.c:74
int a
Definition: isn.c:73
#define lfirst(lc)
Definition: pg_list.h:172

References a, b, lfirst, and pg_cmp_u64().

Referenced by UpdateLogicalMappings().

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
TimestampTz  abort_time 
)

Definition at line 3040 of file reorderbuffer.c.

3042{
3043 ReorderBufferTXN *txn;
3044
3045 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3046 false);
3047
3048 /* unknown, nothing to remove */
3049 if (txn == NULL)
3050 return;
3051
3052 txn->xact_time.abort_time = abort_time;
3053
3054 /* For streamed transactions notify the remote node about the abort. */
3055 if (rbtxn_is_streamed(txn))
3056 {
3057 rb->stream_abort(rb, txn, lsn);
3058
3059 /*
3060 * We might have decoded changes for this transaction that could load
3061 * the cache as per the current transaction's view (consider DDL's
3062 * happened in this transaction). We don't want the decoding of future
3063 * transactions to use those cache entries so execute invalidations.
3064 */
3065 if (txn->ninvalidations > 0)
3067 txn->invalidations);
3068 }
3069
3070 /* cosmetic... */
3071 txn->final_lsn = lsn;
3072
3073 /* remove potential on-disk data, and deallocate */
3074 ReorderBufferCleanupTXN(rb, txn);
3075}
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_streamed(txn)
SharedInvalidationMessage * invalidations
TimestampTz abort_time
XLogRecPtr final_lsn
union ReorderBufferTXN::@116 xact_time
ReorderBufferStreamAbortCB stream_abort

References ReorderBufferTXN::abort_time, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), ReorderBuffer::stream_abort, and ReorderBufferTXN::xact_time.

Referenced by DecodeAbort().

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer rb,
TransactionId  oldestRunningXid 
)

Definition at line 3085 of file reorderbuffer.c.

3086{
3088
3089 /*
3090 * Iterate through all (potential) toplevel TXNs and abort all that are
3091 * older than what possibly can be running. Once we've found the first
3092 * that is alive we stop, there might be some that acquired an xid earlier
3093 * but started writing later, but it's unlikely and they will be cleaned
3094 * up in a later call to this function.
3095 */
3097 {
3098 ReorderBufferTXN *txn;
3099
3100 txn = dlist_container(ReorderBufferTXN, node, it.cur);
3101
3102 if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
3103 {
3104 elog(DEBUG2, "aborting old transaction %u", txn->xid);
3105
3106 /* Notify the remote node about the crash/immediate restart. */
3107 if (rbtxn_is_streamed(txn))
3108 rb->stream_abort(rb, txn, InvalidXLogRecPtr);
3109
3110 /* remove potential on-disk data, and deallocate this tx */
3111 ReorderBufferCleanupTXN(rb, txn);
3112 }
3113 else
3114 return;
3115 }
3116}
#define DEBUG2
Definition: elog.h:29
#define elog(elevel,...)
Definition: elog.h:226
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:640
TransactionId xid
dlist_node * cur
Definition: ilist.h:200
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280

References dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, elog, InvalidXLogRecPtr, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBuffer::stream_abort, ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), and ReorderBufferTXN::xid.

Referenced by standby_decode().

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 3438 of file reorderbuffer.c.

3441{
3442 ReorderBufferTXN *txn;
3443 MemoryContext oldcontext;
3444 ReorderBufferChange *change;
3445
3446 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3447
3448 oldcontext = MemoryContextSwitchTo(rb->context);
3449
3450 /*
3451 * Collect all the invalidations under the top transaction, if available,
3452 * so that we can execute them all together. See comments atop this
3453 * function.
3454 */
3455 txn = rbtxn_get_toptxn(txn);
3456
3457 Assert(nmsgs > 0);
3458
3459 /* Accumulate invalidations. */
3460 if (txn->ninvalidations == 0)
3461 {
3462 txn->ninvalidations = nmsgs;
3464 palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3465 memcpy(txn->invalidations, msgs,
3466 sizeof(SharedInvalidationMessage) * nmsgs);
3467 }
3468 else
3469 {
3472 (txn->ninvalidations + nmsgs));
3473
3474 memcpy(txn->invalidations + txn->ninvalidations, msgs,
3475 nmsgs * sizeof(SharedInvalidationMessage));
3476 txn->ninvalidations += nmsgs;
3477 }
3478
3479 change = ReorderBufferAllocChange(rb);
3481 change->data.inval.ninvalidations = nmsgs;
3483 palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3484 memcpy(change->data.inval.invalidations, msgs,
3485 sizeof(SharedInvalidationMessage) * nmsgs);
3486
3487 ReorderBufferQueueChange(rb, xid, lsn, change, false);
3488
3489 MemoryContextSwitchTo(oldcontext);
3490}
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:2166
void * palloc(Size size)
Definition: mcxt.c:1939
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
ReorderBufferChange * ReorderBufferAllocChange(ReorderBuffer *rb)
#define rbtxn_get_toptxn(txn)
@ REORDER_BUFFER_CHANGE_INVALIDATION
Definition: reorderbuffer.h:56
ReorderBufferChangeType action
Definition: reorderbuffer.h:81
union ReorderBufferChange::@110 data
struct ReorderBufferChange::@110::@115 inval
SharedInvalidationMessage * invalidations
MemoryContext context

References ReorderBufferChange::action, Assert(), ReorderBuffer::context, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::invalidations, ReorderBufferTXN::invalidations, MemoryContextSwitchTo(), ReorderBufferChange::ninvalidations, ReorderBufferTXN::ninvalidations, palloc(), rbtxn_get_toptxn, REORDER_BUFFER_CHANGE_INVALIDATION, ReorderBufferAllocChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), and repalloc().

Referenced by SnapBuildDistributeSnapshotAndInval(), and xact_decode().

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileLocator  locator,
ItemPointerData  tid,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 3401 of file reorderbuffer.c.

3405{
3407 ReorderBufferTXN *txn;
3408
3409 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3410
3411 change->data.tuplecid.locator = locator;
3412 change->data.tuplecid.tid = tid;
3413 change->data.tuplecid.cmin = cmin;
3414 change->data.tuplecid.cmax = cmax;
3415 change->data.tuplecid.combocid = combocid;
3416 change->lsn = lsn;
3417 change->txn = txn;
3419
3420 dlist_push_tail(&txn->tuplecids, &change->node);
3421 txn->ntuplecids++;
3422}
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:364
@ REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID
Definition: reorderbuffer.h:59
ItemPointerData tid
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:84
RelFileLocator locator
struct ReorderBufferChange::@110::@114 tuplecid
dlist_head tuplecids

References ReorderBufferChange::action, ReorderBufferChange::cmax, ReorderBufferChange::cmin, ReorderBufferChange::combocid, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::locator, ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferAllocChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tid, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, and ReorderBufferChange::txn.

Referenced by SnapBuildProcessNewCid().

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

◆ ReorderBufferAllocate()

ReorderBuffer * ReorderBufferAllocate ( void  )

Definition at line 312 of file reorderbuffer.c.

313{
314 ReorderBuffer *buffer;
315 HASHCTL hash_ctl;
316 MemoryContext new_ctx;
317
318 Assert(MyReplicationSlot != NULL);
319
320 /* allocate memory in own context, to have better accountability */
322 "ReorderBuffer",
324
325 buffer =
326 (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
327
328 memset(&hash_ctl, 0, sizeof(hash_ctl));
329
330 buffer->context = new_ctx;
331
332 buffer->change_context = SlabContextCreate(new_ctx,
333 "Change",
335 sizeof(ReorderBufferChange));
336
337 buffer->txn_context = SlabContextCreate(new_ctx,
338 "TXN",
340 sizeof(ReorderBufferTXN));
341
342 /*
343 * To minimize memory fragmentation caused by long-running transactions
344 * with changes spanning multiple memory blocks, we use a single
345 * fixed-size memory block for decoded tuple storage. The performance
346 * testing showed that the default memory block size maintains logical
347 * decoding performance without causing fragmentation due to concurrent
348 * transactions. One might think that we can use the max size as
349 * SLAB_LARGE_BLOCK_SIZE but the test also showed it doesn't help resolve
350 * the memory fragmentation.
351 */
352 buffer->tup_context = GenerationContextCreate(new_ctx,
353 "Tuples",
357
358 hash_ctl.keysize = sizeof(TransactionId);
359 hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
360 hash_ctl.hcxt = buffer->context;
361
362 buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
364
366 buffer->by_txn_last_txn = NULL;
367
368 buffer->outbuf = NULL;
369 buffer->outbufsize = 0;
370 buffer->size = 0;
371
372 /* txn_heap is ordered by transaction size */
374
375 buffer->spillTxns = 0;
376 buffer->spillCount = 0;
377 buffer->spillBytes = 0;
378 buffer->streamTxns = 0;
379 buffer->streamCount = 0;
380 buffer->streamBytes = 0;
381 buffer->totalTxns = 0;
382 buffer->totalBytes = 0;
383
385
386 dlist_init(&buffer->toplevel_by_lsn);
388 dclist_init(&buffer->catchange_txns);
389
390 /*
391 * Ensure there's no stale data from prior uses of this slot, in case some
392 * prior exit avoided calling ReorderBufferFree. Failure to do this can
393 * produce duplicated txns, and it's very cheap if there's nothing there.
394 */
396
397 return buffer;
398}
#define NameStr(name)
Definition: c.h:717
uint32 TransactionId
Definition: c.h:623
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: generation.c:160
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
static void dclist_init(dclist_head *head)
Definition: ilist.h:671
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1256
MemoryContext CurrentMemoryContext
Definition: mcxt.c:159
#define AllocSetContextCreate
Definition: memutils.h:149
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:180
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:209
pairingheap * pairingheap_allocate(pairingheap_comparator compare, void *arg)
Definition: pairingheap.c:42
static int ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg)
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:322
ReplicationSlot * MyReplicationSlot
Definition: slot.c:147
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
MemoryContext hcxt
Definition: hsearch.h:86
dclist_head catchange_txns
MemoryContext change_context
ReorderBufferTXN * by_txn_last_txn
TransactionId by_txn_last_xid
MemoryContext tup_context
pairingheap * txn_heap
MemoryContext txn_context
XLogRecPtr current_restart_decoding_lsn
ReplicationSlotPersistentData data
Definition: slot.h:185
#define InvalidTransactionId
Definition: transam.h:31

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert(), ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::catchange_txns, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dclist_init(), dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, pairingheap_allocate(), ReorderBufferCleanupSerializedTXNs(), ReorderBufferTXNSizeCompare(), ReorderBuffer::size, SLAB_DEFAULT_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBuffer::streamBytes, ReorderBuffer::streamCount, ReorderBuffer::streamTxns, ReorderBuffer::toplevel_by_lsn, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBuffer::tup_context, ReorderBuffer::txn_context, ReorderBuffer::txn_heap, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by StartupDecodingContext().

◆ ReorderBufferAllocChange()

◆ ReorderBufferAllocRelids()

Oid * ReorderBufferAllocRelids ( ReorderBuffer rb,
int  nrelids 
)

Definition at line 606 of file reorderbuffer.c.

607{
608 Oid *relids;
609 Size alloc_len;
610
611 alloc_len = sizeof(Oid) * nrelids;
612
613 relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
614
615 return relids;
616}
size_t Size
Definition: c.h:576
unsigned int Oid
Definition: postgres_ext.h:30

References ReorderBuffer::context, and MemoryContextAlloc().

Referenced by DecodeTruncate(), and ReorderBufferRestoreChange().

◆ ReorderBufferAllocTupleBuf()

HeapTuple ReorderBufferAllocTupleBuf ( ReorderBuffer rb,
Size  tuple_len 
)

Definition at line 573 of file reorderbuffer.c.

574{
575 HeapTuple tuple;
576 Size alloc_len;
577
578 alloc_len = tuple_len + SizeofHeapTupleHeader;
579
581 HEAPTUPLESIZE + alloc_len);
582 tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
583
584 return tuple;
585}
#define HEAPTUPLESIZE
Definition: htup.h:73
HeapTupleData * HeapTuple
Definition: htup.h:71
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
#define SizeofHeapTupleHeader
Definition: htup_details.h:185
HeapTupleHeader t_data
Definition: htup.h:68

References HEAPTUPLESIZE, MemoryContextAlloc(), SizeofHeapTupleHeader, HeapTupleData::t_data, and ReorderBuffer::tup_context.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

◆ ReorderBufferAllocTXN()

static ReorderBufferTXN * ReorderBufferAllocTXN ( ReorderBuffer rb)
static

Definition at line 422 of file reorderbuffer.c.

423{
424 ReorderBufferTXN *txn;
425
426 txn = (ReorderBufferTXN *)
428
429 memset(txn, 0, sizeof(ReorderBufferTXN));
430
431 dlist_init(&txn->changes);
432 dlist_init(&txn->tuplecids);
433 dlist_init(&txn->subtxns);
434
435 /* InvalidCommandId is not zero, so set it explicitly */
437 txn->output_plugin_private = NULL;
438
439 return txn;
440}
CommandId command_id
void * output_plugin_private
dlist_head subtxns

References ReorderBufferTXN::changes, ReorderBufferTXN::command_id, dlist_init(), InvalidCommandId, MemoryContextAlloc(), ReorderBufferTXN::output_plugin_private, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, and ReorderBuffer::txn_context.

Referenced by ReorderBufferTXNByXid().

◆ ReorderBufferApplyChange()

static void ReorderBufferApplyChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 2053 of file reorderbuffer.c.

2056{
2057 if (streaming)
2058 rb->stream_change(rb, txn, relation, change);
2059 else
2060 rb->apply_change(rb, txn, relation, change);
2061}
ReorderBufferStreamChangeCB stream_change
ReorderBufferApplyChangeCB apply_change

References ReorderBuffer::apply_change, and ReorderBuffer::stream_change.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferApplyMessage()

static void ReorderBufferApplyMessage ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 2081 of file reorderbuffer.c.

2083{
2084 if (streaming)
2085 rb->stream_message(rb, txn, change->lsn, true,
2086 change->data.msg.prefix,
2087 change->data.msg.message_size,
2088 change->data.msg.message);
2089 else
2090 rb->message(rb, txn, change->lsn, true,
2091 change->data.msg.prefix,
2092 change->data.msg.message_size,
2093 change->data.msg.message);
2094}
struct ReorderBufferChange::@110::@113 msg
ReorderBufferStreamMessageCB stream_message
ReorderBufferMessageCB message

References ReorderBufferChange::data, ReorderBufferChange::lsn, ReorderBufferChange::message, ReorderBuffer::message, ReorderBufferChange::message_size, ReorderBufferChange::msg, ReorderBufferChange::prefix, and ReorderBuffer::stream_message.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferApplyTruncate()

static void ReorderBufferApplyTruncate ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  nrelations,
Relation relations,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 2067 of file reorderbuffer.c.

2070{
2071 if (streaming)
2072 rb->stream_truncate(rb, txn, nrelations, relations, change);
2073 else
2074 rb->apply_truncate(rb, txn, nrelations, relations, change);
2075}
ReorderBufferStreamTruncateCB stream_truncate
ReorderBufferApplyTruncateCB apply_truncate

References ReorderBuffer::apply_truncate, and ReorderBuffer::stream_truncate.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)

Definition at line 1080 of file reorderbuffer.c.

1082{
1083 ReorderBufferTXN *txn;
1084 ReorderBufferTXN *subtxn;
1085 bool new_top;
1086 bool new_sub;
1087
1088 txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1089 subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1090
1091 if (!new_sub)
1092 {
1093 if (rbtxn_is_known_subxact(subtxn))
1094 {
1095 /* already associated, nothing to do */
1096 return;
1097 }
1098 else
1099 {
1100 /*
1101 * We already saw this transaction, but initially added it to the
1102 * list of top-level txns. Now that we know it's not top-level,
1103 * remove it from there.
1104 */
1105 dlist_delete(&subtxn->node);
1106 }
1107 }
1108
1109 subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1110 subtxn->toplevel_xid = xid;
1111 Assert(subtxn->nsubtxns == 0);
1112
1113 /* set the reference to top-level transaction */
1114 subtxn->toptxn = txn;
1115
1116 /* add to subtransaction list */
1117 dlist_push_tail(&txn->subtxns, &subtxn->node);
1118 txn->nsubtxns++;
1119
1120 /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1122
1123 /* Verify LSN-ordering invariant */
1125}
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
#define RBTXN_IS_SUBXACT
TransactionId toplevel_xid
struct ReorderBufferTXN * toptxn

References Assert(), AssertTXNLsnOrder(), dlist_delete(), dlist_push_tail(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SUBXACT, ReorderBufferTransferSnapToParent(), ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, ReorderBufferTXN::toplevel_xid, ReorderBufferTXN::toptxn, and ReorderBufferTXN::txn_flags.

Referenced by LogicalDecodingProcessRecord(), and ReorderBufferCommitChild().

◆ ReorderBufferBuildTupleCidHash()

static void ReorderBufferBuildTupleCidHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1817 of file reorderbuffer.c.

1818{
1819 dlist_iter iter;
1820 HASHCTL hash_ctl;
1821
1823 return;
1824
1825 hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1826 hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1827 hash_ctl.hcxt = rb->context;
1828
1829 /*
1830 * create the hash with the exact number of to-be-stored tuplecids from
1831 * the start
1832 */
1833 txn->tuplecid_hash =
1834 hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1836
1837 dlist_foreach(iter, &txn->tuplecids)
1838 {
1841 bool found;
1842 ReorderBufferChange *change;
1843
1844 change = dlist_container(ReorderBufferChange, node, iter.cur);
1845
1847
1848 /* be careful about padding */
1849 memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1850
1851 key.rlocator = change->data.tuplecid.locator;
1852
1854 &key.tid);
1855
1856 ent = (ReorderBufferTupleCidEnt *)
1857 hash_search(txn->tuplecid_hash, &key, HASH_ENTER, &found);
1858 if (!found)
1859 {
1860 ent->cmin = change->data.tuplecid.cmin;
1861 ent->cmax = change->data.tuplecid.cmax;
1862 ent->combocid = change->data.tuplecid.combocid;
1863 }
1864 else
1865 {
1866 /*
1867 * Maybe we already saw this tuple before in this transaction, but
1868 * if so it must have the same cmin.
1869 */
1870 Assert(ent->cmin == change->data.tuplecid.cmin);
1871
1872 /*
1873 * cmax may be initially invalid, but once set it can only grow,
1874 * and never become invalid again.
1875 */
1876 Assert((ent->cmax == InvalidCommandId) ||
1877 ((change->data.tuplecid.cmax != InvalidCommandId) &&
1878 (change->data.tuplecid.cmax > ent->cmax)));
1879 ent->cmax = change->data.tuplecid.cmax;
1880 }
1881 }
1882}
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
#define rbtxn_has_catalog_changes(txn)

References ReorderBufferChange::action, Assert(), ReorderBufferTupleCidEnt::cmax, ReorderBufferChange::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferChange::cmin, ReorderBufferTupleCidEnt::combocid, ReorderBufferChange::combocid, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, dlist_container, dlist_foreach, dlist_is_empty(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), HASHCTL::hcxt, InvalidCommandId, ItemPointerCopy(), sort-test::key, HASHCTL::keysize, ReorderBufferChange::locator, ReorderBufferTXN::ntuplecids, rbtxn_has_catalog_changes, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferChange::tid, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::tuplecids.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferCanStartStreaming()

static bool ReorderBufferCanStartStreaming ( ReorderBuffer rb)
inlinestatic

Definition at line 4152 of file reorderbuffer.c.

4153{
4155 SnapBuild *builder = ctx->snapshot_builder;
4156
4157 /* We can't start streaming unless a consistent state is reached. */
4159 return false;
4160
4161 /*
4162 * We can't start streaming immediately even if the streaming is enabled
4163 * because we previously decoded this transaction and now just are
4164 * restarting.
4165 */
4166 if (ReorderBufferCanStream(rb) &&
4167 !SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr))
4168 return true;
4169
4170 return false;
4171}
static bool ReorderBufferCanStream(ReorderBuffer *rb)
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
Definition: snapbuild.c:277
@ SNAPBUILD_CONSISTENT
Definition: snapbuild.h:50
XLogRecPtr ReadRecPtr
Definition: xlogreader.h:206

References ReorderBuffer::private_data, LogicalDecodingContext::reader, XLogReaderState::ReadRecPtr, ReorderBufferCanStream(), SNAPBUILD_CONSISTENT, SnapBuildCurrentState(), SnapBuildXactNeedsSkip(), and LogicalDecodingContext::snapshot_builder.

Referenced by ReorderBufferCheckMemoryLimit(), and ReorderBufferProcessPartialChange().

◆ ReorderBufferCanStream()

static bool ReorderBufferCanStream ( ReorderBuffer rb)
inlinestatic

◆ ReorderBufferChangeMemoryUpdate()

static void ReorderBufferChangeMemoryUpdate ( ReorderBuffer rb,
ReorderBufferChange change,
ReorderBufferTXN txn,
bool  addition,
Size  sz 
)
static

Definition at line 3330 of file reorderbuffer.c.

3334{
3335 ReorderBufferTXN *toptxn;
3336
3337 Assert(txn || change);
3338
3339 /*
3340 * Ignore tuple CID changes, because those are not evicted when reaching
3341 * memory limit. So we just don't count them, because it might easily
3342 * trigger a pointless attempt to spill.
3343 */
3344 if (change && change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
3345 return;
3346
3347 if (sz == 0)
3348 return;
3349
3350 if (txn == NULL)
3351 txn = change->txn;
3352 Assert(txn != NULL);
3353
3354 /*
3355 * Update the total size in top level as well. This is later used to
3356 * compute the decoding stats.
3357 */
3358 toptxn = rbtxn_get_toptxn(txn);
3359
3360 if (addition)
3361 {
3362 Size oldsize = txn->size;
3363
3364 txn->size += sz;
3365 rb->size += sz;
3366
3367 /* Update the total size in the top transaction. */
3368 toptxn->total_size += sz;
3369
3370 /* Update the max-heap */
3371 if (oldsize != 0)
3373 pairingheap_add(rb->txn_heap, &txn->txn_node);
3374 }
3375 else
3376 {
3377 Assert((rb->size >= sz) && (txn->size >= sz));
3378 txn->size -= sz;
3379 rb->size -= sz;
3380
3381 /* Update the total size in the top transaction. */
3382 toptxn->total_size -= sz;
3383
3384 /* Update the max-heap */
3386 if (txn->size != 0)
3387 pairingheap_add(rb->txn_heap, &txn->txn_node);
3388 }
3389
3390 Assert(txn->size <= rb->size);
3391}
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:170
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:112
pairingheap_node txn_node

References ReorderBufferChange::action, Assert(), pairingheap_add(), pairingheap_remove(), rbtxn_get_toptxn, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferTXN::size, ReorderBuffer::size, ReorderBufferTXN::total_size, ReorderBufferChange::txn, ReorderBuffer::txn_heap, and ReorderBufferTXN::txn_node.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferFreeChange(), ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferSerializeTXN(), ReorderBufferToastReplace(), and ReorderBufferTruncateTXN().

◆ ReorderBufferChangeSize()

static Size ReorderBufferChangeSize ( ReorderBufferChange change)
static

Definition at line 4295 of file reorderbuffer.c.

4296{
4297 Size sz = sizeof(ReorderBufferChange);
4298
4299 switch (change->action)
4300 {
4301 /* fall through these, they're all similar enough */
4306 {
4307 HeapTuple oldtup,
4308 newtup;
4309 Size oldlen = 0;
4310 Size newlen = 0;
4311
4312 oldtup = change->data.tp.oldtuple;
4313 newtup = change->data.tp.newtuple;
4314
4315 if (oldtup)
4316 {
4317 sz += sizeof(HeapTupleData);
4318 oldlen = oldtup->t_len;
4319 sz += oldlen;
4320 }
4321
4322 if (newtup)
4323 {
4324 sz += sizeof(HeapTupleData);
4325 newlen = newtup->t_len;
4326 sz += newlen;
4327 }
4328
4329 break;
4330 }
4332 {
4333 Size prefix_size = strlen(change->data.msg.prefix) + 1;
4334
4335 sz += prefix_size + change->data.msg.message_size +
4336 sizeof(Size) + sizeof(Size);
4337
4338 break;
4339 }
4341 {
4342 sz += sizeof(SharedInvalidationMessage) *
4343 change->data.inval.ninvalidations;
4344 break;
4345 }
4347 {
4348 Snapshot snap;
4349
4350 snap = change->data.snapshot;
4351
4352 sz += sizeof(SnapshotData) +
4353 sizeof(TransactionId) * snap->xcnt +
4354 sizeof(TransactionId) * snap->subxcnt;
4355
4356 break;
4357 }
4359 {
4360 sz += sizeof(Oid) * change->data.truncate.nrelids;
4361
4362 break;
4363 }
4368 /* ReorderBufferChange contains everything important */
4369 break;
4370 }
4371
4372 return sz;
4373}
struct HeapTupleData HeapTupleData
struct ReorderBufferChange ReorderBufferChange
@ REORDER_BUFFER_CHANGE_MESSAGE
Definition: reorderbuffer.h:55
@ REORDER_BUFFER_CHANGE_TRUNCATE
Definition: reorderbuffer.h:63
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:54
struct SnapshotData SnapshotData
uint32 t_len
Definition: htup.h:64
struct ReorderBufferChange::@110::@112 truncate
struct ReorderBufferChange::@110::@111 tp
int32 subxcnt
Definition: snapshot.h:177
uint32 xcnt
Definition: snapshot.h:165

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::message_size, ReorderBufferChange::msg, ReorderBufferChange::newtuple, ReorderBufferChange::ninvalidations, ReorderBufferChange::nrelids, ReorderBufferChange::oldtuple, ReorderBufferChange::prefix, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::snapshot, SnapshotData::subxcnt, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, and SnapshotData::xcnt.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferFreeChange(), ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferToastReplace(), and ReorderBufferTruncateTXN().

◆ ReorderBufferCheckAndTruncateAbortedTXN()

static bool ReorderBufferCheckAndTruncateAbortedTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1755 of file reorderbuffer.c.

1756{
1757 /* Quick return for regression tests */
1759 return false;
1760
1761 /*
1762 * Quick return if the transaction status is already known.
1763 */
1764
1765 if (rbtxn_is_committed(txn))
1766 return false;
1767 if (rbtxn_is_aborted(txn))
1768 {
1769 /* Already-aborted transactions should not have any changes */
1770 Assert(txn->size == 0);
1771
1772 return true;
1773 }
1774
1775 /* Otherwise, check the transaction status using CLOG lookup */
1776
1778 return false;
1779
1780 if (TransactionIdDidCommit(txn->xid))
1781 {
1782 /*
1783 * Remember the transaction is committed so that we can skip CLOG
1784 * check next time, avoiding the pressure on CLOG lookup.
1785 */
1786 Assert(!rbtxn_is_aborted(txn));
1788 return false;
1789 }
1790
1791 /*
1792 * The transaction aborted. We discard both the changes collected so far
1793 * and the toast reconstruction data. The full cleanup will happen as part
1794 * of decoding ABORT record of this transaction.
1795 */
1797 ReorderBufferToastReset(rb, txn);
1798
1799 /* All changes should be discarded */
1800 Assert(txn->size == 0);
1801
1802 /*
1803 * Mark the transaction as aborted so we can ignore future changes of this
1804 * transaction.
1805 */
1808
1809 return true;
1810}
#define unlikely(x)
Definition: c.h:347
bool TransactionIdIsInProgress(TransactionId xid)
Definition: procarray.c:1402
int debug_logical_replication_streaming
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define rbtxn_is_committed(txn)
@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE
Definition: reorderbuffer.h:34
#define rbtxn_is_prepared(txn)
#define RBTXN_IS_COMMITTED
#define rbtxn_is_aborted(txn)
#define RBTXN_IS_ABORTED
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:126

References Assert(), DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE, debug_logical_replication_streaming, RBTXN_IS_ABORTED, rbtxn_is_aborted, RBTXN_IS_COMMITTED, rbtxn_is_committed, rbtxn_is_prepared, ReorderBufferToastReset(), ReorderBufferTruncateTXN(), ReorderBufferTXN::size, TransactionIdDidCommit(), TransactionIdIsInProgress(), ReorderBufferTXN::txn_flags, unlikely, and ReorderBufferTXN::xid.

Referenced by ReorderBufferCheckMemoryLimit().

◆ ReorderBufferCheckMemoryLimit()

static void ReorderBufferCheckMemoryLimit ( ReorderBuffer rb)
static

Definition at line 3753 of file reorderbuffer.c.

3754{
3755 ReorderBufferTXN *txn;
3756
3757 /*
3758 * Bail out if debug_logical_replication_streaming is buffered and we
3759 * haven't exceeded the memory limit.
3760 */
3762 rb->size < logical_decoding_work_mem * (Size) 1024)
3763 return;
3764
3765 /*
3766 * If debug_logical_replication_streaming is immediate, loop until there's
3767 * no change. Otherwise, loop until we reach under the memory limit. One
3768 * might think that just by evicting the largest (sub)transaction we will
3769 * come under the memory limit based on assumption that the selected
3770 * transaction is at least as large as the most recent change (which
3771 * caused us to go over the memory limit). However, that is not true
3772 * because a user can reduce the logical_decoding_work_mem to a smaller
3773 * value before the most recent change.
3774 */
3775 while (rb->size >= logical_decoding_work_mem * (Size) 1024 ||
3777 rb->size > 0))
3778 {
3779 /*
3780 * Pick the largest non-aborted transaction and evict it from memory
3781 * by streaming, if possible. Otherwise, spill to disk.
3782 */
3784 (txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
3785 {
3786 /* we know there has to be one, because the size is not zero */
3787 Assert(txn && rbtxn_is_toptxn(txn));
3788 Assert(txn->total_size > 0);
3789 Assert(rb->size >= txn->total_size);
3790
3791 /* skip the transaction if aborted */
3793 continue;
3794
3795 ReorderBufferStreamTXN(rb, txn);
3796 }
3797 else
3798 {
3799 /*
3800 * Pick the largest transaction (or subtransaction) and evict it
3801 * from memory by serializing it to disk.
3802 */
3803 txn = ReorderBufferLargestTXN(rb);
3804
3805 /* we know there has to be one, because the size is not zero */
3806 Assert(txn);
3807 Assert(txn->size > 0);
3808 Assert(rb->size >= txn->size);
3809
3810 /* skip the transaction if aborted */
3812 continue;
3813
3815 }
3816
3817 /*
3818 * After eviction, the transaction should have no entries in memory,
3819 * and should use 0 bytes for changes.
3820 */
3821 Assert(txn->size == 0);
3822 Assert(txn->nentries_mem == 0);
3823 }
3824
3825 /* We must be under the memory limit now. */
3826 Assert(rb->size < logical_decoding_work_mem * (Size) 1024);
3827}
static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb)
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)
int logical_decoding_work_mem
static ReorderBufferTXN * ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
static bool ReorderBufferCheckAndTruncateAbortedTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
@ DEBUG_LOGICAL_REP_STREAMING_BUFFERED
Definition: reorderbuffer.h:33
#define rbtxn_is_toptxn(txn)

References Assert(), DEBUG_LOGICAL_REP_STREAMING_BUFFERED, DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE, debug_logical_replication_streaming, logical_decoding_work_mem, ReorderBufferTXN::nentries_mem, rbtxn_is_toptxn, ReorderBufferCanStartStreaming(), ReorderBufferCheckAndTruncateAbortedTXN(), ReorderBufferLargestStreamableTopTXN(), ReorderBufferLargestTXN(), ReorderBufferSerializeTXN(), ReorderBufferStreamTXN(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBufferTXN::total_size.

Referenced by ReorderBufferQueueChange().

◆ ReorderBufferCleanupSerializedTXNs()

static void ReorderBufferCleanupSerializedTXNs ( const char *  slotname)
static

Definition at line 4720 of file reorderbuffer.c.

4721{
4722 DIR *spill_dir;
4723 struct dirent *spill_de;
4724 struct stat statbuf;
4725 char path[MAXPGPATH * 2 + sizeof(PG_REPLSLOT_DIR)];
4726
4727 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, slotname);
4728
4729 /* we're only handling directories here, skip if it's not ours */
4730 if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
4731 return;
4732
4733 spill_dir = AllocateDir(path);
4734 while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
4735 {
4736 /* only look at names that can be ours */
4737 if (strncmp(spill_de->d_name, "xid", 3) == 0)
4738 {
4739 snprintf(path, sizeof(path),
4740 "%s/%s/%s", PG_REPLSLOT_DIR, slotname,
4741 spill_de->d_name);
4742
4743 if (unlink(path) != 0)
4744 ereport(ERROR,
4746 errmsg("could not remove file \"%s\" during removal of %s/%s/xid*: %m",
4747 path, PG_REPLSLOT_DIR, slotname)));
4748 }
4749 }
4750 FreeDir(spill_dir);
4751}
#define INFO
Definition: elog.h:34
int FreeDir(DIR *dir)
Definition: fd.c:3025
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2988
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2907
#define snprintf
Definition: port.h:239
#define PG_REPLSLOT_DIR
Definition: slot.h:21
Definition: dirent.c:26
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15
#define lstat(path, sb)
Definition: win32_port.h:275
#define S_ISDIR(m)
Definition: win32_port.h:315

References AllocateDir(), dirent::d_name, ereport, errcode_for_file_access(), errmsg(), ERROR, FreeDir(), INFO, lstat, MAXPGPATH, PG_REPLSLOT_DIR, ReadDirExtended(), S_ISDIR, snprintf, sprintf, and stat::st_mode.

Referenced by ReorderBufferAllocate(), ReorderBufferFree(), and StartupReorderBuffer().

◆ ReorderBufferCleanupTXN()

static void ReorderBufferCleanupTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1516 of file reorderbuffer.c.

1517{
1518 bool found;
1519 dlist_mutable_iter iter;
1520 Size mem_freed = 0;
1521
1522 /* cleanup subtransactions & their changes */
1523 dlist_foreach_modify(iter, &txn->subtxns)
1524 {
1525 ReorderBufferTXN *subtxn;
1526
1527 subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1528
1529 /*
1530 * Subtransactions are always associated to the toplevel TXN, even if
1531 * they originally were happening inside another subtxn, so we won't
1532 * ever recurse more than one level deep here.
1533 */
1535 Assert(subtxn->nsubtxns == 0);
1536
1537 ReorderBufferCleanupTXN(rb, subtxn);
1538 }
1539
1540 /* cleanup changes in the txn */
1541 dlist_foreach_modify(iter, &txn->changes)
1542 {
1543 ReorderBufferChange *change;
1544
1545 change = dlist_container(ReorderBufferChange, node, iter.cur);
1546
1547 /* Check we're not mixing changes from different transactions. */
1548 Assert(change->txn == txn);
1549
1550 /*
1551 * Instead of updating the memory counter for individual changes, we
1552 * sum up the size of memory to free so we can update the memory
1553 * counter all together below. This saves costs of maintaining the
1554 * max-heap.
1555 */
1556 mem_freed += ReorderBufferChangeSize(change);
1557
1558 ReorderBufferFreeChange(rb, change, false);
1559 }
1560
1561 /* Update the memory counter */
1562 ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
1563
1564 /*
1565 * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1566 * They are always stored in the toplevel transaction.
1567 */
1568 dlist_foreach_modify(iter, &txn->tuplecids)
1569 {
1570 ReorderBufferChange *change;
1571
1572 change = dlist_container(ReorderBufferChange, node, iter.cur);
1573
1574 /* Check we're not mixing changes from different transactions. */
1575 Assert(change->txn == txn);
1577
1578 ReorderBufferFreeChange(rb, change, true);
1579 }
1580
1581 /*
1582 * Cleanup the base snapshot, if set.
1583 */
1584 if (txn->base_snapshot != NULL)
1585 {
1588 }
1589
1590 /*
1591 * Cleanup the snapshot for the last streamed run.
1592 */
1593 if (txn->snapshot_now != NULL)
1594 {
1597 }
1598
1599 /*
1600 * Remove TXN from its containing lists.
1601 *
1602 * Note: if txn is known as subxact, we are deleting the TXN from its
1603 * parent's list of known subxacts; this leaves the parent's nsubxacts
1604 * count too high, but we don't care. Otherwise, we are deleting the TXN
1605 * from the LSN-ordered list of toplevel TXNs. We remove the TXN from the
1606 * list of catalog modifying transactions as well.
1607 */
1608 dlist_delete(&txn->node);
1611
1612 /* now remove reference from buffer */
1613 hash_search(rb->by_txn, &txn->xid, HASH_REMOVE, &found);
1614 Assert(found);
1615
1616 /* remove entries spilled to disk */
1617 if (rbtxn_is_serialized(txn))
1619
1620 /* deallocate */
1621 ReorderBufferFreeTXN(rb, txn);
1622}
@ HASH_REMOVE
Definition: hsearch.h:115
static void dclist_delete_from(dclist_head *head, dlist_node *node)
Definition: ilist.h:763
void ReorderBufferFreeChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
static void ReorderBufferFreeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, ReorderBufferTXN *txn, bool addition, Size sz)
#define rbtxn_is_serialized(txn)
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:328
Snapshot snapshot_now
dlist_node catchange_node
dlist_node base_snapshot_node

References ReorderBufferChange::action, Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_node, ReorderBuffer::by_txn, ReorderBufferTXN::catchange_node, ReorderBuffer::catchange_txns, ReorderBufferTXN::changes, dlist_mutable_iter::cur, dclist_delete_from(), dlist_container, dlist_delete(), dlist_foreach_modify, HASH_REMOVE, hash_search(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_has_catalog_changes, rbtxn_is_known_subxact, rbtxn_is_serialized, rbtxn_is_streamed, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferCleanupTXN(), ReorderBufferFreeChange(), ReorderBufferFreeSnap(), ReorderBufferFreeTXN(), ReorderBufferRestoreCleanup(), SnapBuildSnapDecRefcount(), ReorderBufferTXN::snapshot_now, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAbortOld(), ReorderBufferCleanupTXN(), ReorderBufferFinishPrepared(), ReorderBufferForget(), ReorderBufferProcessTXN(), ReorderBufferReplay(), and ReorderBufferStreamCommit().

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2837 of file reorderbuffer.c.

2841{
2842 ReorderBufferTXN *txn;
2843
2844 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2845 false);
2846
2847 /* unknown transaction, nothing to replay */
2848 if (txn == NULL)
2849 return;
2850
2851 ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2852 origin_id, origin_lsn);
2853}
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)

References InvalidXLogRecPtr, ReorderBufferReplay(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 1200 of file reorderbuffer.c.

1203{
1204 ReorderBufferTXN *subtxn;
1205
1206 subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1207 InvalidXLogRecPtr, false);
1208
1209 /*
1210 * No need to do anything if that subtxn didn't contain any changes
1211 */
1212 if (!subtxn)
1213 return;
1214
1215 subtxn->final_lsn = commit_lsn;
1216 subtxn->end_lsn = end_lsn;
1217
1218 /*
1219 * Assign this subxact as a child of the toplevel xact (no-op if already
1220 * done.)
1221 */
1223}
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferAssignChild(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit(), and DecodePrepare().

◆ ReorderBufferCopySnap()

static Snapshot ReorderBufferCopySnap ( ReorderBuffer rb,
Snapshot  orig_snap,
ReorderBufferTXN txn,
CommandId  cid 
)
static

Definition at line 1890 of file reorderbuffer.c.

1892{
1893 Snapshot snap;
1894 dlist_iter iter;
1895 int i = 0;
1896 Size size;
1897
1898 size = sizeof(SnapshotData) +
1899 sizeof(TransactionId) * orig_snap->xcnt +
1900 sizeof(TransactionId) * (txn->nsubtxns + 1);
1901
1902 snap = MemoryContextAllocZero(rb->context, size);
1903 memcpy(snap, orig_snap, sizeof(SnapshotData));
1904
1905 snap->copied = true;
1906 snap->active_count = 1; /* mark as active so nobody frees it */
1907 snap->regd_count = 0;
1908 snap->xip = (TransactionId *) (snap + 1);
1909
1910 memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1911
1912 /*
1913 * snap->subxip contains all txids that belong to our transaction which we
1914 * need to check via cmin/cmax. That's why we store the toplevel
1915 * transaction in there as well.
1916 */
1917 snap->subxip = snap->xip + snap->xcnt;
1918 snap->subxip[i++] = txn->xid;
1919
1920 /*
1921 * txn->nsubtxns isn't decreased when subtransactions abort, so count
1922 * manually. Since it's an upper boundary it is safe to use it for the
1923 * allocation above.
1924 */
1925 snap->subxcnt = 1;
1926
1927 dlist_foreach(iter, &txn->subtxns)
1928 {
1929 ReorderBufferTXN *sub_txn;
1930
1931 sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1932 snap->subxip[i++] = sub_txn->xid;
1933 snap->subxcnt++;
1934 }
1935
1936 /* sort so we can bsearch() later */
1937 qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1938
1939 /* store the specified current CommandId */
1940 snap->curcid = cid;
1941
1942 return snap;
1943}
int i
Definition: isn.c:77
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1290
#define qsort(a, b, c, d)
Definition: port.h:479
bool copied
Definition: snapshot.h:181
uint32 regd_count
Definition: snapshot.h:201
uint32 active_count
Definition: snapshot.h:200
CommandId curcid
Definition: snapshot.h:183
TransactionId * subxip
Definition: snapshot.h:176
TransactionId * xip
Definition: snapshot.h:164
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:152

References SnapshotData::active_count, ReorderBuffer::context, SnapshotData::copied, dlist_iter::cur, SnapshotData::curcid, dlist_container, dlist_foreach, i, MemoryContextAllocZero(), ReorderBufferTXN::nsubtxns, qsort, SnapshotData::regd_count, ReorderBufferTXN::subtxns, SnapshotData::subxcnt, SnapshotData::subxip, SnapshotData::xcnt, ReorderBufferTXN::xid, xidComparator(), and SnapshotData::xip.

Referenced by ReorderBufferProcessTXN(), ReorderBufferSaveTXNSnapshot(), and ReorderBufferStreamTXN().

◆ ReorderBufferExecuteInvalidations()

static void ReorderBufferExecuteInvalidations ( uint32  nmsgs,
SharedInvalidationMessage msgs 
)
static

Definition at line 3497 of file reorderbuffer.c.

3498{
3499 int i;
3500
3501 for (i = 0; i < nmsgs; i++)
3503}
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:823

References i, and LocalExecuteInvalidationMessage().

Referenced by ReorderBufferFinishPrepared(), and ReorderBufferProcessTXN().

◆ ReorderBufferFinishPrepared()

void ReorderBufferFinishPrepared ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
XLogRecPtr  two_phase_at,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn,
char *  gid,
bool  is_commit 
)

Definition at line 2954 of file reorderbuffer.c.

2959{
2960 ReorderBufferTXN *txn;
2961 XLogRecPtr prepare_end_lsn;
2962 TimestampTz prepare_time;
2963
2964 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
2965
2966 /* unknown transaction, nothing to do */
2967 if (txn == NULL)
2968 return;
2969
2970 /*
2971 * By this time the txn has the prepare record information, remember it to
2972 * be later used for rollback.
2973 */
2974 prepare_end_lsn = txn->end_lsn;
2975 prepare_time = txn->xact_time.prepare_time;
2976
2977 /* add the gid in the txn */
2978 txn->gid = pstrdup(gid);
2979
2980 /*
2981 * It is possible that this transaction is not decoded at prepare time
2982 * either because by that time we didn't have a consistent snapshot, or
2983 * two_phase was not enabled, or it was decoded earlier but we have
2984 * restarted. We only need to send the prepare if it was not decoded
2985 * earlier. We don't need to decode the xact for aborts if it is not done
2986 * already.
2987 */
2988 if ((txn->final_lsn < two_phase_at) && is_commit)
2989 {
2990 /*
2991 * txn must have been marked as a prepared transaction and skipped but
2992 * not sent a prepare. Also, the prepare info must have been updated
2993 * in txn even if we skip prepare.
2994 */
2998
2999 /*
3000 * By this time the txn has the prepare record information and it is
3001 * important to use that so that downstream gets the accurate
3002 * information. If instead, we have passed commit information here
3003 * then downstream can behave as it has already replayed commit
3004 * prepared after the restart.
3005 */
3006 ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
3007 txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
3008 }
3009
3010 txn->final_lsn = commit_lsn;
3011 txn->end_lsn = end_lsn;
3012 txn->xact_time.commit_time = commit_time;
3013 txn->origin_id = origin_id;
3014 txn->origin_lsn = origin_lsn;
3015
3016 if (is_commit)
3017 rb->commit_prepared(rb, txn, commit_lsn);
3018 else
3019 rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
3020
3021 /* cleanup: make sure there's no cache pollution */
3023 txn->invalidations);
3024 ReorderBufferCleanupTXN(rb, txn);
3025}
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:2321
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
#define RBTXN_PREPARE_STATUS_MASK
#define RBTXN_IS_PREPARED
#define RBTXN_SKIPPED_PREPARE
TimestampTz commit_time
RepOriginId origin_id
XLogRecPtr origin_lsn
TimestampTz prepare_time
ReorderBufferCommitPreparedCB commit_prepared
ReorderBufferRollbackPreparedCB rollback_prepared

References Assert(), ReorderBuffer::commit_prepared, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, pstrdup(), RBTXN_IS_PREPARED, RBTXN_PREPARE_STATUS_MASK, RBTXN_SKIPPED_PREPARE, ReorderBufferCleanupTXN(), ReorderBufferExecuteInvalidations(), ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBuffer::rollback_prepared, ReorderBufferTXN::txn_flags, and ReorderBufferTXN::xact_time.

Referenced by DecodeAbort(), and DecodeCommit().

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3132 of file reorderbuffer.c.

3133{
3134 ReorderBufferTXN *txn;
3135
3136 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3137 false);
3138
3139 /* unknown, nothing to forget */
3140 if (txn == NULL)
3141 return;
3142
3143 /* this transaction mustn't be streamed */
3145
3146 /* cosmetic... */
3147 txn->final_lsn = lsn;
3148
3149 /*
3150 * Process cache invalidation messages if there are any. Even if we're not
3151 * interested in the transaction's contents, it could have manipulated the
3152 * catalog and we need to update the caches according to that.
3153 */
3154 if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3156 txn->invalidations);
3157 else
3158 Assert(txn->ninvalidations == 0);
3159
3160 /* remove potential on-disk data, and deallocate */
3161 ReorderBufferCleanupTXN(rb, txn);
3162}

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer rb)

Definition at line 404 of file reorderbuffer.c.

405{
406 MemoryContext context = rb->context;
407
408 /*
409 * We free separately allocated data by entirely scrapping reorderbuffer's
410 * memory context.
411 */
412 MemoryContextDelete(context);
413
414 /* Free disk space used by unconsumed reorder buffers */
416}
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:485

References ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

◆ ReorderBufferFreeChange()

void ReorderBufferFreeChange ( ReorderBuffer rb,
ReorderBufferChange change,
bool  upd_mem 
)

Definition at line 503 of file reorderbuffer.c.

505{
506 /* update memory accounting info */
507 if (upd_mem)
508 ReorderBufferChangeMemoryUpdate(rb, change, NULL, false,
510
511 /* free contained data */
512 switch (change->action)
513 {
518 if (change->data.tp.newtuple)
519 {
521 change->data.tp.newtuple = NULL;
522 }
523
524 if (change->data.tp.oldtuple)
525 {
527 change->data.tp.oldtuple = NULL;
528 }
529 break;
531 if (change->data.msg.prefix != NULL)
532 pfree(change->data.msg.prefix);
533 change->data.msg.prefix = NULL;
534 if (change->data.msg.message != NULL)
535 pfree(change->data.msg.message);
536 change->data.msg.message = NULL;
537 break;
539 if (change->data.inval.invalidations)
540 pfree(change->data.inval.invalidations);
541 change->data.inval.invalidations = NULL;
542 break;
544 if (change->data.snapshot)
545 {
547 change->data.snapshot = NULL;
548 }
549 break;
550 /* no data in addition to the struct itself */
552 if (change->data.truncate.relids != NULL)
553 {
555 change->data.truncate.relids = NULL;
556 }
557 break;
562 break;
563 }
564
565 pfree(change);
566}
void pfree(void *pointer)
Definition: mcxt.c:2146
void ReorderBufferFreeRelids(ReorderBuffer *rb, Oid *relids)
void ReorderBufferFreeTupleBuf(HeapTuple tuple)

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::invalidations, ReorderBufferChange::message, ReorderBufferChange::msg, ReorderBufferChange::newtuple, ReorderBufferChange::oldtuple, pfree(), ReorderBufferChange::prefix, ReorderBufferChange::relids, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferFreeRelids(), ReorderBufferFreeSnap(), ReorderBufferFreeTupleBuf(), ReorderBufferChange::snapshot, ReorderBufferChange::tp, and ReorderBufferChange::truncate.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferProcessTXN(), ReorderBufferQueueChange(), ReorderBufferResetTXN(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferToastReset(), and ReorderBufferTruncateTXN().

◆ ReorderBufferFreeRelids()

void ReorderBufferFreeRelids ( ReorderBuffer rb,
Oid relids 
)

Definition at line 622 of file reorderbuffer.c.

623{
624 pfree(relids);
625}

References pfree().

Referenced by ReorderBufferFreeChange().

◆ ReorderBufferFreeSnap()

static void ReorderBufferFreeSnap ( ReorderBuffer rb,
Snapshot  snap 
)
static

Definition at line 1949 of file reorderbuffer.c.

1950{
1951 if (snap->copied)
1952 pfree(snap);
1953 else
1955}

References SnapshotData::copied, pfree(), and SnapBuildSnapDecRefcount().

Referenced by ReorderBufferCleanupTXN(), ReorderBufferFreeChange(), ReorderBufferProcessTXN(), and ReorderBufferStreamTXN().

◆ ReorderBufferFreeTupleBuf()

void ReorderBufferFreeTupleBuf ( HeapTuple  tuple)

Definition at line 591 of file reorderbuffer.c.

592{
593 pfree(tuple);
594}

References pfree().

Referenced by ReorderBufferFreeChange().

◆ ReorderBufferFreeTXN()

static void ReorderBufferFreeTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 446 of file reorderbuffer.c.

447{
448 /* clean the lookup cache if we were cached (quite likely) */
449 if (rb->by_txn_last_xid == txn->xid)
450 {
452 rb->by_txn_last_txn = NULL;
453 }
454
455 /* free data that's contained */
456
457 if (txn->gid != NULL)
458 {
459 pfree(txn->gid);
460 txn->gid = NULL;
461 }
462
463 if (txn->tuplecid_hash != NULL)
464 {
466 txn->tuplecid_hash = NULL;
467 }
468
469 if (txn->invalidations)
470 {
471 pfree(txn->invalidations);
472 txn->invalidations = NULL;
473 }
474
475 /* Reset the toast hash */
477
478 /* All changes must be deallocated */
479 Assert(txn->size == 0);
480
481 pfree(txn);
482}
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:865

References Assert(), ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBufferTXN::gid, hash_destroy(), ReorderBufferTXN::invalidations, InvalidTransactionId, pfree(), ReorderBufferToastReset(), ReorderBufferTXN::size, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::xid.

Referenced by ReorderBufferCleanupTXN().

◆ ReorderBufferGetCatalogChangesXacts()

TransactionId * ReorderBufferGetCatalogChangesXacts ( ReorderBuffer rb)

Definition at line 3547 of file reorderbuffer.c.

3548{
3549 dlist_iter iter;
3550 TransactionId *xids = NULL;
3551 size_t xcnt = 0;
3552
3553 /* Quick return if the list is empty */
3554 if (dclist_count(&rb->catchange_txns) == 0)
3555 return NULL;
3556
3557 /* Initialize XID array */
3558 xids = (TransactionId *) palloc(sizeof(TransactionId) *
3560 dclist_foreach(iter, &rb->catchange_txns)
3561 {
3563 catchange_node,
3564 iter.cur);
3565
3567
3568 xids[xcnt++] = txn->xid;
3569 }
3570
3571 qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
3572
3573 Assert(xcnt == dclist_count(&rb->catchange_txns));
3574 return xids;
3575}
#define dclist_container(type, membername, ptr)
Definition: ilist.h:947
static uint32 dclist_count(const dclist_head *head)
Definition: ilist.h:932
#define dclist_foreach(iter, lhead)
Definition: ilist.h:970

References Assert(), ReorderBuffer::catchange_txns, dlist_iter::cur, dclist_container, dclist_count(), dclist_foreach, palloc(), qsort, rbtxn_has_catalog_changes, ReorderBufferTXN::xid, and xidComparator().

Referenced by SnapBuildSerialize().

◆ ReorderBufferGetInvalidations()

uint32 ReorderBufferGetInvalidations ( ReorderBuffer rb,
TransactionId  xid,
SharedInvalidationMessage **  msgs 
)

Definition at line 5471 of file reorderbuffer.c.

5473{
5474 ReorderBufferTXN *txn;
5475
5476 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
5477 false);
5478
5479 if (txn == NULL)
5480 return 0;
5481
5482 *msgs = txn->invalidations;
5483
5484 return txn->ninvalidations;
5485}

References ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, and ReorderBufferTXNByXid().

Referenced by SnapBuildDistributeSnapshotAndInval().

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN * ReorderBufferGetOldestTXN ( ReorderBuffer rb)

Definition at line 1025 of file reorderbuffer.c.

1026{
1027 ReorderBufferTXN *txn;
1028
1030
1032 return NULL;
1033
1035
1038 return txn;
1039}
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:603

References Assert(), AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, and ReorderBuffer::toplevel_by_lsn.

Referenced by SnapBuildProcessRunningXacts().

◆ ReorderBufferGetOldestXmin()

TransactionId ReorderBufferGetOldestXmin ( ReorderBuffer rb)

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer rb,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 3204 of file reorderbuffer.c.

3206{
3207 bool use_subtxn = IsTransactionOrTransactionBlock();
3208 int i;
3209
3210 if (use_subtxn)
3212
3213 /*
3214 * Force invalidations to happen outside of a valid transaction - that way
3215 * entries will just be marked as invalid without accessing the catalog.
3216 * That's advantageous because we don't need to setup the full state
3217 * necessary for catalog access.
3218 */
3219 if (use_subtxn)
3221
3222 for (i = 0; i < ninvalidations; i++)
3223 LocalExecuteInvalidationMessage(&invalidations[i]);
3224
3225 if (use_subtxn)
3227}
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4989
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4694
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4796
void AbortCurrentTransaction(void)
Definition: xact.c:3451

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by ReorderBufferAbort(), ReorderBufferForget(), ReorderBufferInvalidate(), and xact_decode().

◆ ReorderBufferInvalidate()

void ReorderBufferInvalidate ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3173 of file reorderbuffer.c.

3174{
3175 ReorderBufferTXN *txn;
3176
3177 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3178 false);
3179
3180 /* unknown, nothing to do */
3181 if (txn == NULL)
3182 return;
3183
3184 /*
3185 * Process cache invalidation messages if there are any. Even if we're not
3186 * interested in the transaction's contents, it could have manipulated the
3187 * catalog and we need to update the caches according to that.
3188 */
3189 if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3191 txn->invalidations);
3192 else
3193 Assert(txn->ninvalidations == 0);
3194}

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodePrepare().

◆ ReorderBufferIterCompare()

static int ReorderBufferIterCompare ( Datum  a,
Datum  b,
void *  arg 
)
static

Definition at line 1242 of file reorderbuffer.c.

1243{
1245 XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
1246 XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
1247
1248 if (pos_a < pos_b)
1249 return 1;
1250 else if (pos_a == pos_b)
1251 return 0;
1252 return -1;
1253}
void * arg
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:207
Definition: regguts.h:323

References a, arg, b, and DatumGetInt32().

Referenced by ReorderBufferIterTXNInit().

◆ ReorderBufferIterTXNFinish()

static void ReorderBufferIterTXNFinish ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1485 of file reorderbuffer.c.

1487{
1488 int32 off;
1489
1490 for (off = 0; off < state->nr_txns; off++)
1491 {
1492 if (state->entries[off].file.vfd != -1)
1493 FileClose(state->entries[off].file.vfd);
1494 }
1495
1496 /* free memory we might have "leaked" in the last *Next call */
1497 if (!dlist_is_empty(&state->old_change))
1498 {
1499 ReorderBufferChange *change;
1500
1501 change = dlist_container(ReorderBufferChange, node,
1502 dlist_pop_head_node(&state->old_change));
1503 ReorderBufferFreeChange(rb, change, true);
1504 Assert(dlist_is_empty(&state->old_change));
1505 }
1506
1507 binaryheap_free(state->heap);
1508 pfree(state);
1509}
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:75
void FileClose(File file)
Definition: fd.c:1982
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:450

References Assert(), binaryheap_free(), dlist_container, dlist_is_empty(), dlist_pop_head_node(), FileClose(), pfree(), and ReorderBufferFreeChange().

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferIterTXNInit()

static void ReorderBufferIterTXNInit ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferIterTXNState *volatile *  iter_state 
)
static

Definition at line 1265 of file reorderbuffer.c.

1267{
1268 Size nr_txns = 0;
1270 dlist_iter cur_txn_i;
1271 int32 off;
1272
1273 *iter_state = NULL;
1274
1275 /* Check ordering of changes in the toplevel transaction. */
1277
1278 /*
1279 * Calculate the size of our heap: one element for every transaction that
1280 * contains changes. (Besides the transactions already in the reorder
1281 * buffer, we count the one we were directly passed.)
1282 */
1283 if (txn->nentries > 0)
1284 nr_txns++;
1285
1286 dlist_foreach(cur_txn_i, &txn->subtxns)
1287 {
1288 ReorderBufferTXN *cur_txn;
1289
1290 cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1291
1292 /* Check ordering of changes in this subtransaction. */
1293 AssertChangeLsnOrder(cur_txn);
1294
1295 if (cur_txn->nentries > 0)
1296 nr_txns++;
1297 }
1298
1299 /* allocate iteration state */
1303 sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1304
1305 state->nr_txns = nr_txns;
1306 dlist_init(&state->old_change);
1307
1308 for (off = 0; off < state->nr_txns; off++)
1309 {
1310 state->entries[off].file.vfd = -1;
1311 state->entries[off].segno = 0;
1312 }
1313
1314 /* allocate heap */
1315 state->heap = binaryheap_allocate(state->nr_txns,
1317 state);
1318
1319 /* Now that the state fields are initialized, it is safe to return it. */
1320 *iter_state = state;
1321
1322 /*
1323 * Now insert items into the binary heap, in an unordered fashion. (We
1324 * will run a heap assembly step at the end; this is more efficient.)
1325 */
1326
1327 off = 0;
1328
1329 /* add toplevel transaction if it contains changes */
1330 if (txn->nentries > 0)
1331 {
1332 ReorderBufferChange *cur_change;
1333
1334 if (rbtxn_is_serialized(txn))
1335 {
1336 /* serialize remaining changes */
1338 ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1339 &state->entries[off].segno);
1340 }
1341
1342 cur_change = dlist_head_element(ReorderBufferChange, node,
1343 &txn->changes);
1344
1345 state->entries[off].lsn = cur_change->lsn;
1346 state->entries[off].change = cur_change;
1347 state->entries[off].txn = txn;
1348
1350 }
1351
1352 /* add subtransactions if they contain changes */
1353 dlist_foreach(cur_txn_i, &txn->subtxns)
1354 {
1355 ReorderBufferTXN *cur_txn;
1356
1357 cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1358
1359 if (cur_txn->nentries > 0)
1360 {
1361 ReorderBufferChange *cur_change;
1362
1363 if (rbtxn_is_serialized(cur_txn))
1364 {
1365 /* serialize remaining changes */
1366 ReorderBufferSerializeTXN(rb, cur_txn);
1367 ReorderBufferRestoreChanges(rb, cur_txn,
1368 &state->entries[off].file,
1369 &state->entries[off].segno);
1370 }
1371 cur_change = dlist_head_element(ReorderBufferChange, node,
1372 &cur_txn->changes);
1373
1374 state->entries[off].lsn = cur_change->lsn;
1375 state->entries[off].change = cur_change;
1376 state->entries[off].txn = cur_txn;
1377
1379 }
1380 }
1381
1382 /* assemble a valid binary heap */
1383 binaryheap_build(state->heap);
1384}
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:138
void binaryheap_add_unordered(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:116
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:39
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:217
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
static void AssertChangeLsnOrder(ReorderBufferTXN *txn)
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)

References AssertChangeLsnOrder(), binaryheap_add_unordered(), binaryheap_allocate(), binaryheap_build(), ReorderBufferTXN::changes, ReorderBuffer::context, dlist_iter::cur, dlist_container, dlist_foreach, dlist_head_element, dlist_init(), Int32GetDatum(), ReorderBufferChange::lsn, MemoryContextAllocZero(), ReorderBufferTXN::nentries, rbtxn_is_serialized, ReorderBufferIterCompare(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), and ReorderBufferTXN::subtxns.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferIterTXNNext()

static ReorderBufferChange * ReorderBufferIterTXNNext ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1393 of file reorderbuffer.c.

1394{
1395 ReorderBufferChange *change;
1397 int32 off;
1398
1399 /* nothing there anymore */
1400 if (state->heap->bh_size == 0)
1401 return NULL;
1402
1403 off = DatumGetInt32(binaryheap_first(state->heap));
1404 entry = &state->entries[off];
1405
1406 /* free memory we might have "leaked" in the previous *Next call */
1407 if (!dlist_is_empty(&state->old_change))
1408 {
1409 change = dlist_container(ReorderBufferChange, node,
1410 dlist_pop_head_node(&state->old_change));
1411 ReorderBufferFreeChange(rb, change, true);
1412 Assert(dlist_is_empty(&state->old_change));
1413 }
1414
1415 change = entry->change;
1416
1417 /*
1418 * update heap with information about which transaction has the next
1419 * relevant change in LSN order
1420 */
1421
1422 /* there are in-memory changes */
1423 if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1424 {
1425 dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1426 ReorderBufferChange *next_change =
1428
1429 /* txn stays the same */
1430 state->entries[off].lsn = next_change->lsn;
1431 state->entries[off].change = next_change;
1432
1434 return change;
1435 }
1436
1437 /* try to load changes from disk */
1438 if (entry->txn->nentries != entry->txn->nentries_mem)
1439 {
1440 /*
1441 * Ugly: restoring changes will reuse *Change records, thus delete the
1442 * current one from the per-tx list and only free in the next call.
1443 */
1444 dlist_delete(&change->node);
1445 dlist_push_tail(&state->old_change, &change->node);
1446
1447 /*
1448 * Update the total bytes processed by the txn for which we are
1449 * releasing the current set of changes and restoring the new set of
1450 * changes.
1451 */
1452 rb->totalBytes += entry->txn->size;
1453 if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1454 &state->entries[off].segno))
1455 {
1456 /* successfully restored changes from disk */
1457 ReorderBufferChange *next_change =
1459 &entry->txn->changes);
1460
1461 elog(DEBUG2, "restored %u/%u changes from disk",
1462 (uint32) entry->txn->nentries_mem,
1463 (uint32) entry->txn->nentries);
1464
1465 Assert(entry->txn->nentries_mem);
1466 /* txn stays the same */
1467 state->entries[off].lsn = next_change->lsn;
1468 state->entries[off].change = next_change;
1470
1471 return change;
1472 }
1473 }
1474
1475 /* ok, no changes there anymore, remove */
1477
1478 return change;
1479}
void binaryheap_replace_first(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:255
bh_node_type binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:177
bh_node_type binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:192
static int32 next
Definition: blutils.c:224
uint32_t uint32
Definition: c.h:502
static bool dlist_has_next(const dlist_head *head, const dlist_node *node)
Definition: ilist.h:503
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:537
ReorderBufferChange * change
ReorderBufferTXN * txn

References Assert(), binaryheap_first(), binaryheap_remove_first(), binaryheap_replace_first(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, DatumGetInt32(), DEBUG2, dlist_container, dlist_delete(), dlist_has_next(), dlist_head_element, dlist_is_empty(), dlist_next_node(), dlist_pop_head_node(), dlist_push_tail(), elog, ReorderBufferIterTXNEntry::file, Int32GetDatum(), ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, next, ReorderBufferChange::node, ReorderBufferFreeChange(), ReorderBufferRestoreChanges(), ReorderBufferTXN::size, ReorderBuffer::totalBytes, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferLargestStreamableTopTXN()

static ReorderBufferTXN * ReorderBufferLargestStreamableTopTXN ( ReorderBuffer rb)
static

Definition at line 3703 of file reorderbuffer.c.

3704{
3705 dlist_iter iter;
3706 Size largest_size = 0;
3707 ReorderBufferTXN *largest = NULL;
3708
3709 /* Find the largest top-level transaction having a base snapshot. */
3711 {
3712 ReorderBufferTXN *txn;
3713
3714 txn = dlist_container(ReorderBufferTXN, base_snapshot_node, iter.cur);
3715
3716 /* must not be a subtxn */
3718 /* base_snapshot must be set */
3719 Assert(txn->base_snapshot != NULL);
3720
3721 /* Don't consider these kinds of transactions for eviction. */
3722 if (rbtxn_has_partial_change(txn) ||
3724 rbtxn_is_aborted(txn))
3725 continue;
3726
3727 /* Find the largest of the eviction candidates. */
3728 if ((largest == NULL || txn->total_size > largest_size) &&
3729 (txn->total_size > 0))
3730 {
3731 largest = txn;
3732 largest_size = txn->total_size;
3733 }
3734 }
3735
3736 return largest;
3737}
#define rbtxn_has_streamable_change(txn)
#define rbtxn_has_partial_change(txn)

References Assert(), ReorderBufferTXN::base_snapshot, dlist_iter::cur, dlist_container, dlist_foreach, rbtxn_has_partial_change, rbtxn_has_streamable_change, rbtxn_is_aborted, rbtxn_is_known_subxact, ReorderBufferTXN::total_size, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferCheckMemoryLimit().

◆ ReorderBufferLargestTXN()

static ReorderBufferTXN * ReorderBufferLargestTXN ( ReorderBuffer rb)
static

Definition at line 3662 of file reorderbuffer.c.

3663{
3664 ReorderBufferTXN *largest;
3665
3666 /* Get the largest transaction from the max-heap */
3667 largest = pairingheap_container(ReorderBufferTXN, txn_node,
3669
3670 Assert(largest);
3671 Assert(largest->size > 0);
3672 Assert(largest->size <= rb->size);
3673
3674 return largest;
3675}
pairingheap_node * pairingheap_first(pairingheap *heap)
Definition: pairingheap.c:130
#define pairingheap_container(type, membername, ptr)
Definition: pairingheap.h:43

References Assert(), pairingheap_container, pairingheap_first(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBuffer::txn_heap.

Referenced by ReorderBufferCheckMemoryLimit().

◆ ReorderBufferMaybeMarkTXNStreamed()

static void ReorderBufferMaybeMarkTXNStreamed ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2119 of file reorderbuffer.c.

2120{
2121 /*
2122 * The top-level transaction, is marked as streamed always, even if it
2123 * does not contain any changes (that is, when all the changes are in
2124 * subtransactions).
2125 *
2126 * For subtransactions, we only mark them as streamed when there are
2127 * changes in them.
2128 *
2129 * We do it this way because of aborts - we don't want to send aborts for
2130 * XIDs the downstream is not aware of. And of course, it always knows
2131 * about the top-level xact (we send the XID in all messages), but we
2132 * never stream XIDs of empty subxacts.
2133 */
2134 if (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0))
2136}
#define RBTXN_IS_STREAMED

References ReorderBufferTXN::nentries_mem, RBTXN_IS_STREAMED, rbtxn_is_toptxn, and ReorderBufferTXN::txn_flags.

Referenced by ReorderBufferProcessTXN(), and ReorderBufferTruncateTXN().

◆ ReorderBufferPrepare()

void ReorderBufferPrepare ( ReorderBuffer rb,
TransactionId  xid,
char *  gid 
)

Definition at line 2913 of file reorderbuffer.c.

2915{
2916 ReorderBufferTXN *txn;
2917
2918 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2919 false);
2920
2921 /* unknown transaction, nothing to replay */
2922 if (txn == NULL)
2923 return;
2924
2925 /*
2926 * txn must have been marked as a prepared transaction and must have
2927 * neither been skipped nor sent a prepare. Also, the prepare info must
2928 * have been updated in it by now.
2929 */
2932
2933 txn->gid = pstrdup(gid);
2934
2935 ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2936 txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2937
2938 /*
2939 * Send a prepare if not already done so. This might occur if we have
2940 * detected a concurrent abort while replaying the non-streaming
2941 * transaction.
2942 */
2943 if (!rbtxn_sent_prepare(txn))
2944 {
2945 rb->prepare(rb, txn, txn->final_lsn);
2947 }
2948}
#define RBTXN_SENT_PREPARE
#define rbtxn_sent_prepare(txn)
ReorderBufferPrepareCB prepare

References Assert(), ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBuffer::prepare, ReorderBufferTXN::prepare_time, pstrdup(), RBTXN_IS_PREPARED, RBTXN_PREPARE_STATUS_MASK, RBTXN_SENT_PREPARE, rbtxn_sent_prepare, ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBufferTXN::txn_flags, and ReorderBufferTXN::xact_time.

Referenced by DecodePrepare().

◆ ReorderBufferProcessPartialChange()

static void ReorderBufferProcessPartialChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  toast_insert 
)
static

Definition at line 722 of file reorderbuffer.c.

725{
726 ReorderBufferTXN *toptxn;
727
728 /*
729 * The partial changes need to be processed only while streaming
730 * in-progress transactions.
731 */
732 if (!ReorderBufferCanStream(rb))
733 return;
734
735 /* Get the top transaction. */
736 toptxn = rbtxn_get_toptxn(txn);
737
738 /*
739 * Indicate a partial change for toast inserts. The change will be
740 * considered as complete once we get the insert or update on the main
741 * table and we are sure that the pending toast chunks are not required
742 * anymore.
743 *
744 * If we allow streaming when there are pending toast chunks then such
745 * chunks won't be released till the insert (multi_insert) is complete and
746 * we expect the txn to have streamed all changes after streaming. This
747 * restriction is mainly to ensure the correctness of streamed
748 * transactions and it doesn't seem worth uplifting such a restriction
749 * just to allow this case because anyway we will stream the transaction
750 * once such an insert is complete.
751 */
752 if (toast_insert)
754 else if (rbtxn_has_partial_change(toptxn) &&
755 IsInsertOrUpdate(change->action) &&
757 toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
758
759 /*
760 * Indicate a partial change for speculative inserts. The change will be
761 * considered as complete once we get the speculative confirm or abort
762 * token.
763 */
764 if (IsSpecInsert(change->action))
766 else if (rbtxn_has_partial_change(toptxn) &&
768 toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
769
770 /*
771 * Stream the transaction if it is serialized before and the changes are
772 * now complete in the top-level transaction.
773 *
774 * The reason for doing the streaming of such a transaction as soon as we
775 * get the complete change for it is that previously it would have reached
776 * the memory threshold and wouldn't get streamed because of incomplete
777 * changes. Delaying such transactions would increase apply lag for them.
778 */
780 !(rbtxn_has_partial_change(toptxn)) &&
781 rbtxn_is_serialized(txn) &&
783 ReorderBufferStreamTXN(rb, toptxn);
784}
#define IsSpecInsert(action)
#define IsInsertOrUpdate(action)
#define IsSpecConfirmOrAbort(action)
#define RBTXN_HAS_PARTIAL_CHANGE

References ReorderBufferChange::action, ReorderBufferChange::clear_toast_afterwards, ReorderBufferChange::data, IsInsertOrUpdate, IsSpecConfirmOrAbort, IsSpecInsert, rbtxn_get_toptxn, RBTXN_HAS_PARTIAL_CHANGE, rbtxn_has_partial_change, rbtxn_has_streamable_change, rbtxn_is_serialized, ReorderBufferCanStartStreaming(), ReorderBufferCanStream(), ReorderBufferStreamTXN(), ReorderBufferChange::tp, and ReorderBufferTXN::txn_flags.

Referenced by ReorderBufferQueueChange().

◆ ReorderBufferProcessTXN()

static void ReorderBufferProcessTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn,
volatile Snapshot  snapshot_now,
volatile CommandId  command_id,
bool  streaming 
)
static

Definition at line 2192 of file reorderbuffer.c.

2197{
2198 bool using_subtxn;
2200 ReorderBufferIterTXNState *volatile iterstate = NULL;
2201 volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
2202 ReorderBufferChange *volatile specinsert = NULL;
2203 volatile bool stream_started = false;
2204 ReorderBufferTXN *volatile curtxn = NULL;
2205
2206 /* build data to be able to lookup the CommandIds of catalog tuples */
2208
2209 /* setup the initial snapshot */
2210 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2211
2212 /*
2213 * Decoding needs access to syscaches et al., which in turn use
2214 * heavyweight locks and such. Thus we need to have enough state around to
2215 * keep track of those. The easiest way is to simply use a transaction
2216 * internally. That also allows us to easily enforce that nothing writes
2217 * to the database by checking for xid assignments.
2218 *
2219 * When we're called via the SQL SRF there's already a transaction
2220 * started, so start an explicit subtransaction there.
2221 */
2222 using_subtxn = IsTransactionOrTransactionBlock();
2223
2224 PG_TRY();
2225 {
2226 ReorderBufferChange *change;
2227 int changes_count = 0; /* used to accumulate the number of
2228 * changes */
2229
2230 if (using_subtxn)
2231 BeginInternalSubTransaction(streaming ? "stream" : "replay");
2232 else
2234
2235 /*
2236 * We only need to send begin/begin-prepare for non-streamed
2237 * transactions.
2238 */
2239 if (!streaming)
2240 {
2241 if (rbtxn_is_prepared(txn))
2242 rb->begin_prepare(rb, txn);
2243 else
2244 rb->begin(rb, txn);
2245 }
2246
2247 ReorderBufferIterTXNInit(rb, txn, &iterstate);
2248 while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
2249 {
2250 Relation relation = NULL;
2251 Oid reloid;
2252
2254
2255 /*
2256 * We can't call start stream callback before processing first
2257 * change.
2258 */
2259 if (prev_lsn == InvalidXLogRecPtr)
2260 {
2261 if (streaming)
2262 {
2263 txn->origin_id = change->origin_id;
2264 rb->stream_start(rb, txn, change->lsn);
2265 stream_started = true;
2266 }
2267 }
2268
2269 /*
2270 * Enforce correct ordering of changes, merged from multiple
2271 * subtransactions. The changes may have the same LSN due to
2272 * MULTI_INSERT xlog records.
2273 */
2274 Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
2275
2276 prev_lsn = change->lsn;
2277
2278 /*
2279 * Set the current xid to detect concurrent aborts. This is
2280 * required for the cases when we decode the changes before the
2281 * COMMIT record is processed.
2282 */
2283 if (streaming || rbtxn_is_prepared(change->txn))
2284 {
2285 curtxn = change->txn;
2286 SetupCheckXidLive(curtxn->xid);
2287 }
2288
2289 switch (change->action)
2290 {
2292
2293 /*
2294 * Confirmation for speculative insertion arrived. Simply
2295 * use as a normal record. It'll be cleaned up at the end
2296 * of INSERT processing.
2297 */
2298 if (specinsert == NULL)
2299 elog(ERROR, "invalid ordering of speculative insertion changes");
2300 Assert(specinsert->data.tp.oldtuple == NULL);
2301 change = specinsert;
2303
2304 /* intentionally fall through */
2308 Assert(snapshot_now);
2309
2310 reloid = RelidByRelfilenumber(change->data.tp.rlocator.spcOid,
2311 change->data.tp.rlocator.relNumber);
2312
2313 /*
2314 * Mapped catalog tuple without data, emitted while
2315 * catalog table was in the process of being rewritten. We
2316 * can fail to look up the relfilenumber, because the
2317 * relmapper has no "historic" view, in contrast to the
2318 * normal catalog during decoding. Thus repeated rewrites
2319 * can cause a lookup failure. That's OK because we do not
2320 * decode catalog changes anyway. Normally such tuples
2321 * would be skipped over below, but we can't identify
2322 * whether the table should be logically logged without
2323 * mapping the relfilenumber to the oid.
2324 */
2325 if (reloid == InvalidOid &&
2326 change->data.tp.newtuple == NULL &&
2327 change->data.tp.oldtuple == NULL)
2328 goto change_done;
2329 else if (reloid == InvalidOid)
2330 elog(ERROR, "could not map filenumber \"%s\" to relation OID",
2331 relpathperm(change->data.tp.rlocator,
2332 MAIN_FORKNUM).str);
2333
2334 relation = RelationIdGetRelation(reloid);
2335
2336 if (!RelationIsValid(relation))
2337 elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",
2338 reloid,
2339 relpathperm(change->data.tp.rlocator,
2340 MAIN_FORKNUM).str);
2341
2342 if (!RelationIsLogicallyLogged(relation))
2343 goto change_done;
2344
2345 /*
2346 * Ignore temporary heaps created during DDL unless the
2347 * plugin has asked for them.
2348 */
2349 if (relation->rd_rel->relrewrite && !rb->output_rewrites)
2350 goto change_done;
2351
2352 /*
2353 * For now ignore sequence changes entirely. Most of the
2354 * time they don't log changes using records we
2355 * understand, so it doesn't make sense to handle the few
2356 * cases we do.
2357 */
2358 if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
2359 goto change_done;
2360
2361 /* user-triggered change */
2362 if (!IsToastRelation(relation))
2363 {
2364 ReorderBufferToastReplace(rb, txn, relation, change);
2365 ReorderBufferApplyChange(rb, txn, relation, change,
2366 streaming);
2367
2368 /*
2369 * Only clear reassembled toast chunks if we're sure
2370 * they're not required anymore. The creator of the
2371 * tuple tells us.
2372 */
2373 if (change->data.tp.clear_toast_afterwards)
2374 ReorderBufferToastReset(rb, txn);
2375 }
2376 /* we're not interested in toast deletions */
2377 else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
2378 {
2379 /*
2380 * Need to reassemble the full toasted Datum in
2381 * memory, to ensure the chunks don't get reused till
2382 * we're done remove it from the list of this
2383 * transaction's changes. Otherwise it will get
2384 * freed/reused while restoring spooled data from
2385 * disk.
2386 */
2387 Assert(change->data.tp.newtuple != NULL);
2388
2389 dlist_delete(&change->node);
2390 ReorderBufferToastAppendChunk(rb, txn, relation,
2391 change);
2392 }
2393
2394 change_done:
2395
2396 /*
2397 * If speculative insertion was confirmed, the record
2398 * isn't needed anymore.
2399 */
2400 if (specinsert != NULL)
2401 {
2402 ReorderBufferFreeChange(rb, specinsert, true);
2403 specinsert = NULL;
2404 }
2405
2406 if (RelationIsValid(relation))
2407 {
2408 RelationClose(relation);
2409 relation = NULL;
2410 }
2411 break;
2412
2414
2415 /*
2416 * Speculative insertions are dealt with by delaying the
2417 * processing of the insert until the confirmation record
2418 * arrives. For that we simply unlink the record from the
2419 * chain, so it does not get freed/reused while restoring
2420 * spooled data from disk.
2421 *
2422 * This is safe in the face of concurrent catalog changes
2423 * because the relevant relation can't be changed between
2424 * speculative insertion and confirmation due to
2425 * CheckTableNotInUse() and locking.
2426 */
2427
2428 /* clear out a pending (and thus failed) speculation */
2429 if (specinsert != NULL)
2430 {
2431 ReorderBufferFreeChange(rb, specinsert, true);
2432 specinsert = NULL;
2433 }
2434
2435 /* and memorize the pending insertion */
2436 dlist_delete(&change->node);
2437 specinsert = change;
2438 break;
2439
2441
2442 /*
2443 * Abort for speculative insertion arrived. So cleanup the
2444 * specinsert tuple and toast hash.
2445 *
2446 * Note that we get the spec abort change for each toast
2447 * entry but we need to perform the cleanup only the first
2448 * time we get it for the main table.
2449 */
2450 if (specinsert != NULL)
2451 {
2452 /*
2453 * We must clean the toast hash before processing a
2454 * completely new tuple to avoid confusion about the
2455 * previous tuple's toast chunks.
2456 */
2458 ReorderBufferToastReset(rb, txn);
2459
2460 /* We don't need this record anymore. */
2461 ReorderBufferFreeChange(rb, specinsert, true);
2462 specinsert = NULL;
2463 }
2464 break;
2465
2467 {
2468 int i;
2469 int nrelids = change->data.truncate.nrelids;
2470 int nrelations = 0;
2471 Relation *relations;
2472
2473 relations = palloc0(nrelids * sizeof(Relation));
2474 for (i = 0; i < nrelids; i++)
2475 {
2476 Oid relid = change->data.truncate.relids[i];
2477 Relation rel;
2478
2479 rel = RelationIdGetRelation(relid);
2480
2481 if (!RelationIsValid(rel))
2482 elog(ERROR, "could not open relation with OID %u", relid);
2483
2484 if (!RelationIsLogicallyLogged(rel))
2485 continue;
2486
2487 relations[nrelations++] = rel;
2488 }
2489
2490 /* Apply the truncate. */
2491 ReorderBufferApplyTruncate(rb, txn, nrelations,
2492 relations, change,
2493 streaming);
2494
2495 for (i = 0; i < nrelations; i++)
2496 RelationClose(relations[i]);
2497
2498 break;
2499 }
2500
2502 ReorderBufferApplyMessage(rb, txn, change, streaming);
2503 break;
2504
2506 /* Execute the invalidation messages locally */
2508 change->data.inval.invalidations);
2509 break;
2510
2512 /* get rid of the old */
2514
2515 if (snapshot_now->copied)
2516 {
2517 ReorderBufferFreeSnap(rb, snapshot_now);
2518 snapshot_now =
2520 txn, command_id);
2521 }
2522
2523 /*
2524 * Restored from disk, need to be careful not to double
2525 * free. We could introduce refcounting for that, but for
2526 * now this seems infrequent enough not to care.
2527 */
2528 else if (change->data.snapshot->copied)
2529 {
2530 snapshot_now =
2532 txn, command_id);
2533 }
2534 else
2535 {
2536 snapshot_now = change->data.snapshot;
2537 }
2538
2539 /* and continue with the new one */
2540 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2541 break;
2542
2545
2546 if (command_id < change->data.command_id)
2547 {
2548 command_id = change->data.command_id;
2549
2550 if (!snapshot_now->copied)
2551 {
2552 /* we don't use the global one anymore */
2553 snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2554 txn, command_id);
2555 }
2556
2557 snapshot_now->curcid = command_id;
2558
2560 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2561 }
2562
2563 break;
2564
2566 elog(ERROR, "tuplecid value in changequeue");
2567 break;
2568 }
2569
2570 /*
2571 * It is possible that the data is not sent to downstream for a
2572 * long time either because the output plugin filtered it or there
2573 * is a DDL that generates a lot of data that is not processed by
2574 * the plugin. So, in such cases, the downstream can timeout. To
2575 * avoid that we try to send a keepalive message if required.
2576 * Trying to send a keepalive message after every change has some
2577 * overhead, but testing showed there is no noticeable overhead if
2578 * we do it after every ~100 changes.
2579 */
2580#define CHANGES_THRESHOLD 100
2581
2582 if (++changes_count >= CHANGES_THRESHOLD)
2583 {
2584 rb->update_progress_txn(rb, txn, change->lsn);
2585 changes_count = 0;
2586 }
2587 }
2588
2589 /* speculative insertion record must be freed by now */
2590 Assert(!specinsert);
2591
2592 /* clean up the iterator */
2593 ReorderBufferIterTXNFinish(rb, iterstate);
2594 iterstate = NULL;
2595
2596 /*
2597 * Update total transaction count and total bytes processed by the
2598 * transaction and its subtransactions. Ensure to not count the
2599 * streamed transaction multiple times.
2600 *
2601 * Note that the statistics computation has to be done after
2602 * ReorderBufferIterTXNFinish as it releases the serialized change
2603 * which we have already accounted in ReorderBufferIterTXNNext.
2604 */
2605 if (!rbtxn_is_streamed(txn))
2606 rb->totalTxns++;
2607
2608 rb->totalBytes += txn->total_size;
2609
2610 /*
2611 * Done with current changes, send the last message for this set of
2612 * changes depending upon streaming mode.
2613 */
2614 if (streaming)
2615 {
2616 if (stream_started)
2617 {
2618 rb->stream_stop(rb, txn, prev_lsn);
2619 stream_started = false;
2620 }
2621 }
2622 else
2623 {
2624 /*
2625 * Call either PREPARE (for two-phase transactions) or COMMIT (for
2626 * regular ones).
2627 */
2628 if (rbtxn_is_prepared(txn))
2629 {
2631 rb->prepare(rb, txn, commit_lsn);
2633 }
2634 else
2635 rb->commit(rb, txn, commit_lsn);
2636 }
2637
2638 /* this is just a sanity check against bad output plugin behaviour */
2640 elog(ERROR, "output plugin used XID %u",
2642
2643 /*
2644 * Remember the command ID and snapshot for the next set of changes in
2645 * streaming mode.
2646 */
2647 if (streaming)
2648 ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2649 else if (snapshot_now->copied)
2650 ReorderBufferFreeSnap(rb, snapshot_now);
2651
2652 /* cleanup */
2654
2655 /*
2656 * Aborting the current (sub-)transaction as a whole has the right
2657 * semantics. We want all locks acquired in here to be released, not
2658 * reassigned to the parent and we do not want any database access
2659 * have persistent effects.
2660 */
2662
2663 /* make sure there's no cache pollution */
2665
2666 if (using_subtxn)
2668
2669 /*
2670 * We are here due to one of the four reasons: 1. Decoding an
2671 * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
2672 * prepared txn that was (partially) streamed. 4. Decoding a committed
2673 * txn.
2674 *
2675 * For 1, we allow truncation of txn data by removing the changes
2676 * already streamed but still keeping other things like invalidations,
2677 * snapshot, and tuplecids. For 2 and 3, we indicate
2678 * ReorderBufferTruncateTXN to do more elaborate truncation of txn
2679 * data as the entire transaction has been decoded except for commit.
2680 * For 4, as the entire txn has been decoded, we can fully clean up
2681 * the TXN reorder buffer.
2682 */
2683 if (streaming || rbtxn_is_prepared(txn))
2684 {
2685 if (streaming)
2687
2689 /* Reset the CheckXidAlive */
2691 }
2692 else
2693 ReorderBufferCleanupTXN(rb, txn);
2694 }
2695 PG_CATCH();
2696 {
2698 ErrorData *errdata = CopyErrorData();
2699
2700 /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
2701 if (iterstate)
2702 ReorderBufferIterTXNFinish(rb, iterstate);
2703
2705
2706 /*
2707 * Force cache invalidation to happen outside of a valid transaction
2708 * to prevent catalog access as we just caught an error.
2709 */
2711
2712 /* make sure there's no cache pollution */
2714 txn->invalidations);
2715
2716 if (using_subtxn)
2718
2719 /*
2720 * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
2721 * abort of the (sub)transaction we are streaming or preparing. We
2722 * need to do the cleanup and return gracefully on this error, see
2723 * SetupCheckXidLive.
2724 *
2725 * This error code can be thrown by one of the callbacks we call
2726 * during decoding so we need to ensure that we return gracefully only
2727 * when we are sending the data in streaming mode and the streaming is
2728 * not finished yet or when we are sending the data out on a PREPARE
2729 * during a two-phase commit.
2730 */
2731 if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
2732 (stream_started || rbtxn_is_prepared(txn)))
2733 {
2734 /* curtxn must be set for streaming or prepared transactions */
2735 Assert(curtxn);
2736
2737 /* Cleanup the temporary error state. */
2739 FreeErrorData(errdata);
2740 errdata = NULL;
2741
2742 /* Remember the transaction is aborted. */
2743 Assert(!rbtxn_is_committed(curtxn));
2744 curtxn->txn_flags |= RBTXN_IS_ABORTED;
2745
2746 /* Mark the transaction is streamed if appropriate */
2747 if (stream_started)
2749
2750 /* Reset the TXN so that it is allowed to stream remaining data. */
2751 ReorderBufferResetTXN(rb, txn, snapshot_now,
2752 command_id, prev_lsn,
2753 specinsert);
2754 }
2755 else
2756 {
2757 ReorderBufferCleanupTXN(rb, txn);
2759 PG_RE_THROW();
2760 }
2761 }
2762 PG_END_TRY();
2763}
bool IsToastRelation(Relation relation)
Definition: catalog.c:206
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1840
ErrorData * CopyErrorData(void)
Definition: elog.c:1768
void FlushErrorState(void)
Definition: elog.c:1889
#define PG_RE_THROW()
Definition: elog.h:405
#define PG_TRY(...)
Definition: elog.h:372
#define PG_END_TRY(...)
Definition: elog.h:397
#define PG_CATCH(...)
Definition: elog.h:382
void * palloc0(Size size)
Definition: mcxt.c:1969
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
const void * data
#define InvalidOid
Definition: postgres_ext.h:35
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:712
#define RelationIsValid(relation)
Definition: rel.h:489
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2099
void RelationClose(Relation relation)
Definition: relcache.c:2220
Oid RelidByRelfilenumber(Oid reltablespace, RelFileNumber relfilenumber)
@ MAIN_FORKNUM
Definition: relpath.h:58
#define relpathperm(rlocator, forknum)
Definition: relpath.h:146
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
static void ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
#define CHANGES_THRESHOLD
static void ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
static void SetupCheckXidLive(TransactionId xid)
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
static void ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
static void ReorderBufferMaybeMarkTXNStreamed(ReorderBuffer *rb, ReorderBufferTXN *txn)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:1672
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:1656
int sqlerrcode
Definition: elog.h:431
RelFileNumber relNumber
Form_pg_class rd_rel
Definition: rel.h:111
RelFileLocator rlocator
Definition: reorderbuffer.h:98
RepOriginId origin_id
Definition: reorderbuffer.h:86
ReorderBufferBeginCB begin_prepare
ReorderBufferUpdateProgressTxnCB update_progress_txn
ReorderBufferStreamStopCB stream_stop
ReorderBufferCommitCB commit
ReorderBufferStreamStartCB stream_start
ReorderBufferBeginCB begin
TransactionId CheckXidAlive
Definition: xact.c:99
void StartTransactionCommand(void)
Definition: xact.c:3059
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:471
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:454

References AbortCurrentTransaction(), ReorderBufferChange::action, Assert(), ReorderBuffer::begin, ReorderBuffer::begin_prepare, BeginInternalSubTransaction(), CHANGES_THRESHOLD, CHECK_FOR_INTERRUPTS, CheckXidAlive, ReorderBufferChange::clear_toast_afterwards, ReorderBufferChange::command_id, ReorderBuffer::commit, SnapshotData::copied, CopyErrorData(), SnapshotData::curcid, CurrentMemoryContext, ReorderBufferChange::data, data, dlist_delete(), elog, ERROR, FlushErrorState(), FreeErrorData(), GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), i, ReorderBufferChange::inval, ReorderBufferChange::invalidations, ReorderBufferTXN::invalidations, InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, MemoryContextSwitchTo(), ReorderBufferChange::newtuple, ReorderBufferChange::ninvalidations, ReorderBufferTXN::ninvalidations, ReorderBufferChange::node, ReorderBufferChange::nrelids, ReorderBufferChange::oldtuple, ReorderBufferChange::origin_id, ReorderBufferTXN::origin_id, ReorderBuffer::output_rewrites, palloc0(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, ReorderBuffer::prepare, RBTXN_IS_ABORTED, rbtxn_is_committed, rbtxn_is_prepared, rbtxn_is_streamed, RBTXN_SENT_PREPARE, rbtxn_sent_prepare, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelationIsValid, RelidByRelfilenumber(), ReorderBufferChange::relids, RelFileLocator::relNumber, relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferApplyChange(), ReorderBufferApplyMessage(), ReorderBufferApplyTruncate(), ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeChange(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferMaybeMarkTXNStreamed(), ReorderBufferResetTXN(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), ReorderBufferChange::rlocator, RollbackAndReleaseCurrentSubTransaction(), SetupCheckXidLive(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, RelFileLocator::spcOid, ErrorData::sqlerrcode, StartTransactionCommand(), ReorderBuffer::stream_start, ReorderBuffer::stream_stop, TeardownHistoricSnapshot(), ReorderBufferTXN::total_size, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTXN::tuplecid_hash, ReorderBufferChange::txn, ReorderBufferTXN::txn_flags, ReorderBuffer::update_progress_txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferReplay(), and ReorderBufferStreamTXN().

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3240 of file reorderbuffer.c.

3241{
3242 /* many records won't have an xid assigned, centralize check here */
3243 if (xid != InvalidTransactionId)
3244 ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3245}

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by heap2_decode(), heap_decode(), LogicalDecodingProcessRecord(), logicalmsg_decode(), standby_decode(), xact_decode(), and xlog_decode().

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChange change,
bool  toast_insert 
)

Definition at line 791 of file reorderbuffer.c.

793{
794 ReorderBufferTXN *txn;
795
796 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
797
798 /*
799 * If we have detected that the transaction is aborted while streaming the
800 * previous changes or by checking its CLOG, there is no point in
801 * collecting further changes for it.
802 */
803 if (rbtxn_is_aborted(txn))
804 {
805 /*
806 * We don't need to update memory accounting for this change as we
807 * have not added it to the queue yet.
808 */
809 ReorderBufferFreeChange(rb, change, false);
810 return;
811 }
812
813 /*
814 * The changes that are sent downstream are considered streamable. We
815 * remember such transactions so that only those will later be considered
816 * for streaming.
817 */
818 if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
824 {
825 ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
826
828 }
829
830 change->lsn = lsn;
831 change->txn = txn;
832
834 dlist_push_tail(&txn->changes, &change->node);
835 txn->nentries++;
836 txn->nentries_mem++;
837
838 /* update memory accounting information */
839 ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
841
842 /* process partial change */
843 ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
844
845 /* check the memory limits and evict something if needed */
847}
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
#define RBTXN_HAS_STREAMABLE_CHANGE

References ReorderBufferChange::action, Assert(), ReorderBufferTXN::changes, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, rbtxn_get_toptxn, RBTXN_HAS_STREAMABLE_CHANGE, rbtxn_is_aborted, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferCheckMemoryLimit(), ReorderBufferFreeChange(), ReorderBufferProcessPartialChange(), ReorderBufferTXNByXid(), ReorderBufferChange::txn, and ReorderBufferTXN::txn_flags.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddInvalidations(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer rb,
TransactionId  xid,
Snapshot  snap,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 854 of file reorderbuffer.c.

858{
859 if (transactional)
860 {
861 MemoryContext oldcontext;
862 ReorderBufferChange *change;
863
865
866 /*
867 * We don't expect snapshots for transactional changes - we'll use the
868 * snapshot derived later during apply (unless the change gets
869 * skipped).
870 */
871 Assert(!snap);
872
873 oldcontext = MemoryContextSwitchTo(rb->context);
874
875 change = ReorderBufferAllocChange(rb);
877 change->data.msg.prefix = pstrdup(prefix);
878 change->data.msg.message_size = message_size;
879 change->data.msg.message = palloc(message_size);
880 memcpy(change->data.msg.message, message, message_size);
881
882 ReorderBufferQueueChange(rb, xid, lsn, change, false);
883
884 MemoryContextSwitchTo(oldcontext);
885 }
886 else
887 {
888 ReorderBufferTXN *txn = NULL;
889 volatile Snapshot snapshot_now = snap;
890
891 /* Non-transactional changes require a valid snapshot. */
892 Assert(snapshot_now);
893
894 if (xid != InvalidTransactionId)
895 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
896
897 /* setup snapshot to allow catalog access */
898 SetupHistoricSnapshot(snapshot_now, NULL);
899 PG_TRY();
900 {
901 rb->message(rb, txn, lsn, false, prefix, message_size, message);
902
904 }
905 PG_CATCH();
906 {
908 PG_RE_THROW();
909 }
910 PG_END_TRY();
911 }
912}

References ReorderBufferChange::action, Assert(), ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBufferChange::message, ReorderBuffer::message, ReorderBufferChange::message_size, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, ReorderBufferChange::prefix, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferAllocChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), and TeardownHistoricSnapshot().

Referenced by logicalmsg_decode().

◆ ReorderBufferRememberPrepareInfo()

bool ReorderBufferRememberPrepareInfo ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  prepare_lsn,
XLogRecPtr  end_lsn,
TimestampTz  prepare_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2860 of file reorderbuffer.c.

2864{
2865 ReorderBufferTXN *txn;
2866
2867 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2868
2869 /* unknown transaction, nothing to do */
2870 if (txn == NULL)
2871 return false;
2872
2873 /*
2874 * Remember the prepare information to be later used by commit prepared in
2875 * case we skip doing prepare.
2876 */
2877 txn->final_lsn = prepare_lsn;
2878 txn->end_lsn = end_lsn;
2879 txn->xact_time.prepare_time = prepare_time;
2880 txn->origin_id = origin_id;
2881 txn->origin_lsn = origin_lsn;
2882
2883 /* Mark this transaction as a prepared transaction */
2886
2887 return true;
2888}

References Assert(), ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, RBTXN_IS_PREPARED, RBTXN_PREPARE_STATUS_MASK, ReorderBufferTXNByXid(), ReorderBufferTXN::txn_flags, and ReorderBufferTXN::xact_time.

Referenced by DecodePrepare().

◆ ReorderBufferReplay()

static void ReorderBufferReplay ( ReorderBufferTXN txn,
ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)
static

Definition at line 2776 of file reorderbuffer.c.

2781{
2782 Snapshot snapshot_now;
2783 CommandId command_id = FirstCommandId;
2784
2785 txn->final_lsn = commit_lsn;
2786 txn->end_lsn = end_lsn;
2787 txn->xact_time.commit_time = commit_time;
2788 txn->origin_id = origin_id;
2789 txn->origin_lsn = origin_lsn;
2790
2791 /*
2792 * If the transaction was (partially) streamed, we need to commit it in a
2793 * 'streamed' way. That is, we first stream the remaining part of the
2794 * transaction, and then invoke stream_commit message.
2795 *
2796 * Called after everything (origin ID, LSN, ...) is stored in the
2797 * transaction to avoid passing that information directly.
2798 */
2799 if (rbtxn_is_streamed(txn))
2800 {
2802 return;
2803 }
2804
2805 /*
2806 * If this transaction has no snapshot, it didn't make any changes to the
2807 * database, so there's nothing to decode. Note that
2808 * ReorderBufferCommitChild will have transferred any snapshots from
2809 * subtransactions if there were any.
2810 */
2811 if (txn->base_snapshot == NULL)
2812 {
2813 Assert(txn->ninvalidations == 0);
2814
2815 /*
2816 * Removing this txn before a commit might result in the computation
2817 * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
2818 */
2819 if (!rbtxn_is_prepared(txn))
2820 ReorderBufferCleanupTXN(rb, txn);
2821 return;
2822 }
2823
2824 snapshot_now = txn->base_snapshot;
2825
2826 /* Process and send the changes to output plugin. */
2827 ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2828 command_id, false);
2829}
#define FirstCommandId
Definition: c.h:639
uint32 CommandId
Definition: c.h:637
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, FirstCommandId, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, rbtxn_is_prepared, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), ReorderBufferStreamCommit(), and ReorderBufferTXN::xact_time.

Referenced by ReorderBufferCommit(), ReorderBufferFinishPrepared(), and ReorderBufferPrepare().

◆ ReorderBufferResetTXN()

static void ReorderBufferResetTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
Snapshot  snapshot_now,
CommandId  command_id,
XLogRecPtr  last_lsn,
ReorderBufferChange specinsert 
)
static

Definition at line 2146 of file reorderbuffer.c.

2151{
2152 /* Discard the changes that we just streamed */
2154
2155 /* Free all resources allocated for toast reconstruction */
2156 ReorderBufferToastReset(rb, txn);
2157
2158 /* Return the spec insert change if it is not NULL */
2159 if (specinsert != NULL)
2160 {
2161 ReorderBufferFreeChange(rb, specinsert, true);
2162 specinsert = NULL;
2163 }
2164
2165 /*
2166 * For the streaming case, stop the stream and remember the command ID and
2167 * snapshot for the streaming run.
2168 */
2169 if (rbtxn_is_streamed(txn))
2170 {
2171 rb->stream_stop(rb, txn, last_lsn);
2172 ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2173 }
2174
2175 /* All changes must be deallocated */
2176 Assert(txn->size == 0);
2177}

References Assert(), rbtxn_is_prepared, rbtxn_is_streamed, ReorderBufferFreeChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), ReorderBufferTXN::size, and ReorderBuffer::stream_stop.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferRestoreChange()

static void ReorderBufferRestoreChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
char *  data 
)
static

Definition at line 4523 of file reorderbuffer.c.

4525{
4527 ReorderBufferChange *change;
4528
4529 ondisk = (ReorderBufferDiskChange *) data;
4530
4531 change = ReorderBufferAllocChange(rb);
4532
4533 /* copy static part */
4534 memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
4535
4536 data += sizeof(ReorderBufferDiskChange);
4537
4538 /* restore individual stuff */
4539 switch (change->action)
4540 {
4541 /* fall through these, they're all similar enough */
4546 if (change->data.tp.oldtuple)
4547 {
4548 uint32 tuplelen = ((HeapTuple) data)->t_len;
4549
4550 change->data.tp.oldtuple =
4552
4553 /* restore ->tuple */
4554 memcpy(change->data.tp.oldtuple, data,
4555 sizeof(HeapTupleData));
4556 data += sizeof(HeapTupleData);
4557
4558 /* reset t_data pointer into the new tuplebuf */
4559 change->data.tp.oldtuple->t_data =
4560 (HeapTupleHeader) ((char *) change->data.tp.oldtuple + HEAPTUPLESIZE);
4561
4562 /* restore tuple data itself */
4563 memcpy(change->data.tp.oldtuple->t_data, data, tuplelen);
4564 data += tuplelen;
4565 }
4566
4567 if (change->data.tp.newtuple)
4568 {
4569 /* here, data might not be suitably aligned! */
4570 uint32 tuplelen;
4571
4572 memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
4573 sizeof(uint32));
4574
4575 change->data.tp.newtuple =
4577
4578 /* restore ->tuple */
4579 memcpy(change->data.tp.newtuple, data,
4580 sizeof(HeapTupleData));
4581 data += sizeof(HeapTupleData);
4582
4583 /* reset t_data pointer into the new tuplebuf */
4584 change->data.tp.newtuple->t_data =
4585 (HeapTupleHeader) ((char *) change->data.tp.newtuple + HEAPTUPLESIZE);
4586
4587 /* restore tuple data itself */
4588 memcpy(change->data.tp.newtuple->t_data, data, tuplelen);
4589 data += tuplelen;
4590 }
4591
4592 break;
4594 {
4595 Size prefix_size;
4596
4597 /* read prefix */
4598 memcpy(&prefix_size, data, sizeof(Size));
4599 data += sizeof(Size);
4601 prefix_size);
4602 memcpy(change->data.msg.prefix, data, prefix_size);
4603 Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
4604 data += prefix_size;
4605
4606 /* read the message */
4607 memcpy(&change->data.msg.message_size, data, sizeof(Size));
4608 data += sizeof(Size);
4610 change->data.msg.message_size);
4611 memcpy(change->data.msg.message, data,
4612 change->data.msg.message_size);
4613 data += change->data.msg.message_size;
4614
4615 break;
4616 }
4618 {
4619 Size inval_size = sizeof(SharedInvalidationMessage) *
4620 change->data.inval.ninvalidations;
4621
4622 change->data.inval.invalidations =
4623 MemoryContextAlloc(rb->context, inval_size);
4624
4625 /* read the message */
4626 memcpy(change->data.inval.invalidations, data, inval_size);
4627
4628 break;
4629 }
4631 {
4632 Snapshot oldsnap;
4633 Snapshot newsnap;
4634 Size size;
4635
4636 oldsnap = (Snapshot) data;
4637
4638 size = sizeof(SnapshotData) +
4639 sizeof(TransactionId) * oldsnap->xcnt +
4640 sizeof(TransactionId) * (oldsnap->subxcnt + 0);
4641
4642 change->data.snapshot = MemoryContextAllocZero(rb->context, size);
4643
4644 newsnap = change->data.snapshot;
4645
4646 memcpy(newsnap, data, size);
4647 newsnap->xip = (TransactionId *)
4648 (((char *) newsnap) + sizeof(SnapshotData));
4649 newsnap->subxip = newsnap->xip + newsnap->xcnt;
4650 newsnap->copied = true;
4651 break;
4652 }
4653 /* the base struct contains all the data, easy peasy */
4655 {
4656 Oid *relids;
4657
4658 relids = ReorderBufferAllocRelids(rb, change->data.truncate.nrelids);
4659 memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
4660 change->data.truncate.relids = relids;
4661
4662 break;
4663 }
4668 break;
4669 }
4670
4671 dlist_push_tail(&txn->changes, &change->node);
4672 txn->nentries_mem++;
4673
4674 /*
4675 * Update memory accounting for the restored change. We need to do this
4676 * although we don't check the memory limit when restoring the changes in
4677 * this branch (we only do that when initially queueing the changes after
4678 * decoding), because we will release the changes later, and that will
4679 * update the accounting too (subtracting the size from the counters). And
4680 * we don't want to underflow there.
4681 */
4682 ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
4683 ReorderBufferChangeSize(change));
4684}
struct ReorderBufferDiskChange ReorderBufferDiskChange
HeapTuple ReorderBufferAllocTupleBuf(ReorderBuffer *rb, Size tuple_len)
Oid * ReorderBufferAllocRelids(ReorderBuffer *rb, int nrelids)
struct SnapshotData * Snapshot
Definition: snapshot.h:117
ReorderBufferChange change

References ReorderBufferChange::action, Assert(), ReorderBufferDiskChange::change, ReorderBufferTXN::changes, ReorderBuffer::context, SnapshotData::copied, ReorderBufferChange::data, data, dlist_push_tail(), HEAPTUPLESIZE, ReorderBufferChange::inval, ReorderBufferChange::invalidations, MemoryContextAlloc(), MemoryContextAllocZero(), ReorderBufferChange::message, ReorderBufferChange::message_size, ReorderBufferChange::msg, ReorderBufferTXN::nentries_mem, ReorderBufferChange::newtuple, ReorderBufferChange::ninvalidations, ReorderBufferChange::node, ReorderBufferChange::nrelids, ReorderBufferChange::oldtuple, ReorderBufferChange::prefix, ReorderBufferChange::relids, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferAllocChange(), ReorderBufferAllocRelids(), ReorderBufferAllocTupleBuf(), ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), SizeofHeapTupleHeader, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, HeapTupleData::t_data, ReorderBufferChange::tp, ReorderBufferChange::truncate, SnapshotData::xcnt, and SnapshotData::xip.

Referenced by ReorderBufferRestoreChanges().

◆ ReorderBufferRestoreChanges()

static Size ReorderBufferRestoreChanges ( ReorderBuffer rb,
ReorderBufferTXN txn,
TXNEntryFile file,
XLogSegNo segno 
)
static

Definition at line 4380 of file reorderbuffer.c.

4382{
4383 Size restored = 0;
4384 XLogSegNo last_segno;
4385 dlist_mutable_iter cleanup_iter;
4386 File *fd = &file->vfd;
4387
4390
4391 /* free current entries, so we have memory for more */
4392 dlist_foreach_modify(cleanup_iter, &txn->changes)
4393 {
4395 dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
4396
4397 dlist_delete(&cleanup->node);
4399 }
4400 txn->nentries_mem = 0;
4402
4403 XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
4404
4405 while (restored < max_changes_in_memory && *segno <= last_segno)
4406 {
4407 int readBytes;
4409
4411
4412 if (*fd == -1)
4413 {
4414 char path[MAXPGPATH];
4415
4416 /* first time in */
4417 if (*segno == 0)
4418 XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
4419
4420 Assert(*segno != 0 || dlist_is_empty(&txn->changes));
4421
4422 /*
4423 * No need to care about TLIs here, only used during a single run,
4424 * so each LSN only maps to a specific WAL record.
4425 */
4427 *segno);
4428
4429 *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
4430
4431 /* No harm in resetting the offset even in case of failure */
4432 file->curOffset = 0;
4433
4434 if (*fd < 0 && errno == ENOENT)
4435 {
4436 *fd = -1;
4437 (*segno)++;
4438 continue;
4439 }
4440 else if (*fd < 0)
4441 ereport(ERROR,
4443 errmsg("could not open file \"%s\": %m",
4444 path)));
4445 }
4446
4447 /*
4448 * Read the statically sized part of a change which has information
4449 * about the total size. If we couldn't read a record, we're at the
4450 * end of this file.
4451 */
4453 readBytes = FileRead(file->vfd, rb->outbuf,
4455 file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
4456
4457 /* eof */
4458 if (readBytes == 0)
4459 {
4460 FileClose(*fd);
4461 *fd = -1;
4462 (*segno)++;
4463 continue;
4464 }
4465 else if (readBytes < 0)
4466 ereport(ERROR,
4468 errmsg("could not read from reorderbuffer spill file: %m")));
4469 else if (readBytes != sizeof(ReorderBufferDiskChange))
4470 ereport(ERROR,
4472 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4473 readBytes,
4474 (uint32) sizeof(ReorderBufferDiskChange))));
4475
4476 file->curOffset += readBytes;
4477
4478 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4479
4481 sizeof(ReorderBufferDiskChange) + ondisk->size);
4482 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4483
4484 readBytes = FileRead(file->vfd,
4485 rb->outbuf + sizeof(ReorderBufferDiskChange),
4486 ondisk->size - sizeof(ReorderBufferDiskChange),
4487 file->curOffset,
4488 WAIT_EVENT_REORDER_BUFFER_READ);
4489
4490 if (readBytes < 0)
4491 ereport(ERROR,
4493 errmsg("could not read from reorderbuffer spill file: %m")));
4494 else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
4495 ereport(ERROR,
4497 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4498 readBytes,
4499 (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
4500
4501 file->curOffset += readBytes;
4502
4503 /*
4504 * ok, read a full change from disk, now restore it into proper
4505 * in-memory format
4506 */
4507 ReorderBufferRestoreChange(rb, txn, rb->outbuf);
4508 restored++;
4509 }
4510
4511 return restored;
4512}
static void cleanup(void)
Definition: bootstrap.c:713
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1579
static ssize_t FileRead(File file, void *buffer, size_t amount, off_t offset, uint32 wait_event_info)
Definition: fd.h:199
int File
Definition: fd.h:51
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *data)
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
static const Size max_changes_in_memory
int wal_segment_size
Definition: xlog.c:143
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition: xlogdefs.h:48

References Assert(), ReorderBufferTXN::changes, CHECK_FOR_INTERRUPTS, cleanup(), dlist_mutable_iter::cur, TXNEntryFile::curOffset, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FileClose(), FileRead(), ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, max_changes_in_memory, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBuffer::outbuf, PathNameOpenFile(), PG_BINARY, ReorderBufferFreeChange(), ReorderBufferRestoreChange(), ReorderBufferSerializedPath(), ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, TXNEntryFile::vfd, wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferIterTXNInit(), and ReorderBufferIterTXNNext().

◆ ReorderBufferRestoreCleanup()

static void ReorderBufferRestoreCleanup ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 4690 of file reorderbuffer.c.

4691{
4692 XLogSegNo first;
4693 XLogSegNo cur;
4694 XLogSegNo last;
4695
4698
4701
4702 /* iterate over all possible filenames, and delete them */
4703 for (cur = first; cur <= last; cur++)
4704 {
4705 char path[MAXPGPATH];
4706
4708 if (unlink(path) != 0 && errno != ENOENT)
4709 ereport(ERROR,
4711 errmsg("could not remove file \"%s\": %m", path)));
4712 }
4713}
struct cursor * cur
Definition: ecpg.c:29

References Assert(), cur, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, MAXPGPATH, MyReplicationSlot, ReorderBufferSerializedPath(), wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferCleanupTXN(), and ReorderBufferTruncateTXN().

◆ ReorderBufferSaveTXNSnapshot()

static void ReorderBufferSaveTXNSnapshot ( ReorderBuffer rb,
ReorderBufferTXN txn,
Snapshot  snapshot_now,
CommandId  command_id 
)
inlinestatic

Definition at line 2101 of file reorderbuffer.c.

2103{
2104 txn->command_id = command_id;
2105
2106 /* Avoid copying if it's already copied. */
2107 if (snapshot_now->copied)
2108 txn->snapshot_now = snapshot_now;
2109 else
2110 txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2111 txn, command_id);
2112}

References ReorderBufferTXN::command_id, SnapshotData::copied, ReorderBufferCopySnap(), and ReorderBufferTXN::snapshot_now.

Referenced by ReorderBufferProcessTXN(), and ReorderBufferResetTXN().

◆ ReorderBufferSerializeChange()

static void ReorderBufferSerializeChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  fd,
ReorderBufferChange change 
)
static

Definition at line 3928 of file reorderbuffer.c.

3930{
3932 Size sz = sizeof(ReorderBufferDiskChange);
3933
3935
3936 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3937 memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
3938
3939 switch (change->action)
3940 {
3941 /* fall through these, they're all similar enough */
3946 {
3947 char *data;
3948 HeapTuple oldtup,
3949 newtup;
3950 Size oldlen = 0;
3951 Size newlen = 0;
3952
3953 oldtup = change->data.tp.oldtuple;
3954 newtup = change->data.tp.newtuple;
3955
3956 if (oldtup)
3957 {
3958 sz += sizeof(HeapTupleData);
3959 oldlen = oldtup->t_len;
3960 sz += oldlen;
3961 }
3962
3963 if (newtup)
3964 {
3965 sz += sizeof(HeapTupleData);
3966 newlen = newtup->t_len;
3967 sz += newlen;
3968 }
3969
3970 /* make sure we have enough space */
3972
3973 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3974 /* might have been reallocated above */
3975 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3976
3977 if (oldlen)
3978 {
3979 memcpy(data, oldtup, sizeof(HeapTupleData));
3980 data += sizeof(HeapTupleData);
3981
3982 memcpy(data, oldtup->t_data, oldlen);
3983 data += oldlen;
3984 }
3985
3986 if (newlen)
3987 {
3988 memcpy(data, newtup, sizeof(HeapTupleData));
3989 data += sizeof(HeapTupleData);
3990
3991 memcpy(data, newtup->t_data, newlen);
3992 data += newlen;
3993 }
3994 break;
3995 }
3997 {
3998 char *data;
3999 Size prefix_size = strlen(change->data.msg.prefix) + 1;
4000
4001 sz += prefix_size + change->data.msg.message_size +
4002 sizeof(Size) + sizeof(Size);
4004
4005 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
4006
4007 /* might have been reallocated above */
4008 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4009
4010 /* write the prefix including the size */
4011 memcpy(data, &prefix_size, sizeof(Size));
4012 data += sizeof(Size);
4013 memcpy(data, change->data.msg.prefix,
4014 prefix_size);
4015 data += prefix_size;
4016
4017 /* write the message including the size */
4018 memcpy(data, &change->data.msg.message_size, sizeof(Size));
4019 data += sizeof(Size);
4020 memcpy(data, change->data.msg.message,
4021 change->data.msg.message_size);
4022 data += change->data.msg.message_size;
4023
4024 break;
4025 }
4027 {
4028 char *data;
4029 Size inval_size = sizeof(SharedInvalidationMessage) *
4030 change->data.inval.ninvalidations;
4031
4032 sz += inval_size;
4033
4035 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
4036
4037 /* might have been reallocated above */
4038 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4039 memcpy(data, change->data.inval.invalidations, inval_size);
4040 data += inval_size;
4041
4042 break;
4043 }
4045 {
4046 Snapshot snap;
4047 char *data;
4048
4049 snap = change->data.snapshot;
4050
4051 sz += sizeof(SnapshotData) +
4052 sizeof(TransactionId) * snap->xcnt +
4053 sizeof(TransactionId) * snap->subxcnt;
4054
4055 /* make sure we have enough space */
4057 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
4058 /* might have been reallocated above */
4059 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4060
4061 memcpy(data, snap, sizeof(SnapshotData));
4062 data += sizeof(SnapshotData);
4063
4064 if (snap->xcnt)
4065 {
4066 memcpy(data, snap->xip,
4067 sizeof(TransactionId) * snap->xcnt);
4068 data += sizeof(TransactionId) * snap->xcnt;
4069 }
4070
4071 if (snap->subxcnt)
4072 {
4073 memcpy(data, snap->subxip,
4074 sizeof(TransactionId) * snap->subxcnt);
4075 data += sizeof(TransactionId) * snap->subxcnt;
4076 }
4077 break;
4078 }
4080 {
4081 Size size;
4082 char *data;
4083
4084 /* account for the OIDs of truncated relations */
4085 size = sizeof(Oid) * change->data.truncate.nrelids;
4086 sz += size;
4087
4088 /* make sure we have enough space */
4090
4091 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
4092 /* might have been reallocated above */
4093 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4094
4095 memcpy(data, change->data.truncate.relids, size);
4096 data += size;
4097
4098 break;
4099 }
4104 /* ReorderBufferChange contains everything important */
4105 break;
4106 }
4107
4108 ondisk->size = sz;
4109
4110 errno = 0;
4111 pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
4112 if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
4113 {
4114 int save_errno = errno;
4115
4117
4118 /* if write didn't set errno, assume problem is no disk space */
4119 errno = save_errno ? save_errno : ENOSPC;
4120 ereport(ERROR,
4122 errmsg("could not write to data file for XID %u: %m",
4123 txn->xid)));
4124 }
4126
4127 /*
4128 * Keep the transaction's final_lsn up to date with each change we send to
4129 * disk, so that ReorderBufferRestoreCleanup works correctly. (We used to
4130 * only do this on commit and abort records, but that doesn't work if a
4131 * system crash leaves a transaction without its abort record).
4132 *
4133 * Make sure not to move it backwards.
4134 */
4135 if (txn->final_lsn < change->lsn)
4136 txn->final_lsn = change->lsn;
4137
4138 Assert(ondisk->change.action == change->action);
4139}
#define write(a, b, c)
Definition: win32.h:14
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:81

References ReorderBufferChange::action, Assert(), ReorderBufferDiskChange::change, CloseTransientFile(), ReorderBufferChange::data, data, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferTXN::final_lsn, if(), ReorderBufferChange::inval, ReorderBufferChange::invalidations, ReorderBufferChange::lsn, ReorderBufferChange::message, ReorderBufferChange::message_size, ReorderBufferChange::msg, ReorderBufferChange::newtuple, ReorderBufferChange::ninvalidations, ReorderBufferChange::nrelids, ReorderBufferChange::oldtuple, ReorderBuffer::outbuf, pgstat_report_wait_end(), pgstat_report_wait_start(), ReorderBufferChange::prefix, ReorderBufferChange::relids, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, write, SnapshotData::xcnt, ReorderBufferTXN::xid, and SnapshotData::xip.

Referenced by ReorderBufferSerializeTXN().

◆ ReorderBufferSerializedPath()

static void ReorderBufferSerializedPath ( char *  path,
ReplicationSlot slot,
TransactionId  xid,
XLogSegNo  segno 
)
static

Definition at line 4759 of file reorderbuffer.c.

4761{
4762 XLogRecPtr recptr;
4763
4764 XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
4765
4766 snprintf(path, MAXPGPATH, "%s/%s/xid-%u-lsn-%X-%X.spill",
4769 xid, LSN_FORMAT_ARGS(recptr));
4770}
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43

References ReplicationSlot::data, LSN_FORMAT_ARGS, MAXPGPATH, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, PG_REPLSLOT_DIR, snprintf, wal_segment_size, and XLogSegNoOffsetToRecPtr.

Referenced by ReorderBufferRestoreChanges(), ReorderBufferRestoreCleanup(), and ReorderBufferSerializeTXN().

◆ ReorderBufferSerializeReserve()

static void ReorderBufferSerializeReserve ( ReorderBuffer rb,
Size  sz 
)
static

Definition at line 3629 of file reorderbuffer.c.

3630{
3631 if (!rb->outbufsize)
3632 {
3633 rb->outbuf = MemoryContextAlloc(rb->context, sz);
3634 rb->outbufsize = sz;
3635 }
3636 else if (rb->outbufsize < sz)
3637 {
3638 rb->outbuf = repalloc(rb->outbuf, sz);
3639 rb->outbufsize = sz;
3640 }
3641}

References ReorderBuffer::context, MemoryContextAlloc(), ReorderBuffer::outbuf, ReorderBuffer::outbufsize, and repalloc().

Referenced by ReorderBufferRestoreChanges(), and ReorderBufferSerializeChange().

◆ ReorderBufferSerializeTXN()

static void ReorderBufferSerializeTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 3833 of file reorderbuffer.c.

3834{
3835 dlist_iter subtxn_i;
3836 dlist_mutable_iter change_i;
3837 int fd = -1;
3838 XLogSegNo curOpenSegNo = 0;
3839 Size spilled = 0;
3840 Size size = txn->size;
3841
3842 elog(DEBUG2, "spill %u changes in XID %u to disk",
3843 (uint32) txn->nentries_mem, txn->xid);
3844
3845 /* do the same to all child TXs */
3846 dlist_foreach(subtxn_i, &txn->subtxns)
3847 {
3848 ReorderBufferTXN *subtxn;
3849
3850 subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
3851 ReorderBufferSerializeTXN(rb, subtxn);
3852 }
3853
3854 /* serialize changestream */
3855 dlist_foreach_modify(change_i, &txn->changes)
3856 {
3857 ReorderBufferChange *change;
3858
3859 change = dlist_container(ReorderBufferChange, node, change_i.cur);
3860
3861 /*
3862 * store in segment in which it belongs by start lsn, don't split over
3863 * multiple segments tho
3864 */
3865 if (fd == -1 ||
3866 !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
3867 {
3868 char path[MAXPGPATH];
3869
3870 if (fd != -1)
3872
3873 XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
3874
3875 /*
3876 * No need to care about TLIs here, only used during a single run,
3877 * so each LSN only maps to a specific WAL record.
3878 */
3880 curOpenSegNo);
3881
3882 /* open segment, create it if necessary */
3883 fd = OpenTransientFile(path,
3884 O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
3885
3886 if (fd < 0)
3887 ereport(ERROR,
3889 errmsg("could not open file \"%s\": %m", path)));
3890 }
3891
3892 ReorderBufferSerializeChange(rb, txn, fd, change);
3893 dlist_delete(&change->node);
3894 ReorderBufferFreeChange(rb, change, false);
3895
3896 spilled++;
3897 }
3898
3899 /* Update the memory counter */
3900 ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, size);
3901
3902 /* update the statistics iff we have spilled anything */
3903 if (spilled)
3904 {
3905 rb->spillCount += 1;
3906 rb->spillBytes += size;
3907
3908 /* don't consider already serialized transactions */
3909 rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
3910
3911 /* update the decoding stats */
3913 }
3914
3915 Assert(spilled == txn->nentries_mem);
3917 txn->nentries_mem = 0;
3919
3920 if (fd != -1)
3922}
void UpdateDecodingStats(LogicalDecodingContext *ctx)
Definition: logical.c:1915
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
#define rbtxn_is_serialized_clear(txn)
#define RBTXN_IS_SERIALIZED
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)

References Assert(), ReorderBufferTXN::changes, CloseTransientFile(), dlist_iter::cur, dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_delete(), dlist_foreach, dlist_foreach_modify, dlist_is_empty(), elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferChange::lsn, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, OpenTransientFile(), PG_BINARY, ReorderBuffer::private_data, RBTXN_IS_SERIALIZED, rbtxn_is_serialized, rbtxn_is_serialized_clear, ReorderBufferChangeMemoryUpdate(), ReorderBufferFreeChange(), ReorderBufferSerializeChange(), ReorderBufferSerializedPath(), ReorderBufferSerializeTXN(), ReorderBufferTXN::size, ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBufferTXN::subtxns, ReorderBufferTXN::txn_flags, UpdateDecodingStats(), wal_segment_size, ReorderBufferTXN::xid, XLByteInSeg, and XLByteToSeg.

Referenced by ReorderBufferCheckMemoryLimit(), ReorderBufferIterTXNInit(), and ReorderBufferSerializeTXN().

◆ ReorderBufferSetBaseSnapshot()

void ReorderBufferSetBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 3271 of file reorderbuffer.c.

3273{
3274 ReorderBufferTXN *txn;
3275 bool is_new;
3276
3277 Assert(snap != NULL);
3278
3279 /*
3280 * Fetch the transaction to operate on. If we know it's a subtransaction,
3281 * operate on its top-level transaction instead.
3282 */
3283 txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
3284 if (rbtxn_is_known_subxact(txn))
3285 txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3286 NULL, InvalidXLogRecPtr, false);
3287 Assert(txn->base_snapshot == NULL);
3288
3289 txn->base_snapshot = snap;
3290 txn->base_snapshot_lsn = lsn;
3292
3294}

References Assert(), AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBufferTXN::base_snapshot_node, dlist_push_tail(), InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), ReorderBufferTXN::toplevel_xid, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by SnapBuildCommitTxn(), and SnapBuildProcessChange().

◆ ReorderBufferSetRestartPoint()

void ReorderBufferSetRestartPoint ( ReorderBuffer rb,
XLogRecPtr  ptr 
)

Definition at line 1068 of file reorderbuffer.c.

1069{
1071}

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

◆ ReorderBufferSkipPrepare()

void ReorderBufferSkipPrepare ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 2892 of file reorderbuffer.c.

2893{
2894 ReorderBufferTXN *txn;
2895
2896 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2897
2898 /* unknown transaction, nothing to do */
2899 if (txn == NULL)
2900 return;
2901
2902 /* txn must have been marked as a prepared transaction */
2905}

References Assert(), InvalidXLogRecPtr, RBTXN_IS_PREPARED, RBTXN_PREPARE_STATUS_MASK, RBTXN_SKIPPED_PREPARE, ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by DecodePrepare().

◆ ReorderBufferStreamCommit()

static void ReorderBufferStreamCommit ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1964 of file reorderbuffer.c.

1965{
1966 /* we should only call this for previously streamed transactions */
1968
1969 ReorderBufferStreamTXN(rb, txn);
1970
1971 if (rbtxn_is_prepared(txn))
1972 {
1973 /*
1974 * Note, we send stream prepare even if a concurrent abort is
1975 * detected. See DecodePrepare for more information.
1976 */
1978 rb->stream_prepare(rb, txn, txn->final_lsn);
1980
1981 /*
1982 * This is a PREPARED transaction, part of a two-phase commit. The
1983 * full cleanup will happen as part of the COMMIT PREPAREDs, so now
1984 * just truncate txn by removing changes and tuplecids.
1985 */
1986 ReorderBufferTruncateTXN(rb, txn, true);
1987 /* Reset the CheckXidAlive */
1989 }
1990 else
1991 {
1992 rb->stream_commit(rb, txn, txn->final_lsn);
1993 ReorderBufferCleanupTXN(rb, txn);
1994 }
1995}
ReorderBufferStreamPrepareCB stream_prepare
ReorderBufferStreamCommitCB stream_commit

References Assert(), CheckXidAlive, ReorderBufferTXN::final_lsn, InvalidTransactionId, rbtxn_is_prepared, rbtxn_is_streamed, RBTXN_SENT_PREPARE, rbtxn_sent_prepare, ReorderBufferCleanupTXN(), ReorderBufferStreamTXN(), ReorderBufferTruncateTXN(), ReorderBuffer::stream_commit, ReorderBuffer::stream_prepare, and ReorderBufferTXN::txn_flags.

Referenced by ReorderBufferReplay().

◆ ReorderBufferStreamTXN()

static void ReorderBufferStreamTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 4178 of file reorderbuffer.c.

4179{
4180 Snapshot snapshot_now;
4181 CommandId command_id;
4182 Size stream_bytes;
4183 bool txn_is_streamed;
4184
4185 /* We can never reach here for a subtransaction. */
4186 Assert(rbtxn_is_toptxn(txn));
4187
4188 /*
4189 * We can't make any assumptions about base snapshot here, similar to what
4190 * ReorderBufferCommit() does. That relies on base_snapshot getting
4191 * transferred from subxact in ReorderBufferCommitChild(), but that was
4192 * not yet called as the transaction is in-progress.
4193 *
4194 * So just walk the subxacts and use the same logic here. But we only need
4195 * to do that once, when the transaction is streamed for the first time.
4196 * After that we need to reuse the snapshot from the previous run.
4197 *
4198 * Unlike DecodeCommit which adds xids of all the subtransactions in
4199 * snapshot's xip array via SnapBuildCommitTxn, we can't do that here but
4200 * we do add them to subxip array instead via ReorderBufferCopySnap. This
4201 * allows the catalog changes made in subtransactions decoded till now to
4202 * be visible.
4203 */
4204 if (txn->snapshot_now == NULL)
4205 {
4206 dlist_iter subxact_i;
4207
4208 /* make sure this transaction is streamed for the first time */
4210
4211 /* at the beginning we should have invalid command ID */
4213
4214 dlist_foreach(subxact_i, &txn->subtxns)
4215 {
4216 ReorderBufferTXN *subtxn;
4217
4218 subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
4220 }
4221
4222 /*
4223 * If this transaction has no snapshot, it didn't make any changes to
4224 * the database till now, so there's nothing to decode.
4225 */
4226 if (txn->base_snapshot == NULL)
4227 {
4228 Assert(txn->ninvalidations == 0);
4229 return;
4230 }
4231
4232 command_id = FirstCommandId;
4233 snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
4234 txn, command_id);
4235 }
4236 else
4237 {
4238 /* the transaction must have been already streamed */
4240
4241 /*
4242 * Nah, we already have snapshot from the previous streaming run. We
4243 * assume new subxacts can't move the LSN backwards, and so can't beat
4244 * the LSN condition in the previous branch (so no need to walk
4245 * through subxacts again). In fact, we must not do that as we may be
4246 * using snapshot half-way through the subxact.
4247 */
4248 command_id = txn->command_id;
4249
4250 /*
4251 * We can't use txn->snapshot_now directly because after the last
4252 * streaming run, we might have got some new sub-transactions. So we
4253 * need to add them to the snapshot.
4254 */
4255 snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now,
4256 txn, command_id);
4257
4258 /* Free the previously copied snapshot. */
4259 Assert(txn->snapshot_now->copied);
4261 txn->snapshot_now = NULL;
4262 }
4263
4264 /*
4265 * Remember this information to be used later to update stats. We can't
4266 * update the stats here as an error while processing the changes would
4267 * lead to the accumulation of stats even though we haven't streamed all
4268 * the changes.
4269 */
4270 txn_is_streamed = rbtxn_is_streamed(txn);
4271 stream_bytes = txn->total_size;
4272
4273 /* Process and send the changes to output plugin. */
4274 ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
4275 command_id, true);
4276
4277 rb->streamCount += 1;
4278 rb->streamBytes += stream_bytes;
4279
4280 /* Don't consider already streamed transaction. */
4281 rb->streamTxns += (txn_is_streamed) ? 0 : 1;
4282
4283 /* update the decoding stats */
4285
4287 Assert(txn->nentries == 0);
4288 Assert(txn->nentries_mem == 0);
4289}

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::changes, ReorderBufferTXN::command_id, SnapshotData::copied, dlist_iter::cur, dlist_container, dlist_foreach, dlist_is_empty(), FirstCommandId, InvalidCommandId, InvalidXLogRecPtr, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferTXN::ninvalidations, ReorderBuffer::private_data, rbtxn_is_streamed, rbtxn_is_toptxn, ReorderBufferCopySnap(), ReorderBufferFreeSnap(), ReorderBufferProcessTXN(), ReorderBufferTransferSnapToParent(), ReorderBufferTXN::snapshot_now, ReorderBuffer::streamBytes, ReorderBuffer::streamCount, ReorderBuffer::streamTxns, ReorderBufferTXN::subtxns, ReorderBufferTXN::total_size, and UpdateDecodingStats().

Referenced by ReorderBufferCheckMemoryLimit(), ReorderBufferProcessPartialChange(), and ReorderBufferStreamCommit().

◆ ReorderBufferToastAppendChunk()

static void ReorderBufferToastAppendChunk ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 4831 of file reorderbuffer.c.

4833{
4835 HeapTuple newtup;
4836 bool found;
4837 int32 chunksize;
4838 bool isnull;
4839 Pointer chunk;
4840 TupleDesc desc = RelationGetDescr(relation);
4841 Oid chunk_id;
4842 int32 chunk_seq;
4843
4844 if (txn->toast_hash == NULL)
4846
4847 Assert(IsToastRelation(relation));
4848
4849 newtup = change->data.tp.newtuple;
4850 chunk_id = DatumGetObjectId(fastgetattr(newtup, 1, desc, &isnull));
4851 Assert(!isnull);
4852 chunk_seq = DatumGetInt32(fastgetattr(newtup, 2, desc, &isnull));
4853 Assert(!isnull);
4854
4855 ent = (ReorderBufferToastEnt *)
4856 hash_search(txn->toast_hash, &chunk_id, HASH_ENTER, &found);
4857
4858 if (!found)
4859 {
4860 Assert(ent->chunk_id == chunk_id);
4861 ent->num_chunks = 0;
4862 ent->last_chunk_seq = 0;
4863 ent->size = 0;
4864 ent->reconstructed = NULL;
4865 dlist_init(&ent->chunks);
4866
4867 if (chunk_seq != 0)
4868 elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
4869 chunk_seq, chunk_id);
4870 }
4871 else if (found && chunk_seq != ent->last_chunk_seq + 1)
4872 elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
4873 chunk_seq, chunk_id, ent->last_chunk_seq + 1);
4874
4875 chunk = DatumGetPointer(fastgetattr(newtup, 3, desc, &isnull));
4876 Assert(!isnull);
4877
4878 /* calculate size so we can allocate the right size at once later */
4879 if (!VARATT_IS_EXTENDED(chunk))
4880 chunksize = VARSIZE(chunk) - VARHDRSZ;
4881 else if (VARATT_IS_SHORT(chunk))
4882 /* could happen due to heap_form_tuple doing its thing */
4883 chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
4884 else
4885 elog(ERROR, "unexpected type of toast chunk");
4886
4887 ent->size += chunksize;
4888 ent->last_chunk_seq = chunk_seq;
4889 ent->num_chunks++;
4890 dlist_push_tail(&ent->chunks, &change->node);
4891}
char * Pointer
Definition: c.h:493
#define VARHDRSZ
Definition: c.h:663
static Datum fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull)
Definition: htup_details.h:861
static Oid DatumGetObjectId(Datum X)
Definition: postgres.h:247
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:317
#define RelationGetDescr(relation)
Definition: rel.h:542
static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
struct varlena * reconstructed
#define VARHDRSZ_SHORT
Definition: varatt.h:255
#define VARSIZE_SHORT(PTR)
Definition: varatt.h:281
#define VARATT_IS_EXTENDED(PTR)
Definition: varatt.h:303
#define VARATT_IS_SHORT(PTR)
Definition: varatt.h:302
#define VARSIZE(PTR)
Definition: varatt.h:279

References Assert(), ReorderBufferToastEnt::chunk_id, ReorderBufferToastEnt::chunks, ReorderBufferChange::data, DatumGetInt32(), DatumGetObjectId(), DatumGetPointer(), dlist_init(), dlist_push_tail(), elog, ERROR, fastgetattr(), HASH_ENTER, hash_search(), IsToastRelation(), ReorderBufferToastEnt::last_chunk_seq, ReorderBufferChange::newtuple, ReorderBufferChange::node, ReorderBufferToastEnt::num_chunks, ReorderBufferToastEnt::reconstructed, RelationGetDescr, ReorderBufferToastInitHash(), ReorderBufferToastEnt::size, ReorderBufferTXN::toast_hash, ReorderBufferChange::tp, VARATT_IS_EXTENDED, VARATT_IS_SHORT, VARHDRSZ, VARHDRSZ_SHORT, VARSIZE, and VARSIZE_SHORT.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferToastInitHash()

static void ReorderBufferToastInitHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 4811 of file reorderbuffer.c.

4812{
4813 HASHCTL hash_ctl;
4814
4815 Assert(txn->toast_hash == NULL);
4816
4817 hash_ctl.keysize = sizeof(Oid);
4818 hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
4819 hash_ctl.hcxt = rb->context;
4820 txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
4822}
struct ReorderBufferToastEnt ReorderBufferToastEnt

References Assert(), ReorderBuffer::context, HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, and ReorderBufferTXN::toast_hash.

Referenced by ReorderBufferToastAppendChunk().

◆ ReorderBufferToastReplace()

static void ReorderBufferToastReplace ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 4914 of file reorderbuffer.c.

4916{
4917 TupleDesc desc;
4918 int natt;
4919 Datum *attrs;
4920 bool *isnull;
4921 bool *free;
4922 HeapTuple tmphtup;
4923 Relation toast_rel;
4924 TupleDesc toast_desc;
4925 MemoryContext oldcontext;
4926 HeapTuple newtup;
4927 Size old_size;
4928
4929 /* no toast tuples changed */
4930 if (txn->toast_hash == NULL)
4931 return;
4932
4933 /*
4934 * We're going to modify the size of the change. So, to make sure the
4935 * accounting is correct we record the current change size and then after
4936 * re-computing the change we'll subtract the recorded size and then
4937 * re-add the new change size at the end. We don't immediately subtract
4938 * the old size because if there is any error before we add the new size,
4939 * we will release the changes and that will update the accounting info
4940 * (subtracting the size from the counters). And we don't want to
4941 * underflow there.
4942 */
4943 old_size = ReorderBufferChangeSize(change);
4944
4945 oldcontext = MemoryContextSwitchTo(rb->context);
4946
4947 /* we should only have toast tuples in an INSERT or UPDATE */
4948 Assert(change->data.tp.newtuple);
4949
4950 desc = RelationGetDescr(relation);
4951
4952 toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
4953 if (!RelationIsValid(toast_rel))
4954 elog(ERROR, "could not open toast relation with OID %u (base relation \"%s\")",
4955 relation->rd_rel->reltoastrelid, RelationGetRelationName(relation));
4956
4957 toast_desc = RelationGetDescr(toast_rel);
4958
4959 /* should we allocate from stack instead? */
4960 attrs = palloc0(sizeof(Datum) * desc->natts);
4961 isnull = palloc0(sizeof(bool) * desc->natts);
4962 free = palloc0(sizeof(bool) * desc->natts);
4963
4964 newtup = change->data.tp.newtuple;
4965
4966 heap_deform_tuple(newtup, desc, attrs, isnull);
4967
4968 for (natt = 0; natt < desc->natts; natt++)
4969 {
4970 Form_pg_attribute attr = TupleDescAttr(desc, natt);
4972 struct varlena *varlena;
4973
4974 /* va_rawsize is the size of the original datum -- including header */
4975 struct varatt_external toast_pointer;
4976 struct varatt_indirect redirect_pointer;
4977 struct varlena *new_datum = NULL;
4978 struct varlena *reconstructed;
4979 dlist_iter it;
4980 Size data_done = 0;
4981
4982 /* system columns aren't toasted */
4983 if (attr->attnum < 0)
4984 continue;
4985
4986 if (attr->attisdropped)
4987 continue;
4988
4989 /* not a varlena datatype */
4990 if (attr->attlen != -1)
4991 continue;
4992
4993 /* no data */
4994 if (isnull[natt])
4995 continue;
4996
4997 /* ok, we know we have a toast datum */
4998 varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
4999
5000 /* no need to do anything if the tuple isn't external */
5002 continue;
5003
5004 VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
5005
5006 /*
5007 * Check whether the toast tuple changed, replace if so.
5008 */
5009 ent = (ReorderBufferToastEnt *)
5011 &toast_pointer.va_valueid,
5012 HASH_FIND,
5013 NULL);
5014 if (ent == NULL)
5015 continue;
5016
5017 new_datum =
5019
5020 free[natt] = true;
5021
5022 reconstructed = palloc0(toast_pointer.va_rawsize);
5023
5024 ent->reconstructed = reconstructed;
5025
5026 /* stitch toast tuple back together from its parts */
5027 dlist_foreach(it, &ent->chunks)
5028 {
5029 bool cisnull;
5030 ReorderBufferChange *cchange;
5031 HeapTuple ctup;
5032 Pointer chunk;
5033
5034 cchange = dlist_container(ReorderBufferChange, node, it.cur);
5035 ctup = cchange->data.tp.newtuple;
5036 chunk = DatumGetPointer(fastgetattr(ctup, 3, toast_desc, &cisnull));
5037
5038 Assert(!cisnull);
5039 Assert(!VARATT_IS_EXTERNAL(chunk));
5040 Assert(!VARATT_IS_SHORT(chunk));
5041
5042 memcpy(VARDATA(reconstructed) + data_done,
5043 VARDATA(chunk),
5044 VARSIZE(chunk) - VARHDRSZ);
5045 data_done += VARSIZE(chunk) - VARHDRSZ;
5046 }
5047 Assert(data_done == VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer));
5048
5049 /* make sure its marked as compressed or not */
5050 if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
5051 SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
5052 else
5053 SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
5054
5055 memset(&redirect_pointer, 0, sizeof(redirect_pointer));
5056 redirect_pointer.pointer = reconstructed;
5057
5059 memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
5060 sizeof(redirect_pointer));
5061
5062 attrs[natt] = PointerGetDatum(new_datum);
5063 }
5064
5065 /*
5066 * Build tuple in separate memory & copy tuple back into the tuplebuf
5067 * passed to the output plugin. We can't directly heap_fill_tuple() into
5068 * the tuplebuf because attrs[] will point back into the current content.
5069 */
5070 tmphtup = heap_form_tuple(desc, attrs, isnull);
5071 Assert(newtup->t_len <= MaxHeapTupleSize);
5072 Assert(newtup->t_data == (HeapTupleHeader) ((char *) newtup + HEAPTUPLESIZE));
5073
5074 memcpy(newtup->t_data, tmphtup->t_data, tmphtup->t_len);
5075 newtup->t_len = tmphtup->t_len;
5076
5077 /*
5078 * free resources we won't further need, more persistent stuff will be
5079 * free'd in ReorderBufferToastReset().
5080 */
5081 RelationClose(toast_rel);
5082 pfree(tmphtup);
5083 for (natt = 0; natt < desc->natts; natt++)
5084 {
5085 if (free[natt])
5086 pfree(DatumGetPointer(attrs[natt]));
5087 }
5088 pfree(attrs);
5089 pfree(free);
5090 pfree(isnull);
5091
5092 MemoryContextSwitchTo(oldcontext);
5093
5094 /* subtract the old change size */
5095 ReorderBufferChangeMemoryUpdate(rb, change, NULL, false, old_size);
5096 /* now add the change back, with the correct size */
5097 ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
5098 ReorderBufferChangeSize(change));
5099}
#define INDIRECT_POINTER_SIZE
Definition: detoast.h:34
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
Definition: detoast.h:22
#define free(a)
Definition: header.h:65
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1117
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:1346
#define MaxHeapTupleSize
Definition: htup_details.h:610
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:202
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:327
uintptr_t Datum
Definition: postgres.h:69
#define RelationGetRelationName(relation)
Definition: rel.h:550
Definition: c.h:658
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:160
#define SET_VARSIZE_COMPRESSED(PTR, len)
Definition: varatt.h:307
#define VARDATA(PTR)
Definition: varatt.h:278
#define SET_VARTAG_EXTERNAL(PTR, tag)
Definition: varatt.h:309
#define VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)
Definition: varatt.h:354
#define VARDATA_EXTERNAL(PTR)
Definition: varatt.h:286
#define SET_VARSIZE(PTR, len)
Definition: varatt.h:305
#define VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer)
Definition: varatt.h:334
#define VARATT_IS_EXTERNAL(PTR)
Definition: varatt.h:289
@ VARTAG_INDIRECT
Definition: varatt.h:86

References Assert(), ReorderBufferToastEnt::chunks, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, DatumGetPointer(), dlist_container, dlist_foreach, elog, ERROR, fastgetattr(), free, HASH_FIND, hash_search(), heap_deform_tuple(), heap_form_tuple(), HEAPTUPLESIZE, INDIRECT_POINTER_SIZE, MaxHeapTupleSize, MemoryContextSwitchTo(), TupleDescData::natts, ReorderBufferChange::newtuple, palloc0(), pfree(), varatt_indirect::pointer, PointerGetDatum(), RelationData::rd_rel, ReorderBufferToastEnt::reconstructed, RelationClose(), RelationGetDescr, RelationGetRelationName, RelationIdGetRelation(), RelationIsValid, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), SET_VARSIZE, SET_VARSIZE_COMPRESSED, SET_VARTAG_EXTERNAL, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferTXN::toast_hash, ReorderBufferChange::tp, TupleDescAttr(), varatt_external::va_rawsize, varatt_external::va_valueid, VARATT_EXTERNAL_GET_EXTSIZE, VARATT_EXTERNAL_GET_POINTER, VARATT_EXTERNAL_IS_COMPRESSED, VARATT_IS_EXTERNAL, VARATT_IS_SHORT, VARDATA, VARDATA_EXTERNAL, VARHDRSZ, VARSIZE, and VARTAG_INDIRECT.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferToastReset()

static void ReorderBufferToastReset ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 5105 of file reorderbuffer.c.

5106{
5107 HASH_SEQ_STATUS hstat;
5109
5110 if (txn->toast_hash == NULL)
5111 return;
5112
5113 /* sequentially walk over the hash and free everything */
5114 hash_seq_init(&hstat, txn->toast_hash);
5115 while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
5116 {
5118
5119 if (ent->reconstructed != NULL)
5120 pfree(ent->reconstructed);
5121
5122 dlist_foreach_modify(it, &ent->chunks)
5123 {
5124 ReorderBufferChange *change =
5126
5127 dlist_delete(&change->node);
5128 ReorderBufferFreeChange(rb, change, true);
5129 }
5130 }
5131
5133 txn->toast_hash = NULL;
5134}
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1420
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1385

References ReorderBufferToastEnt::chunks, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, hash_destroy(), hash_seq_init(), hash_seq_search(), ReorderBufferChange::node, pfree(), ReorderBufferToastEnt::reconstructed, ReorderBufferFreeChange(), and ReorderBufferTXN::toast_hash.

Referenced by ReorderBufferCheckAndTruncateAbortedTXN(), ReorderBufferFreeTXN(), ReorderBufferProcessTXN(), and ReorderBufferResetTXN().

◆ ReorderBufferTransferSnapToParent()

static void ReorderBufferTransferSnapToParent ( ReorderBufferTXN txn,
ReorderBufferTXN subtxn 
)
static

Definition at line 1146 of file reorderbuffer.c.

1148{
1149 Assert(subtxn->toplevel_xid == txn->xid);
1150
1151 if (subtxn->base_snapshot != NULL)
1152 {
1153 if (txn->base_snapshot == NULL ||
1154 subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
1155 {
1156 /*
1157 * If the toplevel transaction already has a base snapshot but
1158 * it's newer than the subxact's, purge it.
1159 */
1160 if (txn->base_snapshot != NULL)
1161 {
1164 }
1165
1166 /*
1167 * The snapshot is now the top transaction's; transfer it, and
1168 * adjust the list position of the top transaction in the list by
1169 * moving it to where the subtransaction is.
1170 */
1171 txn->base_snapshot = subtxn->base_snapshot;
1172 txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
1174 &txn->base_snapshot_node);
1175
1176 /*
1177 * The subtransaction doesn't have a snapshot anymore (so it
1178 * mustn't be in the list.)
1179 */
1180 subtxn->base_snapshot = NULL;
1183 }
1184 else
1185 {
1186 /* Base snap of toplevel is fine, so subxact's is not needed */
1189 subtxn->base_snapshot = NULL;
1191 }
1192 }
1193}
static void dlist_insert_before(dlist_node *before, dlist_node *node)
Definition: ilist.h:393

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBufferTXN::base_snapshot_node, dlist_delete(), dlist_insert_before(), InvalidXLogRecPtr, SnapBuildSnapDecRefcount(), ReorderBufferTXN::toplevel_xid, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAssignChild(), and ReorderBufferStreamTXN().

◆ ReorderBufferTruncateTXN()

static void ReorderBufferTruncateTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
bool  txn_prepared 
)
static

Definition at line 1637 of file reorderbuffer.c.

1638{
1639 dlist_mutable_iter iter;
1640 Size mem_freed = 0;
1641
1642 /* cleanup subtransactions & their changes */
1643 dlist_foreach_modify(iter, &txn->subtxns)
1644 {
1645 ReorderBufferTXN *subtxn;
1646
1647 subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1648
1649 /*
1650 * Subtransactions are always associated to the toplevel TXN, even if
1651 * they originally were happening inside another subtxn, so we won't
1652 * ever recurse more than one level deep here.
1653 */
1655 Assert(subtxn->nsubtxns == 0);
1656
1658 ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
1659 }
1660
1661 /* cleanup changes in the txn */
1662 dlist_foreach_modify(iter, &txn->changes)
1663 {
1664 ReorderBufferChange *change;
1665
1666 change = dlist_container(ReorderBufferChange, node, iter.cur);
1667
1668 /* Check we're not mixing changes from different transactions. */
1669 Assert(change->txn == txn);
1670
1671 /* remove the change from its containing list */
1672 dlist_delete(&change->node);
1673
1674 /*
1675 * Instead of updating the memory counter for individual changes, we
1676 * sum up the size of memory to free so we can update the memory
1677 * counter all together below. This saves costs of maintaining the
1678 * max-heap.
1679 */
1680 mem_freed += ReorderBufferChangeSize(change);
1681
1682 ReorderBufferFreeChange(rb, change, false);
1683 }
1684
1685 /* Update the memory counter */
1686 ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
1687
1688 if (txn_prepared)
1689 {
1690 /*
1691 * If this is a prepared txn, cleanup the tuplecids we stored for
1692 * decoding catalog snapshot access. They are always stored in the
1693 * toplevel transaction.
1694 */
1695 dlist_foreach_modify(iter, &txn->tuplecids)
1696 {
1697 ReorderBufferChange *change;
1698
1699 change = dlist_container(ReorderBufferChange, node, iter.cur);
1700
1701 /* Check we're not mixing changes from different transactions. */
1702 Assert(change->txn == txn);
1704
1705 /* Remove the change from its containing list. */
1706 dlist_delete(&change->node);
1707
1708 ReorderBufferFreeChange(rb, change, true);
1709 }
1710 }
1711
1712 /*
1713 * Destroy the (relfilelocator, ctid) hashtable, so that we don't leak any
1714 * memory. We could also keep the hash table and update it with new ctid
1715 * values, but this seems simpler and good enough for now.
1716 */
1717 if (txn->tuplecid_hash != NULL)
1718 {
1720 txn->tuplecid_hash = NULL;
1721 }
1722
1723 /* If this txn is serialized then clean the disk space. */
1724 if (rbtxn_is_serialized(txn))
1725 {
1727 txn->txn_flags &= ~RBTXN_IS_SERIALIZED;
1728
1729 /*
1730 * We set this flag to indicate if the transaction is ever serialized.
1731 * We need this to accurately update the stats as otherwise the same
1732 * transaction can be counted as serialized multiple times.
1733 */
1735 }
1736
1737 /* also reset the number of entries in the transaction */
1738 txn->nentries_mem = 0;
1739 txn->nentries = 0;
1740}
#define RBTXN_IS_SERIALIZED_CLEAR

References ReorderBufferChange::action, Assert(), ReorderBufferTXN::changes, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, hash_destroy(), ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, rbtxn_is_serialized, RBTXN_IS_SERIALIZED_CLEAR, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferFreeChange(), ReorderBufferMaybeMarkTXNStreamed(), ReorderBufferRestoreCleanup(), ReorderBufferTruncateTXN(), ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecid_hash, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXN::txn_flags.

Referenced by ReorderBufferCheckAndTruncateAbortedTXN(), ReorderBufferProcessTXN(), ReorderBufferResetTXN(), ReorderBufferStreamCommit(), and ReorderBufferTruncateTXN().

◆ ReorderBufferTXNByXid()

static ReorderBufferTXN * ReorderBufferTXNByXid ( ReorderBuffer rb,
TransactionId  xid,
bool  create,
bool *  is_new,
XLogRecPtr  lsn,
bool  create_as_top 
)
static

Definition at line 634 of file reorderbuffer.c.

636{
637 ReorderBufferTXN *txn;
639 bool found;
640
642
643 /*
644 * Check the one-entry lookup cache first
645 */
647 rb->by_txn_last_xid == xid)
648 {
649 txn = rb->by_txn_last_txn;
650
651 if (txn != NULL)
652 {
653 /* found it, and it's valid */
654 if (is_new)
655 *is_new = false;
656 return txn;
657 }
658
659 /*
660 * cached as non-existent, and asked not to create? Then nothing else
661 * to do.
662 */
663 if (!create)
664 return NULL;
665 /* otherwise fall through to create it */
666 }
667
668 /*
669 * If the cache wasn't hit or it yielded a "does-not-exist" and we want to
670 * create an entry.
671 */
672
673 /* search the lookup table */
676 &xid,
677 create ? HASH_ENTER : HASH_FIND,
678 &found);
679 if (found)
680 txn = ent->txn;
681 else if (create)
682 {
683 /* initialize the new entry, if creation was requested */
684 Assert(ent != NULL);
686
687 ent->txn = ReorderBufferAllocTXN(rb);
688 ent->txn->xid = xid;
689 txn = ent->txn;
690 txn->first_lsn = lsn;
692
693 if (create_as_top)
694 {
697 }
698 }
699 else
700 txn = NULL; /* not found and not asked to create */
701
702 /* update cache */
703 rb->by_txn_last_xid = xid;
704 rb->by_txn_last_txn = txn;
705
706 if (is_new)
707 *is_new = !found;
708
709 Assert(!create || txn != NULL);
710 return txn;
711}
static ReorderBufferTXN * ReorderBufferAllocTXN(ReorderBuffer *rb)
ReorderBufferTXN * txn
XLogRecPtr restart_decoding_lsn
#define TransactionIdIsValid(xid)
Definition: transam.h:41

References Assert(), AssertTXNLsnOrder(), ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::current_restart_decoding_lsn, dlist_push_tail(), ReorderBufferTXN::first_lsn, HASH_ENTER, HASH_FIND, hash_search(), InvalidXLogRecPtr, ReorderBufferTXN::node, ReorderBufferAllocTXN(), ReorderBufferTXN::restart_decoding_lsn, ReorderBuffer::toplevel_by_lsn, TransactionIdIsValid, ReorderBufferTXNByIdEnt::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAddInvalidations(), ReorderBufferAddNewTupleCids(), ReorderBufferAssignChild(), ReorderBufferCommit(), ReorderBufferCommitChild(), ReorderBufferFinishPrepared(), ReorderBufferForget(), ReorderBufferGetInvalidations(), ReorderBufferInvalidate(), ReorderBufferPrepare(), ReorderBufferProcessXid(), ReorderBufferQueueChange(), ReorderBufferQueueMessage(), ReorderBufferRememberPrepareInfo(), ReorderBufferSetBaseSnapshot(), ReorderBufferSkipPrepare(), ReorderBufferXidHasBaseSnapshot(), ReorderBufferXidHasCatalogChanges(), and ReorderBufferXidSetCatalogChanges().

◆ ReorderBufferTXNSizeCompare()

static int ReorderBufferTXNSizeCompare ( const pairingheap_node a,
const pairingheap_node b,
void *  arg 
)
static

Definition at line 3646 of file reorderbuffer.c.

3647{
3650
3651 if (ta->size < tb->size)
3652 return -1;
3653 if (ta->size > tb->size)
3654 return 1;
3655 return 0;
3656}
#define pairingheap_const_container(type, membername, ptr)
Definition: pairingheap.h:51

References a, b, pairingheap_const_container, and ReorderBufferTXN::size.

Referenced by ReorderBufferAllocate().

◆ ReorderBufferXidHasBaseSnapshot()

bool ReorderBufferXidHasBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 3599 of file reorderbuffer.c.

3600{
3601 ReorderBufferTXN *txn;
3602
3603 txn = ReorderBufferTXNByXid(rb, xid, false,
3604 NULL, InvalidXLogRecPtr, false);
3605
3606 /* transaction isn't known yet, ergo no snapshot */
3607 if (txn == NULL)
3608 return false;
3609
3610 /* a known subtxn? operate on top-level txn instead */
3611 if (rbtxn_is_known_subxact(txn))
3612 txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3613 NULL, InvalidXLogRecPtr, false);
3614
3615 return txn->base_snapshot != NULL;
3616}

References ReorderBufferTXN::base_snapshot, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), and ReorderBufferTXN::toplevel_xid.

Referenced by SnapBuildCommitTxn(), SnapBuildDistributeSnapshotAndInval(), and SnapBuildProcessChange().

◆ ReorderBufferXidHasCatalogChanges()

bool ReorderBufferXidHasCatalogChanges ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 3582 of file reorderbuffer.c.

3583{
3584 ReorderBufferTXN *txn;
3585
3586 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3587 false);
3588 if (txn == NULL)
3589 return false;
3590
3591 return rbtxn_has_catalog_changes(txn);
3592}

References InvalidXLogRecPtr, rbtxn_has_catalog_changes, and ReorderBufferTXNByXid().

Referenced by SnapBuildXidHasCatalogChanges().

◆ ReorderBufferXidSetCatalogChanges()

void ReorderBufferXidSetCatalogChanges ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3509 of file reorderbuffer.c.

3511{
3512 ReorderBufferTXN *txn;
3513
3514 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3515
3516 if (!rbtxn_has_catalog_changes(txn))
3517 {
3520 }
3521
3522 /*
3523 * Mark top-level transaction as having catalog changes too if one of its
3524 * children has so that the ReorderBufferBuildTupleCidHash can
3525 * conveniently check just top-level transaction and decide whether to
3526 * build the hash table or not.
3527 */
3528 if (rbtxn_is_subtxn(txn))
3529 {
3530 ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
3531
3532 if (!rbtxn_has_catalog_changes(toptxn))
3533 {
3536 }
3537 }
3538}
static void dclist_push_tail(dclist_head *head, dlist_node *node)
Definition: ilist.h:709
#define rbtxn_is_subtxn(txn)
#define RBTXN_HAS_CATALOG_CHANGES

References ReorderBufferTXN::catchange_node, ReorderBuffer::catchange_txns, dclist_push_tail(), rbtxn_get_toptxn, RBTXN_HAS_CATALOG_CHANGES, rbtxn_has_catalog_changes, rbtxn_is_subtxn, ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by SnapBuildProcessNewCid(), and xact_decode().

◆ ResolveCminCmaxDuringDecoding()

bool ResolveCminCmaxDuringDecoding ( HTAB tuplecid_data,
Snapshot  snapshot,
HeapTuple  htup,
Buffer  buffer,
CommandId cmin,
CommandId cmax 
)

Definition at line 5397 of file reorderbuffer.c.

5401{
5404 ForkNumber forkno;
5405 BlockNumber blockno;
5406 bool updated_mapping = false;
5407
5408 /*
5409 * Return unresolved if tuplecid_data is not valid. That's because when
5410 * streaming in-progress transactions we may run into tuples with the CID
5411 * before actually decoding them. Think e.g. about INSERT followed by
5412 * TRUNCATE, where the TRUNCATE may not be decoded yet when applying the
5413 * INSERT. So in such cases, we assume the CID is from the future
5414 * command.
5415 */
5416 if (tuplecid_data == NULL)
5417 return false;
5418
5419 /* be careful about padding */
5420 memset(&key, 0, sizeof(key));
5421
5422 Assert(!BufferIsLocal(buffer));
5423
5424 /*
5425 * get relfilelocator from the buffer, no convenient way to access it
5426 * other than that.
5427 */
5428 BufferGetTag(buffer, &key.rlocator, &forkno, &blockno);
5429
5430 /* tuples can only be in the main fork */
5431 Assert(forkno == MAIN_FORKNUM);
5432 Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
5433
5434 ItemPointerCopy(&htup->t_self,
5435 &key.tid);
5436
5437restart:
5438 ent = (ReorderBufferTupleCidEnt *)
5440
5441 /*
5442 * failed to find a mapping, check whether the table was rewritten and
5443 * apply mapping if so, but only do that once - there can be no new
5444 * mappings while we are in here since we have to hold a lock on the
5445 * relation.
5446 */
5447 if (ent == NULL && !updated_mapping)
5448 {
5450 /* now check but don't update for a mapping again */
5451 updated_mapping = true;
5452 goto restart;
5453 }
5454 else if (ent == NULL)
5455 return false;
5456
5457 if (cmin)
5458 *cmin = ent->cmin;
5459 if (cmax)
5460 *cmax = ent->cmax;
5461 return true;
5462}
uint32 BlockNumber
Definition: block.h:31
#define BufferIsLocal(buffer)
Definition: buf.h:37
void BufferGetTag(Buffer buffer, RelFileLocator *rlocator, ForkNumber *forknum, BlockNumber *blknum)
Definition: bufmgr.c:4252
static BlockNumber ItemPointerGetBlockNumber(const ItemPointerData *pointer)
Definition: itemptr.h:103
ForkNumber
Definition: relpath.h:56
static void UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
ItemPointerData t_self
Definition: htup.h:65
Oid t_tableOid
Definition: htup.h:66

References Assert(), BufferGetTag(), BufferIsLocal, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, HASH_FIND, hash_search(), ItemPointerCopy(), ItemPointerGetBlockNumber(), sort-test::key, MAIN_FORKNUM, HeapTupleData::t_self, HeapTupleData::t_tableOid, tuplecid_data, and UpdateLogicalMappings().

Referenced by HeapTupleSatisfiesHistoricMVCC().

◆ SetupCheckXidLive()

static void SetupCheckXidLive ( TransactionId  xid)
inlinestatic

Definition at line 2030 of file reorderbuffer.c.

2031{
2032 /*
2033 * If the input transaction id is already set as a CheckXidAlive then
2034 * nothing to do.
2035 */
2037 return;
2038
2039 /*
2040 * setup CheckXidAlive if it's not committed yet. We don't check if the
2041 * xid is aborted. That will happen during catalog access.
2042 */
2043 if (!TransactionIdDidCommit(xid))
2044 CheckXidAlive = xid;
2045 else
2047}
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43

References CheckXidAlive, InvalidTransactionId, TransactionIdDidCommit(), and TransactionIdEquals.

Referenced by ReorderBufferProcessTXN().

◆ StartupReorderBuffer()

void StartupReorderBuffer ( void  )

Definition at line 4777 of file reorderbuffer.c.

4778{
4779 DIR *logical_dir;
4780 struct dirent *logical_de;
4781
4782 logical_dir = AllocateDir(PG_REPLSLOT_DIR);
4783 while ((logical_de = ReadDir(logical_dir, PG_REPLSLOT_DIR)) != NULL)
4784 {
4785 if (strcmp(logical_de->d_name, ".") == 0 ||
4786 strcmp(logical_de->d_name, "..") == 0)
4787 continue;
4788
4789 /* if it cannot be a slot, skip the directory */
4790 if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
4791 continue;
4792
4793 /*
4794 * ok, has to be a surviving logical slot, iterate and delete
4795 * everything starting with xid-*
4796 */
4798 }
4799 FreeDir(logical_dir);
4800}
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2973
bool ReplicationSlotValidateName(const char *name, int elevel)
Definition: slot.c:267

References AllocateDir(), dirent::d_name, DEBUG2, FreeDir(), PG_REPLSLOT_DIR, ReadDir(), ReorderBufferCleanupSerializedTXNs(), and ReplicationSlotValidateName().

Referenced by StartupXLOG().

◆ TransactionIdInArray()

static bool TransactionIdInArray ( TransactionId  xid,
TransactionId xip,
Size  num 
)
static

Definition at line 5296 of file reorderbuffer.c.

5297{
5298 return bsearch(&xid, xip, num,
5299 sizeof(TransactionId), xidComparator) != NULL;
5300}

References xidComparator().

Referenced by UpdateLogicalMappings().

◆ UpdateLogicalMappings()

static void UpdateLogicalMappings ( HTAB tuplecid_data,
Oid  relid,
Snapshot  snapshot 
)
static

Definition at line 5319 of file reorderbuffer.c.

5320{
5321 DIR *mapping_dir;
5322 struct dirent *mapping_de;
5323 List *files = NIL;
5324 ListCell *file;
5325 Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
5326
5327 mapping_dir = AllocateDir(PG_LOGICAL_MAPPINGS_DIR);
5328 while ((mapping_de = ReadDir(mapping_dir, PG_LOGICAL_MAPPINGS_DIR)) != NULL)
5329 {
5330 Oid f_dboid;
5331 Oid f_relid;
5332 TransactionId f_mapped_xid;
5333 TransactionId f_create_xid;
5334 XLogRecPtr f_lsn;
5335 uint32 f_hi,
5336 f_lo;
5338
5339 if (strcmp(mapping_de->d_name, ".") == 0 ||
5340 strcmp(mapping_de->d_name, "..") == 0)
5341 continue;
5342
5343 /* Ignore files that aren't ours */
5344 if (strncmp(mapping_de->d_name, "map-", 4) != 0)
5345 continue;
5346
5347 if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
5348 &f_dboid, &f_relid, &f_hi, &f_lo,
5349 &f_mapped_xid, &f_create_xid) != 6)
5350 elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
5351
5352 f_lsn = ((uint64) f_hi) << 32 | f_lo;
5353
5354 /* mapping for another database */
5355 if (f_dboid != dboid)
5356 continue;
5357
5358 /* mapping for another relation */
5359 if (f_relid != relid)
5360 continue;
5361
5362 /* did the creating transaction abort? */
5363 if (!TransactionIdDidCommit(f_create_xid))
5364 continue;
5365
5366 /* not for our transaction */
5367 if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
5368 continue;
5369
5370 /* ok, relevant, queue for apply */
5371 f = palloc(sizeof(RewriteMappingFile));
5372 f->lsn = f_lsn;
5373 strcpy(f->fname, mapping_de->d_name);
5374 files = lappend(files, f);
5375 }
5376 FreeDir(mapping_dir);
5377
5378 /* sort files so we apply them in LSN order */
5380
5381 foreach(file, files)
5382 {
5384
5385 elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
5386 snapshot->subxip[0]);
5388 pfree(f);
5389 }
5390}
uint64_t uint64
Definition: c.h:503
bool IsSharedRelation(Oid relationId)
Definition: catalog.c:304
#define DEBUG1
Definition: elog.h:30
Oid MyDatabaseId
Definition: globals.c:95
List * lappend(List *list, void *datum)
Definition: list.c:339
void list_sort(List *list, list_sort_comparator cmp)
Definition: list.c:1674
#define NIL
Definition: pg_list.h:68
static int file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
static void ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
static bool TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
Definition: pg_list.h:54
char fname[MAXPGPATH]

References AllocateDir(), ApplyLogicalMappingFile(), dirent::d_name, DEBUG1, elog, ERROR, file_sort_by_lsn(), RewriteMappingFile::fname, FreeDir(), InvalidOid, IsSharedRelation(), lappend(), lfirst, list_sort(), LOGICAL_REWRITE_FORMAT, RewriteMappingFile::lsn, MyDatabaseId, NIL, palloc(), pfree(), PG_LOGICAL_MAPPINGS_DIR, ReadDir(), SnapshotData::subxcnt, SnapshotData::subxip, TransactionIdDidCommit(), TransactionIdInArray(), and tuplecid_data.

Referenced by ResolveCminCmaxDuringDecoding().

Variable Documentation

◆ debug_logical_replication_streaming

int debug_logical_replication_streaming = DEBUG_LOGICAL_REP_STREAMING_BUFFERED

◆ logical_decoding_work_mem

int logical_decoding_work_mem

Definition at line 213 of file reorderbuffer.c.

Referenced by ReorderBufferCheckMemoryLimit().

◆ max_changes_in_memory

const Size max_changes_in_memory = 4096
static

Definition at line 214 of file reorderbuffer.c.

Referenced by ReorderBufferRestoreChanges().