PostgreSQL Source Code git master
reorderbuffer.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/detoast.h"
#include "access/heapam.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "common/int.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/procarray.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/inval.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenumbermap.h"
Include dependency graph for reorderbuffer.c:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTXNByIdEnt
 
struct  ReorderBufferTupleCidKey
 
struct  ReorderBufferTupleCidEnt
 
struct  TXNEntryFile
 
struct  ReorderBufferIterTXNEntry
 
struct  ReorderBufferIterTXNState
 
struct  ReorderBufferToastEnt
 
struct  ReorderBufferDiskChange
 
struct  RewriteMappingFile
 

Macros

#define MAX_DISTR_INVAL_MSG_PER_TXN    ((8 * 1024 * 1024) / sizeof(SharedInvalidationMessage))
 
#define IsSpecInsert(action)
 
#define IsSpecConfirmOrAbort(action)
 
#define IsInsertOrUpdate(action)
 
#define CHANGES_THRESHOLD   100
 

Typedefs

typedef struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
 
typedef struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
 
typedef struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
 
typedef struct TXNEntryFile TXNEntryFile
 
typedef struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
 
typedef struct ReorderBufferIterTXNState ReorderBufferIterTXNState
 
typedef struct ReorderBufferToastEnt ReorderBufferToastEnt
 
typedef struct ReorderBufferDiskChange ReorderBufferDiskChange
 
typedef struct RewriteMappingFile RewriteMappingFile
 

Functions

static ReorderBufferTXNReorderBufferAllocTXN (ReorderBuffer *rb)
 
static void ReorderBufferFreeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferTXNReorderBufferTXNByXid (ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
 
static void ReorderBufferTransferSnapToParent (ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
 
static void AssertTXNLsnOrder (ReorderBuffer *rb)
 
static void ReorderBufferIterTXNInit (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
 
static ReorderBufferChangeReorderBufferIterTXNNext (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferIterTXNFinish (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferExecuteInvalidations (uint32 nmsgs, SharedInvalidationMessage *msgs)
 
static void ReorderBufferCheckMemoryLimit (ReorderBuffer *rb)
 
static void ReorderBufferSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeChange (ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
 
static Size ReorderBufferRestoreChanges (ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
 
static void ReorderBufferRestoreChange (ReorderBuffer *rb, ReorderBufferTXN *txn, char *data)
 
static void ReorderBufferRestoreCleanup (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferTruncateTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
 
static void ReorderBufferMaybeMarkTXNStreamed (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static bool ReorderBufferCheckAndTruncateAbortedTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferCleanupSerializedTXNs (const char *slotname)
 
static void ReorderBufferSerializedPath (char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
 
static int ReorderBufferTXNSizeCompare (const pairingheap_node *a, const pairingheap_node *b, void *arg)
 
static void ReorderBufferFreeSnap (ReorderBuffer *rb, Snapshot snap)
 
static Snapshot ReorderBufferCopySnap (ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
 
static bool ReorderBufferCanStream (ReorderBuffer *rb)
 
static bool ReorderBufferCanStartStreaming (ReorderBuffer *rb)
 
static void ReorderBufferStreamTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferStreamCommit (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastInitHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReset (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReplace (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void ReorderBufferToastAppendChunk (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static Size ReorderBufferChangeSize (ReorderBufferChange *change)
 
static void ReorderBufferChangeMemoryUpdate (ReorderBuffer *rb, ReorderBufferChange *change, ReorderBufferTXN *txn, bool addition, Size sz)
 
ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *rb)
 
ReorderBufferChangeReorderBufferAllocChange (ReorderBuffer *rb)
 
void ReorderBufferFreeChange (ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
 
HeapTuple ReorderBufferAllocTupleBuf (ReorderBuffer *rb, Size tuple_len)
 
void ReorderBufferFreeTupleBuf (HeapTuple tuple)
 
OidReorderBufferAllocRelids (ReorderBuffer *rb, int nrelids)
 
void ReorderBufferFreeRelids (ReorderBuffer *rb, Oid *relids)
 
static void ReorderBufferProcessPartialChange (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
static void AssertChangeLsnOrder (ReorderBufferTXN *txn)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *rb)
 
TransactionId ReorderBufferGetOldestXmin (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *rb, XLogRecPtr ptr)
 
void ReorderBufferAssignChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
static int ReorderBufferIterCompare (Datum a, Datum b, void *arg)
 
static void ReorderBufferCleanupTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferBuildTupleCidHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void SetupCheckXidLive (TransactionId xid)
 
static void ReorderBufferApplyChange (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyTruncate (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyMessage (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferSaveTXNSnapshot (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
 
static void ReorderBufferResetTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
 
static void ReorderBufferProcessTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)
 
static void ReorderBufferReplay (ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
bool ReorderBufferRememberPrepareInfo (ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferSkipPrepare (ReorderBuffer *rb, TransactionId xid)
 
void ReorderBufferPrepare (ReorderBuffer *rb, TransactionId xid, char *gid)
 
void ReorderBufferFinishPrepared (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)
 
void ReorderBufferAbort (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, TimestampTz abort_time)
 
void ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
void ReorderBufferForget (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferInvalidate (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAddSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileLocator locator, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
 
static void ReorderBufferQueueInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
static void ReorderBufferAccumulateInvalidations (SharedInvalidationMessage **invals_out, uint32 *ninvals_out, SharedInvalidationMessage *msgs_new, Size nmsgs_new)
 
void ReorderBufferAddInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferAddDistributedInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
TransactionIdReorderBufferGetCatalogChangesXacts (ReorderBuffer *rb)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *rb, TransactionId xid)
 
static void ReorderBufferSerializeReserve (ReorderBuffer *rb, Size sz)
 
static ReorderBufferTXNReorderBufferLargestTXN (ReorderBuffer *rb)
 
static ReorderBufferTXNReorderBufferLargestStreamableTopTXN (ReorderBuffer *rb)
 
void StartupReorderBuffer (void)
 
static void ApplyLogicalMappingFile (HTAB *tuplecid_data, Oid relid, const char *fname)
 
static bool TransactionIdInArray (TransactionId xid, TransactionId *xip, Size num)
 
static int file_sort_by_lsn (const ListCell *a_p, const ListCell *b_p)
 
static void UpdateLogicalMappings (HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
 
bool ResolveCminCmaxDuringDecoding (HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
 
uint32 ReorderBufferGetInvalidations (ReorderBuffer *rb, TransactionId xid, SharedInvalidationMessage **msgs)
 

Variables

int logical_decoding_work_mem
 
static const Size max_changes_in_memory = 4096
 
int debug_logical_replication_streaming = DEBUG_LOGICAL_REP_STREAMING_BUFFERED
 

Macro Definition Documentation

◆ CHANGES_THRESHOLD

#define CHANGES_THRESHOLD   100

◆ IsInsertOrUpdate

#define IsInsertOrUpdate (   action)
Value:

Definition at line 206 of file reorderbuffer.c.

◆ IsSpecConfirmOrAbort

#define IsSpecConfirmOrAbort (   action)
Value:
( \
)
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
Definition: reorderbuffer.h:61
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT
Definition: reorderbuffer.h:62

Definition at line 201 of file reorderbuffer.c.

◆ IsSpecInsert

#define IsSpecInsert (   action)
Value:

Definition at line 197 of file reorderbuffer.c.

◆ MAX_DISTR_INVAL_MSG_PER_TXN

#define MAX_DISTR_INVAL_MSG_PER_TXN    ((8 * 1024 * 1024) / sizeof(SharedInvalidationMessage))

Definition at line 125 of file reorderbuffer.c.

Typedef Documentation

◆ ReorderBufferDiskChange

◆ ReorderBufferIterTXNEntry

◆ ReorderBufferIterTXNState

◆ ReorderBufferToastEnt

◆ ReorderBufferTupleCidEnt

◆ ReorderBufferTupleCidKey

◆ ReorderBufferTXNByIdEnt

◆ RewriteMappingFile

◆ TXNEntryFile

typedef struct TXNEntryFile TXNEntryFile

Function Documentation

◆ ApplyLogicalMappingFile()

static void ApplyLogicalMappingFile ( HTAB tuplecid_data,
Oid  relid,
const char *  fname 
)
static

Definition at line 5367 of file reorderbuffer.c.

5368{
5369 char path[MAXPGPATH];
5370 int fd;
5371 int readBytes;
5373
5374 sprintf(path, "%s/%s", PG_LOGICAL_MAPPINGS_DIR, fname);
5375 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
5376 if (fd < 0)
5377 ereport(ERROR,
5379 errmsg("could not open file \"%s\": %m", path)));
5380
5381 while (true)
5382 {
5385 ReorderBufferTupleCidEnt *new_ent;
5386 bool found;
5387
5388 /* be careful about padding */
5389 memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
5390
5391 /* read all mappings till the end of the file */
5392 pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ);
5393 readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
5395
5396 if (readBytes < 0)
5397 ereport(ERROR,
5399 errmsg("could not read file \"%s\": %m",
5400 path)));
5401 else if (readBytes == 0) /* EOF */
5402 break;
5403 else if (readBytes != sizeof(LogicalRewriteMappingData))
5404 ereport(ERROR,
5406 errmsg("could not read from file \"%s\": read %d instead of %d bytes",
5407 path, readBytes,
5408 (int32) sizeof(LogicalRewriteMappingData))));
5409
5410 key.rlocator = map.old_locator;
5412 &key.tid);
5413
5414
5415 ent = (ReorderBufferTupleCidEnt *)
5417
5418 /* no existing mapping, no need to update */
5419 if (!ent)
5420 continue;
5421
5422 key.rlocator = map.new_locator;
5424 &key.tid);
5425
5426 new_ent = (ReorderBufferTupleCidEnt *)
5428
5429 if (found)
5430 {
5431 /*
5432 * Make sure the existing mapping makes sense. We sometime update
5433 * old records that did not yet have a cmax (e.g. pg_class' own
5434 * entry while rewriting it) during rewrites, so allow that.
5435 */
5436 Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
5437 Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
5438 }
5439 else
5440 {
5441 /* update mapping */
5442 new_ent->cmin = ent->cmin;
5443 new_ent->cmax = ent->cmax;
5444 new_ent->combocid = ent->combocid;
5445 }
5446 }
5447
5448 if (CloseTransientFile(fd) != 0)
5449 ereport(ERROR,
5451 errmsg("could not close file \"%s\": %m", path)));
5452}
#define InvalidCommandId
Definition: c.h:677
#define PG_BINARY
Definition: c.h:1261
int32_t int32
Definition: c.h:537
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:952
int errcode_for_file_access(void)
Definition: elog.c:886
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:150
int CloseTransientFile(int fd)
Definition: fd.c:2868
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2691
Assert(PointerIsAligned(start, uint64))
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_ENTER
Definition: hsearch.h:114
#define read(a, b, c)
Definition: win32.h:13
static void ItemPointerCopy(const ItemPointerData *fromPointer, ItemPointerData *toPointer)
Definition: itemptr.h:172
#define MAXPGPATH
#define sprintf
Definition: port.h:262
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_LOGICAL_MAPPINGS_DIR
Definition: reorderbuffer.h:23
static HTAB * tuplecid_data
Definition: snapmgr.c:163
ItemPointerData new_tid
Definition: rewriteheap.h:40
RelFileLocator old_locator
Definition: rewriteheap.h:37
ItemPointerData old_tid
Definition: rewriteheap.h:39
RelFileLocator new_locator
Definition: rewriteheap.h:38
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:69
static void pgstat_report_wait_end(void)
Definition: wait_event.h:85

References Assert(), CloseTransientFile(), ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), HASH_ENTER, HASH_FIND, hash_search(), InvalidCommandId, ItemPointerCopy(), sort-test::key, MAXPGPATH, LogicalRewriteMappingData::new_locator, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_locator, LogicalRewriteMappingData::old_tid, OpenTransientFile(), PG_BINARY, PG_LOGICAL_MAPPINGS_DIR, pgstat_report_wait_end(), pgstat_report_wait_start(), read, sprintf, and tuplecid_data.

Referenced by UpdateLogicalMappings().

◆ AssertChangeLsnOrder()

static void AssertChangeLsnOrder ( ReorderBufferTXN txn)
static

Definition at line 1013 of file reorderbuffer.c.

1014{
1015#ifdef USE_ASSERT_CHECKING
1016 dlist_iter iter;
1017 XLogRecPtr prev_lsn = txn->first_lsn;
1018
1019 dlist_foreach(iter, &txn->changes)
1020 {
1021 ReorderBufferChange *cur_change;
1022
1023 cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
1024
1026 Assert(XLogRecPtrIsValid(cur_change->lsn));
1027 Assert(txn->first_lsn <= cur_change->lsn);
1028
1029 if (XLogRecPtrIsValid(txn->end_lsn))
1030 Assert(cur_change->lsn <= txn->end_lsn);
1031
1032 Assert(prev_lsn <= cur_change->lsn);
1033
1034 prev_lsn = cur_change->lsn;
1035 }
1036#endif
1037}
#define dlist_foreach(iter, lhead)
Definition: ilist.h:623
#define dlist_container(type, membername, ptr)
Definition: ilist.h:593
XLogRecPtr first_lsn
XLogRecPtr end_lsn
dlist_head changes
dlist_node * cur
Definition: ilist.h:179
#define XLogRecPtrIsValid(r)
Definition: xlogdefs.h:29
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References Assert(), ReorderBufferTXN::changes, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, ReorderBufferChange::lsn, and XLogRecPtrIsValid.

Referenced by ReorderBufferIterTXNInit().

◆ AssertTXNLsnOrder()

static void AssertTXNLsnOrder ( ReorderBuffer rb)
static

Definition at line 942 of file reorderbuffer.c.

943{
944#ifdef USE_ASSERT_CHECKING
946 dlist_iter iter;
947 XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
948 XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
949
950 /*
951 * Skip the verification if we don't reach the LSN at which we start
952 * decoding the contents of transactions yet because until we reach the
953 * LSN, we could have transactions that don't have the association between
954 * the top-level transaction and subtransaction yet and consequently have
955 * the same LSN. We don't guarantee this association until we try to
956 * decode the actual contents of transaction. The ordering of the records
957 * prior to the start_decoding_at LSN should have been checked before the
958 * restart.
959 */
961 return;
962
964 {
966 iter.cur);
967
968 /* start LSN must be set */
970
971 /* If there is an end LSN, it must be higher than start LSN */
972 if (XLogRecPtrIsValid(cur_txn->end_lsn))
973 Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
974
975 /* Current initial LSN must be strictly higher than previous */
976 if (XLogRecPtrIsValid(prev_first_lsn))
977 Assert(prev_first_lsn < cur_txn->first_lsn);
978
979 /* known-as-subtxn txns must not be listed */
981
982 prev_first_lsn = cur_txn->first_lsn;
983 }
984
986 {
988 base_snapshot_node,
989 iter.cur);
990
991 /* base snapshot (and its LSN) must be set */
992 Assert(cur_txn->base_snapshot != NULL);
994
995 /* current LSN must be strictly higher than previous */
996 if (XLogRecPtrIsValid(prev_base_snap_lsn))
997 Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
998
999 /* known-as-subtxn txns must not be listed */
1000 Assert(!rbtxn_is_known_subxact(cur_txn));
1001
1002 prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
1003 }
1004#endif
1005}
#define rbtxn_is_known_subxact(txn)
bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
Definition: snapbuild.c:304
XLogReaderState * reader
Definition: logical.h:42
struct SnapBuild * snapshot_builder
Definition: logical.h:44
XLogRecPtr base_snapshot_lsn
Snapshot base_snapshot
dlist_head txns_by_base_snapshot_lsn
dlist_head toplevel_by_lsn
void * private_data
XLogRecPtr EndRecPtr
Definition: xlogreader.h:206
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, XLogReaderState::EndRecPtr, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, ReorderBuffer::private_data, rbtxn_is_known_subxact, LogicalDecodingContext::reader, SnapBuildXactNeedsSkip(), LogicalDecodingContext::snapshot_builder, ReorderBuffer::toplevel_by_lsn, ReorderBuffer::txns_by_base_snapshot_lsn, and XLogRecPtrIsValid.

Referenced by ReorderBufferAssignChild(), ReorderBufferGetOldestTXN(), ReorderBufferGetOldestXmin(), ReorderBufferSetBaseSnapshot(), and ReorderBufferTXNByXid().

◆ file_sort_by_lsn()

static int file_sort_by_lsn ( const ListCell a_p,
const ListCell b_p 
)
static

Definition at line 5469 of file reorderbuffer.c.

5470{
5473
5474 return pg_cmp_u64(a->lsn, b->lsn);
5475}
static int pg_cmp_u64(uint64 a, uint64 b)
Definition: int.h:731
int b
Definition: isn.c:74
int a
Definition: isn.c:73
#define lfirst(lc)
Definition: pg_list.h:172

References a, b, lfirst, and pg_cmp_u64().

Referenced by UpdateLogicalMappings().

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
TimestampTz  abort_time 
)

Definition at line 3087 of file reorderbuffer.c.

3089{
3090 ReorderBufferTXN *txn;
3091
3092 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3093 false);
3094
3095 /* unknown, nothing to remove */
3096 if (txn == NULL)
3097 return;
3098
3099 txn->abort_time = abort_time;
3100
3101 /* For streamed transactions notify the remote node about the abort. */
3102 if (rbtxn_is_streamed(txn))
3103 {
3104 rb->stream_abort(rb, txn, lsn);
3105
3106 /*
3107 * We might have decoded changes for this transaction that could load
3108 * the cache as per the current transaction's view (consider DDL's
3109 * happened in this transaction). We don't want the decoding of future
3110 * transactions to use those cache entries so execute only the inval
3111 * messages in this transaction.
3112 */
3113 if (txn->ninvalidations > 0)
3115 txn->invalidations);
3116 }
3117
3118 /* cosmetic... */
3119 txn->final_lsn = lsn;
3120
3121 /* remove potential on-disk data, and deallocate */
3122 ReorderBufferCleanupTXN(rb, txn);
3123}
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_streamed(txn)
SharedInvalidationMessage * invalidations
TimestampTz abort_time
XLogRecPtr final_lsn
ReorderBufferStreamAbortCB stream_abort

References ReorderBufferTXN::abort_time, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), and ReorderBuffer::stream_abort.

Referenced by DecodeAbort().

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer rb,
TransactionId  oldestRunningXid 
)

Definition at line 3133 of file reorderbuffer.c.

3134{
3136
3137 /*
3138 * Iterate through all (potential) toplevel TXNs and abort all that are
3139 * older than what possibly can be running. Once we've found the first
3140 * that is alive we stop, there might be some that acquired an xid earlier
3141 * but started writing later, but it's unlikely and they will be cleaned
3142 * up in a later call to this function.
3143 */
3145 {
3146 ReorderBufferTXN *txn;
3147
3148 txn = dlist_container(ReorderBufferTXN, node, it.cur);
3149
3150 if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
3151 {
3152 elog(DEBUG2, "aborting old transaction %u", txn->xid);
3153
3154 /* Notify the remote node about the crash/immediate restart. */
3155 if (rbtxn_is_streamed(txn))
3156 rb->stream_abort(rb, txn, InvalidXLogRecPtr);
3157
3158 /* remove potential on-disk data, and deallocate this tx */
3159 ReorderBufferCleanupTXN(rb, txn);
3160 }
3161 else
3162 return;
3163 }
3164}
#define DEBUG2
Definition: elog.h:29
#define elog(elevel,...)
Definition: elog.h:226
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:640
TransactionId xid
dlist_node * cur
Definition: ilist.h:200
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.h:263

References dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, elog, InvalidXLogRecPtr, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBuffer::stream_abort, ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), and ReorderBufferTXN::xid.

Referenced by standby_decode().

◆ ReorderBufferAccumulateInvalidations()

static void ReorderBufferAccumulateInvalidations ( SharedInvalidationMessage **  invals_out,
uint32 ninvals_out,
SharedInvalidationMessage msgs_new,
Size  nmsgs_new 
)
static

Definition at line 3506 of file reorderbuffer.c.

3510{
3511 if (*ninvals_out == 0)
3512 {
3513 *ninvals_out = nmsgs_new;
3514 *invals_out = (SharedInvalidationMessage *)
3515 palloc(sizeof(SharedInvalidationMessage) * nmsgs_new);
3516 memcpy(*invals_out, msgs_new, sizeof(SharedInvalidationMessage) * nmsgs_new);
3517 }
3518 else
3519 {
3520 /* Enlarge the array of inval messages */
3521 *invals_out = (SharedInvalidationMessage *)
3522 repalloc(*invals_out, sizeof(SharedInvalidationMessage) *
3523 (*ninvals_out + nmsgs_new));
3524 memcpy(*invals_out + *ninvals_out, msgs_new,
3525 nmsgs_new * sizeof(SharedInvalidationMessage));
3526 *ninvals_out += nmsgs_new;
3527 }
3528}
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1610
void * palloc(Size size)
Definition: mcxt.c:1365

References palloc(), and repalloc().

Referenced by ReorderBufferAddDistributedInvalidations(), and ReorderBufferAddInvalidations().

◆ ReorderBufferAddDistributedInvalidations()

void ReorderBufferAddDistributedInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 3585 of file reorderbuffer.c.

3588{
3589 ReorderBufferTXN *txn;
3590 MemoryContext oldcontext;
3591
3592 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3593
3594 oldcontext = MemoryContextSwitchTo(rb->context);
3595
3596 /*
3597 * Collect all the invalidations under the top transaction, if available,
3598 * so that we can execute them all together. See comments
3599 * ReorderBufferAddInvalidations.
3600 */
3601 txn = rbtxn_get_toptxn(txn);
3602
3603 Assert(nmsgs > 0);
3604
3606 {
3607 /*
3608 * Check the transaction has enough space for storing distributed
3609 * invalidation messages.
3610 */
3612 {
3613 /*
3614 * Mark the invalidation message as overflowed and free up the
3615 * messages accumulated so far.
3616 */
3618
3620 {
3622 txn->invalidations_distributed = NULL;
3624 }
3625 }
3626 else
3629 msgs, nmsgs);
3630 }
3631
3632 /* Queue the invalidation messages into the transaction */
3633 ReorderBufferQueueInvalidations(rb, xid, lsn, nmsgs, msgs);
3634
3635 MemoryContextSwitchTo(oldcontext);
3636}
void pfree(void *pointer)
Definition: mcxt.c:1594
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
static void ReorderBufferAccumulateInvalidations(SharedInvalidationMessage **invals_out, uint32 *ninvals_out, SharedInvalidationMessage *msgs_new, Size nmsgs_new)
#define MAX_DISTR_INVAL_MSG_PER_TXN
static void ReorderBufferQueueInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
#define rbtxn_get_toptxn(txn)
#define RBTXN_DISTR_INVAL_OVERFLOWED
#define rbtxn_distr_inval_overflowed(txn)
uint32 ninvalidations_distributed
SharedInvalidationMessage * invalidations_distributed
MemoryContext context

References Assert(), ReorderBuffer::context, ReorderBufferTXN::invalidations_distributed, MAX_DISTR_INVAL_MSG_PER_TXN, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations_distributed, pfree(), RBTXN_DISTR_INVAL_OVERFLOWED, rbtxn_distr_inval_overflowed, rbtxn_get_toptxn, ReorderBufferAccumulateInvalidations(), ReorderBufferQueueInvalidations(), ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by SnapBuildDistributeSnapshotAndInval().

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 3544 of file reorderbuffer.c.

3547{
3548 ReorderBufferTXN *txn;
3549 MemoryContext oldcontext;
3550
3551 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3552
3553 oldcontext = MemoryContextSwitchTo(rb->context);
3554
3555 /*
3556 * Collect all the invalidations under the top transaction, if available,
3557 * so that we can execute them all together. See comments atop this
3558 * function.
3559 */
3560 txn = rbtxn_get_toptxn(txn);
3561
3562 Assert(nmsgs > 0);
3563
3565 &txn->ninvalidations,
3566 msgs, nmsgs);
3567
3568 ReorderBufferQueueInvalidations(rb, xid, lsn, nmsgs, msgs);
3569
3570 MemoryContextSwitchTo(oldcontext);
3571}

References Assert(), ReorderBuffer::context, ReorderBufferTXN::invalidations, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, rbtxn_get_toptxn, ReorderBufferAccumulateInvalidations(), ReorderBufferQueueInvalidations(), and ReorderBufferTXNByXid().

Referenced by xact_decode().

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)

Definition at line 3357 of file reorderbuffer.c.

3359{
3361
3362 change->data.command_id = cid;
3364
3365 ReorderBufferQueueChange(rb, xid, lsn, change, false);
3366}
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
ReorderBufferChange * ReorderBufferAllocChange(ReorderBuffer *rb)
@ REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID
Definition: reorderbuffer.h:58
ReorderBufferChangeType action
Definition: reorderbuffer.h:81
union ReorderBufferChange::@114 data

References ReorderBufferChange::action, ReorderBufferChange::command_id, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, ReorderBufferAllocChange(), and ReorderBufferQueueChange().

Referenced by SnapBuildProcessNewCid().

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileLocator  locator,
ItemPointerData  tid,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 3456 of file reorderbuffer.c.

3460{
3462 ReorderBufferTXN *txn;
3463
3464 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3465
3466 change->data.tuplecid.locator = locator;
3467 change->data.tuplecid.tid = tid;
3468 change->data.tuplecid.cmin = cmin;
3469 change->data.tuplecid.cmax = cmax;
3470 change->data.tuplecid.combocid = combocid;
3471 change->lsn = lsn;
3472 change->txn = txn;
3474
3475 dlist_push_tail(&txn->tuplecids, &change->node);
3476 txn->ntuplecids++;
3477}
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:364
@ REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID
Definition: reorderbuffer.h:59
struct ReorderBufferChange::@114::@118 tuplecid
ItemPointerData tid
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:84
RelFileLocator locator
dlist_head tuplecids

References ReorderBufferChange::action, ReorderBufferChange::cmax, ReorderBufferChange::cmin, ReorderBufferChange::combocid, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::locator, ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferAllocChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tid, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, and ReorderBufferChange::txn.

Referenced by SnapBuildProcessNewCid().

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

◆ ReorderBufferAllocate()

ReorderBuffer * ReorderBufferAllocate ( void  )

Definition at line 324 of file reorderbuffer.c.

325{
326 ReorderBuffer *buffer;
327 HASHCTL hash_ctl;
328 MemoryContext new_ctx;
329
330 Assert(MyReplicationSlot != NULL);
331
332 /* allocate memory in own context, to have better accountability */
334 "ReorderBuffer",
336
337 buffer =
338 (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
339
340 memset(&hash_ctl, 0, sizeof(hash_ctl));
341
342 buffer->context = new_ctx;
343
344 buffer->change_context = SlabContextCreate(new_ctx,
345 "Change",
347 sizeof(ReorderBufferChange));
348
349 buffer->txn_context = SlabContextCreate(new_ctx,
350 "TXN",
352 sizeof(ReorderBufferTXN));
353
354 /*
355 * To minimize memory fragmentation caused by long-running transactions
356 * with changes spanning multiple memory blocks, we use a single
357 * fixed-size memory block for decoded tuple storage. The performance
358 * testing showed that the default memory block size maintains logical
359 * decoding performance without causing fragmentation due to concurrent
360 * transactions. One might think that we can use the max size as
361 * SLAB_LARGE_BLOCK_SIZE but the test also showed it doesn't help resolve
362 * the memory fragmentation.
363 */
364 buffer->tup_context = GenerationContextCreate(new_ctx,
365 "Tuples",
369
370 hash_ctl.keysize = sizeof(TransactionId);
371 hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
372 hash_ctl.hcxt = buffer->context;
373
374 buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
376
378 buffer->by_txn_last_txn = NULL;
379
380 buffer->outbuf = NULL;
381 buffer->outbufsize = 0;
382 buffer->size = 0;
383
384 /* txn_heap is ordered by transaction size */
386
387 buffer->spillTxns = 0;
388 buffer->spillCount = 0;
389 buffer->spillBytes = 0;
390 buffer->streamTxns = 0;
391 buffer->streamCount = 0;
392 buffer->streamBytes = 0;
393 buffer->memExceededCount = 0;
394 buffer->totalTxns = 0;
395 buffer->totalBytes = 0;
396
398
399 dlist_init(&buffer->toplevel_by_lsn);
401 dclist_init(&buffer->catchange_txns);
402
403 /*
404 * Ensure there's no stale data from prior uses of this slot, in case some
405 * prior exit avoided calling ReorderBufferFree. Failure to do this can
406 * produce duplicated txns, and it's very cheap if there's nothing there.
407 */
409
410 return buffer;
411}
#define NameStr(name)
Definition: c.h:754
uint32 TransactionId
Definition: c.h:660
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:358
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: generation.c:162
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
static void dclist_init(dclist_head *head)
Definition: ilist.h:671
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1229
MemoryContext CurrentMemoryContext
Definition: mcxt.c:160
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:189
pairingheap * pairingheap_allocate(pairingheap_comparator compare, void *arg)
Definition: pairingheap.c:42
static int ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg)
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:322
ReplicationSlot * MyReplicationSlot
Definition: slot.c:148
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
MemoryContext hcxt
Definition: hsearch.h:86
dclist_head catchange_txns
int64 memExceededCount
MemoryContext change_context
ReorderBufferTXN * by_txn_last_txn
TransactionId by_txn_last_xid
MemoryContext tup_context
pairingheap * txn_heap
MemoryContext txn_context
XLogRecPtr current_restart_decoding_lsn
ReplicationSlotPersistentData data
Definition: slot.h:210
#define InvalidTransactionId
Definition: transam.h:31

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert(), ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::catchange_txns, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dclist_init(), dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, ReorderBuffer::memExceededCount, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, pairingheap_allocate(), ReorderBufferCleanupSerializedTXNs(), ReorderBufferTXNSizeCompare(), ReorderBuffer::size, SLAB_DEFAULT_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBuffer::streamBytes, ReorderBuffer::streamCount, ReorderBuffer::streamTxns, ReorderBuffer::toplevel_by_lsn, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBuffer::tup_context, ReorderBuffer::txn_context, ReorderBuffer::txn_heap, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by StartupDecodingContext().

◆ ReorderBufferAllocChange()

◆ ReorderBufferAllocRelids()

Oid * ReorderBufferAllocRelids ( ReorderBuffer rb,
int  nrelids 
)

Definition at line 625 of file reorderbuffer.c.

626{
627 Oid *relids;
628 Size alloc_len;
629
630 alloc_len = sizeof(Oid) * nrelids;
631
632 relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
633
634 return relids;
635}
size_t Size
Definition: c.h:613
unsigned int Oid
Definition: postgres_ext.h:32

References ReorderBuffer::context, and MemoryContextAlloc().

Referenced by DecodeTruncate(), and ReorderBufferRestoreChange().

◆ ReorderBufferAllocTupleBuf()

HeapTuple ReorderBufferAllocTupleBuf ( ReorderBuffer rb,
Size  tuple_len 
)

Definition at line 592 of file reorderbuffer.c.

593{
594 HeapTuple tuple;
595 Size alloc_len;
596
597 alloc_len = tuple_len + SizeofHeapTupleHeader;
598
600 HEAPTUPLESIZE + alloc_len);
601 tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
602
603 return tuple;
604}
#define HEAPTUPLESIZE
Definition: htup.h:73
HeapTupleData * HeapTuple
Definition: htup.h:71
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
#define SizeofHeapTupleHeader
Definition: htup_details.h:185
HeapTupleHeader t_data
Definition: htup.h:68

References HEAPTUPLESIZE, MemoryContextAlloc(), SizeofHeapTupleHeader, HeapTupleData::t_data, and ReorderBuffer::tup_context.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

◆ ReorderBufferAllocTXN()

static ReorderBufferTXN * ReorderBufferAllocTXN ( ReorderBuffer rb)
static

Definition at line 435 of file reorderbuffer.c.

436{
437 ReorderBufferTXN *txn;
438
439 txn = (ReorderBufferTXN *)
441
442 memset(txn, 0, sizeof(ReorderBufferTXN));
443
444 dlist_init(&txn->changes);
445 dlist_init(&txn->tuplecids);
446 dlist_init(&txn->subtxns);
447
448 /* InvalidCommandId is not zero, so set it explicitly */
450 txn->output_plugin_private = NULL;
451
452 return txn;
453}
CommandId command_id
void * output_plugin_private
dlist_head subtxns

References ReorderBufferTXN::changes, ReorderBufferTXN::command_id, dlist_init(), InvalidCommandId, MemoryContextAlloc(), ReorderBufferTXN::output_plugin_private, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, and ReorderBuffer::txn_context.

Referenced by ReorderBufferTXNByXid().

◆ ReorderBufferApplyChange()

static void ReorderBufferApplyChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 2072 of file reorderbuffer.c.

2075{
2076 if (streaming)
2077 rb->stream_change(rb, txn, relation, change);
2078 else
2079 rb->apply_change(rb, txn, relation, change);
2080}
ReorderBufferStreamChangeCB stream_change
ReorderBufferApplyChangeCB apply_change

References ReorderBuffer::apply_change, and ReorderBuffer::stream_change.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferApplyMessage()

static void ReorderBufferApplyMessage ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 2100 of file reorderbuffer.c.

2102{
2103 if (streaming)
2104 rb->stream_message(rb, txn, change->lsn, true,
2105 change->data.msg.prefix,
2106 change->data.msg.message_size,
2107 change->data.msg.message);
2108 else
2109 rb->message(rb, txn, change->lsn, true,
2110 change->data.msg.prefix,
2111 change->data.msg.message_size,
2112 change->data.msg.message);
2113}
struct ReorderBufferChange::@114::@117 msg
ReorderBufferStreamMessageCB stream_message
ReorderBufferMessageCB message

References ReorderBufferChange::data, ReorderBufferChange::lsn, ReorderBufferChange::message, ReorderBuffer::message, ReorderBufferChange::message_size, ReorderBufferChange::msg, ReorderBufferChange::prefix, and ReorderBuffer::stream_message.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferApplyTruncate()

static void ReorderBufferApplyTruncate ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  nrelations,
Relation relations,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 2086 of file reorderbuffer.c.

2089{
2090 if (streaming)
2091 rb->stream_truncate(rb, txn, nrelations, relations, change);
2092 else
2093 rb->apply_truncate(rb, txn, nrelations, relations, change);
2094}
ReorderBufferStreamTruncateCB stream_truncate
ReorderBufferApplyTruncateCB apply_truncate

References ReorderBuffer::apply_truncate, and ReorderBuffer::stream_truncate.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)

Definition at line 1099 of file reorderbuffer.c.

1101{
1102 ReorderBufferTXN *txn;
1103 ReorderBufferTXN *subtxn;
1104 bool new_top;
1105 bool new_sub;
1106
1107 txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1108 subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1109
1110 if (!new_sub)
1111 {
1112 if (rbtxn_is_known_subxact(subtxn))
1113 {
1114 /* already associated, nothing to do */
1115 return;
1116 }
1117 else
1118 {
1119 /*
1120 * We already saw this transaction, but initially added it to the
1121 * list of top-level txns. Now that we know it's not top-level,
1122 * remove it from there.
1123 */
1124 dlist_delete(&subtxn->node);
1125 }
1126 }
1127
1128 subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1129 subtxn->toplevel_xid = xid;
1130 Assert(subtxn->nsubtxns == 0);
1131
1132 /* set the reference to top-level transaction */
1133 subtxn->toptxn = txn;
1134
1135 /* add to subtransaction list */
1136 dlist_push_tail(&txn->subtxns, &subtxn->node);
1137 txn->nsubtxns++;
1138
1139 /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1141
1142 /* Verify LSN-ordering invariant */
1144}
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
#define RBTXN_IS_SUBXACT
TransactionId toplevel_xid
struct ReorderBufferTXN * toptxn

References Assert(), AssertTXNLsnOrder(), dlist_delete(), dlist_push_tail(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SUBXACT, ReorderBufferTransferSnapToParent(), ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, ReorderBufferTXN::toplevel_xid, ReorderBufferTXN::toptxn, and ReorderBufferTXN::txn_flags.

Referenced by LogicalDecodingProcessRecord(), and ReorderBufferCommitChild().

◆ ReorderBufferBuildTupleCidHash()

static void ReorderBufferBuildTupleCidHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1836 of file reorderbuffer.c.

1837{
1838 dlist_iter iter;
1839 HASHCTL hash_ctl;
1840
1842 return;
1843
1844 hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1845 hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1846 hash_ctl.hcxt = rb->context;
1847
1848 /*
1849 * create the hash with the exact number of to-be-stored tuplecids from
1850 * the start
1851 */
1852 txn->tuplecid_hash =
1853 hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1855
1856 dlist_foreach(iter, &txn->tuplecids)
1857 {
1860 bool found;
1861 ReorderBufferChange *change;
1862
1863 change = dlist_container(ReorderBufferChange, node, iter.cur);
1864
1866
1867 /* be careful about padding */
1868 memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1869
1870 key.rlocator = change->data.tuplecid.locator;
1871
1873 &key.tid);
1874
1875 ent = (ReorderBufferTupleCidEnt *)
1876 hash_search(txn->tuplecid_hash, &key, HASH_ENTER, &found);
1877 if (!found)
1878 {
1879 ent->cmin = change->data.tuplecid.cmin;
1880 ent->cmax = change->data.tuplecid.cmax;
1881 ent->combocid = change->data.tuplecid.combocid;
1882 }
1883 else
1884 {
1885 /*
1886 * Maybe we already saw this tuple before in this transaction, but
1887 * if so it must have the same cmin.
1888 */
1889 Assert(ent->cmin == change->data.tuplecid.cmin);
1890
1891 /*
1892 * cmax may be initially invalid, but once set it can only grow,
1893 * and never become invalid again.
1894 */
1895 Assert((ent->cmax == InvalidCommandId) ||
1896 ((change->data.tuplecid.cmax != InvalidCommandId) &&
1897 (change->data.tuplecid.cmax > ent->cmax)));
1898 ent->cmax = change->data.tuplecid.cmax;
1899 }
1900 }
1901}
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
#define rbtxn_has_catalog_changes(txn)

References ReorderBufferChange::action, Assert(), ReorderBufferTupleCidEnt::cmax, ReorderBufferChange::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferChange::cmin, ReorderBufferTupleCidEnt::combocid, ReorderBufferChange::combocid, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, dlist_container, dlist_foreach, dlist_is_empty(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), HASHCTL::hcxt, InvalidCommandId, ItemPointerCopy(), sort-test::key, HASHCTL::keysize, ReorderBufferChange::locator, ReorderBufferTXN::ntuplecids, rbtxn_has_catalog_changes, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferChange::tid, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::tuplecids.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferCanStartStreaming()

static bool ReorderBufferCanStartStreaming ( ReorderBuffer rb)
inlinestatic

Definition at line 4319 of file reorderbuffer.c.

4320{
4322 SnapBuild *builder = ctx->snapshot_builder;
4323
4324 /* We can't start streaming unless a consistent state is reached. */
4326 return false;
4327
4328 /*
4329 * We can't start streaming immediately even if the streaming is enabled
4330 * because we previously decoded this transaction and now just are
4331 * restarting.
4332 */
4333 if (ReorderBufferCanStream(rb) &&
4334 !SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr))
4335 return true;
4336
4337 return false;
4338}
static bool ReorderBufferCanStream(ReorderBuffer *rb)
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
Definition: snapbuild.c:277
@ SNAPBUILD_CONSISTENT
Definition: snapbuild.h:50
XLogRecPtr ReadRecPtr
Definition: xlogreader.h:205

References ReorderBuffer::private_data, LogicalDecodingContext::reader, XLogReaderState::ReadRecPtr, ReorderBufferCanStream(), SNAPBUILD_CONSISTENT, SnapBuildCurrentState(), SnapBuildXactNeedsSkip(), and LogicalDecodingContext::snapshot_builder.

Referenced by ReorderBufferCheckMemoryLimit(), and ReorderBufferProcessPartialChange().

◆ ReorderBufferCanStream()

static bool ReorderBufferCanStream ( ReorderBuffer rb)
inlinestatic

◆ ReorderBufferChangeMemoryUpdate()

static void ReorderBufferChangeMemoryUpdate ( ReorderBuffer rb,
ReorderBufferChange change,
ReorderBufferTXN txn,
bool  addition,
Size  sz 
)
static

Definition at line 3385 of file reorderbuffer.c.

3389{
3390 ReorderBufferTXN *toptxn;
3391
3392 Assert(txn || change);
3393
3394 /*
3395 * Ignore tuple CID changes, because those are not evicted when reaching
3396 * memory limit. So we just don't count them, because it might easily
3397 * trigger a pointless attempt to spill.
3398 */
3399 if (change && change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
3400 return;
3401
3402 if (sz == 0)
3403 return;
3404
3405 if (txn == NULL)
3406 txn = change->txn;
3407 Assert(txn != NULL);
3408
3409 /*
3410 * Update the total size in top level as well. This is later used to
3411 * compute the decoding stats.
3412 */
3413 toptxn = rbtxn_get_toptxn(txn);
3414
3415 if (addition)
3416 {
3417 Size oldsize = txn->size;
3418
3419 txn->size += sz;
3420 rb->size += sz;
3421
3422 /* Update the total size in the top transaction. */
3423 toptxn->total_size += sz;
3424
3425 /* Update the max-heap */
3426 if (oldsize != 0)
3428 pairingheap_add(rb->txn_heap, &txn->txn_node);
3429 }
3430 else
3431 {
3432 Assert((rb->size >= sz) && (txn->size >= sz));
3433 txn->size -= sz;
3434 rb->size -= sz;
3435
3436 /* Update the total size in the top transaction. */
3437 toptxn->total_size -= sz;
3438
3439 /* Update the max-heap */
3441 if (txn->size != 0)
3442 pairingheap_add(rb->txn_heap, &txn->txn_node);
3443 }
3444
3445 Assert(txn->size <= rb->size);
3446}
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:184
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:126
pairingheap_node txn_node

References ReorderBufferChange::action, Assert(), pairingheap_add(), pairingheap_remove(), rbtxn_get_toptxn, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferTXN::size, ReorderBuffer::size, ReorderBufferTXN::total_size, ReorderBufferChange::txn, ReorderBuffer::txn_heap, and ReorderBufferTXN::txn_node.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferFreeChange(), ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferSerializeTXN(), ReorderBufferToastReplace(), and ReorderBufferTruncateTXN().

◆ ReorderBufferChangeSize()

static Size ReorderBufferChangeSize ( ReorderBufferChange change)
static

Definition at line 4462 of file reorderbuffer.c.

4463{
4464 Size sz = sizeof(ReorderBufferChange);
4465
4466 switch (change->action)
4467 {
4468 /* fall through these, they're all similar enough */
4473 {
4474 HeapTuple oldtup,
4475 newtup;
4476 Size oldlen = 0;
4477 Size newlen = 0;
4478
4479 oldtup = change->data.tp.oldtuple;
4480 newtup = change->data.tp.newtuple;
4481
4482 if (oldtup)
4483 {
4484 sz += sizeof(HeapTupleData);
4485 oldlen = oldtup->t_len;
4486 sz += oldlen;
4487 }
4488
4489 if (newtup)
4490 {
4491 sz += sizeof(HeapTupleData);
4492 newlen = newtup->t_len;
4493 sz += newlen;
4494 }
4495
4496 break;
4497 }
4499 {
4500 Size prefix_size = strlen(change->data.msg.prefix) + 1;
4501
4502 sz += prefix_size + change->data.msg.message_size +
4503 sizeof(Size) + sizeof(Size);
4504
4505 break;
4506 }
4508 {
4509 sz += sizeof(SharedInvalidationMessage) *
4510 change->data.inval.ninvalidations;
4511 break;
4512 }
4514 {
4515 Snapshot snap;
4516
4517 snap = change->data.snapshot;
4518
4519 sz += sizeof(SnapshotData) +
4520 sizeof(TransactionId) * snap->xcnt +
4521 sizeof(TransactionId) * snap->subxcnt;
4522
4523 break;
4524 }
4526 {
4527 sz += sizeof(Oid) * change->data.truncate.nrelids;
4528
4529 break;
4530 }
4535 /* ReorderBufferChange contains everything important */
4536 break;
4537 }
4538
4539 return sz;
4540}
struct HeapTupleData HeapTupleData
struct ReorderBufferChange ReorderBufferChange
@ REORDER_BUFFER_CHANGE_INVALIDATION
Definition: reorderbuffer.h:56
@ REORDER_BUFFER_CHANGE_MESSAGE
Definition: reorderbuffer.h:55
@ REORDER_BUFFER_CHANGE_TRUNCATE
Definition: reorderbuffer.h:63
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:54
struct SnapshotData SnapshotData
uint32 t_len
Definition: htup.h:64
struct ReorderBufferChange::@114::@116 truncate
struct ReorderBufferChange::@114::@115 tp
struct ReorderBufferChange::@114::@119 inval
int32 subxcnt
Definition: snapshot.h:177
uint32 xcnt
Definition: snapshot.h:165

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::message_size, ReorderBufferChange::msg, ReorderBufferChange::newtuple, ReorderBufferChange::ninvalidations, ReorderBufferChange::nrelids, ReorderBufferChange::oldtuple, ReorderBufferChange::prefix, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::snapshot, SnapshotData::subxcnt, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, and SnapshotData::xcnt.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferFreeChange(), ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferToastReplace(), and ReorderBufferTruncateTXN().

◆ ReorderBufferCheckAndTruncateAbortedTXN()

static bool ReorderBufferCheckAndTruncateAbortedTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1774 of file reorderbuffer.c.

1775{
1776 /* Quick return for regression tests */
1778 return false;
1779
1780 /*
1781 * Quick return if the transaction status is already known.
1782 */
1783
1784 if (rbtxn_is_committed(txn))
1785 return false;
1786 if (rbtxn_is_aborted(txn))
1787 {
1788 /* Already-aborted transactions should not have any changes */
1789 Assert(txn->size == 0);
1790
1791 return true;
1792 }
1793
1794 /* Otherwise, check the transaction status using CLOG lookup */
1795
1797 return false;
1798
1799 if (TransactionIdDidCommit(txn->xid))
1800 {
1801 /*
1802 * Remember the transaction is committed so that we can skip CLOG
1803 * check next time, avoiding the pressure on CLOG lookup.
1804 */
1805 Assert(!rbtxn_is_aborted(txn));
1807 return false;
1808 }
1809
1810 /*
1811 * The transaction aborted. We discard both the changes collected so far
1812 * and the toast reconstruction data. The full cleanup will happen as part
1813 * of decoding ABORT record of this transaction.
1814 */
1816 ReorderBufferToastReset(rb, txn);
1817
1818 /* All changes should be discarded */
1819 Assert(txn->size == 0);
1820
1821 /*
1822 * Mark the transaction as aborted so we can ignore future changes of this
1823 * transaction.
1824 */
1827
1828 return true;
1829}
#define unlikely(x)
Definition: c.h:407
bool TransactionIdIsInProgress(TransactionId xid)
Definition: procarray.c:1402
int debug_logical_replication_streaming
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define rbtxn_is_committed(txn)
@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE
Definition: reorderbuffer.h:34
#define rbtxn_is_prepared(txn)
#define RBTXN_IS_COMMITTED
#define rbtxn_is_aborted(txn)
#define RBTXN_IS_ABORTED
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:126

References Assert(), DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE, debug_logical_replication_streaming, RBTXN_IS_ABORTED, rbtxn_is_aborted, RBTXN_IS_COMMITTED, rbtxn_is_committed, rbtxn_is_prepared, ReorderBufferToastReset(), ReorderBufferTruncateTXN(), ReorderBufferTXN::size, TransactionIdDidCommit(), TransactionIdIsInProgress(), ReorderBufferTXN::txn_flags, unlikely, and ReorderBufferTXN::xid.

Referenced by ReorderBufferCheckMemoryLimit().

◆ ReorderBufferCheckMemoryLimit()

static void ReorderBufferCheckMemoryLimit ( ReorderBuffer rb)
static

Definition at line 3899 of file reorderbuffer.c.

3900{
3901 ReorderBufferTXN *txn;
3902 bool update_stats = true;
3903
3904 if (rb->size >= logical_decoding_work_mem * (Size) 1024)
3905 {
3906 /*
3907 * Update the statistics as the memory usage has reached the limit. We
3908 * report the statistics update later in this function since we can
3909 * update the slot statistics altogether while streaming or
3910 * serializing transactions in most cases.
3911 */
3912 rb->memExceededCount += 1;
3913 }
3915 {
3916 /*
3917 * Bail out if debug_logical_replication_streaming is buffered and we
3918 * haven't exceeded the memory limit.
3919 */
3920 return;
3921 }
3922
3923 /*
3924 * If debug_logical_replication_streaming is immediate, loop until there's
3925 * no change. Otherwise, loop until we reach under the memory limit. One
3926 * might think that just by evicting the largest (sub)transaction we will
3927 * come under the memory limit based on assumption that the selected
3928 * transaction is at least as large as the most recent change (which
3929 * caused us to go over the memory limit). However, that is not true
3930 * because a user can reduce the logical_decoding_work_mem to a smaller
3931 * value before the most recent change.
3932 */
3933 while (rb->size >= logical_decoding_work_mem * (Size) 1024 ||
3935 rb->size > 0))
3936 {
3937 /*
3938 * Pick the largest non-aborted transaction and evict it from memory
3939 * by streaming, if possible. Otherwise, spill to disk.
3940 */
3942 (txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
3943 {
3944 /* we know there has to be one, because the size is not zero */
3945 Assert(txn && rbtxn_is_toptxn(txn));
3946 Assert(txn->total_size > 0);
3947 Assert(rb->size >= txn->total_size);
3948
3949 /* skip the transaction if aborted */
3951 continue;
3952
3953 ReorderBufferStreamTXN(rb, txn);
3954 }
3955 else
3956 {
3957 /*
3958 * Pick the largest transaction (or subtransaction) and evict it
3959 * from memory by serializing it to disk.
3960 */
3961 txn = ReorderBufferLargestTXN(rb);
3962
3963 /* we know there has to be one, because the size is not zero */
3964 Assert(txn);
3965 Assert(txn->size > 0);
3966 Assert(rb->size >= txn->size);
3967
3968 /* skip the transaction if aborted */
3970 continue;
3971
3973 }
3974
3975 /*
3976 * After eviction, the transaction should have no entries in memory,
3977 * and should use 0 bytes for changes.
3978 */
3979 Assert(txn->size == 0);
3980 Assert(txn->nentries_mem == 0);
3981
3982 /*
3983 * We've reported the memExceededCount update while streaming or
3984 * serializing the transaction.
3985 */
3986 update_stats = false;
3987 }
3988
3989 if (update_stats)
3991
3992 /* We must be under the memory limit now. */
3993 Assert(rb->size < logical_decoding_work_mem * (Size) 1024);
3994}
void UpdateDecodingStats(LogicalDecodingContext *ctx)
Definition: logical.c:1952
static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb)
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)
int logical_decoding_work_mem
static ReorderBufferTXN * ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
static bool ReorderBufferCheckAndTruncateAbortedTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
@ DEBUG_LOGICAL_REP_STREAMING_BUFFERED
Definition: reorderbuffer.h:33
#define rbtxn_is_toptxn(txn)

References Assert(), DEBUG_LOGICAL_REP_STREAMING_BUFFERED, DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE, debug_logical_replication_streaming, logical_decoding_work_mem, ReorderBuffer::memExceededCount, ReorderBufferTXN::nentries_mem, ReorderBuffer::private_data, rbtxn_is_toptxn, ReorderBufferCanStartStreaming(), ReorderBufferCheckAndTruncateAbortedTXN(), ReorderBufferLargestStreamableTopTXN(), ReorderBufferLargestTXN(), ReorderBufferSerializeTXN(), ReorderBufferStreamTXN(), ReorderBufferTXN::size, ReorderBuffer::size, ReorderBufferTXN::total_size, and UpdateDecodingStats().

Referenced by ReorderBufferQueueChange().

◆ ReorderBufferCleanupSerializedTXNs()

static void ReorderBufferCleanupSerializedTXNs ( const char *  slotname)
static

Definition at line 4887 of file reorderbuffer.c.

4888{
4889 DIR *spill_dir;
4890 struct dirent *spill_de;
4891 struct stat statbuf;
4892 char path[MAXPGPATH * 2 + sizeof(PG_REPLSLOT_DIR)];
4893
4894 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, slotname);
4895
4896 /* we're only handling directories here, skip if it's not ours */
4897 if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
4898 return;
4899
4900 spill_dir = AllocateDir(path);
4901 while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
4902 {
4903 /* only look at names that can be ours */
4904 if (strncmp(spill_de->d_name, "xid", 3) == 0)
4905 {
4906 snprintf(path, sizeof(path),
4907 "%s/%s/%s", PG_REPLSLOT_DIR, slotname,
4908 spill_de->d_name);
4909
4910 if (unlink(path) != 0)
4911 ereport(ERROR,
4913 errmsg("could not remove file \"%s\" during removal of %s/%s/xid*: %m",
4914 path, PG_REPLSLOT_DIR, slotname)));
4915 }
4916 }
4917 FreeDir(spill_dir);
4918}
#define INFO
Definition: elog.h:34
int FreeDir(DIR *dir)
Definition: fd.c:3022
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2985
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2904
#define snprintf
Definition: port.h:260
#define PG_REPLSLOT_DIR
Definition: slot.h:21
Definition: dirent.c:26
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15
#define lstat(path, sb)
Definition: win32_port.h:275
#define S_ISDIR(m)
Definition: win32_port.h:315

References AllocateDir(), dirent::d_name, ereport, errcode_for_file_access(), errmsg(), ERROR, FreeDir(), INFO, lstat, MAXPGPATH, PG_REPLSLOT_DIR, ReadDirExtended(), S_ISDIR, snprintf, sprintf, and stat::st_mode.

Referenced by ReorderBufferAllocate(), ReorderBufferFree(), and StartupReorderBuffer().

◆ ReorderBufferCleanupTXN()

static void ReorderBufferCleanupTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1535 of file reorderbuffer.c.

1536{
1537 bool found;
1538 dlist_mutable_iter iter;
1539 Size mem_freed = 0;
1540
1541 /* cleanup subtransactions & their changes */
1542 dlist_foreach_modify(iter, &txn->subtxns)
1543 {
1544 ReorderBufferTXN *subtxn;
1545
1546 subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1547
1548 /*
1549 * Subtransactions are always associated to the toplevel TXN, even if
1550 * they originally were happening inside another subtxn, so we won't
1551 * ever recurse more than one level deep here.
1552 */
1554 Assert(subtxn->nsubtxns == 0);
1555
1556 ReorderBufferCleanupTXN(rb, subtxn);
1557 }
1558
1559 /* cleanup changes in the txn */
1560 dlist_foreach_modify(iter, &txn->changes)
1561 {
1562 ReorderBufferChange *change;
1563
1564 change = dlist_container(ReorderBufferChange, node, iter.cur);
1565
1566 /* Check we're not mixing changes from different transactions. */
1567 Assert(change->txn == txn);
1568
1569 /*
1570 * Instead of updating the memory counter for individual changes, we
1571 * sum up the size of memory to free so we can update the memory
1572 * counter all together below. This saves costs of maintaining the
1573 * max-heap.
1574 */
1575 mem_freed += ReorderBufferChangeSize(change);
1576
1577 ReorderBufferFreeChange(rb, change, false);
1578 }
1579
1580 /* Update the memory counter */
1581 ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
1582
1583 /*
1584 * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1585 * They are always stored in the toplevel transaction.
1586 */
1587 dlist_foreach_modify(iter, &txn->tuplecids)
1588 {
1589 ReorderBufferChange *change;
1590
1591 change = dlist_container(ReorderBufferChange, node, iter.cur);
1592
1593 /* Check we're not mixing changes from different transactions. */
1594 Assert(change->txn == txn);
1596
1597 ReorderBufferFreeChange(rb, change, true);
1598 }
1599
1600 /*
1601 * Cleanup the base snapshot, if set.
1602 */
1603 if (txn->base_snapshot != NULL)
1604 {
1607 }
1608
1609 /*
1610 * Cleanup the snapshot for the last streamed run.
1611 */
1612 if (txn->snapshot_now != NULL)
1613 {
1616 }
1617
1618 /*
1619 * Remove TXN from its containing lists.
1620 *
1621 * Note: if txn is known as subxact, we are deleting the TXN from its
1622 * parent's list of known subxacts; this leaves the parent's nsubxacts
1623 * count too high, but we don't care. Otherwise, we are deleting the TXN
1624 * from the LSN-ordered list of toplevel TXNs. We remove the TXN from the
1625 * list of catalog modifying transactions as well.
1626 */
1627 dlist_delete(&txn->node);
1630
1631 /* now remove reference from buffer */
1632 hash_search(rb->by_txn, &txn->xid, HASH_REMOVE, &found);
1633 Assert(found);
1634
1635 /* remove entries spilled to disk */
1636 if (rbtxn_is_serialized(txn))
1638
1639 /* deallocate */
1640 ReorderBufferFreeTXN(rb, txn);
1641}
@ HASH_REMOVE
Definition: hsearch.h:115
static void dclist_delete_from(dclist_head *head, dlist_node *node)
Definition: ilist.h:763
void ReorderBufferFreeChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
static void ReorderBufferFreeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, ReorderBufferTXN *txn, bool addition, Size sz)
#define rbtxn_is_serialized(txn)
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:328
Snapshot snapshot_now
dlist_node catchange_node
dlist_node base_snapshot_node

References ReorderBufferChange::action, Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_node, ReorderBuffer::by_txn, ReorderBufferTXN::catchange_node, ReorderBuffer::catchange_txns, ReorderBufferTXN::changes, dlist_mutable_iter::cur, dclist_delete_from(), dlist_container, dlist_delete(), dlist_foreach_modify, HASH_REMOVE, hash_search(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_has_catalog_changes, rbtxn_is_known_subxact, rbtxn_is_serialized, rbtxn_is_streamed, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferCleanupTXN(), ReorderBufferFreeChange(), ReorderBufferFreeSnap(), ReorderBufferFreeTXN(), ReorderBufferRestoreCleanup(), SnapBuildSnapDecRefcount(), ReorderBufferTXN::snapshot_now, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAbortOld(), ReorderBufferCleanupTXN(), ReorderBufferFinishPrepared(), ReorderBufferForget(), ReorderBufferProcessTXN(), ReorderBufferReplay(), and ReorderBufferStreamCommit().

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2884 of file reorderbuffer.c.

2888{
2889 ReorderBufferTXN *txn;
2890
2891 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2892 false);
2893
2894 /* unknown transaction, nothing to replay */
2895 if (txn == NULL)
2896 return;
2897
2898 ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2899 origin_id, origin_lsn);
2900}
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)

References InvalidXLogRecPtr, ReorderBufferReplay(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 1219 of file reorderbuffer.c.

1222{
1223 ReorderBufferTXN *subtxn;
1224
1225 subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1226 InvalidXLogRecPtr, false);
1227
1228 /*
1229 * No need to do anything if that subtxn didn't contain any changes
1230 */
1231 if (!subtxn)
1232 return;
1233
1234 subtxn->final_lsn = commit_lsn;
1235 subtxn->end_lsn = end_lsn;
1236
1237 /*
1238 * Assign this subxact as a child of the toplevel xact (no-op if already
1239 * done.)
1240 */
1242}
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferAssignChild(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit(), and DecodePrepare().

◆ ReorderBufferCopySnap()

static Snapshot ReorderBufferCopySnap ( ReorderBuffer rb,
Snapshot  orig_snap,
ReorderBufferTXN txn,
CommandId  cid 
)
static

Definition at line 1909 of file reorderbuffer.c.

1911{
1912 Snapshot snap;
1913 dlist_iter iter;
1914 int i = 0;
1915 Size size;
1916
1917 size = sizeof(SnapshotData) +
1918 sizeof(TransactionId) * orig_snap->xcnt +
1919 sizeof(TransactionId) * (txn->nsubtxns + 1);
1920
1921 snap = MemoryContextAllocZero(rb->context, size);
1922 memcpy(snap, orig_snap, sizeof(SnapshotData));
1923
1924 snap->copied = true;
1925 snap->active_count = 1; /* mark as active so nobody frees it */
1926 snap->regd_count = 0;
1927 snap->xip = (TransactionId *) (snap + 1);
1928
1929 memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1930
1931 /*
1932 * snap->subxip contains all txids that belong to our transaction which we
1933 * need to check via cmin/cmax. That's why we store the toplevel
1934 * transaction in there as well.
1935 */
1936 snap->subxip = snap->xip + snap->xcnt;
1937 snap->subxip[i++] = txn->xid;
1938
1939 /*
1940 * txn->nsubtxns isn't decreased when subtransactions abort, so count
1941 * manually. Since it's an upper boundary it is safe to use it for the
1942 * allocation above.
1943 */
1944 snap->subxcnt = 1;
1945
1946 dlist_foreach(iter, &txn->subtxns)
1947 {
1948 ReorderBufferTXN *sub_txn;
1949
1950 sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1951 snap->subxip[i++] = sub_txn->xid;
1952 snap->subxcnt++;
1953 }
1954
1955 /* sort so we can bsearch() later */
1956 qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1957
1958 /* store the specified current CommandId */
1959 snap->curcid = cid;
1960
1961 return snap;
1962}
int i
Definition: isn.c:77
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1263
#define qsort(a, b, c, d)
Definition: port.h:500
bool copied
Definition: snapshot.h:181
uint32 regd_count
Definition: snapshot.h:201
uint32 active_count
Definition: snapshot.h:200
CommandId curcid
Definition: snapshot.h:183
TransactionId * subxip
Definition: snapshot.h:176
TransactionId * xip
Definition: snapshot.h:164
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:152

References SnapshotData::active_count, ReorderBuffer::context, SnapshotData::copied, dlist_iter::cur, SnapshotData::curcid, dlist_container, dlist_foreach, i, MemoryContextAllocZero(), ReorderBufferTXN::nsubtxns, qsort, SnapshotData::regd_count, ReorderBufferTXN::subtxns, SnapshotData::subxcnt, SnapshotData::subxip, SnapshotData::xcnt, ReorderBufferTXN::xid, xidComparator(), and SnapshotData::xip.

Referenced by ReorderBufferProcessTXN(), ReorderBufferSaveTXNSnapshot(), and ReorderBufferStreamTXN().

◆ ReorderBufferExecuteInvalidations()

static void ReorderBufferExecuteInvalidations ( uint32  nmsgs,
SharedInvalidationMessage msgs 
)
static

Definition at line 3643 of file reorderbuffer.c.

3644{
3645 int i;
3646
3647 for (i = 0; i < nmsgs; i++)
3649}
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:823

References i, and LocalExecuteInvalidationMessage().

Referenced by ReorderBufferFinishPrepared(), and ReorderBufferProcessTXN().

◆ ReorderBufferFinishPrepared()

void ReorderBufferFinishPrepared ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
XLogRecPtr  two_phase_at,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn,
char *  gid,
bool  is_commit 
)

Definition at line 3001 of file reorderbuffer.c.

3006{
3007 ReorderBufferTXN *txn;
3008 XLogRecPtr prepare_end_lsn;
3009 TimestampTz prepare_time;
3010
3011 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
3012
3013 /* unknown transaction, nothing to do */
3014 if (txn == NULL)
3015 return;
3016
3017 /*
3018 * By this time the txn has the prepare record information, remember it to
3019 * be later used for rollback.
3020 */
3021 prepare_end_lsn = txn->end_lsn;
3022 prepare_time = txn->prepare_time;
3023
3024 /* add the gid in the txn */
3025 txn->gid = pstrdup(gid);
3026
3027 /*
3028 * It is possible that this transaction is not decoded at prepare time
3029 * either because by that time we didn't have a consistent snapshot, or
3030 * two_phase was not enabled, or it was decoded earlier but we have
3031 * restarted. We only need to send the prepare if it was not decoded
3032 * earlier. We don't need to decode the xact for aborts if it is not done
3033 * already.
3034 */
3035 if ((txn->final_lsn < two_phase_at) && is_commit)
3036 {
3037 /*
3038 * txn must have been marked as a prepared transaction and skipped but
3039 * not sent a prepare. Also, the prepare info must have been updated
3040 * in txn even if we skip prepare.
3041 */
3045
3046 /*
3047 * By this time the txn has the prepare record information and it is
3048 * important to use that so that downstream gets the accurate
3049 * information. If instead, we have passed commit information here
3050 * then downstream can behave as it has already replayed commit
3051 * prepared after the restart.
3052 */
3053 ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
3054 txn->prepare_time, txn->origin_id, txn->origin_lsn);
3055 }
3056
3057 txn->final_lsn = commit_lsn;
3058 txn->end_lsn = end_lsn;
3059 txn->commit_time = commit_time;
3060 txn->origin_id = origin_id;
3061 txn->origin_lsn = origin_lsn;
3062
3063 if (is_commit)
3064 rb->commit_prepared(rb, txn, commit_lsn);
3065 else
3066 rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
3067
3068 /* cleanup: make sure there's no cache pollution */
3070 txn->invalidations);
3071 ReorderBufferCleanupTXN(rb, txn);
3072}
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1759
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
#define RBTXN_PREPARE_STATUS_MASK
#define RBTXN_IS_PREPARED
#define RBTXN_SKIPPED_PREPARE
TimestampTz commit_time
RepOriginId origin_id
XLogRecPtr origin_lsn
TimestampTz prepare_time
ReorderBufferCommitPreparedCB commit_prepared
ReorderBufferRollbackPreparedCB rollback_prepared

References Assert(), ReorderBuffer::commit_prepared, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, ReorderBufferTXN::invalidations, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, pstrdup(), RBTXN_IS_PREPARED, RBTXN_PREPARE_STATUS_MASK, RBTXN_SKIPPED_PREPARE, ReorderBufferCleanupTXN(), ReorderBufferExecuteInvalidations(), ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBuffer::rollback_prepared, ReorderBufferTXN::txn_flags, and XLogRecPtrIsValid.

Referenced by DecodeAbort(), and DecodeCommit().

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3180 of file reorderbuffer.c.

3181{
3182 ReorderBufferTXN *txn;
3183
3184 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3185 false);
3186
3187 /* unknown, nothing to forget */
3188 if (txn == NULL)
3189 return;
3190
3191 /* this transaction mustn't be streamed */
3193
3194 /* cosmetic... */
3195 txn->final_lsn = lsn;
3196
3197 /*
3198 * Process only cache invalidation messages in this transaction if there
3199 * are any. Even if we're not interested in the transaction's contents, it
3200 * could have manipulated the catalog and we need to update the caches
3201 * according to that.
3202 */
3203 if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3205 txn->invalidations);
3206 else
3207 Assert(txn->ninvalidations == 0);
3208
3209 /* remove potential on-disk data, and deallocate */
3210 ReorderBufferCleanupTXN(rb, txn);
3211}

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer rb)

Definition at line 417 of file reorderbuffer.c.

418{
419 MemoryContext context = rb->context;
420
421 /*
422 * We free separately allocated data by entirely scrapping reorderbuffer's
423 * memory context.
424 */
425 MemoryContextDelete(context);
426
427 /* Free disk space used by unconsumed reorder buffers */
429}
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:469

References ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

◆ ReorderBufferFreeChange()

void ReorderBufferFreeChange ( ReorderBuffer rb,
ReorderBufferChange change,
bool  upd_mem 
)

Definition at line 522 of file reorderbuffer.c.

524{
525 /* update memory accounting info */
526 if (upd_mem)
527 ReorderBufferChangeMemoryUpdate(rb, change, NULL, false,
529
530 /* free contained data */
531 switch (change->action)
532 {
537 if (change->data.tp.newtuple)
538 {
540 change->data.tp.newtuple = NULL;
541 }
542
543 if (change->data.tp.oldtuple)
544 {
546 change->data.tp.oldtuple = NULL;
547 }
548 break;
550 if (change->data.msg.prefix != NULL)
551 pfree(change->data.msg.prefix);
552 change->data.msg.prefix = NULL;
553 if (change->data.msg.message != NULL)
554 pfree(change->data.msg.message);
555 change->data.msg.message = NULL;
556 break;
558 if (change->data.inval.invalidations)
559 pfree(change->data.inval.invalidations);
560 change->data.inval.invalidations = NULL;
561 break;
563 if (change->data.snapshot)
564 {
566 change->data.snapshot = NULL;
567 }
568 break;
569 /* no data in addition to the struct itself */
571 if (change->data.truncate.relids != NULL)
572 {
574 change->data.truncate.relids = NULL;
575 }
576 break;
581 break;
582 }
583
584 pfree(change);
585}
void ReorderBufferFreeRelids(ReorderBuffer *rb, Oid *relids)
void ReorderBufferFreeTupleBuf(HeapTuple tuple)
SharedInvalidationMessage * invalidations

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::invalidations, ReorderBufferChange::message, ReorderBufferChange::msg, ReorderBufferChange::newtuple, ReorderBufferChange::oldtuple, pfree(), ReorderBufferChange::prefix, ReorderBufferChange::relids, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferFreeRelids(), ReorderBufferFreeSnap(), ReorderBufferFreeTupleBuf(), ReorderBufferChange::snapshot, ReorderBufferChange::tp, and ReorderBufferChange::truncate.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferProcessTXN(), ReorderBufferQueueChange(), ReorderBufferResetTXN(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferToastReset(), and ReorderBufferTruncateTXN().

◆ ReorderBufferFreeRelids()

void ReorderBufferFreeRelids ( ReorderBuffer rb,
Oid relids 
)

Definition at line 641 of file reorderbuffer.c.

642{
643 pfree(relids);
644}

References pfree().

Referenced by ReorderBufferFreeChange().

◆ ReorderBufferFreeSnap()

static void ReorderBufferFreeSnap ( ReorderBuffer rb,
Snapshot  snap 
)
static

Definition at line 1968 of file reorderbuffer.c.

1969{
1970 if (snap->copied)
1971 pfree(snap);
1972 else
1974}

References SnapshotData::copied, pfree(), and SnapBuildSnapDecRefcount().

Referenced by ReorderBufferCleanupTXN(), ReorderBufferFreeChange(), ReorderBufferProcessTXN(), and ReorderBufferStreamTXN().

◆ ReorderBufferFreeTupleBuf()

void ReorderBufferFreeTupleBuf ( HeapTuple  tuple)

Definition at line 610 of file reorderbuffer.c.

611{
612 pfree(tuple);
613}

References pfree().

Referenced by ReorderBufferFreeChange().

◆ ReorderBufferFreeTXN()

static void ReorderBufferFreeTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 459 of file reorderbuffer.c.

460{
461 /* clean the lookup cache if we were cached (quite likely) */
462 if (rb->by_txn_last_xid == txn->xid)
463 {
465 rb->by_txn_last_txn = NULL;
466 }
467
468 /* free data that's contained */
469
470 if (txn->gid != NULL)
471 {
472 pfree(txn->gid);
473 txn->gid = NULL;
474 }
475
476 if (txn->tuplecid_hash != NULL)
477 {
479 txn->tuplecid_hash = NULL;
480 }
481
482 if (txn->invalidations)
483 {
484 pfree(txn->invalidations);
485 txn->invalidations = NULL;
486 }
487
489 {
491 txn->invalidations_distributed = NULL;
492 }
493
494 /* Reset the toast hash */
496
497 /* All changes must be deallocated */
498 Assert(txn->size == 0);
499
500 pfree(txn);
501}
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:865

References Assert(), ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBufferTXN::gid, hash_destroy(), ReorderBufferTXN::invalidations, ReorderBufferTXN::invalidations_distributed, InvalidTransactionId, pfree(), ReorderBufferToastReset(), ReorderBufferTXN::size, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::xid.

Referenced by ReorderBufferCleanupTXN().

◆ ReorderBufferGetCatalogChangesXacts()

TransactionId * ReorderBufferGetCatalogChangesXacts ( ReorderBuffer rb)

Definition at line 3693 of file reorderbuffer.c.

3694{
3695 dlist_iter iter;
3696 TransactionId *xids = NULL;
3697 size_t xcnt = 0;
3698
3699 /* Quick return if the list is empty */
3700 if (dclist_count(&rb->catchange_txns) == 0)
3701 return NULL;
3702
3703 /* Initialize XID array */
3704 xids = (TransactionId *) palloc(sizeof(TransactionId) *
3706 dclist_foreach(iter, &rb->catchange_txns)
3707 {
3709 catchange_node,
3710 iter.cur);
3711
3713
3714 xids[xcnt++] = txn->xid;
3715 }
3716
3717 qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
3718
3719 Assert(xcnt == dclist_count(&rb->catchange_txns));
3720 return xids;
3721}
#define dclist_container(type, membername, ptr)
Definition: ilist.h:947
static uint32 dclist_count(const dclist_head *head)
Definition: ilist.h:932
#define dclist_foreach(iter, lhead)
Definition: ilist.h:970

References Assert(), ReorderBuffer::catchange_txns, dlist_iter::cur, dclist_container, dclist_count(), dclist_foreach, palloc(), qsort, rbtxn_has_catalog_changes, ReorderBufferTXN::xid, and xidComparator().

Referenced by SnapBuildSerialize().

◆ ReorderBufferGetInvalidations()

uint32 ReorderBufferGetInvalidations ( ReorderBuffer rb,
TransactionId  xid,
SharedInvalidationMessage **  msgs 
)

Definition at line 5634 of file reorderbuffer.c.

5636{
5637 ReorderBufferTXN *txn;
5638
5639 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
5640 false);
5641
5642 if (txn == NULL)
5643 return 0;
5644
5645 *msgs = txn->invalidations;
5646
5647 return txn->ninvalidations;
5648}

References ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, and ReorderBufferTXNByXid().

Referenced by SnapBuildDistributeSnapshotAndInval().

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN * ReorderBufferGetOldestTXN ( ReorderBuffer rb)

Definition at line 1044 of file reorderbuffer.c.

1045{
1046 ReorderBufferTXN *txn;
1047
1049
1051 return NULL;
1052
1054
1057 return txn;
1058}
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:603

References Assert(), AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, rbtxn_is_known_subxact, ReorderBuffer::toplevel_by_lsn, and XLogRecPtrIsValid.

Referenced by SnapBuildProcessRunningXacts().

◆ ReorderBufferGetOldestXmin()

TransactionId ReorderBufferGetOldestXmin ( ReorderBuffer rb)

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer rb,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 3253 of file reorderbuffer.c.

3255{
3256 bool use_subtxn = IsTransactionOrTransactionBlock();
3259 int i;
3260
3261 if (use_subtxn)
3263
3264 /*
3265 * Force invalidations to happen outside of a valid transaction - that way
3266 * entries will just be marked as invalid without accessing the catalog.
3267 * That's advantageous because we don't need to setup the full state
3268 * necessary for catalog access.
3269 */
3270 if (use_subtxn)
3272
3273 for (i = 0; i < ninvalidations; i++)
3274 LocalExecuteInvalidationMessage(&invalidations[i]);
3275
3276 if (use_subtxn)
3277 {
3280 CurrentResourceOwner = cowner;
3281 }
3282}
ResourceOwner CurrentResourceOwner
Definition: resowner.c:173
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:5007
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4712
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4814
void AbortCurrentTransaction(void)
Definition: xact.c:3469

References AbortCurrentTransaction(), BeginInternalSubTransaction(), CurrentMemoryContext, CurrentResourceOwner, i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), MemoryContextSwitchTo(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by ReorderBufferAbort(), ReorderBufferForget(), ReorderBufferInvalidate(), and xact_decode().

◆ ReorderBufferInvalidate()

void ReorderBufferInvalidate ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3222 of file reorderbuffer.c.

3223{
3224 ReorderBufferTXN *txn;
3225
3226 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3227 false);
3228
3229 /* unknown, nothing to do */
3230 if (txn == NULL)
3231 return;
3232
3233 /*
3234 * Process cache invalidation messages if there are any. Even if we're not
3235 * interested in the transaction's contents, it could have manipulated the
3236 * catalog and we need to update the caches according to that.
3237 */
3238 if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3240 txn->invalidations);
3241 else
3242 Assert(txn->ninvalidations == 0);
3243}

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodePrepare().

◆ ReorderBufferIterCompare()

static int ReorderBufferIterCompare ( Datum  a,
Datum  b,
void *  arg 
)
static

Definition at line 1261 of file reorderbuffer.c.

1262{
1264 XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
1265 XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
1266
1267 if (pos_a < pos_b)
1268 return 1;
1269 else if (pos_a == pos_b)
1270 return 0;
1271 return -1;
1272}
void * arg
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:212
Definition: regguts.h:323

References a, arg, b, and DatumGetInt32().

Referenced by ReorderBufferIterTXNInit().

◆ ReorderBufferIterTXNFinish()

static void ReorderBufferIterTXNFinish ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1504 of file reorderbuffer.c.

1506{
1507 int32 off;
1508
1509 for (off = 0; off < state->nr_txns; off++)
1510 {
1511 if (state->entries[off].file.vfd != -1)
1512 FileClose(state->entries[off].file.vfd);
1513 }
1514
1515 /* free memory we might have "leaked" in the last *Next call */
1516 if (!dlist_is_empty(&state->old_change))
1517 {
1518 ReorderBufferChange *change;
1519
1520 change = dlist_container(ReorderBufferChange, node,
1521 dlist_pop_head_node(&state->old_change));
1522 ReorderBufferFreeChange(rb, change, true);
1523 Assert(dlist_is_empty(&state->old_change));
1524 }
1525
1526 binaryheap_free(state->heap);
1527 pfree(state);
1528}
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:75
void FileClose(File file)
Definition: fd.c:1979
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:450

References Assert(), binaryheap_free(), dlist_container, dlist_is_empty(), dlist_pop_head_node(), FileClose(), pfree(), and ReorderBufferFreeChange().

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferIterTXNInit()

static void ReorderBufferIterTXNInit ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferIterTXNState *volatile *  iter_state 
)
static

Definition at line 1284 of file reorderbuffer.c.

1286{
1287 Size nr_txns = 0;
1289 dlist_iter cur_txn_i;
1290 int32 off;
1291
1292 *iter_state = NULL;
1293
1294 /* Check ordering of changes in the toplevel transaction. */
1296
1297 /*
1298 * Calculate the size of our heap: one element for every transaction that
1299 * contains changes. (Besides the transactions already in the reorder
1300 * buffer, we count the one we were directly passed.)
1301 */
1302 if (txn->nentries > 0)
1303 nr_txns++;
1304
1305 dlist_foreach(cur_txn_i, &txn->subtxns)
1306 {
1307 ReorderBufferTXN *cur_txn;
1308
1309 cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1310
1311 /* Check ordering of changes in this subtransaction. */
1312 AssertChangeLsnOrder(cur_txn);
1313
1314 if (cur_txn->nentries > 0)
1315 nr_txns++;
1316 }
1317
1318 /* allocate iteration state */
1322 sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1323
1324 state->nr_txns = nr_txns;
1325 dlist_init(&state->old_change);
1326
1327 for (off = 0; off < state->nr_txns; off++)
1328 {
1329 state->entries[off].file.vfd = -1;
1330 state->entries[off].segno = 0;
1331 }
1332
1333 /* allocate heap */
1334 state->heap = binaryheap_allocate(state->nr_txns,
1336 state);
1337
1338 /* Now that the state fields are initialized, it is safe to return it. */
1339 *iter_state = state;
1340
1341 /*
1342 * Now insert items into the binary heap, in an unordered fashion. (We
1343 * will run a heap assembly step at the end; this is more efficient.)
1344 */
1345
1346 off = 0;
1347
1348 /* add toplevel transaction if it contains changes */
1349 if (txn->nentries > 0)
1350 {
1351 ReorderBufferChange *cur_change;
1352
1353 if (rbtxn_is_serialized(txn))
1354 {
1355 /* serialize remaining changes */
1357 ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1358 &state->entries[off].segno);
1359 }
1360
1361 cur_change = dlist_head_element(ReorderBufferChange, node,
1362 &txn->changes);
1363
1364 state->entries[off].lsn = cur_change->lsn;
1365 state->entries[off].change = cur_change;
1366 state->entries[off].txn = txn;
1367
1369 }
1370
1371 /* add subtransactions if they contain changes */
1372 dlist_foreach(cur_txn_i, &txn->subtxns)
1373 {
1374 ReorderBufferTXN *cur_txn;
1375
1376 cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1377
1378 if (cur_txn->nentries > 0)
1379 {
1380 ReorderBufferChange *cur_change;
1381
1382 if (rbtxn_is_serialized(cur_txn))
1383 {
1384 /* serialize remaining changes */
1385 ReorderBufferSerializeTXN(rb, cur_txn);
1386 ReorderBufferRestoreChanges(rb, cur_txn,
1387 &state->entries[off].file,
1388 &state->entries[off].segno);
1389 }
1390 cur_change = dlist_head_element(ReorderBufferChange, node,
1391 &cur_txn->changes);
1392
1393 state->entries[off].lsn = cur_change->lsn;
1394 state->entries[off].change = cur_change;
1395 state->entries[off].txn = cur_txn;
1396
1398 }
1399 }
1400
1401 /* assemble a valid binary heap */
1402 binaryheap_build(state->heap);
1403}
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:138
void binaryheap_add_unordered(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:116
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:39
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:222
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
static void AssertChangeLsnOrder(ReorderBufferTXN *txn)
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)

References AssertChangeLsnOrder(), binaryheap_add_unordered(), binaryheap_allocate(), binaryheap_build(), ReorderBufferTXN::changes, ReorderBuffer::context, dlist_iter::cur, dlist_container, dlist_foreach, dlist_head_element, dlist_init(), Int32GetDatum(), ReorderBufferChange::lsn, MemoryContextAllocZero(), ReorderBufferTXN::nentries, rbtxn_is_serialized, ReorderBufferIterCompare(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), and ReorderBufferTXN::subtxns.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferIterTXNNext()

static ReorderBufferChange * ReorderBufferIterTXNNext ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1412 of file reorderbuffer.c.

1413{
1414 ReorderBufferChange *change;
1416 int32 off;
1417
1418 /* nothing there anymore */
1419 if (binaryheap_empty(state->heap))
1420 return NULL;
1421
1422 off = DatumGetInt32(binaryheap_first(state->heap));
1423 entry = &state->entries[off];
1424
1425 /* free memory we might have "leaked" in the previous *Next call */
1426 if (!dlist_is_empty(&state->old_change))
1427 {
1428 change = dlist_container(ReorderBufferChange, node,
1429 dlist_pop_head_node(&state->old_change));
1430 ReorderBufferFreeChange(rb, change, true);
1431 Assert(dlist_is_empty(&state->old_change));
1432 }
1433
1434 change = entry->change;
1435
1436 /*
1437 * update heap with information about which transaction has the next
1438 * relevant change in LSN order
1439 */
1440
1441 /* there are in-memory changes */
1442 if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1443 {
1444 dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1445 ReorderBufferChange *next_change =
1447
1448 /* txn stays the same */
1449 state->entries[off].lsn = next_change->lsn;
1450 state->entries[off].change = next_change;
1451
1453 return change;
1454 }
1455
1456 /* try to load changes from disk */
1457 if (entry->txn->nentries != entry->txn->nentries_mem)
1458 {
1459 /*
1460 * Ugly: restoring changes will reuse *Change records, thus delete the
1461 * current one from the per-tx list and only free in the next call.
1462 */
1463 dlist_delete(&change->node);
1464 dlist_push_tail(&state->old_change, &change->node);
1465
1466 /*
1467 * Update the total bytes processed by the txn for which we are
1468 * releasing the current set of changes and restoring the new set of
1469 * changes.
1470 */
1471 rb->totalBytes += entry->txn->size;
1472 if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1473 &state->entries[off].segno))
1474 {
1475 /* successfully restored changes from disk */
1476 ReorderBufferChange *next_change =
1478 &entry->txn->changes);
1479
1480 elog(DEBUG2, "restored %u/%u changes from disk",
1481 (uint32) entry->txn->nentries_mem,
1482 (uint32) entry->txn->nentries);
1483
1484 Assert(entry->txn->nentries_mem);
1485 /* txn stays the same */
1486 state->entries[off].lsn = next_change->lsn;
1487 state->entries[off].change = next_change;
1489
1490 return change;
1491 }
1492 }
1493
1494 /* ok, no changes there anymore, remove */
1496
1497 return change;
1498}
void binaryheap_replace_first(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:255
bh_node_type binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:177
bh_node_type binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:192
#define binaryheap_empty(h)
Definition: binaryheap.h:65
static int32 next
Definition: blutils.c:224
uint32_t uint32
Definition: c.h:541
static bool dlist_has_next(const dlist_head *head, const dlist_node *node)
Definition: ilist.h:503
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:537
ReorderBufferChange * change
ReorderBufferTXN * txn

References Assert(), binaryheap_empty, binaryheap_first(), binaryheap_remove_first(), binaryheap_replace_first(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, DatumGetInt32(), DEBUG2, dlist_container, dlist_delete(), dlist_has_next(), dlist_head_element, dlist_is_empty(), dlist_next_node(), dlist_pop_head_node(), dlist_push_tail(), elog, ReorderBufferIterTXNEntry::file, Int32GetDatum(), ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, next, ReorderBufferChange::node, ReorderBufferFreeChange(), ReorderBufferRestoreChanges(), ReorderBufferTXN::size, ReorderBuffer::totalBytes, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferLargestStreamableTopTXN()

static ReorderBufferTXN * ReorderBufferLargestStreamableTopTXN ( ReorderBuffer rb)
static

Definition at line 3849 of file reorderbuffer.c.

3850{
3851 dlist_iter iter;
3852 Size largest_size = 0;
3853 ReorderBufferTXN *largest = NULL;
3854
3855 /* Find the largest top-level transaction having a base snapshot. */
3857 {
3858 ReorderBufferTXN *txn;
3859
3860 txn = dlist_container(ReorderBufferTXN, base_snapshot_node, iter.cur);
3861
3862 /* must not be a subtxn */
3864 /* base_snapshot must be set */
3865 Assert(txn->base_snapshot != NULL);
3866
3867 /* Don't consider these kinds of transactions for eviction. */
3868 if (rbtxn_has_partial_change(txn) ||
3870 rbtxn_is_aborted(txn))
3871 continue;
3872
3873 /* Find the largest of the eviction candidates. */
3874 if ((largest == NULL || txn->total_size > largest_size) &&
3875 (txn->total_size > 0))
3876 {
3877 largest = txn;
3878 largest_size = txn->total_size;
3879 }
3880 }
3881
3882 return largest;
3883}
#define rbtxn_has_streamable_change(txn)
#define rbtxn_has_partial_change(txn)

References Assert(), ReorderBufferTXN::base_snapshot, dlist_iter::cur, dlist_container, dlist_foreach, rbtxn_has_partial_change, rbtxn_has_streamable_change, rbtxn_is_aborted, rbtxn_is_known_subxact, ReorderBufferTXN::total_size, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferCheckMemoryLimit().

◆ ReorderBufferLargestTXN()

static ReorderBufferTXN * ReorderBufferLargestTXN ( ReorderBuffer rb)
static

Definition at line 3808 of file reorderbuffer.c.

3809{
3810 ReorderBufferTXN *largest;
3811
3812 /* Get the largest transaction from the max-heap */
3813 largest = pairingheap_container(ReorderBufferTXN, txn_node,
3815
3816 Assert(largest);
3817 Assert(largest->size > 0);
3818 Assert(largest->size <= rb->size);
3819
3820 return largest;
3821}
pairingheap_node * pairingheap_first(pairingheap *heap)
Definition: pairingheap.c:144
#define pairingheap_container(type, membername, ptr)
Definition: pairingheap.h:43

References Assert(), pairingheap_container, pairingheap_first(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBuffer::txn_heap.

Referenced by ReorderBufferCheckMemoryLimit().

◆ ReorderBufferMaybeMarkTXNStreamed()

static void ReorderBufferMaybeMarkTXNStreamed ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2138 of file reorderbuffer.c.

2139{
2140 /*
2141 * The top-level transaction, is marked as streamed always, even if it
2142 * does not contain any changes (that is, when all the changes are in
2143 * subtransactions).
2144 *
2145 * For subtransactions, we only mark them as streamed when there are
2146 * changes in them.
2147 *
2148 * We do it this way because of aborts - we don't want to send aborts for
2149 * XIDs the downstream is not aware of. And of course, it always knows
2150 * about the top-level xact (we send the XID in all messages), but we
2151 * never stream XIDs of empty subxacts.
2152 */
2153 if (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0))
2155}
#define RBTXN_IS_STREAMED

References ReorderBufferTXN::nentries_mem, RBTXN_IS_STREAMED, rbtxn_is_toptxn, and ReorderBufferTXN::txn_flags.

Referenced by ReorderBufferProcessTXN(), and ReorderBufferTruncateTXN().

◆ ReorderBufferPrepare()

void ReorderBufferPrepare ( ReorderBuffer rb,
TransactionId  xid,
char *  gid 
)

Definition at line 2960 of file reorderbuffer.c.

2962{
2963 ReorderBufferTXN *txn;
2964
2965 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2966 false);
2967
2968 /* unknown transaction, nothing to replay */
2969 if (txn == NULL)
2970 return;
2971
2972 /*
2973 * txn must have been marked as a prepared transaction and must have
2974 * neither been skipped nor sent a prepare. Also, the prepare info must
2975 * have been updated in it by now.
2976 */
2979
2980 txn->gid = pstrdup(gid);
2981
2982 ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2983 txn->prepare_time, txn->origin_id, txn->origin_lsn);
2984
2985 /*
2986 * Send a prepare if not already done so. This might occur if we have
2987 * detected a concurrent abort while replaying the non-streaming
2988 * transaction.
2989 */
2990 if (!rbtxn_sent_prepare(txn))
2991 {
2992 rb->prepare(rb, txn, txn->final_lsn);
2994 }
2995}
#define RBTXN_SENT_PREPARE
#define rbtxn_sent_prepare(txn)
ReorderBufferPrepareCB prepare

References Assert(), ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBuffer::prepare, ReorderBufferTXN::prepare_time, pstrdup(), RBTXN_IS_PREPARED, RBTXN_PREPARE_STATUS_MASK, RBTXN_SENT_PREPARE, rbtxn_sent_prepare, ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBufferTXN::txn_flags, and XLogRecPtrIsValid.

Referenced by DecodePrepare().

◆ ReorderBufferProcessPartialChange()

static void ReorderBufferProcessPartialChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  toast_insert 
)
static

Definition at line 741 of file reorderbuffer.c.

744{
745 ReorderBufferTXN *toptxn;
746
747 /*
748 * The partial changes need to be processed only while streaming
749 * in-progress transactions.
750 */
751 if (!ReorderBufferCanStream(rb))
752 return;
753
754 /* Get the top transaction. */
755 toptxn = rbtxn_get_toptxn(txn);
756
757 /*
758 * Indicate a partial change for toast inserts. The change will be
759 * considered as complete once we get the insert or update on the main
760 * table and we are sure that the pending toast chunks are not required
761 * anymore.
762 *
763 * If we allow streaming when there are pending toast chunks then such
764 * chunks won't be released till the insert (multi_insert) is complete and
765 * we expect the txn to have streamed all changes after streaming. This
766 * restriction is mainly to ensure the correctness of streamed
767 * transactions and it doesn't seem worth uplifting such a restriction
768 * just to allow this case because anyway we will stream the transaction
769 * once such an insert is complete.
770 */
771 if (toast_insert)
773 else if (rbtxn_has_partial_change(toptxn) &&
774 IsInsertOrUpdate(change->action) &&
776 toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
777
778 /*
779 * Indicate a partial change for speculative inserts. The change will be
780 * considered as complete once we get the speculative confirm or abort
781 * token.
782 */
783 if (IsSpecInsert(change->action))
785 else if (rbtxn_has_partial_change(toptxn) &&
787 toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
788
789 /*
790 * Stream the transaction if it is serialized before and the changes are
791 * now complete in the top-level transaction.
792 *
793 * The reason for doing the streaming of such a transaction as soon as we
794 * get the complete change for it is that previously it would have reached
795 * the memory threshold and wouldn't get streamed because of incomplete
796 * changes. Delaying such transactions would increase apply lag for them.
797 */
799 !(rbtxn_has_partial_change(toptxn)) &&
800 rbtxn_is_serialized(txn) &&
802 ReorderBufferStreamTXN(rb, toptxn);
803}
#define IsSpecInsert(action)
#define IsInsertOrUpdate(action)
#define IsSpecConfirmOrAbort(action)
#define RBTXN_HAS_PARTIAL_CHANGE

References ReorderBufferChange::action, ReorderBufferChange::clear_toast_afterwards, ReorderBufferChange::data, IsInsertOrUpdate, IsSpecConfirmOrAbort, IsSpecInsert, rbtxn_get_toptxn, RBTXN_HAS_PARTIAL_CHANGE, rbtxn_has_partial_change, rbtxn_has_streamable_change, rbtxn_is_serialized, ReorderBufferCanStartStreaming(), ReorderBufferCanStream(), ReorderBufferStreamTXN(), ReorderBufferChange::tp, and ReorderBufferTXN::txn_flags.

Referenced by ReorderBufferQueueChange().

◆ ReorderBufferProcessTXN()

static void ReorderBufferProcessTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn,
volatile Snapshot  snapshot_now,
volatile CommandId  command_id,
bool  streaming 
)
static

Definition at line 2211 of file reorderbuffer.c.

2216{
2217 bool using_subtxn;
2220 ReorderBufferIterTXNState *volatile iterstate = NULL;
2221 volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
2222 ReorderBufferChange *volatile specinsert = NULL;
2223 volatile bool stream_started = false;
2224 ReorderBufferTXN *volatile curtxn = NULL;
2225
2226 /* build data to be able to lookup the CommandIds of catalog tuples */
2228
2229 /* setup the initial snapshot */
2230 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2231
2232 /*
2233 * Decoding needs access to syscaches et al., which in turn use
2234 * heavyweight locks and such. Thus we need to have enough state around to
2235 * keep track of those. The easiest way is to simply use a transaction
2236 * internally. That also allows us to easily enforce that nothing writes
2237 * to the database by checking for xid assignments.
2238 *
2239 * When we're called via the SQL SRF there's already a transaction
2240 * started, so start an explicit subtransaction there.
2241 */
2242 using_subtxn = IsTransactionOrTransactionBlock();
2243
2244 PG_TRY();
2245 {
2246 ReorderBufferChange *change;
2247 int changes_count = 0; /* used to accumulate the number of
2248 * changes */
2249
2250 if (using_subtxn)
2251 BeginInternalSubTransaction(streaming ? "stream" : "replay");
2252 else
2254
2255 /*
2256 * We only need to send begin/begin-prepare for non-streamed
2257 * transactions.
2258 */
2259 if (!streaming)
2260 {
2261 if (rbtxn_is_prepared(txn))
2262 rb->begin_prepare(rb, txn);
2263 else
2264 rb->begin(rb, txn);
2265 }
2266
2267 ReorderBufferIterTXNInit(rb, txn, &iterstate);
2268 while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
2269 {
2270 Relation relation = NULL;
2271 Oid reloid;
2272
2274
2275 /*
2276 * We can't call start stream callback before processing first
2277 * change.
2278 */
2279 if (!XLogRecPtrIsValid(prev_lsn))
2280 {
2281 if (streaming)
2282 {
2283 txn->origin_id = change->origin_id;
2284 rb->stream_start(rb, txn, change->lsn);
2285 stream_started = true;
2286 }
2287 }
2288
2289 /*
2290 * Enforce correct ordering of changes, merged from multiple
2291 * subtransactions. The changes may have the same LSN due to
2292 * MULTI_INSERT xlog records.
2293 */
2294 Assert(!XLogRecPtrIsValid(prev_lsn) || prev_lsn <= change->lsn);
2295
2296 prev_lsn = change->lsn;
2297
2298 /*
2299 * Set the current xid to detect concurrent aborts. This is
2300 * required for the cases when we decode the changes before the
2301 * COMMIT record is processed.
2302 */
2303 if (streaming || rbtxn_is_prepared(change->txn))
2304 {
2305 curtxn = change->txn;
2306 SetupCheckXidLive(curtxn->xid);
2307 }
2308
2309 switch (change->action)
2310 {
2312
2313 /*
2314 * Confirmation for speculative insertion arrived. Simply
2315 * use as a normal record. It'll be cleaned up at the end
2316 * of INSERT processing.
2317 */
2318 if (specinsert == NULL)
2319 elog(ERROR, "invalid ordering of speculative insertion changes");
2320 Assert(specinsert->data.tp.oldtuple == NULL);
2321 change = specinsert;
2323
2324 /* intentionally fall through */
2328 Assert(snapshot_now);
2329
2330 reloid = RelidByRelfilenumber(change->data.tp.rlocator.spcOid,
2331 change->data.tp.rlocator.relNumber);
2332
2333 /*
2334 * Mapped catalog tuple without data, emitted while
2335 * catalog table was in the process of being rewritten. We
2336 * can fail to look up the relfilenumber, because the
2337 * relmapper has no "historic" view, in contrast to the
2338 * normal catalog during decoding. Thus repeated rewrites
2339 * can cause a lookup failure. That's OK because we do not
2340 * decode catalog changes anyway. Normally such tuples
2341 * would be skipped over below, but we can't identify
2342 * whether the table should be logically logged without
2343 * mapping the relfilenumber to the oid.
2344 */
2345 if (reloid == InvalidOid &&
2346 change->data.tp.newtuple == NULL &&
2347 change->data.tp.oldtuple == NULL)
2348 goto change_done;
2349 else if (reloid == InvalidOid)
2350 elog(ERROR, "could not map filenumber \"%s\" to relation OID",
2351 relpathperm(change->data.tp.rlocator,
2352 MAIN_FORKNUM).str);
2353
2354 relation = RelationIdGetRelation(reloid);
2355
2356 if (!RelationIsValid(relation))
2357 elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",
2358 reloid,
2359 relpathperm(change->data.tp.rlocator,
2360 MAIN_FORKNUM).str);
2361
2362 if (!RelationIsLogicallyLogged(relation))
2363 goto change_done;
2364
2365 /*
2366 * Ignore temporary heaps created during DDL unless the
2367 * plugin has asked for them.
2368 */
2369 if (relation->rd_rel->relrewrite && !rb->output_rewrites)
2370 goto change_done;
2371
2372 /*
2373 * For now ignore sequence changes entirely. Most of the
2374 * time they don't log changes using records we
2375 * understand, so it doesn't make sense to handle the few
2376 * cases we do.
2377 */
2378 if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
2379 goto change_done;
2380
2381 /* user-triggered change */
2382 if (!IsToastRelation(relation))
2383 {
2384 ReorderBufferToastReplace(rb, txn, relation, change);
2385 ReorderBufferApplyChange(rb, txn, relation, change,
2386 streaming);
2387
2388 /*
2389 * Only clear reassembled toast chunks if we're sure
2390 * they're not required anymore. The creator of the
2391 * tuple tells us.
2392 */
2393 if (change->data.tp.clear_toast_afterwards)
2394 ReorderBufferToastReset(rb, txn);
2395 }
2396 /* we're not interested in toast deletions */
2397 else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
2398 {
2399 /*
2400 * Need to reassemble the full toasted Datum in
2401 * memory, to ensure the chunks don't get reused till
2402 * we're done remove it from the list of this
2403 * transaction's changes. Otherwise it will get
2404 * freed/reused while restoring spooled data from
2405 * disk.
2406 */
2407 Assert(change->data.tp.newtuple != NULL);
2408
2409 dlist_delete(&change->node);
2410 ReorderBufferToastAppendChunk(rb, txn, relation,
2411 change);
2412 }
2413
2414 change_done:
2415
2416 /*
2417 * If speculative insertion was confirmed, the record
2418 * isn't needed anymore.
2419 */
2420 if (specinsert != NULL)
2421 {
2422 ReorderBufferFreeChange(rb, specinsert, true);
2423 specinsert = NULL;
2424 }
2425
2426 if (RelationIsValid(relation))
2427 {
2428 RelationClose(relation);
2429 relation = NULL;
2430 }
2431 break;
2432
2434
2435 /*
2436 * Speculative insertions are dealt with by delaying the
2437 * processing of the insert until the confirmation record
2438 * arrives. For that we simply unlink the record from the
2439 * chain, so it does not get freed/reused while restoring
2440 * spooled data from disk.
2441 *
2442 * This is safe in the face of concurrent catalog changes
2443 * because the relevant relation can't be changed between
2444 * speculative insertion and confirmation due to
2445 * CheckTableNotInUse() and locking.
2446 */
2447
2448 /* clear out a pending (and thus failed) speculation */
2449 if (specinsert != NULL)
2450 {
2451 ReorderBufferFreeChange(rb, specinsert, true);
2452 specinsert = NULL;
2453 }
2454
2455 /* and memorize the pending insertion */
2456 dlist_delete(&change->node);
2457 specinsert = change;
2458 break;
2459
2461
2462 /*
2463 * Abort for speculative insertion arrived. So cleanup the
2464 * specinsert tuple and toast hash.
2465 *
2466 * Note that we get the spec abort change for each toast
2467 * entry but we need to perform the cleanup only the first
2468 * time we get it for the main table.
2469 */
2470 if (specinsert != NULL)
2471 {
2472 /*
2473 * We must clean the toast hash before processing a
2474 * completely new tuple to avoid confusion about the
2475 * previous tuple's toast chunks.
2476 */
2478 ReorderBufferToastReset(rb, txn);
2479
2480 /* We don't need this record anymore. */
2481 ReorderBufferFreeChange(rb, specinsert, true);
2482 specinsert = NULL;
2483 }
2484 break;
2485
2487 {
2488 int i;
2489 int nrelids = change->data.truncate.nrelids;
2490 int nrelations = 0;
2491 Relation *relations;
2492
2493 relations = palloc0(nrelids * sizeof(Relation));
2494 for (i = 0; i < nrelids; i++)
2495 {
2496 Oid relid = change->data.truncate.relids[i];
2497 Relation rel;
2498
2499 rel = RelationIdGetRelation(relid);
2500
2501 if (!RelationIsValid(rel))
2502 elog(ERROR, "could not open relation with OID %u", relid);
2503
2504 if (!RelationIsLogicallyLogged(rel))
2505 continue;
2506
2507 relations[nrelations++] = rel;
2508 }
2509
2510 /* Apply the truncate. */
2511 ReorderBufferApplyTruncate(rb, txn, nrelations,
2512 relations, change,
2513 streaming);
2514
2515 for (i = 0; i < nrelations; i++)
2516 RelationClose(relations[i]);
2517
2518 break;
2519 }
2520
2522 ReorderBufferApplyMessage(rb, txn, change, streaming);
2523 break;
2524
2526 /* Execute the invalidation messages locally */
2528 change->data.inval.invalidations);
2529 break;
2530
2532 /* get rid of the old */
2534
2535 if (snapshot_now->copied)
2536 {
2537 ReorderBufferFreeSnap(rb, snapshot_now);
2538 snapshot_now =
2540 txn, command_id);
2541 }
2542
2543 /*
2544 * Restored from disk, need to be careful not to double
2545 * free. We could introduce refcounting for that, but for
2546 * now this seems infrequent enough not to care.
2547 */
2548 else if (change->data.snapshot->copied)
2549 {
2550 snapshot_now =
2552 txn, command_id);
2553 }
2554 else
2555 {
2556 snapshot_now = change->data.snapshot;
2557 }
2558
2559 /* and continue with the new one */
2560 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2561 break;
2562
2565
2566 if (command_id < change->data.command_id)
2567 {
2568 command_id = change->data.command_id;
2569
2570 if (!snapshot_now->copied)
2571 {
2572 /* we don't use the global one anymore */
2573 snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2574 txn, command_id);
2575 }
2576
2577 snapshot_now->curcid = command_id;
2578
2580 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2581 }
2582
2583 break;
2584
2586 elog(ERROR, "tuplecid value in changequeue");
2587 break;
2588 }
2589
2590 /*
2591 * It is possible that the data is not sent to downstream for a
2592 * long time either because the output plugin filtered it or there
2593 * is a DDL that generates a lot of data that is not processed by
2594 * the plugin. So, in such cases, the downstream can timeout. To
2595 * avoid that we try to send a keepalive message if required.
2596 * Trying to send a keepalive message after every change has some
2597 * overhead, but testing showed there is no noticeable overhead if
2598 * we do it after every ~100 changes.
2599 */
2600#define CHANGES_THRESHOLD 100
2601
2602 if (++changes_count >= CHANGES_THRESHOLD)
2603 {
2604 rb->update_progress_txn(rb, txn, prev_lsn);
2605 changes_count = 0;
2606 }
2607 }
2608
2609 /* speculative insertion record must be freed by now */
2610 Assert(!specinsert);
2611
2612 /* clean up the iterator */
2613 ReorderBufferIterTXNFinish(rb, iterstate);
2614 iterstate = NULL;
2615
2616 /*
2617 * Update total transaction count and total bytes processed by the
2618 * transaction and its subtransactions. Ensure to not count the
2619 * streamed transaction multiple times.
2620 *
2621 * Note that the statistics computation has to be done after
2622 * ReorderBufferIterTXNFinish as it releases the serialized change
2623 * which we have already accounted in ReorderBufferIterTXNNext.
2624 */
2625 if (!rbtxn_is_streamed(txn))
2626 rb->totalTxns++;
2627
2628 rb->totalBytes += txn->total_size;
2629
2630 /*
2631 * Done with current changes, send the last message for this set of
2632 * changes depending upon streaming mode.
2633 */
2634 if (streaming)
2635 {
2636 if (stream_started)
2637 {
2638 rb->stream_stop(rb, txn, prev_lsn);
2639 stream_started = false;
2640 }
2641 }
2642 else
2643 {
2644 /*
2645 * Call either PREPARE (for two-phase transactions) or COMMIT (for
2646 * regular ones).
2647 */
2648 if (rbtxn_is_prepared(txn))
2649 {
2651 rb->prepare(rb, txn, commit_lsn);
2653 }
2654 else
2655 rb->commit(rb, txn, commit_lsn);
2656 }
2657
2658 /* this is just a sanity check against bad output plugin behaviour */
2660 elog(ERROR, "output plugin used XID %u",
2662
2663 /*
2664 * Remember the command ID and snapshot for the next set of changes in
2665 * streaming mode.
2666 */
2667 if (streaming)
2668 ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2669 else if (snapshot_now->copied)
2670 ReorderBufferFreeSnap(rb, snapshot_now);
2671
2672 /* cleanup */
2674
2675 /*
2676 * Aborting the current (sub-)transaction as a whole has the right
2677 * semantics. We want all locks acquired in here to be released, not
2678 * reassigned to the parent and we do not want any database access
2679 * have persistent effects.
2680 */
2682
2683 /* make sure there's no cache pollution */
2685 {
2688 }
2689 else
2690 {
2694 }
2695
2696 if (using_subtxn)
2697 {
2700 CurrentResourceOwner = cowner;
2701 }
2702
2703 /*
2704 * We are here due to one of the four reasons: 1. Decoding an
2705 * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
2706 * prepared txn that was (partially) streamed. 4. Decoding a committed
2707 * txn.
2708 *
2709 * For 1, we allow truncation of txn data by removing the changes
2710 * already streamed but still keeping other things like invalidations,
2711 * snapshot, and tuplecids. For 2 and 3, we indicate
2712 * ReorderBufferTruncateTXN to do more elaborate truncation of txn
2713 * data as the entire transaction has been decoded except for commit.
2714 * For 4, as the entire txn has been decoded, we can fully clean up
2715 * the TXN reorder buffer.
2716 */
2717 if (streaming || rbtxn_is_prepared(txn))
2718 {
2719 if (streaming)
2721
2723 /* Reset the CheckXidAlive */
2725 }
2726 else
2727 ReorderBufferCleanupTXN(rb, txn);
2728 }
2729 PG_CATCH();
2730 {
2732 ErrorData *errdata = CopyErrorData();
2733
2734 /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
2735 if (iterstate)
2736 ReorderBufferIterTXNFinish(rb, iterstate);
2737
2739
2740 /*
2741 * Force cache invalidation to happen outside of a valid transaction
2742 * to prevent catalog access as we just caught an error.
2743 */
2745
2746 /* make sure there's no cache pollution */
2748 {
2751 }
2752 else
2753 {
2757 }
2758
2759 if (using_subtxn)
2760 {
2763 CurrentResourceOwner = cowner;
2764 }
2765
2766 /*
2767 * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
2768 * abort of the (sub)transaction we are streaming or preparing. We
2769 * need to do the cleanup and return gracefully on this error, see
2770 * SetupCheckXidLive.
2771 *
2772 * This error code can be thrown by one of the callbacks we call
2773 * during decoding so we need to ensure that we return gracefully only
2774 * when we are sending the data in streaming mode and the streaming is
2775 * not finished yet or when we are sending the data out on a PREPARE
2776 * during a two-phase commit.
2777 */
2778 if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
2779 (stream_started || rbtxn_is_prepared(txn)))
2780 {
2781 /* curtxn must be set for streaming or prepared transactions */
2782 Assert(curtxn);
2783
2784 /* Cleanup the temporary error state. */
2786 FreeErrorData(errdata);
2787 errdata = NULL;
2788
2789 /* Remember the transaction is aborted. */
2790 Assert(!rbtxn_is_committed(curtxn));
2791 curtxn->txn_flags |= RBTXN_IS_ABORTED;
2792
2793 /* Mark the transaction is streamed if appropriate */
2794 if (stream_started)
2796
2797 /* Reset the TXN so that it is allowed to stream remaining data. */
2798 ReorderBufferResetTXN(rb, txn, snapshot_now,
2799 command_id, prev_lsn,
2800 specinsert);
2801 }
2802 else
2803 {
2804 ReorderBufferCleanupTXN(rb, txn);
2806 PG_RE_THROW();
2807 }
2808 }
2809 PG_END_TRY();
2810}
bool IsToastRelation(Relation relation)
Definition: catalog.c:206
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1835
ErrorData * CopyErrorData(void)
Definition: elog.c:1763
void FlushErrorState(void)
Definition: elog.c:1884
#define PG_RE_THROW()
Definition: elog.h:405
#define PG_TRY(...)
Definition: elog.h:372
#define PG_END_TRY(...)
Definition: elog.h:397
#define PG_CATCH(...)
Definition: elog.h:382
void InvalidateSystemCaches(void)
Definition: inval.c:916
void * palloc0(Size size)
Definition: mcxt.c:1395
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
const void * data
#define InvalidOid
Definition: postgres_ext.h:37
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:711
#define RelationIsValid(relation)
Definition: rel.h:490
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2099
void RelationClose(Relation relation)
Definition: relcache.c:2220
Oid RelidByRelfilenumber(Oid reltablespace, RelFileNumber relfilenumber)
@ MAIN_FORKNUM
Definition: relpath.h:58
#define relpathperm(rlocator, forknum)
Definition: relpath.h:146
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
static void ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
#define CHANGES_THRESHOLD
static void ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
static void SetupCheckXidLive(TransactionId xid)
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
static void ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
static void ReorderBufferMaybeMarkTXNStreamed(ReorderBuffer *rb, ReorderBufferTXN *txn)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:1685
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:1669
int sqlerrcode
Definition: elog.h:431
RelFileNumber relNumber
Form_pg_class rd_rel
Definition: rel.h:111
RelFileLocator rlocator
Definition: reorderbuffer.h:98
RepOriginId origin_id
Definition: reorderbuffer.h:86
ReorderBufferBeginCB begin_prepare
ReorderBufferUpdateProgressTxnCB update_progress_txn
ReorderBufferStreamStopCB stream_stop
ReorderBufferCommitCB commit
ReorderBufferStreamStartCB stream_start
ReorderBufferBeginCB begin
TransactionId CheckXidAlive
Definition: xact.c:100
void StartTransactionCommand(void)
Definition: xact.c:3077
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:472
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:455

References AbortCurrentTransaction(), ReorderBufferChange::action, Assert(), ReorderBuffer::begin, ReorderBuffer::begin_prepare, BeginInternalSubTransaction(), CHANGES_THRESHOLD, CHECK_FOR_INTERRUPTS, CheckXidAlive, ReorderBufferChange::clear_toast_afterwards, ReorderBufferChange::command_id, ReorderBuffer::commit, SnapshotData::copied, CopyErrorData(), SnapshotData::curcid, CurrentMemoryContext, CurrentResourceOwner, ReorderBufferChange::data, data, dlist_delete(), elog, ERROR, FlushErrorState(), FreeErrorData(), GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), i, ReorderBufferChange::inval, InvalidateSystemCaches(), ReorderBufferChange::invalidations, ReorderBufferTXN::invalidations, ReorderBufferTXN::invalidations_distributed, InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, MemoryContextSwitchTo(), ReorderBufferChange::newtuple, ReorderBufferChange::ninvalidations, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::ninvalidations_distributed, ReorderBufferChange::node, ReorderBufferChange::nrelids, ReorderBufferChange::oldtuple, ReorderBufferChange::origin_id, ReorderBufferTXN::origin_id, ReorderBuffer::output_rewrites, palloc0(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, ReorderBuffer::prepare, rbtxn_distr_inval_overflowed, RBTXN_IS_ABORTED, rbtxn_is_committed, rbtxn_is_prepared, rbtxn_is_streamed, RBTXN_SENT_PREPARE, rbtxn_sent_prepare, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelationIsValid, RelidByRelfilenumber(), ReorderBufferChange::relids, RelFileLocator::relNumber, relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferApplyChange(), ReorderBufferApplyMessage(), ReorderBufferApplyTruncate(), ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeChange(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferMaybeMarkTXNStreamed(), ReorderBufferResetTXN(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), ReorderBufferChange::rlocator, RollbackAndReleaseCurrentSubTransaction(), SetupCheckXidLive(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, RelFileLocator::spcOid, ErrorData::sqlerrcode, StartTransactionCommand(), ReorderBuffer::stream_start, ReorderBuffer::stream_stop, TeardownHistoricSnapshot(), ReorderBufferTXN::total_size, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTXN::tuplecid_hash, ReorderBufferChange::txn, ReorderBufferTXN::txn_flags, ReorderBuffer::update_progress_txn, ReorderBufferTXN::xid, and XLogRecPtrIsValid.

Referenced by ReorderBufferReplay(), and ReorderBufferStreamTXN().

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3295 of file reorderbuffer.c.

3296{
3297 /* many records won't have an xid assigned, centralize check here */
3298 if (xid != InvalidTransactionId)
3299 ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3300}

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by heap2_decode(), heap_decode(), LogicalDecodingProcessRecord(), logicalmsg_decode(), standby_decode(), xact_decode(), and xlog_decode().

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChange change,
bool  toast_insert 
)

Definition at line 810 of file reorderbuffer.c.

812{
813 ReorderBufferTXN *txn;
814
815 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
816
817 /*
818 * If we have detected that the transaction is aborted while streaming the
819 * previous changes or by checking its CLOG, there is no point in
820 * collecting further changes for it.
821 */
822 if (rbtxn_is_aborted(txn))
823 {
824 /*
825 * We don't need to update memory accounting for this change as we
826 * have not added it to the queue yet.
827 */
828 ReorderBufferFreeChange(rb, change, false);
829 return;
830 }
831
832 /*
833 * The changes that are sent downstream are considered streamable. We
834 * remember such transactions so that only those will later be considered
835 * for streaming.
836 */
837 if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
843 {
844 ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
845
847 }
848
849 change->lsn = lsn;
850 change->txn = txn;
851
853 dlist_push_tail(&txn->changes, &change->node);
854 txn->nentries++;
855 txn->nentries_mem++;
856
857 /* update memory accounting information */
858 ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
860
861 /* process partial change */
862 ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
863
864 /* check the memory limits and evict something if needed */
866}
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
#define RBTXN_HAS_STREAMABLE_CHANGE

References ReorderBufferChange::action, Assert(), ReorderBufferTXN::changes, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, rbtxn_get_toptxn, RBTXN_HAS_STREAMABLE_CHANGE, rbtxn_is_aborted, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferCheckMemoryLimit(), ReorderBufferFreeChange(), ReorderBufferProcessPartialChange(), ReorderBufferTXNByXid(), ReorderBufferChange::txn, ReorderBufferTXN::txn_flags, and XLogRecPtrIsValid.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), ReorderBufferQueueInvalidations(), and ReorderBufferQueueMessage().

◆ ReorderBufferQueueInvalidations()

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer rb,
TransactionId  xid,
Snapshot  snap,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 873 of file reorderbuffer.c.

877{
878 if (transactional)
879 {
880 MemoryContext oldcontext;
881 ReorderBufferChange *change;
882
884
885 /*
886 * We don't expect snapshots for transactional changes - we'll use the
887 * snapshot derived later during apply (unless the change gets
888 * skipped).
889 */
890 Assert(!snap);
891
892 oldcontext = MemoryContextSwitchTo(rb->context);
893
894 change = ReorderBufferAllocChange(rb);
896 change->data.msg.prefix = pstrdup(prefix);
897 change->data.msg.message_size = message_size;
898 change->data.msg.message = palloc(message_size);
899 memcpy(change->data.msg.message, message, message_size);
900
901 ReorderBufferQueueChange(rb, xid, lsn, change, false);
902
903 MemoryContextSwitchTo(oldcontext);
904 }
905 else
906 {
907 ReorderBufferTXN *txn = NULL;
908 volatile Snapshot snapshot_now = snap;
909
910 /* Non-transactional changes require a valid snapshot. */
911 Assert(snapshot_now);
912
913 if (xid != InvalidTransactionId)
914 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
915
916 /* setup snapshot to allow catalog access */
917 SetupHistoricSnapshot(snapshot_now, NULL);
918 PG_TRY();
919 {
920 rb->message(rb, txn, lsn, false, prefix, message_size, message);
921
923 }
924 PG_CATCH();
925 {
927 PG_RE_THROW();
928 }
929 PG_END_TRY();
930 }
931}

References ReorderBufferChange::action, Assert(), ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBufferChange::message, ReorderBuffer::message, ReorderBufferChange::message_size, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, ReorderBufferChange::prefix, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferAllocChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), and TeardownHistoricSnapshot().

Referenced by logicalmsg_decode().

◆ ReorderBufferRememberPrepareInfo()

bool ReorderBufferRememberPrepareInfo ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  prepare_lsn,
XLogRecPtr  end_lsn,
TimestampTz  prepare_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2907 of file reorderbuffer.c.

2911{
2912 ReorderBufferTXN *txn;
2913
2914 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2915
2916 /* unknown transaction, nothing to do */
2917 if (txn == NULL)
2918 return false;
2919
2920 /*
2921 * Remember the prepare information to be later used by commit prepared in
2922 * case we skip doing prepare.
2923 */
2924 txn->final_lsn = prepare_lsn;
2925 txn->end_lsn = end_lsn;
2926 txn->prepare_time = prepare_time;
2927 txn->origin_id = origin_id;
2928 txn->origin_lsn = origin_lsn;
2929
2930 /* Mark this transaction as a prepared transaction */
2933
2934 return true;
2935}

References Assert(), ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, RBTXN_IS_PREPARED, RBTXN_PREPARE_STATUS_MASK, ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by DecodePrepare().

◆ ReorderBufferReplay()

static void ReorderBufferReplay ( ReorderBufferTXN txn,
ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)
static

Definition at line 2823 of file reorderbuffer.c.

2828{
2829 Snapshot snapshot_now;
2830 CommandId command_id = FirstCommandId;
2831
2832 txn->final_lsn = commit_lsn;
2833 txn->end_lsn = end_lsn;
2834 txn->commit_time = commit_time;
2835 txn->origin_id = origin_id;
2836 txn->origin_lsn = origin_lsn;
2837
2838 /*
2839 * If the transaction was (partially) streamed, we need to commit it in a
2840 * 'streamed' way. That is, we first stream the remaining part of the
2841 * transaction, and then invoke stream_commit message.
2842 *
2843 * Called after everything (origin ID, LSN, ...) is stored in the
2844 * transaction to avoid passing that information directly.
2845 */
2846 if (rbtxn_is_streamed(txn))
2847 {
2849 return;
2850 }
2851
2852 /*
2853 * If this transaction has no snapshot, it didn't make any changes to the
2854 * database, so there's nothing to decode. Note that
2855 * ReorderBufferCommitChild will have transferred any snapshots from
2856 * subtransactions if there were any.
2857 */
2858 if (txn->base_snapshot == NULL)
2859 {
2860 Assert(txn->ninvalidations == 0);
2861
2862 /*
2863 * Removing this txn before a commit might result in the computation
2864 * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
2865 */
2866 if (!rbtxn_is_prepared(txn))
2867 ReorderBufferCleanupTXN(rb, txn);
2868 return;
2869 }
2870
2871 snapshot_now = txn->base_snapshot;
2872
2873 /* Process and send the changes to output plugin. */
2874 ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2875 command_id, false);
2876}
#define FirstCommandId
Definition: c.h:676
uint32 CommandId
Definition: c.h:674
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, FirstCommandId, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, rbtxn_is_prepared, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), and ReorderBufferStreamCommit().

Referenced by ReorderBufferCommit(), ReorderBufferFinishPrepared(), and ReorderBufferPrepare().

◆ ReorderBufferResetTXN()

static void ReorderBufferResetTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
Snapshot  snapshot_now,
CommandId  command_id,
XLogRecPtr  last_lsn,
ReorderBufferChange specinsert 
)
static

Definition at line 2165 of file reorderbuffer.c.

2170{
2171 /* Discard the changes that we just streamed */
2173
2174 /* Free all resources allocated for toast reconstruction */
2175 ReorderBufferToastReset(rb, txn);
2176
2177 /* Return the spec insert change if it is not NULL */
2178 if (specinsert != NULL)
2179 {
2180 ReorderBufferFreeChange(rb, specinsert, true);
2181 specinsert = NULL;
2182 }
2183
2184 /*
2185 * For the streaming case, stop the stream and remember the command ID and
2186 * snapshot for the streaming run.
2187 */
2188 if (rbtxn_is_streamed(txn))
2189 {
2190 rb->stream_stop(rb, txn, last_lsn);
2191 ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2192 }
2193
2194 /* All changes must be deallocated */
2195 Assert(txn->size == 0);
2196}

References Assert(), rbtxn_is_prepared, rbtxn_is_streamed, ReorderBufferFreeChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), ReorderBufferTXN::size, and ReorderBuffer::stream_stop.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferRestoreChange()

static void ReorderBufferRestoreChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
char *  data 
)
static

Definition at line 4690 of file reorderbuffer.c.

4692{
4694 ReorderBufferChange *change;
4695
4696 ondisk = (ReorderBufferDiskChange *) data;
4697
4698 change = ReorderBufferAllocChange(rb);
4699
4700 /* copy static part */
4701 memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
4702
4703 data += sizeof(ReorderBufferDiskChange);
4704
4705 /* restore individual stuff */
4706 switch (change->action)
4707 {
4708 /* fall through these, they're all similar enough */
4713 if (change->data.tp.oldtuple)
4714 {
4715 uint32 tuplelen = ((HeapTuple) data)->t_len;
4716
4717 change->data.tp.oldtuple =
4719
4720 /* restore ->tuple */
4721 memcpy(change->data.tp.oldtuple, data,
4722 sizeof(HeapTupleData));
4723 data += sizeof(HeapTupleData);
4724
4725 /* reset t_data pointer into the new tuplebuf */
4726 change->data.tp.oldtuple->t_data =
4727 (HeapTupleHeader) ((char *) change->data.tp.oldtuple + HEAPTUPLESIZE);
4728
4729 /* restore tuple data itself */
4730 memcpy(change->data.tp.oldtuple->t_data, data, tuplelen);
4731 data += tuplelen;
4732 }
4733
4734 if (change->data.tp.newtuple)
4735 {
4736 /* here, data might not be suitably aligned! */
4737 uint32 tuplelen;
4738
4739 memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
4740 sizeof(uint32));
4741
4742 change->data.tp.newtuple =
4744
4745 /* restore ->tuple */
4746 memcpy(change->data.tp.newtuple, data,
4747 sizeof(HeapTupleData));
4748 data += sizeof(HeapTupleData);
4749
4750 /* reset t_data pointer into the new tuplebuf */
4751 change->data.tp.newtuple->t_data =
4752 (HeapTupleHeader) ((char *) change->data.tp.newtuple + HEAPTUPLESIZE);
4753
4754 /* restore tuple data itself */
4755 memcpy(change->data.tp.newtuple->t_data, data, tuplelen);
4756 data += tuplelen;
4757 }
4758
4759 break;
4761 {
4762 Size prefix_size;
4763
4764 /* read prefix */
4765 memcpy(&prefix_size, data, sizeof(Size));
4766 data += sizeof(Size);
4768 prefix_size);
4769 memcpy(change->data.msg.prefix, data, prefix_size);
4770 Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
4771 data += prefix_size;
4772
4773 /* read the message */
4774 memcpy(&change->data.msg.message_size, data, sizeof(Size));
4775 data += sizeof(Size);
4777 change->data.msg.message_size);
4778 memcpy(change->data.msg.message, data,
4779 change->data.msg.message_size);
4780 data += change->data.msg.message_size;
4781
4782 break;
4783 }
4785 {
4786 Size inval_size = sizeof(SharedInvalidationMessage) *
4787 change->data.inval.ninvalidations;
4788
4789 change->data.inval.invalidations =
4790 MemoryContextAlloc(rb->context, inval_size);
4791
4792 /* read the message */
4793 memcpy(change->data.inval.invalidations, data, inval_size);
4794
4795 break;
4796 }
4798 {
4799 Snapshot oldsnap;
4800 Snapshot newsnap;
4801 Size size;
4802
4803 oldsnap = (Snapshot) data;
4804
4805 size = sizeof(SnapshotData) +
4806 sizeof(TransactionId) * oldsnap->xcnt +
4807 sizeof(TransactionId) * (oldsnap->subxcnt + 0);
4808
4809 change->data.snapshot = MemoryContextAllocZero(rb->context, size);
4810
4811 newsnap = change->data.snapshot;
4812
4813 memcpy(newsnap, data, size);
4814 newsnap->xip = (TransactionId *)
4815 (((char *) newsnap) + sizeof(SnapshotData));
4816 newsnap->subxip = newsnap->xip + newsnap->xcnt;
4817 newsnap->copied = true;
4818 break;
4819 }
4820 /* the base struct contains all the data, easy peasy */
4822 {
4823 Oid *relids;
4824
4825 relids = ReorderBufferAllocRelids(rb, change->data.truncate.nrelids);
4826 memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
4827 change->data.truncate.relids = relids;
4828
4829 break;
4830 }
4835 break;
4836 }
4837
4838 dlist_push_tail(&txn->changes, &change->node);
4839 txn->nentries_mem++;
4840
4841 /*
4842 * Update memory accounting for the restored change. We need to do this
4843 * although we don't check the memory limit when restoring the changes in
4844 * this branch (we only do that when initially queueing the changes after
4845 * decoding), because we will release the changes later, and that will
4846 * update the accounting too (subtracting the size from the counters). And
4847 * we don't want to underflow there.
4848 */
4849 ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
4850 ReorderBufferChangeSize(change));
4851}
struct ReorderBufferDiskChange ReorderBufferDiskChange
HeapTuple ReorderBufferAllocTupleBuf(ReorderBuffer *rb, Size tuple_len)
Oid * ReorderBufferAllocRelids(ReorderBuffer *rb, int nrelids)
struct SnapshotData * Snapshot
Definition: snapshot.h:117
ReorderBufferChange change

References ReorderBufferChange::action, Assert(), ReorderBufferDiskChange::change, ReorderBufferTXN::changes, ReorderBuffer::context, SnapshotData::copied, ReorderBufferChange::data, data, dlist_push_tail(), HEAPTUPLESIZE, ReorderBufferChange::inval, ReorderBufferChange::invalidations, MemoryContextAlloc(), MemoryContextAllocZero(), ReorderBufferChange::message, ReorderBufferChange::message_size, ReorderBufferChange::msg, ReorderBufferTXN::nentries_mem, ReorderBufferChange::newtuple, ReorderBufferChange::ninvalidations, ReorderBufferChange::node, ReorderBufferChange::nrelids, ReorderBufferChange::oldtuple, ReorderBufferChange::prefix, ReorderBufferChange::relids, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferAllocChange(), ReorderBufferAllocRelids(), ReorderBufferAllocTupleBuf(), ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), SizeofHeapTupleHeader, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, HeapTupleData::t_data, ReorderBufferChange::tp, ReorderBufferChange::truncate, SnapshotData::xcnt, and SnapshotData::xip.

Referenced by ReorderBufferRestoreChanges().

◆ ReorderBufferRestoreChanges()

static Size ReorderBufferRestoreChanges ( ReorderBuffer rb,
ReorderBufferTXN txn,
TXNEntryFile file,
XLogSegNo segno 
)
static

Definition at line 4547 of file reorderbuffer.c.

4549{
4550 Size restored = 0;
4551 XLogSegNo last_segno;
4552 dlist_mutable_iter cleanup_iter;
4553 File *fd = &file->vfd;
4554
4557
4558 /* free current entries, so we have memory for more */
4559 dlist_foreach_modify(cleanup_iter, &txn->changes)
4560 {
4562 dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
4563
4564 dlist_delete(&cleanup->node);
4566 }
4567 txn->nentries_mem = 0;
4569
4570 XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
4571
4572 while (restored < max_changes_in_memory && *segno <= last_segno)
4573 {
4574 int readBytes;
4576
4578
4579 if (*fd == -1)
4580 {
4581 char path[MAXPGPATH];
4582
4583 /* first time in */
4584 if (*segno == 0)
4585 XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
4586
4587 Assert(*segno != 0 || dlist_is_empty(&txn->changes));
4588
4589 /*
4590 * No need to care about TLIs here, only used during a single run,
4591 * so each LSN only maps to a specific WAL record.
4592 */
4594 *segno);
4595
4596 *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
4597
4598 /* No harm in resetting the offset even in case of failure */
4599 file->curOffset = 0;
4600
4601 if (*fd < 0 && errno == ENOENT)
4602 {
4603 *fd = -1;
4604 (*segno)++;
4605 continue;
4606 }
4607 else if (*fd < 0)
4608 ereport(ERROR,
4610 errmsg("could not open file \"%s\": %m",
4611 path)));
4612 }
4613
4614 /*
4615 * Read the statically sized part of a change which has information
4616 * about the total size. If we couldn't read a record, we're at the
4617 * end of this file.
4618 */
4620 readBytes = FileRead(file->vfd, rb->outbuf,
4622 file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
4623
4624 /* eof */
4625 if (readBytes == 0)
4626 {
4627 FileClose(*fd);
4628 *fd = -1;
4629 (*segno)++;
4630 continue;
4631 }
4632 else if (readBytes < 0)
4633 ereport(ERROR,
4635 errmsg("could not read from reorderbuffer spill file: %m")));
4636 else if (readBytes != sizeof(ReorderBufferDiskChange))
4637 ereport(ERROR,
4639 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4640 readBytes,
4641 (uint32) sizeof(ReorderBufferDiskChange))));
4642
4643 file->curOffset += readBytes;
4644
4645 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4646
4648 sizeof(ReorderBufferDiskChange) + ondisk->size);
4649 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4650
4651 readBytes = FileRead(file->vfd,
4652 rb->outbuf + sizeof(ReorderBufferDiskChange),
4653 ondisk->size - sizeof(ReorderBufferDiskChange),
4654 file->curOffset,
4655 WAIT_EVENT_REORDER_BUFFER_READ);
4656
4657 if (readBytes < 0)
4658 ereport(ERROR,
4660 errmsg("could not read from reorderbuffer spill file: %m")));
4661 else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
4662 ereport(ERROR,
4664 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4665 readBytes,
4666 (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
4667
4668 file->curOffset += readBytes;
4669
4670 /*
4671 * ok, read a full change from disk, now restore it into proper
4672 * in-memory format
4673 */
4674 ReorderBufferRestoreChange(rb, txn, rb->outbuf);
4675 restored++;
4676 }
4677
4678 return restored;
4679}
static void cleanup(void)
Definition: bootstrap.c:715
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1576
static ssize_t FileRead(File file, void *buffer, size_t amount, pgoff_t offset, uint32 wait_event_info)
Definition: fd.h:198
int File
Definition: fd.h:51
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *data)
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
static const Size max_changes_in_memory
int wal_segment_size
Definition: xlog.c:145
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition: xlogdefs.h:52

References Assert(), ReorderBufferTXN::changes, CHECK_FOR_INTERRUPTS, cleanup(), dlist_mutable_iter::cur, TXNEntryFile::curOffset, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FileClose(), FileRead(), ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, max_changes_in_memory, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBuffer::outbuf, PathNameOpenFile(), PG_BINARY, ReorderBufferFreeChange(), ReorderBufferRestoreChange(), ReorderBufferSerializedPath(), ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, TXNEntryFile::vfd, wal_segment_size, ReorderBufferTXN::xid, XLByteToSeg, and XLogRecPtrIsValid.

Referenced by ReorderBufferIterTXNInit(), and ReorderBufferIterTXNNext().

◆ ReorderBufferRestoreCleanup()

static void ReorderBufferRestoreCleanup ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 4857 of file reorderbuffer.c.

4858{
4859 XLogSegNo first;
4860 XLogSegNo cur;
4861 XLogSegNo last;
4862
4865
4868
4869 /* iterate over all possible filenames, and delete them */
4870 for (cur = first; cur <= last; cur++)
4871 {
4872 char path[MAXPGPATH];
4873
4875 if (unlink(path) != 0 && errno != ENOENT)
4876 ereport(ERROR,
4878 errmsg("could not remove file \"%s\": %m", path)));
4879 }
4880}
struct cursor * cur
Definition: ecpg.c:29

References Assert(), cur, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, MAXPGPATH, MyReplicationSlot, ReorderBufferSerializedPath(), wal_segment_size, ReorderBufferTXN::xid, XLByteToSeg, and XLogRecPtrIsValid.

Referenced by ReorderBufferCleanupTXN(), and ReorderBufferTruncateTXN().

◆ ReorderBufferSaveTXNSnapshot()

static void ReorderBufferSaveTXNSnapshot ( ReorderBuffer rb,
ReorderBufferTXN txn,
Snapshot  snapshot_now,
CommandId  command_id 
)
inlinestatic

Definition at line 2120 of file reorderbuffer.c.

2122{
2123 txn->command_id = command_id;
2124
2125 /* Avoid copying if it's already copied. */
2126 if (snapshot_now->copied)
2127 txn->snapshot_now = snapshot_now;
2128 else
2129 txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2130 txn, command_id);
2131}

References ReorderBufferTXN::command_id, SnapshotData::copied, ReorderBufferCopySnap(), and ReorderBufferTXN::snapshot_now.

Referenced by ReorderBufferProcessTXN(), and ReorderBufferResetTXN().

◆ ReorderBufferSerializeChange()

static void ReorderBufferSerializeChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  fd,
ReorderBufferChange change 
)
static

Definition at line 4095 of file reorderbuffer.c.

4097{
4099 Size sz = sizeof(ReorderBufferDiskChange);
4100
4102
4103 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4104 memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
4105
4106 switch (change->action)
4107 {
4108 /* fall through these, they're all similar enough */
4113 {
4114 char *data;
4115 HeapTuple oldtup,
4116 newtup;
4117 Size oldlen = 0;
4118 Size newlen = 0;
4119
4120 oldtup = change->data.tp.oldtuple;
4121 newtup = change->data.tp.newtuple;
4122
4123 if (oldtup)
4124 {
4125 sz += sizeof(HeapTupleData);
4126 oldlen = oldtup->t_len;
4127 sz += oldlen;
4128 }
4129
4130 if (newtup)
4131 {
4132 sz += sizeof(HeapTupleData);
4133 newlen = newtup->t_len;
4134 sz += newlen;
4135 }
4136
4137 /* make sure we have enough space */
4139
4140 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
4141 /* might have been reallocated above */
4142 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4143
4144 if (oldlen)
4145 {
4146 memcpy(data, oldtup, sizeof(HeapTupleData));
4147 data += sizeof(HeapTupleData);
4148
4149 memcpy(data, oldtup->t_data, oldlen);
4150 data += oldlen;
4151 }
4152
4153 if (newlen)
4154 {
4155 memcpy(data, newtup, sizeof(HeapTupleData));
4156 data += sizeof(HeapTupleData);
4157
4158 memcpy(data, newtup->t_data, newlen);
4159 data += newlen;
4160 }
4161 break;
4162 }
4164 {
4165 char *data;
4166 Size prefix_size = strlen(change->data.msg.prefix) + 1;
4167
4168 sz += prefix_size + change->data.msg.message_size +
4169 sizeof(Size) + sizeof(Size);
4171
4172 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
4173
4174 /* might have been reallocated above */
4175 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4176
4177 /* write the prefix including the size */
4178 memcpy(data, &prefix_size, sizeof(Size));
4179 data += sizeof(Size);
4180 memcpy(data, change->data.msg.prefix,
4181 prefix_size);
4182 data += prefix_size;
4183
4184 /* write the message including the size */
4185 memcpy(data, &change->data.msg.message_size, sizeof(Size));
4186 data += sizeof(Size);
4187 memcpy(data, change->data.msg.message,
4188 change->data.msg.message_size);
4189 data += change->data.msg.message_size;
4190
4191 break;
4192 }
4194 {
4195 char *data;
4196 Size inval_size = sizeof(SharedInvalidationMessage) *
4197 change->data.inval.ninvalidations;
4198
4199 sz += inval_size;
4200
4202 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
4203
4204 /* might have been reallocated above */
4205 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4206 memcpy(data, change->data.inval.invalidations, inval_size);
4207 data += inval_size;
4208
4209 break;
4210 }
4212 {
4213 Snapshot snap;
4214 char *data;
4215
4216 snap = change->data.snapshot;
4217
4218 sz += sizeof(SnapshotData) +
4219 sizeof(TransactionId) * snap->xcnt +
4220 sizeof(TransactionId) * snap->subxcnt;
4221
4222 /* make sure we have enough space */
4224 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
4225 /* might have been reallocated above */
4226 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4227
4228 memcpy(data, snap, sizeof(SnapshotData));
4229 data += sizeof(SnapshotData);
4230
4231 if (snap->xcnt)
4232 {
4233 memcpy(data, snap->xip,
4234 sizeof(TransactionId) * snap->xcnt);
4235 data += sizeof(TransactionId) * snap->xcnt;
4236 }
4237
4238 if (snap->subxcnt)
4239 {
4240 memcpy(data, snap->subxip,
4241 sizeof(TransactionId) * snap->subxcnt);
4242 data += sizeof(TransactionId) * snap->subxcnt;
4243 }
4244 break;
4245 }
4247 {
4248 Size size;
4249 char *data;
4250
4251 /* account for the OIDs of truncated relations */
4252 size = sizeof(Oid) * change->data.truncate.nrelids;
4253 sz += size;
4254
4255 /* make sure we have enough space */
4257
4258 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
4259 /* might have been reallocated above */
4260 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4261
4262 memcpy(data, change->data.truncate.relids, size);
4263 data += size;
4264
4265 break;
4266 }
4271 /* ReorderBufferChange contains everything important */
4272 break;
4273 }
4274
4275 ondisk->size = sz;
4276
4277 errno = 0;
4278 pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
4279 if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
4280 {
4281 int save_errno = errno;
4282
4284
4285 /* if write didn't set errno, assume problem is no disk space */
4286 errno = save_errno ? save_errno : ENOSPC;
4287 ereport(ERROR,
4289 errmsg("could not write to data file for XID %u: %m",
4290 txn->xid)));
4291 }
4293
4294 /*
4295 * Keep the transaction's final_lsn up to date with each change we send to
4296 * disk, so that ReorderBufferRestoreCleanup works correctly. (We used to
4297 * only do this on commit and abort records, but that doesn't work if a
4298 * system crash leaves a transaction without its abort record).
4299 *
4300 * Make sure not to move it backwards.
4301 */
4302 if (txn->final_lsn < change->lsn)
4303 txn->final_lsn = change->lsn;
4304
4305 Assert(ondisk->change.action == change->action);
4306}
#define write(a, b, c)
Definition: win32.h:14
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:81

References ReorderBufferChange::action, Assert(), ReorderBufferDiskChange::change, CloseTransientFile(), ReorderBufferChange::data, data, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferTXN::final_lsn, if(), ReorderBufferChange::inval, ReorderBufferChange::invalidations, ReorderBufferChange::lsn, ReorderBufferChange::message, ReorderBufferChange::message_size, ReorderBufferChange::msg, ReorderBufferChange::newtuple, ReorderBufferChange::ninvalidations, ReorderBufferChange::nrelids, ReorderBufferChange::oldtuple, ReorderBuffer::outbuf, pgstat_report_wait_end(), pgstat_report_wait_start(), ReorderBufferChange::prefix, ReorderBufferChange::relids, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, write, SnapshotData::xcnt, ReorderBufferTXN::xid, and SnapshotData::xip.

Referenced by ReorderBufferSerializeTXN().

◆ ReorderBufferSerializedPath()

static void ReorderBufferSerializedPath ( char *  path,
ReplicationSlot slot,
TransactionId  xid,
XLogSegNo  segno 
)
static

Definition at line 4926 of file reorderbuffer.c.

4928{
4929 XLogRecPtr recptr;
4930
4931 XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
4932
4933 snprintf(path, MAXPGPATH, "%s/%s/xid-%u-lsn-%X-%X.spill",
4936 xid, LSN_FORMAT_ARGS(recptr));
4937}
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:47

References ReplicationSlot::data, LSN_FORMAT_ARGS, MAXPGPATH, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, PG_REPLSLOT_DIR, snprintf, wal_segment_size, and XLogSegNoOffsetToRecPtr.

Referenced by ReorderBufferRestoreChanges(), ReorderBufferRestoreCleanup(), and ReorderBufferSerializeTXN().

◆ ReorderBufferSerializeReserve()

static void ReorderBufferSerializeReserve ( ReorderBuffer rb,
Size  sz 
)
static

Definition at line 3775 of file reorderbuffer.c.

3776{
3777 if (!rb->outbufsize)
3778 {
3779 rb->outbuf = MemoryContextAlloc(rb->context, sz);
3780 rb->outbufsize = sz;
3781 }
3782 else if (rb->outbufsize < sz)
3783 {
3784 rb->outbuf = repalloc(rb->outbuf, sz);
3785 rb->outbufsize = sz;
3786 }
3787}

References ReorderBuffer::context, MemoryContextAlloc(), ReorderBuffer::outbuf, ReorderBuffer::outbufsize, and repalloc().

Referenced by ReorderBufferRestoreChanges(), and ReorderBufferSerializeChange().

◆ ReorderBufferSerializeTXN()

static void ReorderBufferSerializeTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 4000 of file reorderbuffer.c.

4001{
4002 dlist_iter subtxn_i;
4003 dlist_mutable_iter change_i;
4004 int fd = -1;
4005 XLogSegNo curOpenSegNo = 0;
4006 Size spilled = 0;
4007 Size size = txn->size;
4008
4009 elog(DEBUG2, "spill %u changes in XID %u to disk",
4010 (uint32) txn->nentries_mem, txn->xid);
4011
4012 /* do the same to all child TXs */
4013 dlist_foreach(subtxn_i, &txn->subtxns)
4014 {
4015 ReorderBufferTXN *subtxn;
4016
4017 subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
4018 ReorderBufferSerializeTXN(rb, subtxn);
4019 }
4020
4021 /* serialize changestream */
4022 dlist_foreach_modify(change_i, &txn->changes)
4023 {
4024 ReorderBufferChange *change;
4025
4026 change = dlist_container(ReorderBufferChange, node, change_i.cur);
4027
4028 /*
4029 * store in segment in which it belongs by start lsn, don't split over
4030 * multiple segments tho
4031 */
4032 if (fd == -1 ||
4033 !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
4034 {
4035 char path[MAXPGPATH];
4036
4037 if (fd != -1)
4039
4040 XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
4041
4042 /*
4043 * No need to care about TLIs here, only used during a single run,
4044 * so each LSN only maps to a specific WAL record.
4045 */
4047 curOpenSegNo);
4048
4049 /* open segment, create it if necessary */
4050 fd = OpenTransientFile(path,
4051 O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
4052
4053 if (fd < 0)
4054 ereport(ERROR,
4056 errmsg("could not open file \"%s\": %m", path)));
4057 }
4058
4059 ReorderBufferSerializeChange(rb, txn, fd, change);
4060 dlist_delete(&change->node);
4061 ReorderBufferFreeChange(rb, change, false);
4062
4063 spilled++;
4064 }
4065
4066 /* Update the memory counter */
4067 ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, size);
4068
4069 /* update the statistics iff we have spilled anything */
4070 if (spilled)
4071 {
4072 rb->spillCount += 1;
4073 rb->spillBytes += size;
4074
4075 /* don't consider already serialized transactions */
4076 rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
4077
4078 /* update the decoding stats */
4080 }
4081
4082 Assert(spilled == txn->nentries_mem);
4084 txn->nentries_mem = 0;
4086
4087 if (fd != -1)
4089}
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
#define rbtxn_is_serialized_clear(txn)
#define RBTXN_IS_SERIALIZED
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)

References Assert(), ReorderBufferTXN::changes, CloseTransientFile(), dlist_iter::cur, dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_delete(), dlist_foreach, dlist_foreach_modify, dlist_is_empty(), elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferChange::lsn, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, OpenTransientFile(), PG_BINARY, ReorderBuffer::private_data, RBTXN_IS_SERIALIZED, rbtxn_is_serialized, rbtxn_is_serialized_clear, ReorderBufferChangeMemoryUpdate(), ReorderBufferFreeChange(), ReorderBufferSerializeChange(), ReorderBufferSerializedPath(), ReorderBufferSerializeTXN(), ReorderBufferTXN::size, ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBufferTXN::subtxns, ReorderBufferTXN::txn_flags, UpdateDecodingStats(), wal_segment_size, ReorderBufferTXN::xid, XLByteInSeg, and XLByteToSeg.

Referenced by ReorderBufferCheckMemoryLimit(), ReorderBufferIterTXNInit(), and ReorderBufferSerializeTXN().

◆ ReorderBufferSetBaseSnapshot()

void ReorderBufferSetBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 3326 of file reorderbuffer.c.

3328{
3329 ReorderBufferTXN *txn;
3330 bool is_new;
3331
3332 Assert(snap != NULL);
3333
3334 /*
3335 * Fetch the transaction to operate on. If we know it's a subtransaction,
3336 * operate on its top-level transaction instead.
3337 */
3338 txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
3339 if (rbtxn_is_known_subxact(txn))
3340 txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3341 NULL, InvalidXLogRecPtr, false);
3342 Assert(txn->base_snapshot == NULL);
3343
3344 txn->base_snapshot = snap;
3345 txn->base_snapshot_lsn = lsn;
3347
3349}

References Assert(), AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBufferTXN::base_snapshot_node, dlist_push_tail(), InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), ReorderBufferTXN::toplevel_xid, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by SnapBuildCommitTxn(), and SnapBuildProcessChange().

◆ ReorderBufferSetRestartPoint()

void ReorderBufferSetRestartPoint ( ReorderBuffer rb,
XLogRecPtr  ptr 
)

Definition at line 1087 of file reorderbuffer.c.

1088{
1090}

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

◆ ReorderBufferSkipPrepare()

void ReorderBufferSkipPrepare ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 2939 of file reorderbuffer.c.

2940{
2941 ReorderBufferTXN *txn;
2942
2943 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2944
2945 /* unknown transaction, nothing to do */
2946 if (txn == NULL)
2947 return;
2948
2949 /* txn must have been marked as a prepared transaction */
2952}

References Assert(), InvalidXLogRecPtr, RBTXN_IS_PREPARED, RBTXN_PREPARE_STATUS_MASK, RBTXN_SKIPPED_PREPARE, ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by DecodePrepare().

◆ ReorderBufferStreamCommit()

static void ReorderBufferStreamCommit ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1983 of file reorderbuffer.c.

1984{
1985 /* we should only call this for previously streamed transactions */
1987
1988 ReorderBufferStreamTXN(rb, txn);
1989
1990 if (rbtxn_is_prepared(txn))
1991 {
1992 /*
1993 * Note, we send stream prepare even if a concurrent abort is
1994 * detected. See DecodePrepare for more information.
1995 */
1997 rb->stream_prepare(rb, txn, txn->final_lsn);
1999
2000 /*
2001 * This is a PREPARED transaction, part of a two-phase commit. The
2002 * full cleanup will happen as part of the COMMIT PREPAREDs, so now
2003 * just truncate txn by removing changes and tuplecids.
2004 */
2005 ReorderBufferTruncateTXN(rb, txn, true);
2006 /* Reset the CheckXidAlive */
2008 }
2009 else
2010 {
2011 rb->stream_commit(rb, txn, txn->final_lsn);
2012 ReorderBufferCleanupTXN(rb, txn);
2013 }
2014}
ReorderBufferStreamPrepareCB stream_prepare
ReorderBufferStreamCommitCB stream_commit

References Assert(), CheckXidAlive, ReorderBufferTXN::final_lsn, InvalidTransactionId, rbtxn_is_prepared, rbtxn_is_streamed, RBTXN_SENT_PREPARE, rbtxn_sent_prepare, ReorderBufferCleanupTXN(), ReorderBufferStreamTXN(), ReorderBufferTruncateTXN(), ReorderBuffer::stream_commit, ReorderBuffer::stream_prepare, and ReorderBufferTXN::txn_flags.

Referenced by ReorderBufferReplay().

◆ ReorderBufferStreamTXN()

static void ReorderBufferStreamTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 4345 of file reorderbuffer.c.

4346{
4347 Snapshot snapshot_now;
4348 CommandId command_id;
4349 Size stream_bytes;
4350 bool txn_is_streamed;
4351
4352 /* We can never reach here for a subtransaction. */
4353 Assert(rbtxn_is_toptxn(txn));
4354
4355 /*
4356 * We can't make any assumptions about base snapshot here, similar to what
4357 * ReorderBufferCommit() does. That relies on base_snapshot getting
4358 * transferred from subxact in ReorderBufferCommitChild(), but that was
4359 * not yet called as the transaction is in-progress.
4360 *
4361 * So just walk the subxacts and use the same logic here. But we only need
4362 * to do that once, when the transaction is streamed for the first time.
4363 * After that we need to reuse the snapshot from the previous run.
4364 *
4365 * Unlike DecodeCommit which adds xids of all the subtransactions in
4366 * snapshot's xip array via SnapBuildCommitTxn, we can't do that here but
4367 * we do add them to subxip array instead via ReorderBufferCopySnap. This
4368 * allows the catalog changes made in subtransactions decoded till now to
4369 * be visible.
4370 */
4371 if (txn->snapshot_now == NULL)
4372 {
4373 dlist_iter subxact_i;
4374
4375 /* make sure this transaction is streamed for the first time */
4377
4378 /* at the beginning we should have invalid command ID */
4380
4381 dlist_foreach(subxact_i, &txn->subtxns)
4382 {
4383 ReorderBufferTXN *subtxn;
4384
4385 subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
4387 }
4388
4389 /*
4390 * If this transaction has no snapshot, it didn't make any changes to
4391 * the database till now, so there's nothing to decode.
4392 */
4393 if (txn->base_snapshot == NULL)
4394 {
4395 Assert(txn->ninvalidations == 0);
4396 return;
4397 }
4398
4399 command_id = FirstCommandId;
4400 snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
4401 txn, command_id);
4402 }
4403 else
4404 {
4405 /* the transaction must have been already streamed */
4407
4408 /*
4409 * Nah, we already have snapshot from the previous streaming run. We
4410 * assume new subxacts can't move the LSN backwards, and so can't beat
4411 * the LSN condition in the previous branch (so no need to walk
4412 * through subxacts again). In fact, we must not do that as we may be
4413 * using snapshot half-way through the subxact.
4414 */
4415 command_id = txn->command_id;
4416
4417 /*
4418 * We can't use txn->snapshot_now directly because after the last
4419 * streaming run, we might have got some new sub-transactions. So we
4420 * need to add them to the snapshot.
4421 */
4422 snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now,
4423 txn, command_id);
4424
4425 /* Free the previously copied snapshot. */
4426 Assert(txn->snapshot_now->copied);
4428 txn->snapshot_now = NULL;
4429 }
4430
4431 /*
4432 * Remember this information to be used later to update stats. We can't
4433 * update the stats here as an error while processing the changes would
4434 * lead to the accumulation of stats even though we haven't streamed all
4435 * the changes.
4436 */
4437 txn_is_streamed = rbtxn_is_streamed(txn);
4438 stream_bytes = txn->total_size;
4439
4440 /* Process and send the changes to output plugin. */
4441 ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
4442 command_id, true);
4443
4444 rb->streamCount += 1;
4445 rb->streamBytes += stream_bytes;
4446
4447 /* Don't consider already streamed transaction. */
4448 rb->streamTxns += (txn_is_streamed) ? 0 : 1;
4449
4450 /* update the decoding stats */
4452
4454 Assert(txn->nentries == 0);
4455 Assert(txn->nentries_mem == 0);
4456}

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::changes, ReorderBufferTXN::command_id, SnapshotData::copied, dlist_iter::cur, dlist_container, dlist_foreach, dlist_is_empty(), FirstCommandId, InvalidCommandId, InvalidXLogRecPtr, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferTXN::ninvalidations, ReorderBuffer::private_data, rbtxn_is_streamed, rbtxn_is_toptxn, ReorderBufferCopySnap(), ReorderBufferFreeSnap(), ReorderBufferProcessTXN(), ReorderBufferTransferSnapToParent(), ReorderBufferTXN::snapshot_now, ReorderBuffer::streamBytes, ReorderBuffer::streamCount, ReorderBuffer::streamTxns, ReorderBufferTXN::subtxns, ReorderBufferTXN::total_size, and UpdateDecodingStats().

Referenced by ReorderBufferCheckMemoryLimit(), ReorderBufferProcessPartialChange(), and ReorderBufferStreamCommit().

◆ ReorderBufferToastAppendChunk()

static void ReorderBufferToastAppendChunk ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 4998 of file reorderbuffer.c.

5000{
5002 HeapTuple newtup;
5003 bool found;
5004 int32 chunksize;
5005 bool isnull;
5006 Pointer chunk;
5007 TupleDesc desc = RelationGetDescr(relation);
5008 Oid chunk_id;
5009 int32 chunk_seq;
5010
5011 if (txn->toast_hash == NULL)
5013
5014 Assert(IsToastRelation(relation));
5015
5016 newtup = change->data.tp.newtuple;
5017 chunk_id = DatumGetObjectId(fastgetattr(newtup, 1, desc, &isnull));
5018 Assert(!isnull);
5019 chunk_seq = DatumGetInt32(fastgetattr(newtup, 2, desc, &isnull));
5020 Assert(!isnull);
5021
5022 ent = (ReorderBufferToastEnt *)
5023 hash_search(txn->toast_hash, &chunk_id, HASH_ENTER, &found);
5024
5025 if (!found)
5026 {
5027 Assert(ent->chunk_id == chunk_id);
5028 ent->num_chunks = 0;
5029 ent->last_chunk_seq = 0;
5030 ent->size = 0;
5031 ent->reconstructed = NULL;
5032 dlist_init(&ent->chunks);
5033
5034 if (chunk_seq != 0)
5035 elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
5036 chunk_seq, chunk_id);
5037 }
5038 else if (found && chunk_seq != ent->last_chunk_seq + 1)
5039 elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
5040 chunk_seq, chunk_id, ent->last_chunk_seq + 1);
5041
5042 chunk = DatumGetPointer(fastgetattr(newtup, 3, desc, &isnull));
5043 Assert(!isnull);
5044
5045 /* calculate size so we can allocate the right size at once later */
5046 if (!VARATT_IS_EXTENDED(chunk))
5047 chunksize = VARSIZE(chunk) - VARHDRSZ;
5048 else if (VARATT_IS_SHORT(chunk))
5049 /* could happen due to heap_form_tuple doing its thing */
5050 chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
5051 else
5052 elog(ERROR, "unexpected type of toast chunk");
5053
5054 ent->size += chunksize;
5055 ent->last_chunk_seq = chunk_seq;
5056 ent->num_chunks++;
5057 dlist_push_tail(&ent->chunks, &change->node);
5058}
#define VARHDRSZ
Definition: c.h:700
void * Pointer
Definition: c.h:532
static Datum fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull)
Definition: htup_details.h:861
static Oid DatumGetObjectId(Datum X)
Definition: postgres.h:252
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:322
#define RelationGetDescr(relation)
Definition: rel.h:541
static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
struct varlena * reconstructed
#define VARHDRSZ_SHORT
Definition: varatt.h:278
static bool VARATT_IS_SHORT(const void *PTR)
Definition: varatt.h:403
static bool VARATT_IS_EXTENDED(const void *PTR)
Definition: varatt.h:410
static Size VARSIZE(const void *PTR)
Definition: varatt.h:298
static Size VARSIZE_SHORT(const void *PTR)
Definition: varatt.h:312

References Assert(), ReorderBufferToastEnt::chunk_id, ReorderBufferToastEnt::chunks, ReorderBufferChange::data, DatumGetInt32(), DatumGetObjectId(), DatumGetPointer(), dlist_init(), dlist_push_tail(), elog, ERROR, fastgetattr(), HASH_ENTER, hash_search(), IsToastRelation(), ReorderBufferToastEnt::last_chunk_seq, ReorderBufferChange::newtuple, ReorderBufferChange::node, ReorderBufferToastEnt::num_chunks, ReorderBufferToastEnt::reconstructed, RelationGetDescr, ReorderBufferToastInitHash(), ReorderBufferToastEnt::size, ReorderBufferTXN::toast_hash, ReorderBufferChange::tp, VARATT_IS_EXTENDED(), VARATT_IS_SHORT(), VARHDRSZ, VARHDRSZ_SHORT, VARSIZE(), and VARSIZE_SHORT().

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferToastInitHash()

static void ReorderBufferToastInitHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 4978 of file reorderbuffer.c.

4979{
4980 HASHCTL hash_ctl;
4981
4982 Assert(txn->toast_hash == NULL);
4983
4984 hash_ctl.keysize = sizeof(Oid);
4985 hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
4986 hash_ctl.hcxt = rb->context;
4987 txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
4989}
struct ReorderBufferToastEnt ReorderBufferToastEnt

References Assert(), ReorderBuffer::context, HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, and ReorderBufferTXN::toast_hash.

Referenced by ReorderBufferToastAppendChunk().

◆ ReorderBufferToastReplace()

static void ReorderBufferToastReplace ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 5081 of file reorderbuffer.c.

5083{
5084 TupleDesc desc;
5085 int natt;
5086 Datum *attrs;
5087 bool *isnull;
5088 bool *free;
5089 HeapTuple tmphtup;
5090 Relation toast_rel;
5091 TupleDesc toast_desc;
5092 MemoryContext oldcontext;
5093 HeapTuple newtup;
5094 Size old_size;
5095
5096 /* no toast tuples changed */
5097 if (txn->toast_hash == NULL)
5098 return;
5099
5100 /*
5101 * We're going to modify the size of the change. So, to make sure the
5102 * accounting is correct we record the current change size and then after
5103 * re-computing the change we'll subtract the recorded size and then
5104 * re-add the new change size at the end. We don't immediately subtract
5105 * the old size because if there is any error before we add the new size,
5106 * we will release the changes and that will update the accounting info
5107 * (subtracting the size from the counters). And we don't want to
5108 * underflow there.
5109 */
5110 old_size = ReorderBufferChangeSize(change);
5111
5112 oldcontext = MemoryContextSwitchTo(rb->context);
5113
5114 /* we should only have toast tuples in an INSERT or UPDATE */
5115 Assert(change->data.tp.newtuple);
5116
5117 desc = RelationGetDescr(relation);
5118
5119 toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
5120 if (!RelationIsValid(toast_rel))
5121 elog(ERROR, "could not open toast relation with OID %u (base relation \"%s\")",
5122 relation->rd_rel->reltoastrelid, RelationGetRelationName(relation));
5123
5124 toast_desc = RelationGetDescr(toast_rel);
5125
5126 /* should we allocate from stack instead? */
5127 attrs = palloc0(sizeof(Datum) * desc->natts);
5128 isnull = palloc0(sizeof(bool) * desc->natts);
5129 free = palloc0(sizeof(bool) * desc->natts);
5130
5131 newtup = change->data.tp.newtuple;
5132
5133 heap_deform_tuple(newtup, desc, attrs, isnull);
5134
5135 for (natt = 0; natt < desc->natts; natt++)
5136 {
5137 CompactAttribute *attr = TupleDescCompactAttr(desc, natt);
5139 struct varlena *varlena;
5140
5141 /* va_rawsize is the size of the original datum -- including header */
5142 struct varatt_external toast_pointer;
5143 struct varatt_indirect redirect_pointer;
5144 struct varlena *new_datum = NULL;
5145 struct varlena *reconstructed;
5146 dlist_iter it;
5147 Size data_done = 0;
5148
5149 if (attr->attisdropped)
5150 continue;
5151
5152 /* not a varlena datatype */
5153 if (attr->attlen != -1)
5154 continue;
5155
5156 /* no data */
5157 if (isnull[natt])
5158 continue;
5159
5160 /* ok, we know we have a toast datum */
5161 varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
5162
5163 /* no need to do anything if the tuple isn't external */
5165 continue;
5166
5167 VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
5168
5169 /*
5170 * Check whether the toast tuple changed, replace if so.
5171 */
5172 ent = (ReorderBufferToastEnt *)
5174 &toast_pointer.va_valueid,
5175 HASH_FIND,
5176 NULL);
5177 if (ent == NULL)
5178 continue;
5179
5180 new_datum =
5182
5183 free[natt] = true;
5184
5185 reconstructed = palloc0(toast_pointer.va_rawsize);
5186
5187 ent->reconstructed = reconstructed;
5188
5189 /* stitch toast tuple back together from its parts */
5190 dlist_foreach(it, &ent->chunks)
5191 {
5192 bool cisnull;
5193 ReorderBufferChange *cchange;
5194 HeapTuple ctup;
5195 Pointer chunk;
5196
5197 cchange = dlist_container(ReorderBufferChange, node, it.cur);
5198 ctup = cchange->data.tp.newtuple;
5199 chunk = DatumGetPointer(fastgetattr(ctup, 3, toast_desc, &cisnull));
5200
5201 Assert(!cisnull);
5202 Assert(!VARATT_IS_EXTERNAL(chunk));
5203 Assert(!VARATT_IS_SHORT(chunk));
5204
5205 memcpy(VARDATA(reconstructed) + data_done,
5206 VARDATA(chunk),
5207 VARSIZE(chunk) - VARHDRSZ);
5208 data_done += VARSIZE(chunk) - VARHDRSZ;
5209 }
5210 Assert(data_done == VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer));
5211
5212 /* make sure its marked as compressed or not */
5213 if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
5214 SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
5215 else
5216 SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
5217
5218 memset(&redirect_pointer, 0, sizeof(redirect_pointer));
5219 redirect_pointer.pointer = reconstructed;
5220
5222 memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
5223 sizeof(redirect_pointer));
5224
5225 attrs[natt] = PointerGetDatum(new_datum);
5226 }
5227
5228 /*
5229 * Build tuple in separate memory & copy tuple back into the tuplebuf
5230 * passed to the output plugin. We can't directly heap_fill_tuple() into
5231 * the tuplebuf because attrs[] will point back into the current content.
5232 */
5233 tmphtup = heap_form_tuple(desc, attrs, isnull);
5234 Assert(newtup->t_len <= MaxHeapTupleSize);
5235 Assert(newtup->t_data == (HeapTupleHeader) ((char *) newtup + HEAPTUPLESIZE));
5236
5237 memcpy(newtup->t_data, tmphtup->t_data, tmphtup->t_len);
5238 newtup->t_len = tmphtup->t_len;
5239
5240 /*
5241 * free resources we won't further need, more persistent stuff will be
5242 * free'd in ReorderBufferToastReset().
5243 */
5244 RelationClose(toast_rel);
5245 pfree(tmphtup);
5246 for (natt = 0; natt < desc->natts; natt++)
5247 {
5248 if (free[natt])
5249 pfree(DatumGetPointer(attrs[natt]));
5250 }
5251 pfree(attrs);
5252 pfree(free);
5253 pfree(isnull);
5254
5255 MemoryContextSwitchTo(oldcontext);
5256
5257 /* subtract the old change size */
5258 ReorderBufferChangeMemoryUpdate(rb, change, NULL, false, old_size);
5259 /* now add the change back, with the correct size */
5260 ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
5261 ReorderBufferChangeSize(change));
5262}
#define INDIRECT_POINTER_SIZE
Definition: detoast.h:34
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
Definition: detoast.h:22
#define free(a)
Definition: header.h:65
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1117
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:1346
#define MaxHeapTupleSize
Definition: htup_details.h:610
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:332
uint64_t Datum
Definition: postgres.h:70
#define RelationGetRelationName(relation)
Definition: rel.h:549
bool attisdropped
Definition: tupdesc.h:77
int16 attlen
Definition: tupdesc.h:71
Definition: c.h:695
static CompactAttribute * TupleDescCompactAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:175
static void SET_VARSIZE_COMPRESSED(void *PTR, Size len)
Definition: varatt.h:446
static Size VARATT_EXTERNAL_GET_EXTSIZE(struct varatt_external toast_pointer)
Definition: varatt.h:507
static bool VARATT_IS_EXTERNAL(const void *PTR)
Definition: varatt.h:354
static char * VARDATA_EXTERNAL(const void *PTR)
Definition: varatt.h:340
static char * VARDATA(const void *PTR)
Definition: varatt.h:305
static void SET_VARTAG_EXTERNAL(void *PTR, vartag_external tag)
Definition: varatt.h:453
@ VARTAG_INDIRECT
Definition: varatt.h:86
static bool VARATT_EXTERNAL_IS_COMPRESSED(struct varatt_external toast_pointer)
Definition: varatt.h:536
static void SET_VARSIZE(void *PTR, Size len)
Definition: varatt.h:432

References Assert(), CompactAttribute::attisdropped, CompactAttribute::attlen, ReorderBufferToastEnt::chunks, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, DatumGetPointer(), dlist_container, dlist_foreach, elog, ERROR, fastgetattr(), free, HASH_FIND, hash_search(), heap_deform_tuple(), heap_form_tuple(), HEAPTUPLESIZE, INDIRECT_POINTER_SIZE, MaxHeapTupleSize, MemoryContextSwitchTo(), TupleDescData::natts, ReorderBufferChange::newtuple, palloc0(), pfree(), varatt_indirect::pointer, PointerGetDatum(), RelationData::rd_rel, ReorderBufferToastEnt::reconstructed, RelationClose(), RelationGetDescr, RelationGetRelationName, RelationIdGetRelation(), RelationIsValid, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), SET_VARSIZE(), SET_VARSIZE_COMPRESSED(), SET_VARTAG_EXTERNAL(), HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferTXN::toast_hash, ReorderBufferChange::tp, TupleDescCompactAttr(), varatt_external::va_rawsize, varatt_external::va_valueid, VARATT_EXTERNAL_GET_EXTSIZE(), VARATT_EXTERNAL_GET_POINTER, VARATT_EXTERNAL_IS_COMPRESSED(), VARATT_IS_EXTERNAL(), VARATT_IS_SHORT(), VARDATA(), VARDATA_EXTERNAL(), VARHDRSZ, VARSIZE(), and VARTAG_INDIRECT.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferToastReset()

static void ReorderBufferToastReset ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 5268 of file reorderbuffer.c.

5269{
5270 HASH_SEQ_STATUS hstat;
5272
5273 if (txn->toast_hash == NULL)
5274 return;
5275
5276 /* sequentially walk over the hash and free everything */
5277 hash_seq_init(&hstat, txn->toast_hash);
5278 while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
5279 {
5281
5282 if (ent->reconstructed != NULL)
5283 pfree(ent->reconstructed);
5284
5285 dlist_foreach_modify(it, &ent->chunks)
5286 {
5287 ReorderBufferChange *change =
5289
5290 dlist_delete(&change->node);
5291 ReorderBufferFreeChange(rb, change, true);
5292 }
5293 }
5294
5296 txn->toast_hash = NULL;
5297}
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1415
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1380

References ReorderBufferToastEnt::chunks, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, hash_destroy(), hash_seq_init(), hash_seq_search(), ReorderBufferChange::node, pfree(), ReorderBufferToastEnt::reconstructed, ReorderBufferFreeChange(), and ReorderBufferTXN::toast_hash.

Referenced by ReorderBufferCheckAndTruncateAbortedTXN(), ReorderBufferFreeTXN(), ReorderBufferProcessTXN(), and ReorderBufferResetTXN().

◆ ReorderBufferTransferSnapToParent()

static void ReorderBufferTransferSnapToParent ( ReorderBufferTXN txn,
ReorderBufferTXN subtxn 
)
static

Definition at line 1165 of file reorderbuffer.c.

1167{
1168 Assert(subtxn->toplevel_xid == txn->xid);
1169
1170 if (subtxn->base_snapshot != NULL)
1171 {
1172 if (txn->base_snapshot == NULL ||
1173 subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
1174 {
1175 /*
1176 * If the toplevel transaction already has a base snapshot but
1177 * it's newer than the subxact's, purge it.
1178 */
1179 if (txn->base_snapshot != NULL)
1180 {
1183 }
1184
1185 /*
1186 * The snapshot is now the top transaction's; transfer it, and
1187 * adjust the list position of the top transaction in the list by
1188 * moving it to where the subtransaction is.
1189 */
1190 txn->base_snapshot = subtxn->base_snapshot;
1191 txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
1193 &txn->base_snapshot_node);
1194
1195 /*
1196 * The subtransaction doesn't have a snapshot anymore (so it
1197 * mustn't be in the list.)
1198 */
1199 subtxn->base_snapshot = NULL;
1202 }
1203 else
1204 {
1205 /* Base snap of toplevel is fine, so subxact's is not needed */
1208 subtxn->base_snapshot = NULL;
1210 }
1211 }
1212}
static void dlist_insert_before(dlist_node *before, dlist_node *node)
Definition: ilist.h:393

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBufferTXN::base_snapshot_node, dlist_delete(), dlist_insert_before(), InvalidXLogRecPtr, SnapBuildSnapDecRefcount(), ReorderBufferTXN::toplevel_xid, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAssignChild(), and ReorderBufferStreamTXN().

◆ ReorderBufferTruncateTXN()

static void ReorderBufferTruncateTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
bool  txn_prepared 
)
static

Definition at line 1656 of file reorderbuffer.c.

1657{
1658 dlist_mutable_iter iter;
1659 Size mem_freed = 0;
1660
1661 /* cleanup subtransactions & their changes */
1662 dlist_foreach_modify(iter, &txn->subtxns)
1663 {
1664 ReorderBufferTXN *subtxn;
1665
1666 subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1667
1668 /*
1669 * Subtransactions are always associated to the toplevel TXN, even if
1670 * they originally were happening inside another subtxn, so we won't
1671 * ever recurse more than one level deep here.
1672 */
1674 Assert(subtxn->nsubtxns == 0);
1675
1677 ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
1678 }
1679
1680 /* cleanup changes in the txn */
1681 dlist_foreach_modify(iter, &txn->changes)
1682 {
1683 ReorderBufferChange *change;
1684
1685 change = dlist_container(ReorderBufferChange, node, iter.cur);
1686
1687 /* Check we're not mixing changes from different transactions. */
1688 Assert(change->txn == txn);
1689
1690 /* remove the change from its containing list */
1691 dlist_delete(&change->node);
1692
1693 /*
1694 * Instead of updating the memory counter for individual changes, we
1695 * sum up the size of memory to free so we can update the memory
1696 * counter all together below. This saves costs of maintaining the
1697 * max-heap.
1698 */
1699 mem_freed += ReorderBufferChangeSize(change);
1700
1701 ReorderBufferFreeChange(rb, change, false);
1702 }
1703
1704 /* Update the memory counter */
1705 ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
1706
1707 if (txn_prepared)
1708 {
1709 /*
1710 * If this is a prepared txn, cleanup the tuplecids we stored for
1711 * decoding catalog snapshot access. They are always stored in the
1712 * toplevel transaction.
1713 */
1714 dlist_foreach_modify(iter, &txn->tuplecids)
1715 {
1716 ReorderBufferChange *change;
1717
1718 change = dlist_container(ReorderBufferChange, node, iter.cur);
1719
1720 /* Check we're not mixing changes from different transactions. */
1721 Assert(change->txn == txn);
1723
1724 /* Remove the change from its containing list. */
1725 dlist_delete(&change->node);
1726
1727 ReorderBufferFreeChange(rb, change, true);
1728 }
1729 }
1730
1731 /*
1732 * Destroy the (relfilelocator, ctid) hashtable, so that we don't leak any
1733 * memory. We could also keep the hash table and update it with new ctid
1734 * values, but this seems simpler and good enough for now.
1735 */
1736 if (txn->tuplecid_hash != NULL)
1737 {
1739 txn->tuplecid_hash = NULL;
1740 }
1741
1742 /* If this txn is serialized then clean the disk space. */
1743 if (rbtxn_is_serialized(txn))
1744 {
1746 txn->txn_flags &= ~RBTXN_IS_SERIALIZED;
1747
1748 /*
1749 * We set this flag to indicate if the transaction is ever serialized.
1750 * We need this to accurately update the stats as otherwise the same
1751 * transaction can be counted as serialized multiple times.
1752 */
1754 }
1755
1756 /* also reset the number of entries in the transaction */
1757 txn->nentries_mem = 0;
1758 txn->nentries = 0;
1759}
#define RBTXN_IS_SERIALIZED_CLEAR

References ReorderBufferChange::action, Assert(), ReorderBufferTXN::changes, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, hash_destroy(), ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, rbtxn_is_serialized, RBTXN_IS_SERIALIZED_CLEAR, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferFreeChange(), ReorderBufferMaybeMarkTXNStreamed(), ReorderBufferRestoreCleanup(), ReorderBufferTruncateTXN(), ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecid_hash, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXN::txn_flags.

Referenced by ReorderBufferCheckAndTruncateAbortedTXN(), ReorderBufferProcessTXN(), ReorderBufferResetTXN(), ReorderBufferStreamCommit(), and ReorderBufferTruncateTXN().

◆ ReorderBufferTXNByXid()

static ReorderBufferTXN * ReorderBufferTXNByXid ( ReorderBuffer rb,
TransactionId  xid,
bool  create,
bool *  is_new,
XLogRecPtr  lsn,
bool  create_as_top 
)
static

Definition at line 653 of file reorderbuffer.c.

655{
656 ReorderBufferTXN *txn;
658 bool found;
659
661
662 /*
663 * Check the one-entry lookup cache first
664 */
666 rb->by_txn_last_xid == xid)
667 {
668 txn = rb->by_txn_last_txn;
669
670 if (txn != NULL)
671 {
672 /* found it, and it's valid */
673 if (is_new)
674 *is_new = false;
675 return txn;
676 }
677
678 /*
679 * cached as non-existent, and asked not to create? Then nothing else
680 * to do.
681 */
682 if (!create)
683 return NULL;
684 /* otherwise fall through to create it */
685 }
686
687 /*
688 * If the cache wasn't hit or it yielded a "does-not-exist" and we want to
689 * create an entry.
690 */
691
692 /* search the lookup table */
695 &xid,
696 create ? HASH_ENTER : HASH_FIND,
697 &found);
698 if (found)
699 txn = ent->txn;
700 else if (create)
701 {
702 /* initialize the new entry, if creation was requested */
703 Assert(ent != NULL);
705
706 ent->txn = ReorderBufferAllocTXN(rb);
707 ent->txn->xid = xid;
708 txn = ent->txn;
709 txn->first_lsn = lsn;
711
712 if (create_as_top)
713 {
716 }
717 }
718 else
719 txn = NULL; /* not found and not asked to create */
720
721 /* update cache */
722 rb->by_txn_last_xid = xid;
723 rb->by_txn_last_txn = txn;
724
725 if (is_new)
726 *is_new = !found;
727
728 Assert(!create || txn != NULL);
729 return txn;
730}
static ReorderBufferTXN * ReorderBufferAllocTXN(ReorderBuffer *rb)
ReorderBufferTXN * txn
XLogRecPtr restart_decoding_lsn
#define TransactionIdIsValid(xid)
Definition: transam.h:41

References Assert(), AssertTXNLsnOrder(), ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::current_restart_decoding_lsn, dlist_push_tail(), ReorderBufferTXN::first_lsn, HASH_ENTER, HASH_FIND, hash_search(), ReorderBufferTXN::node, ReorderBufferAllocTXN(), ReorderBufferTXN::restart_decoding_lsn, ReorderBuffer::toplevel_by_lsn, TransactionIdIsValid, ReorderBufferTXNByIdEnt::txn, ReorderBufferTXN::xid, and XLogRecPtrIsValid.

Referenced by ReorderBufferAbort(), ReorderBufferAddDistributedInvalidations(), ReorderBufferAddInvalidations(), ReorderBufferAddNewTupleCids(), ReorderBufferAssignChild(), ReorderBufferCommit(), ReorderBufferCommitChild(), ReorderBufferFinishPrepared(), ReorderBufferForget(), ReorderBufferGetInvalidations(), ReorderBufferInvalidate(), ReorderBufferPrepare(), ReorderBufferProcessXid(), ReorderBufferQueueChange(), ReorderBufferQueueMessage(), ReorderBufferRememberPrepareInfo(), ReorderBufferSetBaseSnapshot(), ReorderBufferSkipPrepare(), ReorderBufferXidHasBaseSnapshot(), ReorderBufferXidHasCatalogChanges(), and ReorderBufferXidSetCatalogChanges().

◆ ReorderBufferTXNSizeCompare()

static int ReorderBufferTXNSizeCompare ( const pairingheap_node a,
const pairingheap_node b,
void *  arg 
)
static

Definition at line 3792 of file reorderbuffer.c.

3793{
3796
3797 if (ta->size < tb->size)
3798 return -1;
3799 if (ta->size > tb->size)
3800 return 1;
3801 return 0;
3802}
#define pairingheap_const_container(type, membername, ptr)
Definition: pairingheap.h:51

References a, b, pairingheap_const_container, and ReorderBufferTXN::size.

Referenced by ReorderBufferAllocate().

◆ ReorderBufferXidHasBaseSnapshot()

bool ReorderBufferXidHasBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 3745 of file reorderbuffer.c.

3746{
3747 ReorderBufferTXN *txn;
3748
3749 txn = ReorderBufferTXNByXid(rb, xid, false,
3750 NULL, InvalidXLogRecPtr, false);
3751
3752 /* transaction isn't known yet, ergo no snapshot */
3753 if (txn == NULL)
3754 return false;
3755
3756 /* a known subtxn? operate on top-level txn instead */
3757 if (rbtxn_is_known_subxact(txn))
3758 txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3759 NULL, InvalidXLogRecPtr, false);
3760
3761 return txn->base_snapshot != NULL;
3762}

References ReorderBufferTXN::base_snapshot, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), and ReorderBufferTXN::toplevel_xid.

Referenced by SnapBuildCommitTxn(), SnapBuildDistributeSnapshotAndInval(), and SnapBuildProcessChange().

◆ ReorderBufferXidHasCatalogChanges()

bool ReorderBufferXidHasCatalogChanges ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 3728 of file reorderbuffer.c.

3729{
3730 ReorderBufferTXN *txn;
3731
3732 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3733 false);
3734 if (txn == NULL)
3735 return false;
3736
3737 return rbtxn_has_catalog_changes(txn);
3738}

References InvalidXLogRecPtr, rbtxn_has_catalog_changes, and ReorderBufferTXNByXid().

Referenced by SnapBuildXidHasCatalogChanges().

◆ ReorderBufferXidSetCatalogChanges()

void ReorderBufferXidSetCatalogChanges ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3655 of file reorderbuffer.c.

3657{
3658 ReorderBufferTXN *txn;
3659
3660 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3661
3662 if (!rbtxn_has_catalog_changes(txn))
3663 {
3666 }
3667
3668 /*
3669 * Mark top-level transaction as having catalog changes too if one of its
3670 * children has so that the ReorderBufferBuildTupleCidHash can
3671 * conveniently check just top-level transaction and decide whether to
3672 * build the hash table or not.
3673 */
3674 if (rbtxn_is_subtxn(txn))
3675 {
3676 ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
3677
3678 if (!rbtxn_has_catalog_changes(toptxn))
3679 {
3682 }
3683 }
3684}
static void dclist_push_tail(dclist_head *head, dlist_node *node)
Definition: ilist.h:709
#define rbtxn_is_subtxn(txn)
#define RBTXN_HAS_CATALOG_CHANGES

References ReorderBufferTXN::catchange_node, ReorderBuffer::catchange_txns, dclist_push_tail(), rbtxn_get_toptxn, RBTXN_HAS_CATALOG_CHANGES, rbtxn_has_catalog_changes, rbtxn_is_subtxn, ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by SnapBuildProcessNewCid(), and xact_decode().

◆ ResolveCminCmaxDuringDecoding()

bool ResolveCminCmaxDuringDecoding ( HTAB tuplecid_data,
Snapshot  snapshot,
HeapTuple  htup,
Buffer  buffer,
CommandId cmin,
CommandId cmax 
)

Definition at line 5560 of file reorderbuffer.c.

5564{
5567 ForkNumber forkno;
5568 BlockNumber blockno;
5569 bool updated_mapping = false;
5570
5571 /*
5572 * Return unresolved if tuplecid_data is not valid. That's because when
5573 * streaming in-progress transactions we may run into tuples with the CID
5574 * before actually decoding them. Think e.g. about INSERT followed by
5575 * TRUNCATE, where the TRUNCATE may not be decoded yet when applying the
5576 * INSERT. So in such cases, we assume the CID is from the future
5577 * command.
5578 */
5579 if (tuplecid_data == NULL)
5580 return false;
5581
5582 /* be careful about padding */
5583 memset(&key, 0, sizeof(key));
5584
5585 Assert(!BufferIsLocal(buffer));
5586
5587 /*
5588 * get relfilelocator from the buffer, no convenient way to access it
5589 * other than that.
5590 */
5591 BufferGetTag(buffer, &key.rlocator, &forkno, &blockno);
5592
5593 /* tuples can only be in the main fork */
5594 Assert(forkno == MAIN_FORKNUM);
5595 Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
5596
5597 ItemPointerCopy(&htup->t_self,
5598 &key.tid);
5599
5600restart:
5601 ent = (ReorderBufferTupleCidEnt *)
5603
5604 /*
5605 * failed to find a mapping, check whether the table was rewritten and
5606 * apply mapping if so, but only do that once - there can be no new
5607 * mappings while we are in here since we have to hold a lock on the
5608 * relation.
5609 */
5610 if (ent == NULL && !updated_mapping)
5611 {
5613 /* now check but don't update for a mapping again */
5614 updated_mapping = true;
5615 goto restart;
5616 }
5617 else if (ent == NULL)
5618 return false;
5619
5620 if (cmin)
5621 *cmin = ent->cmin;
5622 if (cmax)
5623 *cmax = ent->cmax;
5624 return true;
5625}
uint32 BlockNumber
Definition: block.h:31
#define BufferIsLocal(buffer)
Definition: buf.h:37
void BufferGetTag(Buffer buffer, RelFileLocator *rlocator, ForkNumber *forknum, BlockNumber *blknum)
Definition: bufmgr.c:4244
static BlockNumber ItemPointerGetBlockNumber(const ItemPointerData *pointer)
Definition: itemptr.h:103
ForkNumber
Definition: relpath.h:56
static void UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
ItemPointerData t_self
Definition: htup.h:65
Oid t_tableOid
Definition: htup.h:66

References Assert(), BufferGetTag(), BufferIsLocal, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, HASH_FIND, hash_search(), ItemPointerCopy(), ItemPointerGetBlockNumber(), sort-test::key, MAIN_FORKNUM, HeapTupleData::t_self, HeapTupleData::t_tableOid, tuplecid_data, and UpdateLogicalMappings().

Referenced by HeapTupleSatisfiesHistoricMVCC().

◆ SetupCheckXidLive()

static void SetupCheckXidLive ( TransactionId  xid)
inlinestatic

Definition at line 2049 of file reorderbuffer.c.

2050{
2051 /*
2052 * If the input transaction id is already set as a CheckXidAlive then
2053 * nothing to do.
2054 */
2056 return;
2057
2058 /*
2059 * setup CheckXidAlive if it's not committed yet. We don't check if the
2060 * xid is aborted. That will happen during catalog access.
2061 */
2062 if (!TransactionIdDidCommit(xid))
2063 CheckXidAlive = xid;
2064 else
2066}
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43

References CheckXidAlive, InvalidTransactionId, TransactionIdDidCommit(), and TransactionIdEquals.

Referenced by ReorderBufferProcessTXN().

◆ StartupReorderBuffer()

void StartupReorderBuffer ( void  )

Definition at line 4944 of file reorderbuffer.c.

4945{
4946 DIR *logical_dir;
4947 struct dirent *logical_de;
4948
4949 logical_dir = AllocateDir(PG_REPLSLOT_DIR);
4950 while ((logical_de = ReadDir(logical_dir, PG_REPLSLOT_DIR)) != NULL)
4951 {
4952 if (strcmp(logical_de->d_name, ".") == 0 ||
4953 strcmp(logical_de->d_name, "..") == 0)
4954 continue;
4955
4956 /* if it cannot be a slot, skip the directory */
4957 if (!ReplicationSlotValidateName(logical_de->d_name, true, DEBUG2))
4958 continue;
4959
4960 /*
4961 * ok, has to be a surviving logical slot, iterate and delete
4962 * everything starting with xid-*
4963 */
4965 }
4966 FreeDir(logical_dir);
4967}
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2970
bool ReplicationSlotValidateName(const char *name, bool allow_reserved_name, int elevel)
Definition: slot.c:266

References AllocateDir(), dirent::d_name, DEBUG2, FreeDir(), PG_REPLSLOT_DIR, ReadDir(), ReorderBufferCleanupSerializedTXNs(), and ReplicationSlotValidateName().

Referenced by StartupXLOG().

◆ TransactionIdInArray()

static bool TransactionIdInArray ( TransactionId  xid,
TransactionId xip,
Size  num 
)
static

Definition at line 5459 of file reorderbuffer.c.

5460{
5461 return bsearch(&xid, xip, num,
5462 sizeof(TransactionId), xidComparator) != NULL;
5463}

References xidComparator().

Referenced by UpdateLogicalMappings().

◆ UpdateLogicalMappings()

static void UpdateLogicalMappings ( HTAB tuplecid_data,
Oid  relid,
Snapshot  snapshot 
)
static

Definition at line 5482 of file reorderbuffer.c.

5483{
5484 DIR *mapping_dir;
5485 struct dirent *mapping_de;
5486 List *files = NIL;
5487 ListCell *file;
5488 Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
5489
5490 mapping_dir = AllocateDir(PG_LOGICAL_MAPPINGS_DIR);
5491 while ((mapping_de = ReadDir(mapping_dir, PG_LOGICAL_MAPPINGS_DIR)) != NULL)
5492 {
5493 Oid f_dboid;
5494 Oid f_relid;
5495 TransactionId f_mapped_xid;
5496 TransactionId f_create_xid;
5497 XLogRecPtr f_lsn;
5498 uint32 f_hi,
5499 f_lo;
5501
5502 if (strcmp(mapping_de->d_name, ".") == 0 ||
5503 strcmp(mapping_de->d_name, "..") == 0)
5504 continue;
5505
5506 /* Ignore files that aren't ours */
5507 if (strncmp(mapping_de->d_name, "map-", 4) != 0)
5508 continue;
5509
5510 if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
5511 &f_dboid, &f_relid, &f_hi, &f_lo,
5512 &f_mapped_xid, &f_create_xid) != 6)
5513 elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
5514
5515 f_lsn = ((uint64) f_hi) << 32 | f_lo;
5516
5517 /* mapping for another database */
5518 if (f_dboid != dboid)
5519 continue;
5520
5521 /* mapping for another relation */
5522 if (f_relid != relid)
5523 continue;
5524
5525 /* did the creating transaction abort? */
5526 if (!TransactionIdDidCommit(f_create_xid))
5527 continue;
5528
5529 /* not for our transaction */
5530 if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
5531 continue;
5532
5533 /* ok, relevant, queue for apply */
5534 f = palloc(sizeof(RewriteMappingFile));
5535 f->lsn = f_lsn;
5536 strcpy(f->fname, mapping_de->d_name);
5537 files = lappend(files, f);
5538 }
5539 FreeDir(mapping_dir);
5540
5541 /* sort files so we apply them in LSN order */
5543
5544 foreach(file, files)
5545 {
5547
5548 elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
5549 snapshot->subxip[0]);
5551 pfree(f);
5552 }
5553}
uint64_t uint64
Definition: c.h:542
bool IsSharedRelation(Oid relationId)
Definition: catalog.c:304
#define DEBUG1
Definition: elog.h:30
Oid MyDatabaseId
Definition: globals.c:94
List * lappend(List *list, void *datum)
Definition: list.c:339
void list_sort(List *list, list_sort_comparator cmp)
Definition: list.c:1674
#define NIL
Definition: pg_list.h:68
static int file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
static void ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
static bool TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
Definition: pg_list.h:54
char fname[MAXPGPATH]

References AllocateDir(), ApplyLogicalMappingFile(), dirent::d_name, DEBUG1, elog, ERROR, file_sort_by_lsn(), RewriteMappingFile::fname, FreeDir(), InvalidOid, IsSharedRelation(), lappend(), lfirst, list_sort(), LOGICAL_REWRITE_FORMAT, RewriteMappingFile::lsn, MyDatabaseId, NIL, palloc(), pfree(), PG_LOGICAL_MAPPINGS_DIR, ReadDir(), SnapshotData::subxcnt, SnapshotData::subxip, TransactionIdDidCommit(), TransactionIdInArray(), and tuplecid_data.

Referenced by ResolveCminCmaxDuringDecoding().

Variable Documentation

◆ debug_logical_replication_streaming

int debug_logical_replication_streaming = DEBUG_LOGICAL_REP_STREAMING_BUFFERED

◆ logical_decoding_work_mem

int logical_decoding_work_mem

Definition at line 225 of file reorderbuffer.c.

Referenced by ReorderBufferCheckMemoryLimit().

◆ max_changes_in_memory

const Size max_changes_in_memory = 4096
static

Definition at line 226 of file reorderbuffer.c.

Referenced by ReorderBufferRestoreChanges().