PostgreSQL Source Code git master
reorderbuffer.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/detoast.h"
#include "access/heapam.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "common/int.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/procarray.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/inval.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenumbermap.h"
Include dependency graph for reorderbuffer.c:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTXNByIdEnt
 
struct  ReorderBufferTupleCidKey
 
struct  ReorderBufferTupleCidEnt
 
struct  TXNEntryFile
 
struct  ReorderBufferIterTXNEntry
 
struct  ReorderBufferIterTXNState
 
struct  ReorderBufferToastEnt
 
struct  ReorderBufferDiskChange
 
struct  RewriteMappingFile
 

Macros

#define MAX_DISTR_INVAL_MSG_PER_TXN    ((8 * 1024 * 1024) / sizeof(SharedInvalidationMessage))
 
#define IsSpecInsert(action)
 
#define IsSpecConfirmOrAbort(action)
 
#define IsInsertOrUpdate(action)
 
#define CHANGES_THRESHOLD   100
 

Typedefs

typedef struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
 
typedef struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
 
typedef struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
 
typedef struct TXNEntryFile TXNEntryFile
 
typedef struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
 
typedef struct ReorderBufferIterTXNState ReorderBufferIterTXNState
 
typedef struct ReorderBufferToastEnt ReorderBufferToastEnt
 
typedef struct ReorderBufferDiskChange ReorderBufferDiskChange
 
typedef struct RewriteMappingFile RewriteMappingFile
 

Functions

static ReorderBufferTXNReorderBufferAllocTXN (ReorderBuffer *rb)
 
static void ReorderBufferFreeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferTXNReorderBufferTXNByXid (ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
 
static void ReorderBufferTransferSnapToParent (ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
 
static void AssertTXNLsnOrder (ReorderBuffer *rb)
 
static void ReorderBufferIterTXNInit (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
 
static ReorderBufferChangeReorderBufferIterTXNNext (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferIterTXNFinish (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferExecuteInvalidations (uint32 nmsgs, SharedInvalidationMessage *msgs)
 
static void ReorderBufferCheckMemoryLimit (ReorderBuffer *rb)
 
static void ReorderBufferSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeChange (ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
 
static Size ReorderBufferRestoreChanges (ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
 
static void ReorderBufferRestoreChange (ReorderBuffer *rb, ReorderBufferTXN *txn, char *data)
 
static void ReorderBufferRestoreCleanup (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferTruncateTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
 
static void ReorderBufferMaybeMarkTXNStreamed (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static bool ReorderBufferCheckAndTruncateAbortedTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferCleanupSerializedTXNs (const char *slotname)
 
static void ReorderBufferSerializedPath (char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
 
static int ReorderBufferTXNSizeCompare (const pairingheap_node *a, const pairingheap_node *b, void *arg)
 
static void ReorderBufferFreeSnap (ReorderBuffer *rb, Snapshot snap)
 
static Snapshot ReorderBufferCopySnap (ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
 
static bool ReorderBufferCanStream (ReorderBuffer *rb)
 
static bool ReorderBufferCanStartStreaming (ReorderBuffer *rb)
 
static void ReorderBufferStreamTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferStreamCommit (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastInitHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReset (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReplace (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void ReorderBufferToastAppendChunk (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static Size ReorderBufferChangeSize (ReorderBufferChange *change)
 
static void ReorderBufferChangeMemoryUpdate (ReorderBuffer *rb, ReorderBufferChange *change, ReorderBufferTXN *txn, bool addition, Size sz)
 
ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *rb)
 
ReorderBufferChangeReorderBufferAllocChange (ReorderBuffer *rb)
 
void ReorderBufferFreeChange (ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
 
HeapTuple ReorderBufferAllocTupleBuf (ReorderBuffer *rb, Size tuple_len)
 
void ReorderBufferFreeTupleBuf (HeapTuple tuple)
 
OidReorderBufferAllocRelids (ReorderBuffer *rb, int nrelids)
 
void ReorderBufferFreeRelids (ReorderBuffer *rb, Oid *relids)
 
static void ReorderBufferProcessPartialChange (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
static void AssertChangeLsnOrder (ReorderBufferTXN *txn)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *rb)
 
TransactionId ReorderBufferGetOldestXmin (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *rb, XLogRecPtr ptr)
 
void ReorderBufferAssignChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
static int ReorderBufferIterCompare (Datum a, Datum b, void *arg)
 
static void ReorderBufferCleanupTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferBuildTupleCidHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void SetupCheckXidLive (TransactionId xid)
 
static void ReorderBufferApplyChange (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyTruncate (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyMessage (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferSaveTXNSnapshot (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
 
static void ReorderBufferResetTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
 
static void ReorderBufferProcessTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)
 
static void ReorderBufferReplay (ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
bool ReorderBufferRememberPrepareInfo (ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferSkipPrepare (ReorderBuffer *rb, TransactionId xid)
 
void ReorderBufferPrepare (ReorderBuffer *rb, TransactionId xid, char *gid)
 
void ReorderBufferFinishPrepared (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)
 
void ReorderBufferAbort (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, TimestampTz abort_time)
 
void ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
void ReorderBufferForget (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferInvalidate (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAddSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileLocator locator, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
 
static void ReorderBufferQueueInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
static void ReorderBufferAccumulateInvalidations (SharedInvalidationMessage **invals_out, uint32 *ninvals_out, SharedInvalidationMessage *msgs_new, Size nmsgs_new)
 
void ReorderBufferAddInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferAddDistributedInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
TransactionIdReorderBufferGetCatalogChangesXacts (ReorderBuffer *rb)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *rb, TransactionId xid)
 
static void ReorderBufferSerializeReserve (ReorderBuffer *rb, Size sz)
 
static ReorderBufferTXNReorderBufferLargestTXN (ReorderBuffer *rb)
 
static ReorderBufferTXNReorderBufferLargestStreamableTopTXN (ReorderBuffer *rb)
 
void StartupReorderBuffer (void)
 
static void ApplyLogicalMappingFile (HTAB *tuplecid_data, Oid relid, const char *fname)
 
static bool TransactionIdInArray (TransactionId xid, TransactionId *xip, Size num)
 
static int file_sort_by_lsn (const ListCell *a_p, const ListCell *b_p)
 
static void UpdateLogicalMappings (HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
 
bool ResolveCminCmaxDuringDecoding (HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
 
uint32 ReorderBufferGetInvalidations (ReorderBuffer *rb, TransactionId xid, SharedInvalidationMessage **msgs)
 

Variables

int logical_decoding_work_mem
 
static const Size max_changes_in_memory = 4096
 
int debug_logical_replication_streaming = DEBUG_LOGICAL_REP_STREAMING_BUFFERED
 

Macro Definition Documentation

◆ CHANGES_THRESHOLD

#define CHANGES_THRESHOLD   100

◆ IsInsertOrUpdate

#define IsInsertOrUpdate (   action)
Value:

Definition at line 206 of file reorderbuffer.c.

◆ IsSpecConfirmOrAbort

#define IsSpecConfirmOrAbort (   action)
Value:
( \
)
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
Definition: reorderbuffer.h:61
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT
Definition: reorderbuffer.h:62

Definition at line 201 of file reorderbuffer.c.

◆ IsSpecInsert

#define IsSpecInsert (   action)
Value:

Definition at line 197 of file reorderbuffer.c.

◆ MAX_DISTR_INVAL_MSG_PER_TXN

#define MAX_DISTR_INVAL_MSG_PER_TXN    ((8 * 1024 * 1024) / sizeof(SharedInvalidationMessage))

Definition at line 125 of file reorderbuffer.c.

Typedef Documentation

◆ ReorderBufferDiskChange

◆ ReorderBufferIterTXNEntry

◆ ReorderBufferIterTXNState

◆ ReorderBufferToastEnt

◆ ReorderBufferTupleCidEnt

◆ ReorderBufferTupleCidKey

◆ ReorderBufferTXNByIdEnt

◆ RewriteMappingFile

◆ TXNEntryFile

typedef struct TXNEntryFile TXNEntryFile

Function Documentation

◆ ApplyLogicalMappingFile()

static void ApplyLogicalMappingFile ( HTAB tuplecid_data,
Oid  relid,
const char *  fname 
)
static

Definition at line 5364 of file reorderbuffer.c.

5365{
5366 char path[MAXPGPATH];
5367 int fd;
5368 int readBytes;
5370
5371 sprintf(path, "%s/%s", PG_LOGICAL_MAPPINGS_DIR, fname);
5372 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
5373 if (fd < 0)
5374 ereport(ERROR,
5376 errmsg("could not open file \"%s\": %m", path)));
5377
5378 while (true)
5379 {
5382 ReorderBufferTupleCidEnt *new_ent;
5383 bool found;
5384
5385 /* be careful about padding */
5386 memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
5387
5388 /* read all mappings till the end of the file */
5389 pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ);
5390 readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
5392
5393 if (readBytes < 0)
5394 ereport(ERROR,
5396 errmsg("could not read file \"%s\": %m",
5397 path)));
5398 else if (readBytes == 0) /* EOF */
5399 break;
5400 else if (readBytes != sizeof(LogicalRewriteMappingData))
5401 ereport(ERROR,
5403 errmsg("could not read from file \"%s\": read %d instead of %d bytes",
5404 path, readBytes,
5405 (int32) sizeof(LogicalRewriteMappingData))));
5406
5407 key.rlocator = map.old_locator;
5409 &key.tid);
5410
5411
5412 ent = (ReorderBufferTupleCidEnt *)
5414
5415 /* no existing mapping, no need to update */
5416 if (!ent)
5417 continue;
5418
5419 key.rlocator = map.new_locator;
5421 &key.tid);
5422
5423 new_ent = (ReorderBufferTupleCidEnt *)
5425
5426 if (found)
5427 {
5428 /*
5429 * Make sure the existing mapping makes sense. We sometime update
5430 * old records that did not yet have a cmax (e.g. pg_class' own
5431 * entry while rewriting it) during rewrites, so allow that.
5432 */
5433 Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
5434 Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
5435 }
5436 else
5437 {
5438 /* update mapping */
5439 new_ent->cmin = ent->cmin;
5440 new_ent->cmax = ent->cmax;
5441 new_ent->combocid = ent->combocid;
5442 }
5443 }
5444
5445 if (CloseTransientFile(fd) != 0)
5446 ereport(ERROR,
5448 errmsg("could not close file \"%s\": %m", path)));
5449}
#define InvalidCommandId
Definition: c.h:688
#define PG_BINARY
Definition: c.h:1271
int32_t int32
Definition: c.h:548
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:952
int errcode_for_file_access(void)
Definition: elog.c:886
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:150
int CloseTransientFile(int fd)
Definition: fd.c:2851
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2674
Assert(PointerIsAligned(start, uint64))
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_ENTER
Definition: hsearch.h:114
#define read(a, b, c)
Definition: win32.h:13
static void ItemPointerCopy(const ItemPointerData *fromPointer, ItemPointerData *toPointer)
Definition: itemptr.h:172
#define MAXPGPATH
#define sprintf
Definition: port.h:262
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_LOGICAL_MAPPINGS_DIR
Definition: reorderbuffer.h:23
static HTAB * tuplecid_data
Definition: snapmgr.c:163
ItemPointerData new_tid
Definition: rewriteheap.h:40
RelFileLocator old_locator
Definition: rewriteheap.h:37
ItemPointerData old_tid
Definition: rewriteheap.h:39
RelFileLocator new_locator
Definition: rewriteheap.h:38
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:69
static void pgstat_report_wait_end(void)
Definition: wait_event.h:85

References Assert(), CloseTransientFile(), ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), HASH_ENTER, HASH_FIND, hash_search(), InvalidCommandId, ItemPointerCopy(), sort-test::key, MAXPGPATH, LogicalRewriteMappingData::new_locator, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_locator, LogicalRewriteMappingData::old_tid, OpenTransientFile(), PG_BINARY, PG_LOGICAL_MAPPINGS_DIR, pgstat_report_wait_end(), pgstat_report_wait_start(), read, sprintf, and tuplecid_data.

Referenced by UpdateLogicalMappings().

◆ AssertChangeLsnOrder()

static void AssertChangeLsnOrder ( ReorderBufferTXN txn)
static

Definition at line 1013 of file reorderbuffer.c.

1014{
1015#ifdef USE_ASSERT_CHECKING
1016 dlist_iter iter;
1017 XLogRecPtr prev_lsn = txn->first_lsn;
1018
1019 dlist_foreach(iter, &txn->changes)
1020 {
1021 ReorderBufferChange *cur_change;
1022
1023 cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
1024
1026 Assert(XLogRecPtrIsValid(cur_change->lsn));
1027 Assert(txn->first_lsn <= cur_change->lsn);
1028
1029 if (XLogRecPtrIsValid(txn->end_lsn))
1030 Assert(cur_change->lsn <= txn->end_lsn);
1031
1032 Assert(prev_lsn <= cur_change->lsn);
1033
1034 prev_lsn = cur_change->lsn;
1035 }
1036#endif
1037}
#define dlist_foreach(iter, lhead)
Definition: ilist.h:623
#define dlist_container(type, membername, ptr)
Definition: ilist.h:593
XLogRecPtr first_lsn
XLogRecPtr end_lsn
dlist_head changes
dlist_node * cur
Definition: ilist.h:179
#define XLogRecPtrIsValid(r)
Definition: xlogdefs.h:29
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References Assert(), ReorderBufferTXN::changes, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, ReorderBufferChange::lsn, and XLogRecPtrIsValid.

Referenced by ReorderBufferIterTXNInit().

◆ AssertTXNLsnOrder()

static void AssertTXNLsnOrder ( ReorderBuffer rb)
static

Definition at line 942 of file reorderbuffer.c.

943{
944#ifdef USE_ASSERT_CHECKING
946 dlist_iter iter;
947 XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
948 XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
949
950 /*
951 * Skip the verification if we don't reach the LSN at which we start
952 * decoding the contents of transactions yet because until we reach the
953 * LSN, we could have transactions that don't have the association between
954 * the top-level transaction and subtransaction yet and consequently have
955 * the same LSN. We don't guarantee this association until we try to
956 * decode the actual contents of transaction. The ordering of the records
957 * prior to the start_decoding_at LSN should have been checked before the
958 * restart.
959 */
961 return;
962
964 {
966 iter.cur);
967
968 /* start LSN must be set */
970
971 /* If there is an end LSN, it must be higher than start LSN */
972 if (XLogRecPtrIsValid(cur_txn->end_lsn))
973 Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
974
975 /* Current initial LSN must be strictly higher than previous */
976 if (XLogRecPtrIsValid(prev_first_lsn))
977 Assert(prev_first_lsn < cur_txn->first_lsn);
978
979 /* known-as-subtxn txns must not be listed */
981
982 prev_first_lsn = cur_txn->first_lsn;
983 }
984
986 {
988 base_snapshot_node,
989 iter.cur);
990
991 /* base snapshot (and its LSN) must be set */
992 Assert(cur_txn->base_snapshot != NULL);
994
995 /* current LSN must be strictly higher than previous */
996 if (XLogRecPtrIsValid(prev_base_snap_lsn))
997 Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
998
999 /* known-as-subtxn txns must not be listed */
1000 Assert(!rbtxn_is_known_subxact(cur_txn));
1001
1002 prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
1003 }
1004#endif
1005}
#define rbtxn_is_known_subxact(txn)
bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
Definition: snapbuild.c:304
XLogReaderState * reader
Definition: logical.h:42
struct SnapBuild * snapshot_builder
Definition: logical.h:44
XLogRecPtr base_snapshot_lsn
Snapshot base_snapshot
dlist_head txns_by_base_snapshot_lsn
dlist_head toplevel_by_lsn
void * private_data
XLogRecPtr EndRecPtr
Definition: xlogreader.h:206
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, XLogReaderState::EndRecPtr, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, ReorderBuffer::private_data, rbtxn_is_known_subxact, LogicalDecodingContext::reader, SnapBuildXactNeedsSkip(), LogicalDecodingContext::snapshot_builder, ReorderBuffer::toplevel_by_lsn, ReorderBuffer::txns_by_base_snapshot_lsn, and XLogRecPtrIsValid.

Referenced by ReorderBufferAssignChild(), ReorderBufferGetOldestTXN(), ReorderBufferGetOldestXmin(), ReorderBufferSetBaseSnapshot(), and ReorderBufferTXNByXid().

◆ file_sort_by_lsn()

static int file_sort_by_lsn ( const ListCell a_p,
const ListCell b_p 
)
static

Definition at line 5466 of file reorderbuffer.c.

5467{
5470
5471 return pg_cmp_u64(a->lsn, b->lsn);
5472}
static int pg_cmp_u64(uint64 a, uint64 b)
Definition: int.h:731
int b
Definition: isn.c:74
int a
Definition: isn.c:73
#define lfirst(lc)
Definition: pg_list.h:172

References a, b, lfirst, and pg_cmp_u64().

Referenced by UpdateLogicalMappings().

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
TimestampTz  abort_time 
)

Definition at line 3087 of file reorderbuffer.c.

3089{
3090 ReorderBufferTXN *txn;
3091
3092 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3093 false);
3094
3095 /* unknown, nothing to remove */
3096 if (txn == NULL)
3097 return;
3098
3099 txn->abort_time = abort_time;
3100
3101 /* For streamed transactions notify the remote node about the abort. */
3102 if (rbtxn_is_streamed(txn))
3103 {
3104 rb->stream_abort(rb, txn, lsn);
3105
3106 /*
3107 * We might have decoded changes for this transaction that could load
3108 * the cache as per the current transaction's view (consider DDL's
3109 * happened in this transaction). We don't want the decoding of future
3110 * transactions to use those cache entries so execute only the inval
3111 * messages in this transaction.
3112 */
3113 if (txn->ninvalidations > 0)
3115 txn->invalidations);
3116 }
3117
3118 /* cosmetic... */
3119 txn->final_lsn = lsn;
3120
3121 /* remove potential on-disk data, and deallocate */
3122 ReorderBufferCleanupTXN(rb, txn);
3123}
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_streamed(txn)
SharedInvalidationMessage * invalidations
TimestampTz abort_time
XLogRecPtr final_lsn
ReorderBufferStreamAbortCB stream_abort

References ReorderBufferTXN::abort_time, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), and ReorderBuffer::stream_abort.

Referenced by DecodeAbort().

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer rb,
TransactionId  oldestRunningXid 
)

Definition at line 3133 of file reorderbuffer.c.

3134{
3136
3137 /*
3138 * Iterate through all (potential) toplevel TXNs and abort all that are
3139 * older than what possibly can be running. Once we've found the first
3140 * that is alive we stop, there might be some that acquired an xid earlier
3141 * but started writing later, but it's unlikely and they will be cleaned
3142 * up in a later call to this function.
3143 */
3145 {
3146 ReorderBufferTXN *txn;
3147
3148 txn = dlist_container(ReorderBufferTXN, node, it.cur);
3149
3150 if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
3151 {
3152 elog(DEBUG2, "aborting old transaction %u", txn->xid);
3153
3154 /* Notify the remote node about the crash/immediate restart. */
3155 if (rbtxn_is_streamed(txn))
3156 rb->stream_abort(rb, txn, InvalidXLogRecPtr);
3157
3158 /* remove potential on-disk data, and deallocate this tx */
3159 ReorderBufferCleanupTXN(rb, txn);
3160 }
3161 else
3162 return;
3163 }
3164}
#define DEBUG2
Definition: elog.h:29
#define elog(elevel,...)
Definition: elog.h:226
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:640
TransactionId xid
dlist_node * cur
Definition: ilist.h:200
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.h:263

References dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, elog, InvalidXLogRecPtr, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBuffer::stream_abort, ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), and ReorderBufferTXN::xid.

Referenced by standby_decode().

◆ ReorderBufferAccumulateInvalidations()

static void ReorderBufferAccumulateInvalidations ( SharedInvalidationMessage **  invals_out,
uint32 ninvals_out,
SharedInvalidationMessage msgs_new,
Size  nmsgs_new 
)
static

Definition at line 3505 of file reorderbuffer.c.

3509{
3510 if (*ninvals_out == 0)
3511 {
3512 *ninvals_out = nmsgs_new;
3513 *invals_out = palloc_array(SharedInvalidationMessage, nmsgs_new);
3514 memcpy(*invals_out, msgs_new, sizeof(SharedInvalidationMessage) * nmsgs_new);
3515 }
3516 else
3517 {
3518 /* Enlarge the array of inval messages */
3519 *invals_out = (SharedInvalidationMessage *)
3520 repalloc(*invals_out, sizeof(SharedInvalidationMessage) *
3521 (*ninvals_out + nmsgs_new));
3522 memcpy(*invals_out + *ninvals_out, msgs_new,
3523 nmsgs_new * sizeof(SharedInvalidationMessage));
3524 *ninvals_out += nmsgs_new;
3525 }
3526}
#define palloc_array(type, count)
Definition: fe_memutils.h:76
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1610

References palloc_array, and repalloc().

Referenced by ReorderBufferAddDistributedInvalidations(), and ReorderBufferAddInvalidations().

◆ ReorderBufferAddDistributedInvalidations()

void ReorderBufferAddDistributedInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 3583 of file reorderbuffer.c.

3586{
3587 ReorderBufferTXN *txn;
3588 MemoryContext oldcontext;
3589
3590 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3591
3592 oldcontext = MemoryContextSwitchTo(rb->context);
3593
3594 /*
3595 * Collect all the invalidations under the top transaction, if available,
3596 * so that we can execute them all together. See comments
3597 * ReorderBufferAddInvalidations.
3598 */
3599 txn = rbtxn_get_toptxn(txn);
3600
3601 Assert(nmsgs > 0);
3602
3604 {
3605 /*
3606 * Check the transaction has enough space for storing distributed
3607 * invalidation messages.
3608 */
3610 {
3611 /*
3612 * Mark the invalidation message as overflowed and free up the
3613 * messages accumulated so far.
3614 */
3616
3618 {
3620 txn->invalidations_distributed = NULL;
3622 }
3623 }
3624 else
3627 msgs, nmsgs);
3628 }
3629
3630 /* Queue the invalidation messages into the transaction */
3631 ReorderBufferQueueInvalidations(rb, xid, lsn, nmsgs, msgs);
3632
3633 MemoryContextSwitchTo(oldcontext);
3634}
void pfree(void *pointer)
Definition: mcxt.c:1594
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
static void ReorderBufferAccumulateInvalidations(SharedInvalidationMessage **invals_out, uint32 *ninvals_out, SharedInvalidationMessage *msgs_new, Size nmsgs_new)
#define MAX_DISTR_INVAL_MSG_PER_TXN
static void ReorderBufferQueueInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
#define rbtxn_get_toptxn(txn)
#define RBTXN_DISTR_INVAL_OVERFLOWED
#define rbtxn_distr_inval_overflowed(txn)
uint32 ninvalidations_distributed
SharedInvalidationMessage * invalidations_distributed
MemoryContext context

References Assert(), ReorderBuffer::context, ReorderBufferTXN::invalidations_distributed, MAX_DISTR_INVAL_MSG_PER_TXN, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations_distributed, pfree(), RBTXN_DISTR_INVAL_OVERFLOWED, rbtxn_distr_inval_overflowed, rbtxn_get_toptxn, ReorderBufferAccumulateInvalidations(), ReorderBufferQueueInvalidations(), ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by SnapBuildDistributeSnapshotAndInval().

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 3542 of file reorderbuffer.c.

3545{
3546 ReorderBufferTXN *txn;
3547 MemoryContext oldcontext;
3548
3549 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3550
3551 oldcontext = MemoryContextSwitchTo(rb->context);
3552
3553 /*
3554 * Collect all the invalidations under the top transaction, if available,
3555 * so that we can execute them all together. See comments atop this
3556 * function.
3557 */
3558 txn = rbtxn_get_toptxn(txn);
3559
3560 Assert(nmsgs > 0);
3561
3563 &txn->ninvalidations,
3564 msgs, nmsgs);
3565
3566 ReorderBufferQueueInvalidations(rb, xid, lsn, nmsgs, msgs);
3567
3568 MemoryContextSwitchTo(oldcontext);
3569}

References Assert(), ReorderBuffer::context, ReorderBufferTXN::invalidations, MemoryContextSwitchTo(), ReorderBufferTXN::ninvalidations, rbtxn_get_toptxn, ReorderBufferAccumulateInvalidations(), ReorderBufferQueueInvalidations(), and ReorderBufferTXNByXid().

Referenced by xact_decode().

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)

Definition at line 3357 of file reorderbuffer.c.

3359{
3361
3362 change->data.command_id = cid;
3364
3365 ReorderBufferQueueChange(rb, xid, lsn, change, false);
3366}
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
ReorderBufferChange * ReorderBufferAllocChange(ReorderBuffer *rb)
@ REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID
Definition: reorderbuffer.h:58
ReorderBufferChangeType action
Definition: reorderbuffer.h:81
union ReorderBufferChange::@114 data

References ReorderBufferChange::action, ReorderBufferChange::command_id, ReorderBufferChange::data, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, ReorderBufferAllocChange(), and ReorderBufferQueueChange().

Referenced by SnapBuildProcessNewCid().

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileLocator  locator,
ItemPointerData  tid,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 3456 of file reorderbuffer.c.

3460{
3462 ReorderBufferTXN *txn;
3463
3464 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3465
3466 change->data.tuplecid.locator = locator;
3467 change->data.tuplecid.tid = tid;
3468 change->data.tuplecid.cmin = cmin;
3469 change->data.tuplecid.cmax = cmax;
3470 change->data.tuplecid.combocid = combocid;
3471 change->lsn = lsn;
3472 change->txn = txn;
3474
3475 dlist_push_tail(&txn->tuplecids, &change->node);
3476 txn->ntuplecids++;
3477}
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:364
@ REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID
Definition: reorderbuffer.h:59
struct ReorderBufferChange::@114::@118 tuplecid
ItemPointerData tid
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:84
RelFileLocator locator
dlist_head tuplecids

References ReorderBufferChange::action, ReorderBufferChange::cmax, ReorderBufferChange::cmin, ReorderBufferChange::combocid, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::locator, ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferAllocChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tid, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, and ReorderBufferChange::txn.

Referenced by SnapBuildProcessNewCid().

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

◆ ReorderBufferAllocate()

ReorderBuffer * ReorderBufferAllocate ( void  )

Definition at line 324 of file reorderbuffer.c.

325{
326 ReorderBuffer *buffer;
327 HASHCTL hash_ctl;
328 MemoryContext new_ctx;
329
330 Assert(MyReplicationSlot != NULL);
331
332 /* allocate memory in own context, to have better accountability */
334 "ReorderBuffer",
336
337 buffer =
338 (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
339
340 memset(&hash_ctl, 0, sizeof(hash_ctl));
341
342 buffer->context = new_ctx;
343
344 buffer->change_context = SlabContextCreate(new_ctx,
345 "Change",
347 sizeof(ReorderBufferChange));
348
349 buffer->txn_context = SlabContextCreate(new_ctx,
350 "TXN",
352 sizeof(ReorderBufferTXN));
353
354 /*
355 * To minimize memory fragmentation caused by long-running transactions
356 * with changes spanning multiple memory blocks, we use a single
357 * fixed-size memory block for decoded tuple storage. The performance
358 * testing showed that the default memory block size maintains logical
359 * decoding performance without causing fragmentation due to concurrent
360 * transactions. One might think that we can use the max size as
361 * SLAB_LARGE_BLOCK_SIZE but the test also showed it doesn't help resolve
362 * the memory fragmentation.
363 */
364 buffer->tup_context = GenerationContextCreate(new_ctx,
365 "Tuples",
369
370 hash_ctl.keysize = sizeof(TransactionId);
371 hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
372 hash_ctl.hcxt = buffer->context;
373
374 buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
376
378 buffer->by_txn_last_txn = NULL;
379
380 buffer->outbuf = NULL;
381 buffer->outbufsize = 0;
382 buffer->size = 0;
383
384 /* txn_heap is ordered by transaction size */
386
387 buffer->spillTxns = 0;
388 buffer->spillCount = 0;
389 buffer->spillBytes = 0;
390 buffer->streamTxns = 0;
391 buffer->streamCount = 0;
392 buffer->streamBytes = 0;
393 buffer->memExceededCount = 0;
394 buffer->totalTxns = 0;
395 buffer->totalBytes = 0;
396
398
399 dlist_init(&buffer->toplevel_by_lsn);
401 dclist_init(&buffer->catchange_txns);
402
403 /*
404 * Ensure there's no stale data from prior uses of this slot, in case some
405 * prior exit avoided calling ReorderBufferFree. Failure to do this can
406 * produce duplicated txns, and it's very cheap if there's nothing there.
407 */
409
410 return buffer;
411}
#define NameStr(name)
Definition: c.h:765
uint32 TransactionId
Definition: c.h:671
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:358
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: generation.c:162
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
static void dclist_init(dclist_head *head)
Definition: ilist.h:671
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1229
MemoryContext CurrentMemoryContext
Definition: mcxt.c:160
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:189
pairingheap * pairingheap_allocate(pairingheap_comparator compare, void *arg)
Definition: pairingheap.c:42
static int ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg)
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:322
ReplicationSlot * MyReplicationSlot
Definition: slot.c:148
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
MemoryContext hcxt
Definition: hsearch.h:86
dclist_head catchange_txns
int64 memExceededCount
MemoryContext change_context
ReorderBufferTXN * by_txn_last_txn
TransactionId by_txn_last_xid
MemoryContext tup_context
pairingheap * txn_heap
MemoryContext txn_context
XLogRecPtr current_restart_decoding_lsn
ReplicationSlotPersistentData data
Definition: slot.h:210
#define InvalidTransactionId
Definition: transam.h:31

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert(), ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::catchange_txns, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dclist_init(), dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, ReorderBuffer::memExceededCount, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, pairingheap_allocate(), ReorderBufferCleanupSerializedTXNs(), ReorderBufferTXNSizeCompare(), ReorderBuffer::size, SLAB_DEFAULT_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBuffer::streamBytes, ReorderBuffer::streamCount, ReorderBuffer::streamTxns, ReorderBuffer::toplevel_by_lsn, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBuffer::tup_context, ReorderBuffer::txn_context, ReorderBuffer::txn_heap, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by StartupDecodingContext().

◆ ReorderBufferAllocChange()

◆ ReorderBufferAllocRelids()

Oid * ReorderBufferAllocRelids ( ReorderBuffer rb,
int  nrelids 
)

Definition at line 625 of file reorderbuffer.c.

626{
627 Oid *relids;
628 Size alloc_len;
629
630 alloc_len = sizeof(Oid) * nrelids;
631
632 relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
633
634 return relids;
635}
size_t Size
Definition: c.h:624
unsigned int Oid
Definition: postgres_ext.h:32

References ReorderBuffer::context, and MemoryContextAlloc().

Referenced by DecodeTruncate(), and ReorderBufferRestoreChange().

◆ ReorderBufferAllocTupleBuf()

HeapTuple ReorderBufferAllocTupleBuf ( ReorderBuffer rb,
Size  tuple_len 
)

Definition at line 592 of file reorderbuffer.c.

593{
594 HeapTuple tuple;
595 Size alloc_len;
596
597 alloc_len = tuple_len + SizeofHeapTupleHeader;
598
600 HEAPTUPLESIZE + alloc_len);
601 tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
602
603 return tuple;
604}
#define HEAPTUPLESIZE
Definition: htup.h:73
HeapTupleData * HeapTuple
Definition: htup.h:71
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
#define SizeofHeapTupleHeader
Definition: htup_details.h:185
HeapTupleHeader t_data
Definition: htup.h:68

References HEAPTUPLESIZE, MemoryContextAlloc(), SizeofHeapTupleHeader, HeapTupleData::t_data, and ReorderBuffer::tup_context.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

◆ ReorderBufferAllocTXN()

static ReorderBufferTXN * ReorderBufferAllocTXN ( ReorderBuffer rb)
static

Definition at line 435 of file reorderbuffer.c.

436{
437 ReorderBufferTXN *txn;
438
439 txn = (ReorderBufferTXN *)
441
442 memset(txn, 0, sizeof(ReorderBufferTXN));
443
444 dlist_init(&txn->changes);
445 dlist_init(&txn->tuplecids);
446 dlist_init(&txn->subtxns);
447
448 /* InvalidCommandId is not zero, so set it explicitly */
450 txn->output_plugin_private = NULL;
451
452 return txn;
453}
CommandId command_id
void * output_plugin_private
dlist_head subtxns

References ReorderBufferTXN::changes, ReorderBufferTXN::command_id, dlist_init(), InvalidCommandId, MemoryContextAlloc(), ReorderBufferTXN::output_plugin_private, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, and ReorderBuffer::txn_context.

Referenced by ReorderBufferTXNByXid().

◆ ReorderBufferApplyChange()

static void ReorderBufferApplyChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 2072 of file reorderbuffer.c.

2075{
2076 if (streaming)
2077 rb->stream_change(rb, txn, relation, change);
2078 else
2079 rb->apply_change(rb, txn, relation, change);
2080}
ReorderBufferStreamChangeCB stream_change
ReorderBufferApplyChangeCB apply_change

References ReorderBuffer::apply_change, and ReorderBuffer::stream_change.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferApplyMessage()

static void ReorderBufferApplyMessage ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 2100 of file reorderbuffer.c.

2102{
2103 if (streaming)
2104 rb->stream_message(rb, txn, change->lsn, true,
2105 change->data.msg.prefix,
2106 change->data.msg.message_size,
2107 change->data.msg.message);
2108 else
2109 rb->message(rb, txn, change->lsn, true,
2110 change->data.msg.prefix,
2111 change->data.msg.message_size,
2112 change->data.msg.message);
2113}
struct ReorderBufferChange::@114::@117 msg
ReorderBufferStreamMessageCB stream_message
ReorderBufferMessageCB message

References ReorderBufferChange::data, ReorderBufferChange::lsn, ReorderBufferChange::message, ReorderBuffer::message, ReorderBufferChange::message_size, ReorderBufferChange::msg, ReorderBufferChange::prefix, and ReorderBuffer::stream_message.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferApplyTruncate()

static void ReorderBufferApplyTruncate ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  nrelations,
Relation relations,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 2086 of file reorderbuffer.c.

2089{
2090 if (streaming)
2091 rb->stream_truncate(rb, txn, nrelations, relations, change);
2092 else
2093 rb->apply_truncate(rb, txn, nrelations, relations, change);
2094}
ReorderBufferStreamTruncateCB stream_truncate
ReorderBufferApplyTruncateCB apply_truncate

References ReorderBuffer::apply_truncate, and ReorderBuffer::stream_truncate.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)

Definition at line 1099 of file reorderbuffer.c.

1101{
1102 ReorderBufferTXN *txn;
1103 ReorderBufferTXN *subtxn;
1104 bool new_top;
1105 bool new_sub;
1106
1107 txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1108 subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1109
1110 if (!new_sub)
1111 {
1112 if (rbtxn_is_known_subxact(subtxn))
1113 {
1114 /* already associated, nothing to do */
1115 return;
1116 }
1117 else
1118 {
1119 /*
1120 * We already saw this transaction, but initially added it to the
1121 * list of top-level txns. Now that we know it's not top-level,
1122 * remove it from there.
1123 */
1124 dlist_delete(&subtxn->node);
1125 }
1126 }
1127
1128 subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1129 subtxn->toplevel_xid = xid;
1130 Assert(subtxn->nsubtxns == 0);
1131
1132 /* set the reference to top-level transaction */
1133 subtxn->toptxn = txn;
1134
1135 /* add to subtransaction list */
1136 dlist_push_tail(&txn->subtxns, &subtxn->node);
1137 txn->nsubtxns++;
1138
1139 /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1141
1142 /* Verify LSN-ordering invariant */
1144}
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
#define RBTXN_IS_SUBXACT
TransactionId toplevel_xid
struct ReorderBufferTXN * toptxn

References Assert(), AssertTXNLsnOrder(), dlist_delete(), dlist_push_tail(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SUBXACT, ReorderBufferTransferSnapToParent(), ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, ReorderBufferTXN::toplevel_xid, ReorderBufferTXN::toptxn, and ReorderBufferTXN::txn_flags.

Referenced by LogicalDecodingProcessRecord(), and ReorderBufferCommitChild().

◆ ReorderBufferBuildTupleCidHash()

static void ReorderBufferBuildTupleCidHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1836 of file reorderbuffer.c.

1837{
1838 dlist_iter iter;
1839 HASHCTL hash_ctl;
1840
1842 return;
1843
1844 hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1845 hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1846 hash_ctl.hcxt = rb->context;
1847
1848 /*
1849 * create the hash with the exact number of to-be-stored tuplecids from
1850 * the start
1851 */
1852 txn->tuplecid_hash =
1853 hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1855
1856 dlist_foreach(iter, &txn->tuplecids)
1857 {
1860 bool found;
1861 ReorderBufferChange *change;
1862
1863 change = dlist_container(ReorderBufferChange, node, iter.cur);
1864
1866
1867 /* be careful about padding */
1868 memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1869
1870 key.rlocator = change->data.tuplecid.locator;
1871
1873 &key.tid);
1874
1875 ent = (ReorderBufferTupleCidEnt *)
1876 hash_search(txn->tuplecid_hash, &key, HASH_ENTER, &found);
1877 if (!found)
1878 {
1879 ent->cmin = change->data.tuplecid.cmin;
1880 ent->cmax = change->data.tuplecid.cmax;
1881 ent->combocid = change->data.tuplecid.combocid;
1882 }
1883 else
1884 {
1885 /*
1886 * Maybe we already saw this tuple before in this transaction, but
1887 * if so it must have the same cmin.
1888 */
1889 Assert(ent->cmin == change->data.tuplecid.cmin);
1890
1891 /*
1892 * cmax may be initially invalid, but once set it can only grow,
1893 * and never become invalid again.
1894 */
1895 Assert((ent->cmax == InvalidCommandId) ||
1896 ((change->data.tuplecid.cmax != InvalidCommandId) &&
1897 (change->data.tuplecid.cmax > ent->cmax)));
1898 ent->cmax = change->data.tuplecid.cmax;
1899 }
1900 }
1901}
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
#define rbtxn_has_catalog_changes(txn)

References ReorderBufferChange::action, Assert(), ReorderBufferTupleCidEnt::cmax, ReorderBufferChange::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferChange::cmin, ReorderBufferTupleCidEnt::combocid, ReorderBufferChange::combocid, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, dlist_container, dlist_foreach, dlist_is_empty(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), HASHCTL::hcxt, InvalidCommandId, ItemPointerCopy(), sort-test::key, HASHCTL::keysize, ReorderBufferChange::locator, ReorderBufferTXN::ntuplecids, rbtxn_has_catalog_changes, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferChange::tid, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::tuplecids.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferCanStartStreaming()

static bool ReorderBufferCanStartStreaming ( ReorderBuffer rb)
inlinestatic

Definition at line 4316 of file reorderbuffer.c.

4317{
4319 SnapBuild *builder = ctx->snapshot_builder;
4320
4321 /* We can't start streaming unless a consistent state is reached. */
4323 return false;
4324
4325 /*
4326 * We can't start streaming immediately even if the streaming is enabled
4327 * because we previously decoded this transaction and now just are
4328 * restarting.
4329 */
4330 if (ReorderBufferCanStream(rb) &&
4331 !SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr))
4332 return true;
4333
4334 return false;
4335}
static bool ReorderBufferCanStream(ReorderBuffer *rb)
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
Definition: snapbuild.c:277
@ SNAPBUILD_CONSISTENT
Definition: snapbuild.h:50
XLogRecPtr ReadRecPtr
Definition: xlogreader.h:205

References ReorderBuffer::private_data, LogicalDecodingContext::reader, XLogReaderState::ReadRecPtr, ReorderBufferCanStream(), SNAPBUILD_CONSISTENT, SnapBuildCurrentState(), SnapBuildXactNeedsSkip(), and LogicalDecodingContext::snapshot_builder.

Referenced by ReorderBufferCheckMemoryLimit(), and ReorderBufferProcessPartialChange().

◆ ReorderBufferCanStream()

static bool ReorderBufferCanStream ( ReorderBuffer rb)
inlinestatic

◆ ReorderBufferChangeMemoryUpdate()

static void ReorderBufferChangeMemoryUpdate ( ReorderBuffer rb,
ReorderBufferChange change,
ReorderBufferTXN txn,
bool  addition,
Size  sz 
)
static

Definition at line 3385 of file reorderbuffer.c.

3389{
3390 ReorderBufferTXN *toptxn;
3391
3392 Assert(txn || change);
3393
3394 /*
3395 * Ignore tuple CID changes, because those are not evicted when reaching
3396 * memory limit. So we just don't count them, because it might easily
3397 * trigger a pointless attempt to spill.
3398 */
3399 if (change && change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
3400 return;
3401
3402 if (sz == 0)
3403 return;
3404
3405 if (txn == NULL)
3406 txn = change->txn;
3407 Assert(txn != NULL);
3408
3409 /*
3410 * Update the total size in top level as well. This is later used to
3411 * compute the decoding stats.
3412 */
3413 toptxn = rbtxn_get_toptxn(txn);
3414
3415 if (addition)
3416 {
3417 Size oldsize = txn->size;
3418
3419 txn->size += sz;
3420 rb->size += sz;
3421
3422 /* Update the total size in the top transaction. */
3423 toptxn->total_size += sz;
3424
3425 /* Update the max-heap */
3426 if (oldsize != 0)
3428 pairingheap_add(rb->txn_heap, &txn->txn_node);
3429 }
3430 else
3431 {
3432 Assert((rb->size >= sz) && (txn->size >= sz));
3433 txn->size -= sz;
3434 rb->size -= sz;
3435
3436 /* Update the total size in the top transaction. */
3437 toptxn->total_size -= sz;
3438
3439 /* Update the max-heap */
3441 if (txn->size != 0)
3442 pairingheap_add(rb->txn_heap, &txn->txn_node);
3443 }
3444
3445 Assert(txn->size <= rb->size);
3446}
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:184
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:126
pairingheap_node txn_node

References ReorderBufferChange::action, Assert(), pairingheap_add(), pairingheap_remove(), rbtxn_get_toptxn, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferTXN::size, ReorderBuffer::size, ReorderBufferTXN::total_size, ReorderBufferChange::txn, ReorderBuffer::txn_heap, and ReorderBufferTXN::txn_node.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferFreeChange(), ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferSerializeTXN(), ReorderBufferToastReplace(), and ReorderBufferTruncateTXN().

◆ ReorderBufferChangeSize()

static Size ReorderBufferChangeSize ( ReorderBufferChange change)
static

Definition at line 4459 of file reorderbuffer.c.

4460{
4461 Size sz = sizeof(ReorderBufferChange);
4462
4463 switch (change->action)
4464 {
4465 /* fall through these, they're all similar enough */
4470 {
4471 HeapTuple oldtup,
4472 newtup;
4473 Size oldlen = 0;
4474 Size newlen = 0;
4475
4476 oldtup = change->data.tp.oldtuple;
4477 newtup = change->data.tp.newtuple;
4478
4479 if (oldtup)
4480 {
4481 sz += sizeof(HeapTupleData);
4482 oldlen = oldtup->t_len;
4483 sz += oldlen;
4484 }
4485
4486 if (newtup)
4487 {
4488 sz += sizeof(HeapTupleData);
4489 newlen = newtup->t_len;
4490 sz += newlen;
4491 }
4492
4493 break;
4494 }
4496 {
4497 Size prefix_size = strlen(change->data.msg.prefix) + 1;
4498
4499 sz += prefix_size + change->data.msg.message_size +
4500 sizeof(Size) + sizeof(Size);
4501
4502 break;
4503 }
4505 {
4506 sz += sizeof(SharedInvalidationMessage) *
4507 change->data.inval.ninvalidations;
4508 break;
4509 }
4511 {
4512 Snapshot snap;
4513
4514 snap = change->data.snapshot;
4515
4516 sz += sizeof(SnapshotData) +
4517 sizeof(TransactionId) * snap->xcnt +
4518 sizeof(TransactionId) * snap->subxcnt;
4519
4520 break;
4521 }
4523 {
4524 sz += sizeof(Oid) * change->data.truncate.nrelids;
4525
4526 break;
4527 }
4532 /* ReorderBufferChange contains everything important */
4533 break;
4534 }
4535
4536 return sz;
4537}
struct HeapTupleData HeapTupleData
struct ReorderBufferChange ReorderBufferChange
@ REORDER_BUFFER_CHANGE_INVALIDATION
Definition: reorderbuffer.h:56
@ REORDER_BUFFER_CHANGE_MESSAGE
Definition: reorderbuffer.h:55
@ REORDER_BUFFER_CHANGE_TRUNCATE
Definition: reorderbuffer.h:63
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:54
struct SnapshotData SnapshotData
uint32 t_len
Definition: htup.h:64
struct ReorderBufferChange::@114::@116 truncate
struct ReorderBufferChange::@114::@115 tp
struct ReorderBufferChange::@114::@119 inval
int32 subxcnt
Definition: snapshot.h:177
uint32 xcnt
Definition: snapshot.h:165

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::message_size, ReorderBufferChange::msg, ReorderBufferChange::newtuple, ReorderBufferChange::ninvalidations, ReorderBufferChange::nrelids, ReorderBufferChange::oldtuple, ReorderBufferChange::prefix, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::snapshot, SnapshotData::subxcnt, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, and SnapshotData::xcnt.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferFreeChange(), ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferToastReplace(), and ReorderBufferTruncateTXN().

◆ ReorderBufferCheckAndTruncateAbortedTXN()

static bool ReorderBufferCheckAndTruncateAbortedTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1774 of file reorderbuffer.c.

1775{
1776 /* Quick return for regression tests */
1778 return false;
1779
1780 /*
1781 * Quick return if the transaction status is already known.
1782 */
1783
1784 if (rbtxn_is_committed(txn))
1785 return false;
1786 if (rbtxn_is_aborted(txn))
1787 {
1788 /* Already-aborted transactions should not have any changes */
1789 Assert(txn->size == 0);
1790
1791 return true;
1792 }
1793
1794 /* Otherwise, check the transaction status using CLOG lookup */
1795
1797 return false;
1798
1799 if (TransactionIdDidCommit(txn->xid))
1800 {
1801 /*
1802 * Remember the transaction is committed so that we can skip CLOG
1803 * check next time, avoiding the pressure on CLOG lookup.
1804 */
1805 Assert(!rbtxn_is_aborted(txn));
1807 return false;
1808 }
1809
1810 /*
1811 * The transaction aborted. We discard both the changes collected so far
1812 * and the toast reconstruction data. The full cleanup will happen as part
1813 * of decoding ABORT record of this transaction.
1814 */
1816 ReorderBufferToastReset(rb, txn);
1817
1818 /* All changes should be discarded */
1819 Assert(txn->size == 0);
1820
1821 /*
1822 * Mark the transaction as aborted so we can ignore future changes of this
1823 * transaction.
1824 */
1827
1828 return true;
1829}
#define unlikely(x)
Definition: c.h:418
bool TransactionIdIsInProgress(TransactionId xid)
Definition: procarray.c:1402
int debug_logical_replication_streaming
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
#define rbtxn_is_committed(txn)
@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE
Definition: reorderbuffer.h:34
#define rbtxn_is_prepared(txn)
#define RBTXN_IS_COMMITTED
#define rbtxn_is_aborted(txn)
#define RBTXN_IS_ABORTED
bool TransactionIdDidCommit(TransactionId transactionId)
Definition: transam.c:126

References Assert(), DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE, debug_logical_replication_streaming, RBTXN_IS_ABORTED, rbtxn_is_aborted, RBTXN_IS_COMMITTED, rbtxn_is_committed, rbtxn_is_prepared, ReorderBufferToastReset(), ReorderBufferTruncateTXN(), ReorderBufferTXN::size, TransactionIdDidCommit(), TransactionIdIsInProgress(), ReorderBufferTXN::txn_flags, unlikely, and ReorderBufferTXN::xid.

Referenced by ReorderBufferCheckMemoryLimit().

◆ ReorderBufferCheckMemoryLimit()

static void ReorderBufferCheckMemoryLimit ( ReorderBuffer rb)
static

Definition at line 3896 of file reorderbuffer.c.

3897{
3898 ReorderBufferTXN *txn;
3899 bool update_stats = true;
3900
3901 if (rb->size >= logical_decoding_work_mem * (Size) 1024)
3902 {
3903 /*
3904 * Update the statistics as the memory usage has reached the limit. We
3905 * report the statistics update later in this function since we can
3906 * update the slot statistics altogether while streaming or
3907 * serializing transactions in most cases.
3908 */
3909 rb->memExceededCount += 1;
3910 }
3912 {
3913 /*
3914 * Bail out if debug_logical_replication_streaming is buffered and we
3915 * haven't exceeded the memory limit.
3916 */
3917 return;
3918 }
3919
3920 /*
3921 * If debug_logical_replication_streaming is immediate, loop until there's
3922 * no change. Otherwise, loop until we reach under the memory limit. One
3923 * might think that just by evicting the largest (sub)transaction we will
3924 * come under the memory limit based on assumption that the selected
3925 * transaction is at least as large as the most recent change (which
3926 * caused us to go over the memory limit). However, that is not true
3927 * because a user can reduce the logical_decoding_work_mem to a smaller
3928 * value before the most recent change.
3929 */
3930 while (rb->size >= logical_decoding_work_mem * (Size) 1024 ||
3932 rb->size > 0))
3933 {
3934 /*
3935 * Pick the largest non-aborted transaction and evict it from memory
3936 * by streaming, if possible. Otherwise, spill to disk.
3937 */
3939 (txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
3940 {
3941 /* we know there has to be one, because the size is not zero */
3942 Assert(txn && rbtxn_is_toptxn(txn));
3943 Assert(txn->total_size > 0);
3944 Assert(rb->size >= txn->total_size);
3945
3946 /* skip the transaction if aborted */
3948 continue;
3949
3950 ReorderBufferStreamTXN(rb, txn);
3951 }
3952 else
3953 {
3954 /*
3955 * Pick the largest transaction (or subtransaction) and evict it
3956 * from memory by serializing it to disk.
3957 */
3958 txn = ReorderBufferLargestTXN(rb);
3959
3960 /* we know there has to be one, because the size is not zero */
3961 Assert(txn);
3962 Assert(txn->size > 0);
3963 Assert(rb->size >= txn->size);
3964
3965 /* skip the transaction if aborted */
3967 continue;
3968
3970 }
3971
3972 /*
3973 * After eviction, the transaction should have no entries in memory,
3974 * and should use 0 bytes for changes.
3975 */
3976 Assert(txn->size == 0);
3977 Assert(txn->nentries_mem == 0);
3978
3979 /*
3980 * We've reported the memExceededCount update while streaming or
3981 * serializing the transaction.
3982 */
3983 update_stats = false;
3984 }
3985
3986 if (update_stats)
3988
3989 /* We must be under the memory limit now. */
3990 Assert(rb->size < logical_decoding_work_mem * (Size) 1024);
3991}
void UpdateDecodingStats(LogicalDecodingContext *ctx)
Definition: logical.c:1952
static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb)
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)
int logical_decoding_work_mem
static ReorderBufferTXN * ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
static bool ReorderBufferCheckAndTruncateAbortedTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
@ DEBUG_LOGICAL_REP_STREAMING_BUFFERED
Definition: reorderbuffer.h:33
#define rbtxn_is_toptxn(txn)

References Assert(), DEBUG_LOGICAL_REP_STREAMING_BUFFERED, DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE, debug_logical_replication_streaming, logical_decoding_work_mem, ReorderBuffer::memExceededCount, ReorderBufferTXN::nentries_mem, ReorderBuffer::private_data, rbtxn_is_toptxn, ReorderBufferCanStartStreaming(), ReorderBufferCheckAndTruncateAbortedTXN(), ReorderBufferLargestStreamableTopTXN(), ReorderBufferLargestTXN(), ReorderBufferSerializeTXN(), ReorderBufferStreamTXN(), ReorderBufferTXN::size, ReorderBuffer::size, ReorderBufferTXN::total_size, and UpdateDecodingStats().

Referenced by ReorderBufferQueueChange().

◆ ReorderBufferCleanupSerializedTXNs()

static void ReorderBufferCleanupSerializedTXNs ( const char *  slotname)
static

Definition at line 4884 of file reorderbuffer.c.

4885{
4886 DIR *spill_dir;
4887 struct dirent *spill_de;
4888 struct stat statbuf;
4889 char path[MAXPGPATH * 2 + sizeof(PG_REPLSLOT_DIR)];
4890
4891 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, slotname);
4892
4893 /* we're only handling directories here, skip if it's not ours */
4894 if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
4895 return;
4896
4897 spill_dir = AllocateDir(path);
4898 while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
4899 {
4900 /* only look at names that can be ours */
4901 if (strncmp(spill_de->d_name, "xid", 3) == 0)
4902 {
4903 snprintf(path, sizeof(path),
4904 "%s/%s/%s", PG_REPLSLOT_DIR, slotname,
4905 spill_de->d_name);
4906
4907 if (unlink(path) != 0)
4908 ereport(ERROR,
4910 errmsg("could not remove file \"%s\" during removal of %s/%s/xid*: %m",
4911 path, PG_REPLSLOT_DIR, slotname)));
4912 }
4913 }
4914 FreeDir(spill_dir);
4915}
#define INFO
Definition: elog.h:34
int FreeDir(DIR *dir)
Definition: fd.c:3005
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2968
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2887
#define snprintf
Definition: port.h:260
#define PG_REPLSLOT_DIR
Definition: slot.h:21
Definition: dirent.c:26
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15
#define lstat(path, sb)
Definition: win32_port.h:275
#define S_ISDIR(m)
Definition: win32_port.h:315

References AllocateDir(), dirent::d_name, ereport, errcode_for_file_access(), errmsg(), ERROR, FreeDir(), INFO, lstat, MAXPGPATH, PG_REPLSLOT_DIR, ReadDirExtended(), S_ISDIR, snprintf, sprintf, and stat::st_mode.

Referenced by ReorderBufferAllocate(), ReorderBufferFree(), and StartupReorderBuffer().

◆ ReorderBufferCleanupTXN()

static void ReorderBufferCleanupTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1535 of file reorderbuffer.c.

1536{
1537 bool found;
1538 dlist_mutable_iter iter;
1539 Size mem_freed = 0;
1540
1541 /* cleanup subtransactions & their changes */
1542 dlist_foreach_modify(iter, &txn->subtxns)
1543 {
1544 ReorderBufferTXN *subtxn;
1545
1546 subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1547
1548 /*
1549 * Subtransactions are always associated to the toplevel TXN, even if
1550 * they originally were happening inside another subtxn, so we won't
1551 * ever recurse more than one level deep here.
1552 */
1554 Assert(subtxn->nsubtxns == 0);
1555
1556 ReorderBufferCleanupTXN(rb, subtxn);
1557 }
1558
1559 /* cleanup changes in the txn */
1560 dlist_foreach_modify(iter, &txn->changes)
1561 {
1562 ReorderBufferChange *change;
1563
1564 change = dlist_container(ReorderBufferChange, node, iter.cur);
1565
1566 /* Check we're not mixing changes from different transactions. */
1567 Assert(change->txn == txn);
1568
1569 /*
1570 * Instead of updating the memory counter for individual changes, we
1571 * sum up the size of memory to free so we can update the memory
1572 * counter all together below. This saves costs of maintaining the
1573 * max-heap.
1574 */
1575 mem_freed += ReorderBufferChangeSize(change);
1576
1577 ReorderBufferFreeChange(rb, change, false);
1578 }
1579
1580 /* Update the memory counter */
1581 ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
1582
1583 /*
1584 * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1585 * They are always stored in the toplevel transaction.
1586 */
1587 dlist_foreach_modify(iter, &txn->tuplecids)
1588 {
1589 ReorderBufferChange *change;
1590
1591 change = dlist_container(ReorderBufferChange, node, iter.cur);
1592
1593 /* Check we're not mixing changes from different transactions. */
1594 Assert(change->txn == txn);
1596
1597 ReorderBufferFreeChange(rb, change, true);
1598 }
1599
1600 /*
1601 * Cleanup the base snapshot, if set.
1602 */
1603 if (txn->base_snapshot != NULL)
1604 {
1607 }
1608
1609 /*
1610 * Cleanup the snapshot for the last streamed run.
1611 */
1612 if (txn->snapshot_now != NULL)
1613 {
1616 }
1617
1618 /*
1619 * Remove TXN from its containing lists.
1620 *
1621 * Note: if txn is known as subxact, we are deleting the TXN from its
1622 * parent's list of known subxacts; this leaves the parent's nsubxacts
1623 * count too high, but we don't care. Otherwise, we are deleting the TXN
1624 * from the LSN-ordered list of toplevel TXNs. We remove the TXN from the
1625 * list of catalog modifying transactions as well.
1626 */
1627 dlist_delete(&txn->node);
1630
1631 /* now remove reference from buffer */
1632 hash_search(rb->by_txn, &txn->xid, HASH_REMOVE, &found);
1633 Assert(found);
1634
1635 /* remove entries spilled to disk */
1636 if (rbtxn_is_serialized(txn))
1638
1639 /* deallocate */
1640 ReorderBufferFreeTXN(rb, txn);
1641}
@ HASH_REMOVE
Definition: hsearch.h:115
static void dclist_delete_from(dclist_head *head, dlist_node *node)
Definition: ilist.h:763
void ReorderBufferFreeChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
static void ReorderBufferFreeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, ReorderBufferTXN *txn, bool addition, Size sz)
#define rbtxn_is_serialized(txn)
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:328
Snapshot snapshot_now
dlist_node catchange_node
dlist_node base_snapshot_node

References ReorderBufferChange::action, Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_node, ReorderBuffer::by_txn, ReorderBufferTXN::catchange_node, ReorderBuffer::catchange_txns, ReorderBufferTXN::changes, dlist_mutable_iter::cur, dclist_delete_from(), dlist_container, dlist_delete(), dlist_foreach_modify, HASH_REMOVE, hash_search(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_has_catalog_changes, rbtxn_is_known_subxact, rbtxn_is_serialized, rbtxn_is_streamed, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferCleanupTXN(), ReorderBufferFreeChange(), ReorderBufferFreeSnap(), ReorderBufferFreeTXN(), ReorderBufferRestoreCleanup(), SnapBuildSnapDecRefcount(), ReorderBufferTXN::snapshot_now, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAbortOld(), ReorderBufferCleanupTXN(), ReorderBufferFinishPrepared(), ReorderBufferForget(), ReorderBufferProcessTXN(), ReorderBufferReplay(), and ReorderBufferStreamCommit().

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2884 of file reorderbuffer.c.

2888{
2889 ReorderBufferTXN *txn;
2890
2891 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2892 false);
2893
2894 /* unknown transaction, nothing to replay */
2895 if (txn == NULL)
2896 return;
2897
2898 ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2899 origin_id, origin_lsn);
2900}
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)

References InvalidXLogRecPtr, ReorderBufferReplay(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 1219 of file reorderbuffer.c.

1222{
1223 ReorderBufferTXN *subtxn;
1224
1225 subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1226 InvalidXLogRecPtr, false);
1227
1228 /*
1229 * No need to do anything if that subtxn didn't contain any changes
1230 */
1231 if (!subtxn)
1232 return;
1233
1234 subtxn->final_lsn = commit_lsn;
1235 subtxn->end_lsn = end_lsn;
1236
1237 /*
1238 * Assign this subxact as a child of the toplevel xact (no-op if already
1239 * done.)
1240 */
1242}
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferAssignChild(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit(), and DecodePrepare().

◆ ReorderBufferCopySnap()

static Snapshot ReorderBufferCopySnap ( ReorderBuffer rb,
Snapshot  orig_snap,
ReorderBufferTXN txn,
CommandId  cid 
)
static

Definition at line 1909 of file reorderbuffer.c.

1911{
1912 Snapshot snap;
1913 dlist_iter iter;
1914 int i = 0;
1915 Size size;
1916
1917 size = sizeof(SnapshotData) +
1918 sizeof(TransactionId) * orig_snap->xcnt +
1919 sizeof(TransactionId) * (txn->nsubtxns + 1);
1920
1921 snap = MemoryContextAllocZero(rb->context, size);
1922 memcpy(snap, orig_snap, sizeof(SnapshotData));
1923
1924 snap->copied = true;
1925 snap->active_count = 1; /* mark as active so nobody frees it */
1926 snap->regd_count = 0;
1927 snap->xip = (TransactionId *) (snap + 1);
1928
1929 memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1930
1931 /*
1932 * snap->subxip contains all txids that belong to our transaction which we
1933 * need to check via cmin/cmax. That's why we store the toplevel
1934 * transaction in there as well.
1935 */
1936 snap->subxip = snap->xip + snap->xcnt;
1937 snap->subxip[i++] = txn->xid;
1938
1939 /*
1940 * txn->nsubtxns isn't decreased when subtransactions abort, so count
1941 * manually. Since it's an upper boundary it is safe to use it for the
1942 * allocation above.
1943 */
1944 snap->subxcnt = 1;
1945
1946 dlist_foreach(iter, &txn->subtxns)
1947 {
1948 ReorderBufferTXN *sub_txn;
1949
1950 sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1951 snap->subxip[i++] = sub_txn->xid;
1952 snap->subxcnt++;
1953 }
1954
1955 /* sort so we can bsearch() later */
1956 qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1957
1958 /* store the specified current CommandId */
1959 snap->curcid = cid;
1960
1961 return snap;
1962}
int i
Definition: isn.c:77
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1263
#define qsort(a, b, c, d)
Definition: port.h:499
bool copied
Definition: snapshot.h:181
uint32 regd_count
Definition: snapshot.h:201
uint32 active_count
Definition: snapshot.h:200
CommandId curcid
Definition: snapshot.h:183
TransactionId * subxip
Definition: snapshot.h:176
TransactionId * xip
Definition: snapshot.h:164
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:152

References SnapshotData::active_count, ReorderBuffer::context, SnapshotData::copied, dlist_iter::cur, SnapshotData::curcid, dlist_container, dlist_foreach, i, MemoryContextAllocZero(), ReorderBufferTXN::nsubtxns, qsort, SnapshotData::regd_count, ReorderBufferTXN::subtxns, SnapshotData::subxcnt, SnapshotData::subxip, SnapshotData::xcnt, ReorderBufferTXN::xid, xidComparator(), and SnapshotData::xip.

Referenced by ReorderBufferProcessTXN(), ReorderBufferSaveTXNSnapshot(), and ReorderBufferStreamTXN().

◆ ReorderBufferExecuteInvalidations()

static void ReorderBufferExecuteInvalidations ( uint32  nmsgs,
SharedInvalidationMessage msgs 
)
static

Definition at line 3641 of file reorderbuffer.c.

3642{
3643 int i;
3644
3645 for (i = 0; i < nmsgs; i++)
3647}
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:823

References i, and LocalExecuteInvalidationMessage().

Referenced by ReorderBufferFinishPrepared(), and ReorderBufferProcessTXN().

◆ ReorderBufferFinishPrepared()

void ReorderBufferFinishPrepared ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
XLogRecPtr  two_phase_at,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn,
char *  gid,
bool  is_commit 
)

Definition at line 3001 of file reorderbuffer.c.

3006{
3007 ReorderBufferTXN *txn;
3008 XLogRecPtr prepare_end_lsn;
3009 TimestampTz prepare_time;
3010
3011 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
3012
3013 /* unknown transaction, nothing to do */
3014 if (txn == NULL)
3015 return;
3016
3017 /*
3018 * By this time the txn has the prepare record information, remember it to
3019 * be later used for rollback.
3020 */
3021 prepare_end_lsn = txn->end_lsn;
3022 prepare_time = txn->prepare_time;
3023
3024 /* add the gid in the txn */
3025 txn->gid = pstrdup(gid);
3026
3027 /*
3028 * It is possible that this transaction is not decoded at prepare time
3029 * either because by that time we didn't have a consistent snapshot, or
3030 * two_phase was not enabled, or it was decoded earlier but we have
3031 * restarted. We only need to send the prepare if it was not decoded
3032 * earlier. We don't need to decode the xact for aborts if it is not done
3033 * already.
3034 */
3035 if ((txn->final_lsn < two_phase_at) && is_commit)
3036 {
3037 /*
3038 * txn must have been marked as a prepared transaction and skipped but
3039 * not sent a prepare. Also, the prepare info must have been updated
3040 * in txn even if we skip prepare.
3041 */
3045
3046 /*
3047 * By this time the txn has the prepare record information and it is
3048 * important to use that so that downstream gets the accurate
3049 * information. If instead, we have passed commit information here
3050 * then downstream can behave as it has already replayed commit
3051 * prepared after the restart.
3052 */
3053 ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
3054 txn->prepare_time, txn->origin_id, txn->origin_lsn);
3055 }
3056
3057 txn->final_lsn = commit_lsn;
3058 txn->end_lsn = end_lsn;
3059 txn->commit_time = commit_time;
3060 txn->origin_id = origin_id;
3061 txn->origin_lsn = origin_lsn;
3062
3063 if (is_commit)
3064 rb->commit_prepared(rb, txn, commit_lsn);
3065 else
3066 rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
3067
3068 /* cleanup: make sure there's no cache pollution */
3070 txn->invalidations);
3071 ReorderBufferCleanupTXN(rb, txn);
3072}
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1759
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
#define RBTXN_PREPARE_STATUS_MASK
#define RBTXN_IS_PREPARED
#define RBTXN_SKIPPED_PREPARE
TimestampTz commit_time
RepOriginId origin_id
XLogRecPtr origin_lsn
TimestampTz prepare_time
ReorderBufferCommitPreparedCB commit_prepared
ReorderBufferRollbackPreparedCB rollback_prepared

References Assert(), ReorderBuffer::commit_prepared, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, ReorderBufferTXN::invalidations, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, pstrdup(), RBTXN_IS_PREPARED, RBTXN_PREPARE_STATUS_MASK, RBTXN_SKIPPED_PREPARE, ReorderBufferCleanupTXN(), ReorderBufferExecuteInvalidations(), ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBuffer::rollback_prepared, ReorderBufferTXN::txn_flags, and XLogRecPtrIsValid.

Referenced by DecodeAbort(), and DecodeCommit().

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3180 of file reorderbuffer.c.

3181{
3182 ReorderBufferTXN *txn;
3183
3184 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3185 false);
3186
3187 /* unknown, nothing to forget */
3188 if (txn == NULL)
3189 return;
3190
3191 /* this transaction mustn't be streamed */
3193
3194 /* cosmetic... */
3195 txn->final_lsn = lsn;
3196
3197 /*
3198 * Process only cache invalidation messages in this transaction if there
3199 * are any. Even if we're not interested in the transaction's contents, it
3200 * could have manipulated the catalog and we need to update the caches
3201 * according to that.
3202 */
3203 if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3205 txn->invalidations);
3206 else
3207 Assert(txn->ninvalidations == 0);
3208
3209 /* remove potential on-disk data, and deallocate */
3210 ReorderBufferCleanupTXN(rb, txn);
3211}

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer rb)

Definition at line 417 of file reorderbuffer.c.

418{
419 MemoryContext context = rb->context;
420
421 /*
422 * We free separately allocated data by entirely scrapping reorderbuffer's
423 * memory context.
424 */
425 MemoryContextDelete(context);
426
427 /* Free disk space used by unconsumed reorder buffers */
429}
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:469

References ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

◆ ReorderBufferFreeChange()

void ReorderBufferFreeChange ( ReorderBuffer rb,
ReorderBufferChange change,
bool  upd_mem 
)

Definition at line 522 of file reorderbuffer.c.

524{
525 /* update memory accounting info */
526 if (upd_mem)
527 ReorderBufferChangeMemoryUpdate(rb, change, NULL, false,
529
530 /* free contained data */
531 switch (change->action)
532 {
537 if (change->data.tp.newtuple)
538 {
540 change->data.tp.newtuple = NULL;
541 }
542
543 if (change->data.tp.oldtuple)
544 {
546 change->data.tp.oldtuple = NULL;
547 }
548 break;
550 if (change->data.msg.prefix != NULL)
551 pfree(change->data.msg.prefix);
552 change->data.msg.prefix = NULL;
553 if (change->data.msg.message != NULL)
554 pfree(change->data.msg.message);
555 change->data.msg.message = NULL;
556 break;
558 if (change->data.inval.invalidations)
559 pfree(change->data.inval.invalidations);
560 change->data.inval.invalidations = NULL;
561 break;
563 if (change->data.snapshot)
564 {
566 change->data.snapshot = NULL;
567 }
568 break;
569 /* no data in addition to the struct itself */
571 if (change->data.truncate.relids != NULL)
572 {
574 change->data.truncate.relids = NULL;
575 }
576 break;
581 break;
582 }
583
584 pfree(change);
585}
void ReorderBufferFreeRelids(ReorderBuffer *rb, Oid *relids)
void ReorderBufferFreeTupleBuf(HeapTuple tuple)
SharedInvalidationMessage * invalidations

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::invalidations, ReorderBufferChange::message, ReorderBufferChange::msg, ReorderBufferChange::newtuple, ReorderBufferChange::oldtuple, pfree(), ReorderBufferChange::prefix, ReorderBufferChange::relids, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferFreeRelids(), ReorderBufferFreeSnap(), ReorderBufferFreeTupleBuf(), ReorderBufferChange::snapshot, ReorderBufferChange::tp, and ReorderBufferChange::truncate.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferProcessTXN(), ReorderBufferQueueChange(), ReorderBufferResetTXN(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferToastReset(), and ReorderBufferTruncateTXN().

◆ ReorderBufferFreeRelids()

void ReorderBufferFreeRelids ( ReorderBuffer rb,
Oid relids 
)

Definition at line 641 of file reorderbuffer.c.

642{
643 pfree(relids);
644}

References pfree().

Referenced by ReorderBufferFreeChange().

◆ ReorderBufferFreeSnap()

static void ReorderBufferFreeSnap ( ReorderBuffer rb,
Snapshot  snap 
)
static

Definition at line 1968 of file reorderbuffer.c.

1969{
1970 if (snap->copied)
1971 pfree(snap);
1972 else
1974}

References SnapshotData::copied, pfree(), and SnapBuildSnapDecRefcount().

Referenced by ReorderBufferCleanupTXN(), ReorderBufferFreeChange(), ReorderBufferProcessTXN(), and ReorderBufferStreamTXN().

◆ ReorderBufferFreeTupleBuf()

void ReorderBufferFreeTupleBuf ( HeapTuple  tuple)

Definition at line 610 of file reorderbuffer.c.

611{
612 pfree(tuple);
613}

References pfree().

Referenced by ReorderBufferFreeChange().

◆ ReorderBufferFreeTXN()

static void ReorderBufferFreeTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 459 of file reorderbuffer.c.

460{
461 /* clean the lookup cache if we were cached (quite likely) */
462 if (rb->by_txn_last_xid == txn->xid)
463 {
465 rb->by_txn_last_txn = NULL;
466 }
467
468 /* free data that's contained */
469
470 if (txn->gid != NULL)
471 {
472 pfree(txn->gid);
473 txn->gid = NULL;
474 }
475
476 if (txn->tuplecid_hash != NULL)
477 {
479 txn->tuplecid_hash = NULL;
480 }
481
482 if (txn->invalidations)
483 {
484 pfree(txn->invalidations);
485 txn->invalidations = NULL;
486 }
487
489 {
491 txn->invalidations_distributed = NULL;
492 }
493
494 /* Reset the toast hash */
496
497 /* All changes must be deallocated */
498 Assert(txn->size == 0);
499
500 pfree(txn);
501}
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:865

References Assert(), ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBufferTXN::gid, hash_destroy(), ReorderBufferTXN::invalidations, ReorderBufferTXN::invalidations_distributed, InvalidTransactionId, pfree(), ReorderBufferToastReset(), ReorderBufferTXN::size, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::xid.

Referenced by ReorderBufferCleanupTXN().

◆ ReorderBufferGetCatalogChangesXacts()

TransactionId * ReorderBufferGetCatalogChangesXacts ( ReorderBuffer rb)

Definition at line 3691 of file reorderbuffer.c.

3692{
3693 dlist_iter iter;
3694 TransactionId *xids = NULL;
3695 size_t xcnt = 0;
3696
3697 /* Quick return if the list is empty */
3698 if (dclist_count(&rb->catchange_txns) == 0)
3699 return NULL;
3700
3701 /* Initialize XID array */
3703 dclist_foreach(iter, &rb->catchange_txns)
3704 {
3706 catchange_node,
3707 iter.cur);
3708
3710
3711 xids[xcnt++] = txn->xid;
3712 }
3713
3714 qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
3715
3716 Assert(xcnt == dclist_count(&rb->catchange_txns));
3717 return xids;
3718}
#define dclist_container(type, membername, ptr)
Definition: ilist.h:947
static uint32 dclist_count(const dclist_head *head)
Definition: ilist.h:932
#define dclist_foreach(iter, lhead)
Definition: ilist.h:970

References Assert(), ReorderBuffer::catchange_txns, dlist_iter::cur, dclist_container, dclist_count(), dclist_foreach, palloc_array, qsort, rbtxn_has_catalog_changes, ReorderBufferTXN::xid, and xidComparator().

Referenced by SnapBuildSerialize().

◆ ReorderBufferGetInvalidations()

uint32 ReorderBufferGetInvalidations ( ReorderBuffer rb,
TransactionId  xid,
SharedInvalidationMessage **  msgs 
)

Definition at line 5631 of file reorderbuffer.c.

5633{
5634 ReorderBufferTXN *txn;
5635
5636 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
5637 false);
5638
5639 if (txn == NULL)
5640 return 0;
5641
5642 *msgs = txn->invalidations;
5643
5644 return txn->ninvalidations;
5645}

References ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, and ReorderBufferTXNByXid().

Referenced by SnapBuildDistributeSnapshotAndInval().

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN * ReorderBufferGetOldestTXN ( ReorderBuffer rb)

Definition at line 1044 of file reorderbuffer.c.

1045{
1046 ReorderBufferTXN *txn;
1047
1049
1051 return NULL;
1052
1054
1057 return txn;
1058}
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:603

References Assert(), AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, rbtxn_is_known_subxact, ReorderBuffer::toplevel_by_lsn, and XLogRecPtrIsValid.

Referenced by SnapBuildProcessRunningXacts().

◆ ReorderBufferGetOldestXmin()

TransactionId ReorderBufferGetOldestXmin ( ReorderBuffer rb)

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer rb,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 3253 of file reorderbuffer.c.

3255{
3256 bool use_subtxn = IsTransactionOrTransactionBlock();
3259 int i;
3260
3261 if (use_subtxn)
3263
3264 /*
3265 * Force invalidations to happen outside of a valid transaction - that way
3266 * entries will just be marked as invalid without accessing the catalog.
3267 * That's advantageous because we don't need to setup the full state
3268 * necessary for catalog access.
3269 */
3270 if (use_subtxn)
3272
3273 for (i = 0; i < ninvalidations; i++)
3274 LocalExecuteInvalidationMessage(&invalidations[i]);
3275
3276 if (use_subtxn)
3277 {
3280 CurrentResourceOwner = cowner;
3281 }
3282}
ResourceOwner CurrentResourceOwner
Definition: resowner.c:173
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:5007
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4712
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4814
void AbortCurrentTransaction(void)
Definition: xact.c:3469

References AbortCurrentTransaction(), BeginInternalSubTransaction(), CurrentMemoryContext, CurrentResourceOwner, i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), MemoryContextSwitchTo(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by ReorderBufferAbort(), ReorderBufferForget(), ReorderBufferInvalidate(), and xact_decode().

◆ ReorderBufferInvalidate()

void ReorderBufferInvalidate ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3222 of file reorderbuffer.c.

3223{
3224 ReorderBufferTXN *txn;
3225
3226 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3227 false);
3228
3229 /* unknown, nothing to do */
3230 if (txn == NULL)
3231 return;
3232
3233 /*
3234 * Process cache invalidation messages if there are any. Even if we're not
3235 * interested in the transaction's contents, it could have manipulated the
3236 * catalog and we need to update the caches according to that.
3237 */
3238 if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3240 txn->invalidations);
3241 else
3242 Assert(txn->ninvalidations == 0);
3243}

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodePrepare().

◆ ReorderBufferIterCompare()

static int ReorderBufferIterCompare ( Datum  a,
Datum  b,
void *  arg 
)
static

Definition at line 1261 of file reorderbuffer.c.

1262{
1264 XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
1265 XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
1266
1267 if (pos_a < pos_b)
1268 return 1;
1269 else if (pos_a == pos_b)
1270 return 0;
1271 return -1;
1272}
void * arg
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:212
Definition: regguts.h:323

References a, arg, b, and DatumGetInt32().

Referenced by ReorderBufferIterTXNInit().

◆ ReorderBufferIterTXNFinish()

static void ReorderBufferIterTXNFinish ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1504 of file reorderbuffer.c.

1506{
1507 int32 off;
1508
1509 for (off = 0; off < state->nr_txns; off++)
1510 {
1511 if (state->entries[off].file.vfd != -1)
1512 FileClose(state->entries[off].file.vfd);
1513 }
1514
1515 /* free memory we might have "leaked" in the last *Next call */
1516 if (!dlist_is_empty(&state->old_change))
1517 {
1518 ReorderBufferChange *change;
1519
1520 change = dlist_container(ReorderBufferChange, node,
1521 dlist_pop_head_node(&state->old_change));
1522 ReorderBufferFreeChange(rb, change, true);
1523 Assert(dlist_is_empty(&state->old_change));
1524 }
1525
1526 binaryheap_free(state->heap);
1527 pfree(state);
1528}
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:75
void FileClose(File file)
Definition: fd.c:1962
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:450

References Assert(), binaryheap_free(), dlist_container, dlist_is_empty(), dlist_pop_head_node(), FileClose(), pfree(), and ReorderBufferFreeChange().

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferIterTXNInit()

static void ReorderBufferIterTXNInit ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferIterTXNState *volatile *  iter_state 
)
static

Definition at line 1284 of file reorderbuffer.c.

1286{
1287 Size nr_txns = 0;
1289 dlist_iter cur_txn_i;
1290 int32 off;
1291
1292 *iter_state = NULL;
1293
1294 /* Check ordering of changes in the toplevel transaction. */
1296
1297 /*
1298 * Calculate the size of our heap: one element for every transaction that
1299 * contains changes. (Besides the transactions already in the reorder
1300 * buffer, we count the one we were directly passed.)
1301 */
1302 if (txn->nentries > 0)
1303 nr_txns++;
1304
1305 dlist_foreach(cur_txn_i, &txn->subtxns)
1306 {
1307 ReorderBufferTXN *cur_txn;
1308
1309 cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1310
1311 /* Check ordering of changes in this subtransaction. */
1312 AssertChangeLsnOrder(cur_txn);
1313
1314 if (cur_txn->nentries > 0)
1315 nr_txns++;
1316 }
1317
1318 /* allocate iteration state */
1322 sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1323
1324 state->nr_txns = nr_txns;
1325 dlist_init(&state->old_change);
1326
1327 for (off = 0; off < state->nr_txns; off++)
1328 {
1329 state->entries[off].file.vfd = -1;
1330 state->entries[off].segno = 0;
1331 }
1332
1333 /* allocate heap */
1334 state->heap = binaryheap_allocate(state->nr_txns,
1336 state);
1337
1338 /* Now that the state fields are initialized, it is safe to return it. */
1339 *iter_state = state;
1340
1341 /*
1342 * Now insert items into the binary heap, in an unordered fashion. (We
1343 * will run a heap assembly step at the end; this is more efficient.)
1344 */
1345
1346 off = 0;
1347
1348 /* add toplevel transaction if it contains changes */
1349 if (txn->nentries > 0)
1350 {
1351 ReorderBufferChange *cur_change;
1352
1353 if (rbtxn_is_serialized(txn))
1354 {
1355 /* serialize remaining changes */
1357 ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1358 &state->entries[off].segno);
1359 }
1360
1361 cur_change = dlist_head_element(ReorderBufferChange, node,
1362 &txn->changes);
1363
1364 state->entries[off].lsn = cur_change->lsn;
1365 state->entries[off].change = cur_change;
1366 state->entries[off].txn = txn;
1367
1369 }
1370
1371 /* add subtransactions if they contain changes */
1372 dlist_foreach(cur_txn_i, &txn->subtxns)
1373 {
1374 ReorderBufferTXN *cur_txn;
1375
1376 cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1377
1378 if (cur_txn->nentries > 0)
1379 {
1380 ReorderBufferChange *cur_change;
1381
1382 if (rbtxn_is_serialized(cur_txn))
1383 {
1384 /* serialize remaining changes */
1385 ReorderBufferSerializeTXN(rb, cur_txn);
1386 ReorderBufferRestoreChanges(rb, cur_txn,
1387 &state->entries[off].file,
1388 &state->entries[off].segno);
1389 }
1390 cur_change = dlist_head_element(ReorderBufferChange, node,
1391 &cur_txn->changes);
1392
1393 state->entries[off].lsn = cur_change->lsn;
1394 state->entries[off].change = cur_change;
1395 state->entries[off].txn = cur_txn;
1396
1398 }
1399 }
1400
1401 /* assemble a valid binary heap */
1402 binaryheap_build(state->heap);
1403}
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:138
void binaryheap_add_unordered(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:116
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:39
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:222
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
static void AssertChangeLsnOrder(ReorderBufferTXN *txn)
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)

References AssertChangeLsnOrder(), binaryheap_add_unordered(), binaryheap_allocate(), binaryheap_build(), ReorderBufferTXN::changes, ReorderBuffer::context, dlist_iter::cur, dlist_container, dlist_foreach, dlist_head_element, dlist_init(), Int32GetDatum(), ReorderBufferChange::lsn, MemoryContextAllocZero(), ReorderBufferTXN::nentries, rbtxn_is_serialized, ReorderBufferIterCompare(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), and ReorderBufferTXN::subtxns.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferIterTXNNext()

static ReorderBufferChange * ReorderBufferIterTXNNext ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1412 of file reorderbuffer.c.

1413{
1414 ReorderBufferChange *change;
1416 int32 off;
1417
1418 /* nothing there anymore */
1419 if (binaryheap_empty(state->heap))
1420 return NULL;
1421
1422 off = DatumGetInt32(binaryheap_first(state->heap));
1423 entry = &state->entries[off];
1424
1425 /* free memory we might have "leaked" in the previous *Next call */
1426 if (!dlist_is_empty(&state->old_change))
1427 {
1428 change = dlist_container(ReorderBufferChange, node,
1429 dlist_pop_head_node(&state->old_change));
1430 ReorderBufferFreeChange(rb, change, true);
1431 Assert(dlist_is_empty(&state->old_change));
1432 }
1433
1434 change = entry->change;
1435
1436 /*
1437 * update heap with information about which transaction has the next
1438 * relevant change in LSN order
1439 */
1440
1441 /* there are in-memory changes */
1442 if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1443 {
1444 dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1445 ReorderBufferChange *next_change =
1447
1448 /* txn stays the same */
1449 state->entries[off].lsn = next_change->lsn;
1450 state->entries[off].change = next_change;
1451
1453 return change;
1454 }
1455
1456 /* try to load changes from disk */
1457 if (entry->txn->nentries != entry->txn->nentries_mem)
1458 {
1459 /*
1460 * Ugly: restoring changes will reuse *Change records, thus delete the
1461 * current one from the per-tx list and only free in the next call.
1462 */
1463 dlist_delete(&change->node);
1464 dlist_push_tail(&state->old_change, &change->node);
1465
1466 /*
1467 * Update the total bytes processed by the txn for which we are
1468 * releasing the current set of changes and restoring the new set of
1469 * changes.
1470 */
1471 rb->totalBytes += entry->txn->size;
1472 if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1473 &state->entries[off].segno))
1474 {
1475 /* successfully restored changes from disk */
1476 ReorderBufferChange *next_change =
1478 &entry->txn->changes);
1479
1480 elog(DEBUG2, "restored %u/%u changes from disk",
1481 (uint32) entry->txn->nentries_mem,
1482 (uint32) entry->txn->nentries);
1483
1484 Assert(entry->txn->nentries_mem);
1485 /* txn stays the same */
1486 state->entries[off].lsn = next_change->lsn;
1487 state->entries[off].change = next_change;
1489
1490 return change;
1491 }
1492 }
1493
1494 /* ok, no changes there anymore, remove */
1496
1497 return change;
1498}
void binaryheap_replace_first(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:255
bh_node_type binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:177
bh_node_type binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:192
#define binaryheap_empty(h)
Definition: binaryheap.h:65
static int32 next
Definition: blutils.c:224
uint32_t uint32
Definition: c.h:552
static bool dlist_has_next(const dlist_head *head, const dlist_node *node)
Definition: ilist.h:503
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:537
ReorderBufferChange * change
ReorderBufferTXN * txn

References Assert(), binaryheap_empty, binaryheap_first(), binaryheap_remove_first(), binaryheap_replace_first(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, DatumGetInt32(), DEBUG2, dlist_container, dlist_delete(), dlist_has_next(), dlist_head_element, dlist_is_empty(), dlist_next_node(), dlist_pop_head_node(), dlist_push_tail(), elog, ReorderBufferIterTXNEntry::file, Int32GetDatum(), ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, next, ReorderBufferChange::node, ReorderBufferFreeChange(), ReorderBufferRestoreChanges(), ReorderBufferTXN::size, ReorderBuffer::totalBytes, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferLargestStreamableTopTXN()

static ReorderBufferTXN * ReorderBufferLargestStreamableTopTXN ( ReorderBuffer rb)
static

Definition at line 3846 of file reorderbuffer.c.

3847{
3848 dlist_iter iter;
3849 Size largest_size = 0;
3850 ReorderBufferTXN *largest = NULL;
3851
3852 /* Find the largest top-level transaction having a base snapshot. */
3854 {
3855 ReorderBufferTXN *txn;
3856
3857 txn = dlist_container(ReorderBufferTXN, base_snapshot_node, iter.cur);
3858
3859 /* must not be a subtxn */
3861 /* base_snapshot must be set */
3862 Assert(txn->base_snapshot != NULL);
3863
3864 /* Don't consider these kinds of transactions for eviction. */
3865 if (rbtxn_has_partial_change(txn) ||
3867 rbtxn_is_aborted(txn))
3868 continue;
3869
3870 /* Find the largest of the eviction candidates. */
3871 if ((largest == NULL || txn->total_size > largest_size) &&
3872 (txn->total_size > 0))
3873 {
3874 largest = txn;
3875 largest_size = txn->total_size;
3876 }
3877 }
3878
3879 return largest;
3880}
#define rbtxn_has_streamable_change(txn)
#define rbtxn_has_partial_change(txn)

References Assert(), ReorderBufferTXN::base_snapshot, dlist_iter::cur, dlist_container, dlist_foreach, rbtxn_has_partial_change, rbtxn_has_streamable_change, rbtxn_is_aborted, rbtxn_is_known_subxact, ReorderBufferTXN::total_size, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferCheckMemoryLimit().

◆ ReorderBufferLargestTXN()

static ReorderBufferTXN * ReorderBufferLargestTXN ( ReorderBuffer rb)
static

Definition at line 3805 of file reorderbuffer.c.

3806{
3807 ReorderBufferTXN *largest;
3808
3809 /* Get the largest transaction from the max-heap */
3810 largest = pairingheap_container(ReorderBufferTXN, txn_node,
3812
3813 Assert(largest);
3814 Assert(largest->size > 0);
3815 Assert(largest->size <= rb->size);
3816
3817 return largest;
3818}
pairingheap_node * pairingheap_first(pairingheap *heap)
Definition: pairingheap.c:144
#define pairingheap_container(type, membername, ptr)
Definition: pairingheap.h:43

References Assert(), pairingheap_container, pairingheap_first(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBuffer::txn_heap.

Referenced by ReorderBufferCheckMemoryLimit().

◆ ReorderBufferMaybeMarkTXNStreamed()

static void ReorderBufferMaybeMarkTXNStreamed ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 2138 of file reorderbuffer.c.

2139{
2140 /*
2141 * The top-level transaction, is marked as streamed always, even if it
2142 * does not contain any changes (that is, when all the changes are in
2143 * subtransactions).
2144 *
2145 * For subtransactions, we only mark them as streamed when there are
2146 * changes in them.
2147 *
2148 * We do it this way because of aborts - we don't want to send aborts for
2149 * XIDs the downstream is not aware of. And of course, it always knows
2150 * about the top-level xact (we send the XID in all messages), but we
2151 * never stream XIDs of empty subxacts.
2152 */
2153 if (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0))
2155}
#define RBTXN_IS_STREAMED

References ReorderBufferTXN::nentries_mem, RBTXN_IS_STREAMED, rbtxn_is_toptxn, and ReorderBufferTXN::txn_flags.

Referenced by ReorderBufferProcessTXN(), and ReorderBufferTruncateTXN().

◆ ReorderBufferPrepare()

void ReorderBufferPrepare ( ReorderBuffer rb,
TransactionId  xid,
char *  gid 
)

Definition at line 2960 of file reorderbuffer.c.

2962{
2963 ReorderBufferTXN *txn;
2964
2965 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2966 false);
2967
2968 /* unknown transaction, nothing to replay */
2969 if (txn == NULL)
2970 return;
2971
2972 /*
2973 * txn must have been marked as a prepared transaction and must have
2974 * neither been skipped nor sent a prepare. Also, the prepare info must
2975 * have been updated in it by now.
2976 */
2979
2980 txn->gid = pstrdup(gid);
2981
2982 ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2983 txn->prepare_time, txn->origin_id, txn->origin_lsn);
2984
2985 /*
2986 * Send a prepare if not already done so. This might occur if we have
2987 * detected a concurrent abort while replaying the non-streaming
2988 * transaction.
2989 */
2990 if (!rbtxn_sent_prepare(txn))
2991 {
2992 rb->prepare(rb, txn, txn->final_lsn);
2994 }
2995}
#define RBTXN_SENT_PREPARE
#define rbtxn_sent_prepare(txn)
ReorderBufferPrepareCB prepare

References Assert(), ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBuffer::prepare, ReorderBufferTXN::prepare_time, pstrdup(), RBTXN_IS_PREPARED, RBTXN_PREPARE_STATUS_MASK, RBTXN_SENT_PREPARE, rbtxn_sent_prepare, ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBufferTXN::txn_flags, and XLogRecPtrIsValid.

Referenced by DecodePrepare().

◆ ReorderBufferProcessPartialChange()

static void ReorderBufferProcessPartialChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  toast_insert 
)
static

Definition at line 741 of file reorderbuffer.c.

744{
745 ReorderBufferTXN *toptxn;
746
747 /*
748 * The partial changes need to be processed only while streaming
749 * in-progress transactions.
750 */
751 if (!ReorderBufferCanStream(rb))
752 return;
753
754 /* Get the top transaction. */
755 toptxn = rbtxn_get_toptxn(txn);
756
757 /*
758 * Indicate a partial change for toast inserts. The change will be
759 * considered as complete once we get the insert or update on the main
760 * table and we are sure that the pending toast chunks are not required
761 * anymore.
762 *
763 * If we allow streaming when there are pending toast chunks then such
764 * chunks won't be released till the insert (multi_insert) is complete and
765 * we expect the txn to have streamed all changes after streaming. This
766 * restriction is mainly to ensure the correctness of streamed
767 * transactions and it doesn't seem worth uplifting such a restriction
768 * just to allow this case because anyway we will stream the transaction
769 * once such an insert is complete.
770 */
771 if (toast_insert)
773 else if (rbtxn_has_partial_change(toptxn) &&
774 IsInsertOrUpdate(change->action) &&
776 toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
777
778 /*
779 * Indicate a partial change for speculative inserts. The change will be
780 * considered as complete once we get the speculative confirm or abort
781 * token.
782 */
783 if (IsSpecInsert(change->action))
785 else if (rbtxn_has_partial_change(toptxn) &&
787 toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
788
789 /*
790 * Stream the transaction if it is serialized before and the changes are
791 * now complete in the top-level transaction.
792 *
793 * The reason for doing the streaming of such a transaction as soon as we
794 * get the complete change for it is that previously it would have reached
795 * the memory threshold and wouldn't get streamed because of incomplete
796 * changes. Delaying such transactions would increase apply lag for them.
797 */
799 !(rbtxn_has_partial_change(toptxn)) &&
800 rbtxn_is_serialized(txn) &&
802 ReorderBufferStreamTXN(rb, toptxn);
803}
#define IsSpecInsert(action)
#define IsInsertOrUpdate(action)
#define IsSpecConfirmOrAbort(action)
#define RBTXN_HAS_PARTIAL_CHANGE

References ReorderBufferChange::action, ReorderBufferChange::clear_toast_afterwards, ReorderBufferChange::data, IsInsertOrUpdate, IsSpecConfirmOrAbort, IsSpecInsert, rbtxn_get_toptxn, RBTXN_HAS_PARTIAL_CHANGE, rbtxn_has_partial_change, rbtxn_has_streamable_change, rbtxn_is_serialized, ReorderBufferCanStartStreaming(), ReorderBufferCanStream(), ReorderBufferStreamTXN(), ReorderBufferChange::tp, and ReorderBufferTXN::txn_flags.

Referenced by ReorderBufferQueueChange().

◆ ReorderBufferProcessTXN()

static void ReorderBufferProcessTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn,
volatile Snapshot  snapshot_now,
volatile CommandId  command_id,
bool  streaming 
)
static

Definition at line 2211 of file reorderbuffer.c.

2216{
2217 bool using_subtxn;
2220 ReorderBufferIterTXNState *volatile iterstate = NULL;
2221 volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
2222 ReorderBufferChange *volatile specinsert = NULL;
2223 volatile bool stream_started = false;
2224 ReorderBufferTXN *volatile curtxn = NULL;
2225
2226 /* build data to be able to lookup the CommandIds of catalog tuples */
2228
2229 /* setup the initial snapshot */
2230 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2231
2232 /*
2233 * Decoding needs access to syscaches et al., which in turn use
2234 * heavyweight locks and such. Thus we need to have enough state around to
2235 * keep track of those. The easiest way is to simply use a transaction
2236 * internally. That also allows us to easily enforce that nothing writes
2237 * to the database by checking for xid assignments.
2238 *
2239 * When we're called via the SQL SRF there's already a transaction
2240 * started, so start an explicit subtransaction there.
2241 */
2242 using_subtxn = IsTransactionOrTransactionBlock();
2243
2244 PG_TRY();
2245 {
2246 ReorderBufferChange *change;
2247 int changes_count = 0; /* used to accumulate the number of
2248 * changes */
2249
2250 if (using_subtxn)
2251 BeginInternalSubTransaction(streaming ? "stream" : "replay");
2252 else
2254
2255 /*
2256 * We only need to send begin/begin-prepare for non-streamed
2257 * transactions.
2258 */
2259 if (!streaming)
2260 {
2261 if (rbtxn_is_prepared(txn))
2262 rb->begin_prepare(rb, txn);
2263 else
2264 rb->begin(rb, txn);
2265 }
2266
2267 ReorderBufferIterTXNInit(rb, txn, &iterstate);
2268 while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
2269 {
2270 Relation relation = NULL;
2271 Oid reloid;
2272
2274
2275 /*
2276 * We can't call start stream callback before processing first
2277 * change.
2278 */
2279 if (!XLogRecPtrIsValid(prev_lsn))
2280 {
2281 if (streaming)
2282 {
2283 txn->origin_id = change->origin_id;
2284 rb->stream_start(rb, txn, change->lsn);
2285 stream_started = true;
2286 }
2287 }
2288
2289 /*
2290 * Enforce correct ordering of changes, merged from multiple
2291 * subtransactions. The changes may have the same LSN due to
2292 * MULTI_INSERT xlog records.
2293 */
2294 Assert(!XLogRecPtrIsValid(prev_lsn) || prev_lsn <= change->lsn);
2295
2296 prev_lsn = change->lsn;
2297
2298 /*
2299 * Set the current xid to detect concurrent aborts. This is
2300 * required for the cases when we decode the changes before the
2301 * COMMIT record is processed.
2302 */
2303 if (streaming || rbtxn_is_prepared(change->txn))
2304 {
2305 curtxn = change->txn;
2306 SetupCheckXidLive(curtxn->xid);
2307 }
2308
2309 switch (change->action)
2310 {
2312
2313 /*
2314 * Confirmation for speculative insertion arrived. Simply
2315 * use as a normal record. It'll be cleaned up at the end
2316 * of INSERT processing.
2317 */
2318 if (specinsert == NULL)
2319 elog(ERROR, "invalid ordering of speculative insertion changes");
2320 Assert(specinsert->data.tp.oldtuple == NULL);
2321 change = specinsert;
2323
2324 /* intentionally fall through */
2328 Assert(snapshot_now);
2329
2330 reloid = RelidByRelfilenumber(change->data.tp.rlocator.spcOid,
2331 change->data.tp.rlocator.relNumber);
2332
2333 /*
2334 * Mapped catalog tuple without data, emitted while
2335 * catalog table was in the process of being rewritten. We
2336 * can fail to look up the relfilenumber, because the
2337 * relmapper has no "historic" view, in contrast to the
2338 * normal catalog during decoding. Thus repeated rewrites
2339 * can cause a lookup failure. That's OK because we do not
2340 * decode catalog changes anyway. Normally such tuples
2341 * would be skipped over below, but we can't identify
2342 * whether the table should be logically logged without
2343 * mapping the relfilenumber to the oid.
2344 */
2345 if (reloid == InvalidOid &&
2346 change->data.tp.newtuple == NULL &&
2347 change->data.tp.oldtuple == NULL)
2348 goto change_done;
2349 else if (reloid == InvalidOid)
2350 elog(ERROR, "could not map filenumber \"%s\" to relation OID",
2351 relpathperm(change->data.tp.rlocator,
2352 MAIN_FORKNUM).str);
2353
2354 relation = RelationIdGetRelation(reloid);
2355
2356 if (!RelationIsValid(relation))
2357 elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",
2358 reloid,
2359 relpathperm(change->data.tp.rlocator,
2360 MAIN_FORKNUM).str);
2361
2362 if (!RelationIsLogicallyLogged(relation))
2363 goto change_done;
2364
2365 /*
2366 * Ignore temporary heaps created during DDL unless the
2367 * plugin has asked for them.
2368 */
2369 if (relation->rd_rel->relrewrite && !rb->output_rewrites)
2370 goto change_done;
2371
2372 /*
2373 * For now ignore sequence changes entirely. Most of the
2374 * time they don't log changes using records we
2375 * understand, so it doesn't make sense to handle the few
2376 * cases we do.
2377 */
2378 if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
2379 goto change_done;
2380
2381 /* user-triggered change */
2382 if (!IsToastRelation(relation))
2383 {
2384 ReorderBufferToastReplace(rb, txn, relation, change);
2385 ReorderBufferApplyChange(rb, txn, relation, change,
2386 streaming);
2387
2388 /*
2389 * Only clear reassembled toast chunks if we're sure
2390 * they're not required anymore. The creator of the
2391 * tuple tells us.
2392 */
2393 if (change->data.tp.clear_toast_afterwards)
2394 ReorderBufferToastReset(rb, txn);
2395 }
2396 /* we're not interested in toast deletions */
2397 else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
2398 {
2399 /*
2400 * Need to reassemble the full toasted Datum in
2401 * memory, to ensure the chunks don't get reused till
2402 * we're done remove it from the list of this
2403 * transaction's changes. Otherwise it will get
2404 * freed/reused while restoring spooled data from
2405 * disk.
2406 */
2407 Assert(change->data.tp.newtuple != NULL);
2408
2409 dlist_delete(&change->node);
2410 ReorderBufferToastAppendChunk(rb, txn, relation,
2411 change);
2412 }
2413
2414 change_done:
2415
2416 /*
2417 * If speculative insertion was confirmed, the record
2418 * isn't needed anymore.
2419 */
2420 if (specinsert != NULL)
2421 {
2422 ReorderBufferFreeChange(rb, specinsert, true);
2423 specinsert = NULL;
2424 }
2425
2426 if (RelationIsValid(relation))
2427 {
2428 RelationClose(relation);
2429 relation = NULL;
2430 }
2431 break;
2432
2434
2435 /*
2436 * Speculative insertions are dealt with by delaying the
2437 * processing of the insert until the confirmation record
2438 * arrives. For that we simply unlink the record from the
2439 * chain, so it does not get freed/reused while restoring
2440 * spooled data from disk.
2441 *
2442 * This is safe in the face of concurrent catalog changes
2443 * because the relevant relation can't be changed between
2444 * speculative insertion and confirmation due to
2445 * CheckTableNotInUse() and locking.
2446 */
2447
2448 /* clear out a pending (and thus failed) speculation */
2449 if (specinsert != NULL)
2450 {
2451 ReorderBufferFreeChange(rb, specinsert, true);
2452 specinsert = NULL;
2453 }
2454
2455 /* and memorize the pending insertion */
2456 dlist_delete(&change->node);
2457 specinsert = change;
2458 break;
2459
2461
2462 /*
2463 * Abort for speculative insertion arrived. So cleanup the
2464 * specinsert tuple and toast hash.
2465 *
2466 * Note that we get the spec abort change for each toast
2467 * entry but we need to perform the cleanup only the first
2468 * time we get it for the main table.
2469 */
2470 if (specinsert != NULL)
2471 {
2472 /*
2473 * We must clean the toast hash before processing a
2474 * completely new tuple to avoid confusion about the
2475 * previous tuple's toast chunks.
2476 */
2478 ReorderBufferToastReset(rb, txn);
2479
2480 /* We don't need this record anymore. */
2481 ReorderBufferFreeChange(rb, specinsert, true);
2482 specinsert = NULL;
2483 }
2484 break;
2485
2487 {
2488 int i;
2489 int nrelids = change->data.truncate.nrelids;
2490 int nrelations = 0;
2491 Relation *relations;
2492
2493 relations = palloc0(nrelids * sizeof(Relation));
2494 for (i = 0; i < nrelids; i++)
2495 {
2496 Oid relid = change->data.truncate.relids[i];
2497 Relation rel;
2498
2499 rel = RelationIdGetRelation(relid);
2500
2501 if (!RelationIsValid(rel))
2502 elog(ERROR, "could not open relation with OID %u", relid);
2503
2504 if (!RelationIsLogicallyLogged(rel))
2505 continue;
2506
2507 relations[nrelations++] = rel;
2508 }
2509
2510 /* Apply the truncate. */
2511 ReorderBufferApplyTruncate(rb, txn, nrelations,
2512 relations, change,
2513 streaming);
2514
2515 for (i = 0; i < nrelations; i++)
2516 RelationClose(relations[i]);
2517
2518 break;
2519 }
2520
2522 ReorderBufferApplyMessage(rb, txn, change, streaming);
2523 break;
2524
2526 /* Execute the invalidation messages locally */
2528 change->data.inval.invalidations);
2529 break;
2530
2532 /* get rid of the old */
2534
2535 if (snapshot_now->copied)
2536 {
2537 ReorderBufferFreeSnap(rb, snapshot_now);
2538 snapshot_now =
2540 txn, command_id);
2541 }
2542
2543 /*
2544 * Restored from disk, need to be careful not to double
2545 * free. We could introduce refcounting for that, but for
2546 * now this seems infrequent enough not to care.
2547 */
2548 else if (change->data.snapshot->copied)
2549 {
2550 snapshot_now =
2552 txn, command_id);
2553 }
2554 else
2555 {
2556 snapshot_now = change->data.snapshot;
2557 }
2558
2559 /* and continue with the new one */
2560 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2561 break;
2562
2565
2566 if (command_id < change->data.command_id)
2567 {
2568 command_id = change->data.command_id;
2569
2570 if (!snapshot_now->copied)
2571 {
2572 /* we don't use the global one anymore */
2573 snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2574 txn, command_id);
2575 }
2576
2577 snapshot_now->curcid = command_id;
2578
2580 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2581 }
2582
2583 break;
2584
2586 elog(ERROR, "tuplecid value in changequeue");
2587 break;
2588 }
2589
2590 /*
2591 * It is possible that the data is not sent to downstream for a
2592 * long time either because the output plugin filtered it or there
2593 * is a DDL that generates a lot of data that is not processed by
2594 * the plugin. So, in such cases, the downstream can timeout. To
2595 * avoid that we try to send a keepalive message if required.
2596 * Trying to send a keepalive message after every change has some
2597 * overhead, but testing showed there is no noticeable overhead if
2598 * we do it after every ~100 changes.
2599 */
2600#define CHANGES_THRESHOLD 100
2601
2602 if (++changes_count >= CHANGES_THRESHOLD)
2603 {
2604 rb->update_progress_txn(rb, txn, prev_lsn);
2605 changes_count = 0;
2606 }
2607 }
2608
2609 /* speculative insertion record must be freed by now */
2610 Assert(!specinsert);
2611
2612 /* clean up the iterator */
2613 ReorderBufferIterTXNFinish(rb, iterstate);
2614 iterstate = NULL;
2615
2616 /*
2617 * Update total transaction count and total bytes processed by the
2618 * transaction and its subtransactions. Ensure to not count the
2619 * streamed transaction multiple times.
2620 *
2621 * Note that the statistics computation has to be done after
2622 * ReorderBufferIterTXNFinish as it releases the serialized change
2623 * which we have already accounted in ReorderBufferIterTXNNext.
2624 */
2625 if (!rbtxn_is_streamed(txn))
2626 rb->totalTxns++;
2627
2628 rb->totalBytes += txn->total_size;
2629
2630 /*
2631 * Done with current changes, send the last message for this set of
2632 * changes depending upon streaming mode.
2633 */
2634 if (streaming)
2635 {
2636 if (stream_started)
2637 {
2638 rb->stream_stop(rb, txn, prev_lsn);
2639 stream_started = false;
2640 }
2641 }
2642 else
2643 {
2644 /*
2645 * Call either PREPARE (for two-phase transactions) or COMMIT (for
2646 * regular ones).
2647 */
2648 if (rbtxn_is_prepared(txn))
2649 {
2651 rb->prepare(rb, txn, commit_lsn);
2653 }
2654 else
2655 rb->commit(rb, txn, commit_lsn);
2656 }
2657
2658 /* this is just a sanity check against bad output plugin behaviour */
2660 elog(ERROR, "output plugin used XID %u",
2662
2663 /*
2664 * Remember the command ID and snapshot for the next set of changes in
2665 * streaming mode.
2666 */
2667 if (streaming)
2668 ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2669 else if (snapshot_now->copied)
2670 ReorderBufferFreeSnap(rb, snapshot_now);
2671
2672 /* cleanup */
2674
2675 /*
2676 * Aborting the current (sub-)transaction as a whole has the right
2677 * semantics. We want all locks acquired in here to be released, not
2678 * reassigned to the parent and we do not want any database access
2679 * have persistent effects.
2680 */
2682
2683 /* make sure there's no cache pollution */
2685 {
2688 }
2689 else
2690 {
2694 }
2695
2696 if (using_subtxn)
2697 {
2700 CurrentResourceOwner = cowner;
2701 }
2702
2703 /*
2704 * We are here due to one of the four reasons: 1. Decoding an
2705 * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
2706 * prepared txn that was (partially) streamed. 4. Decoding a committed
2707 * txn.
2708 *
2709 * For 1, we allow truncation of txn data by removing the changes
2710 * already streamed but still keeping other things like invalidations,
2711 * snapshot, and tuplecids. For 2 and 3, we indicate
2712 * ReorderBufferTruncateTXN to do more elaborate truncation of txn
2713 * data as the entire transaction has been decoded except for commit.
2714 * For 4, as the entire txn has been decoded, we can fully clean up
2715 * the TXN reorder buffer.
2716 */
2717 if (streaming || rbtxn_is_prepared(txn))
2718 {
2719 if (streaming)
2721
2723 /* Reset the CheckXidAlive */
2725 }
2726 else
2727 ReorderBufferCleanupTXN(rb, txn);
2728 }
2729 PG_CATCH();
2730 {
2732 ErrorData *errdata = CopyErrorData();
2733
2734 /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
2735 if (iterstate)
2736 ReorderBufferIterTXNFinish(rb, iterstate);
2737
2739
2740 /*
2741 * Force cache invalidation to happen outside of a valid transaction
2742 * to prevent catalog access as we just caught an error.
2743 */
2745
2746 /* make sure there's no cache pollution */
2748 {
2751 }
2752 else
2753 {
2757 }
2758
2759 if (using_subtxn)
2760 {
2763 CurrentResourceOwner = cowner;
2764 }
2765
2766 /*
2767 * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
2768 * abort of the (sub)transaction we are streaming or preparing. We
2769 * need to do the cleanup and return gracefully on this error, see
2770 * SetupCheckXidLive.
2771 *
2772 * This error code can be thrown by one of the callbacks we call
2773 * during decoding so we need to ensure that we return gracefully only
2774 * when we are sending the data in streaming mode and the streaming is
2775 * not finished yet or when we are sending the data out on a PREPARE
2776 * during a two-phase commit.
2777 */
2778 if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
2779 (stream_started || rbtxn_is_prepared(txn)))
2780 {
2781 /* curtxn must be set for streaming or prepared transactions */
2782 Assert(curtxn);
2783
2784 /* Cleanup the temporary error state. */
2786 FreeErrorData(errdata);
2787 errdata = NULL;
2788
2789 /* Remember the transaction is aborted. */
2790 Assert(!rbtxn_is_committed(curtxn));
2791 curtxn->txn_flags |= RBTXN_IS_ABORTED;
2792
2793 /* Mark the transaction is streamed if appropriate */
2794 if (stream_started)
2796
2797 /* Reset the TXN so that it is allowed to stream remaining data. */
2798 ReorderBufferResetTXN(rb, txn, snapshot_now,
2799 command_id, prev_lsn,
2800 specinsert);
2801 }
2802 else
2803 {
2804 ReorderBufferCleanupTXN(rb, txn);
2806 PG_RE_THROW();
2807 }
2808 }
2809 PG_END_TRY();
2810}
bool IsToastRelation(Relation relation)
Definition: catalog.c:206
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1835
ErrorData * CopyErrorData(void)
Definition: elog.c:1763
void FlushErrorState(void)
Definition: elog.c:1884
#define PG_RE_THROW()
Definition: elog.h:405
#define PG_TRY(...)
Definition: elog.h:372
#define PG_END_TRY(...)
Definition: elog.h:397
#define PG_CATCH(...)
Definition: elog.h:382
void InvalidateSystemCaches(void)
Definition: inval.c:916
void * palloc0(Size size)
Definition: mcxt.c:1395
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
const void * data
#define InvalidOid
Definition: postgres_ext.h:37
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:711
#define RelationIsValid(relation)
Definition: rel.h:490
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2099
void RelationClose(Relation relation)
Definition: relcache.c:2220
Oid RelidByRelfilenumber(Oid reltablespace, RelFileNumber relfilenumber)
@ MAIN_FORKNUM
Definition: relpath.h:58
#define relpathperm(rlocator, forknum)
Definition: relpath.h:146
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
static void ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
#define CHANGES_THRESHOLD
static void ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
static void SetupCheckXidLive(TransactionId xid)
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
static void ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
static void ReorderBufferMaybeMarkTXNStreamed(ReorderBuffer *rb, ReorderBufferTXN *txn)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:1685
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:1669
int sqlerrcode
Definition: elog.h:431
RelFileNumber relNumber
Form_pg_class rd_rel
Definition: rel.h:111
RelFileLocator rlocator
Definition: reorderbuffer.h:98
RepOriginId origin_id
Definition: reorderbuffer.h:86
ReorderBufferBeginCB begin_prepare
ReorderBufferUpdateProgressTxnCB update_progress_txn
ReorderBufferStreamStopCB stream_stop
ReorderBufferCommitCB commit
ReorderBufferStreamStartCB stream_start
ReorderBufferBeginCB begin
TransactionId CheckXidAlive
Definition: xact.c:100
void StartTransactionCommand(void)
Definition: xact.c:3077
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:472
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:455

References AbortCurrentTransaction(), ReorderBufferChange::action, Assert(), ReorderBuffer::begin, ReorderBuffer::begin_prepare, BeginInternalSubTransaction(), CHANGES_THRESHOLD, CHECK_FOR_INTERRUPTS, CheckXidAlive, ReorderBufferChange::clear_toast_afterwards, ReorderBufferChange::command_id, ReorderBuffer::commit, SnapshotData::copied, CopyErrorData(), SnapshotData::curcid, CurrentMemoryContext, CurrentResourceOwner, ReorderBufferChange::data, data, dlist_delete(), elog, ERROR, FlushErrorState(), FreeErrorData(), GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), i, ReorderBufferChange::inval, InvalidateSystemCaches(), ReorderBufferChange::invalidations, ReorderBufferTXN::invalidations, ReorderBufferTXN::invalidations_distributed, InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, MemoryContextSwitchTo(), ReorderBufferChange::newtuple, ReorderBufferChange::ninvalidations, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::ninvalidations_distributed, ReorderBufferChange::node, ReorderBufferChange::nrelids, ReorderBufferChange::oldtuple, ReorderBufferChange::origin_id, ReorderBufferTXN::origin_id, ReorderBuffer::output_rewrites, palloc0(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, ReorderBuffer::prepare, rbtxn_distr_inval_overflowed, RBTXN_IS_ABORTED, rbtxn_is_committed, rbtxn_is_prepared, rbtxn_is_streamed, RBTXN_SENT_PREPARE, rbtxn_sent_prepare, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelationIsValid, RelidByRelfilenumber(), ReorderBufferChange::relids, RelFileLocator::relNumber, relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferApplyChange(), ReorderBufferApplyMessage(), ReorderBufferApplyTruncate(), ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeChange(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferMaybeMarkTXNStreamed(), ReorderBufferResetTXN(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), ReorderBufferChange::rlocator, RollbackAndReleaseCurrentSubTransaction(), SetupCheckXidLive(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, RelFileLocator::spcOid, ErrorData::sqlerrcode, StartTransactionCommand(), ReorderBuffer::stream_start, ReorderBuffer::stream_stop, TeardownHistoricSnapshot(), ReorderBufferTXN::total_size, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTXN::tuplecid_hash, ReorderBufferChange::txn, ReorderBufferTXN::txn_flags, ReorderBuffer::update_progress_txn, ReorderBufferTXN::xid, and XLogRecPtrIsValid.

Referenced by ReorderBufferReplay(), and ReorderBufferStreamTXN().

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3295 of file reorderbuffer.c.

3296{
3297 /* many records won't have an xid assigned, centralize check here */
3298 if (xid != InvalidTransactionId)
3299 ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3300}

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by heap2_decode(), heap_decode(), LogicalDecodingProcessRecord(), logicalmsg_decode(), standby_decode(), xact_decode(), and xlog_decode().

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChange change,
bool  toast_insert 
)

Definition at line 810 of file reorderbuffer.c.

812{
813 ReorderBufferTXN *txn;
814
815 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
816
817 /*
818 * If we have detected that the transaction is aborted while streaming the
819 * previous changes or by checking its CLOG, there is no point in
820 * collecting further changes for it.
821 */
822 if (rbtxn_is_aborted(txn))
823 {
824 /*
825 * We don't need to update memory accounting for this change as we
826 * have not added it to the queue yet.
827 */
828 ReorderBufferFreeChange(rb, change, false);
829 return;
830 }
831
832 /*
833 * The changes that are sent downstream are considered streamable. We
834 * remember such transactions so that only those will later be considered
835 * for streaming.
836 */
837 if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
843 {
844 ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
845
847 }
848
849 change->lsn = lsn;
850 change->txn = txn;
851
853 dlist_push_tail(&txn->changes, &change->node);
854 txn->nentries++;
855 txn->nentries_mem++;
856
857 /* update memory accounting information */
858 ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
860
861 /* process partial change */
862 ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
863
864 /* check the memory limits and evict something if needed */
866}
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
#define RBTXN_HAS_STREAMABLE_CHANGE

References ReorderBufferChange::action, Assert(), ReorderBufferTXN::changes, dlist_push_tail(), ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, rbtxn_get_toptxn, RBTXN_HAS_STREAMABLE_CHANGE, rbtxn_is_aborted, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferCheckMemoryLimit(), ReorderBufferFreeChange(), ReorderBufferProcessPartialChange(), ReorderBufferTXNByXid(), ReorderBufferChange::txn, ReorderBufferTXN::txn_flags, and XLogRecPtrIsValid.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), ReorderBufferQueueInvalidations(), and ReorderBufferQueueMessage().

◆ ReorderBufferQueueInvalidations()

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer rb,
TransactionId  xid,
Snapshot  snap,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 873 of file reorderbuffer.c.

877{
878 if (transactional)
879 {
880 MemoryContext oldcontext;
881 ReorderBufferChange *change;
882
884
885 /*
886 * We don't expect snapshots for transactional changes - we'll use the
887 * snapshot derived later during apply (unless the change gets
888 * skipped).
889 */
890 Assert(!snap);
891
892 oldcontext = MemoryContextSwitchTo(rb->context);
893
894 change = ReorderBufferAllocChange(rb);
896 change->data.msg.prefix = pstrdup(prefix);
897 change->data.msg.message_size = message_size;
898 change->data.msg.message = palloc(message_size);
899 memcpy(change->data.msg.message, message, message_size);
900
901 ReorderBufferQueueChange(rb, xid, lsn, change, false);
902
903 MemoryContextSwitchTo(oldcontext);
904 }
905 else
906 {
907 ReorderBufferTXN *txn = NULL;
908 volatile Snapshot snapshot_now = snap;
909
910 /* Non-transactional changes require a valid snapshot. */
911 Assert(snapshot_now);
912
913 if (xid != InvalidTransactionId)
914 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
915
916 /* setup snapshot to allow catalog access */
917 SetupHistoricSnapshot(snapshot_now, NULL);
918 PG_TRY();
919 {
920 rb->message(rb, txn, lsn, false, prefix, message_size, message);
921
923 }
924 PG_CATCH();
925 {
927 PG_RE_THROW();
928 }
929 PG_END_TRY();
930 }
931}
void * palloc(Size size)
Definition: mcxt.c:1365

References ReorderBufferChange::action, Assert(), ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBufferChange::message, ReorderBuffer::message, ReorderBufferChange::message_size, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, ReorderBufferChange::prefix, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferAllocChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), and TeardownHistoricSnapshot().

Referenced by logicalmsg_decode().

◆ ReorderBufferRememberPrepareInfo()

bool ReorderBufferRememberPrepareInfo ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  prepare_lsn,
XLogRecPtr  end_lsn,
TimestampTz  prepare_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2907 of file reorderbuffer.c.

2911{
2912 ReorderBufferTXN *txn;
2913
2914 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2915
2916 /* unknown transaction, nothing to do */
2917 if (txn == NULL)
2918 return false;
2919
2920 /*
2921 * Remember the prepare information to be later used by commit prepared in
2922 * case we skip doing prepare.
2923 */
2924 txn->final_lsn = prepare_lsn;
2925 txn->end_lsn = end_lsn;
2926 txn->prepare_time = prepare_time;
2927 txn->origin_id = origin_id;
2928 txn->origin_lsn = origin_lsn;
2929
2930 /* Mark this transaction as a prepared transaction */
2933
2934 return true;
2935}

References Assert(), ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, RBTXN_IS_PREPARED, RBTXN_PREPARE_STATUS_MASK, ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by DecodePrepare().

◆ ReorderBufferReplay()

static void ReorderBufferReplay ( ReorderBufferTXN txn,
ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)
static

Definition at line 2823 of file reorderbuffer.c.

2828{
2829 Snapshot snapshot_now;
2830 CommandId command_id = FirstCommandId;
2831
2832 txn->final_lsn = commit_lsn;
2833 txn->end_lsn = end_lsn;
2834 txn->commit_time = commit_time;
2835 txn->origin_id = origin_id;
2836 txn->origin_lsn = origin_lsn;
2837
2838 /*
2839 * If the transaction was (partially) streamed, we need to commit it in a
2840 * 'streamed' way. That is, we first stream the remaining part of the
2841 * transaction, and then invoke stream_commit message.
2842 *
2843 * Called after everything (origin ID, LSN, ...) is stored in the
2844 * transaction to avoid passing that information directly.
2845 */
2846 if (rbtxn_is_streamed(txn))
2847 {
2849 return;
2850 }
2851
2852 /*
2853 * If this transaction has no snapshot, it didn't make any changes to the
2854 * database, so there's nothing to decode. Note that
2855 * ReorderBufferCommitChild will have transferred any snapshots from
2856 * subtransactions if there were any.
2857 */
2858 if (txn->base_snapshot == NULL)
2859 {
2860 Assert(txn->ninvalidations == 0);
2861
2862 /*
2863 * Removing this txn before a commit might result in the computation
2864 * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
2865 */
2866 if (!rbtxn_is_prepared(txn))
2867 ReorderBufferCleanupTXN(rb, txn);
2868 return;
2869 }
2870
2871 snapshot_now = txn->base_snapshot;
2872
2873 /* Process and send the changes to output plugin. */
2874 ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2875 command_id, false);
2876}
#define FirstCommandId
Definition: c.h:687
uint32 CommandId
Definition: c.h:685
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, FirstCommandId, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, rbtxn_is_prepared, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), and ReorderBufferStreamCommit().

Referenced by ReorderBufferCommit(), ReorderBufferFinishPrepared(), and ReorderBufferPrepare().

◆ ReorderBufferResetTXN()

static void ReorderBufferResetTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
Snapshot  snapshot_now,
CommandId  command_id,
XLogRecPtr  last_lsn,
ReorderBufferChange specinsert 
)
static

Definition at line 2165 of file reorderbuffer.c.

2170{
2171 /* Discard the changes that we just streamed */
2173
2174 /* Free all resources allocated for toast reconstruction */
2175 ReorderBufferToastReset(rb, txn);
2176
2177 /* Return the spec insert change if it is not NULL */
2178 if (specinsert != NULL)
2179 {
2180 ReorderBufferFreeChange(rb, specinsert, true);
2181 specinsert = NULL;
2182 }
2183
2184 /*
2185 * For the streaming case, stop the stream and remember the command ID and
2186 * snapshot for the streaming run.
2187 */
2188 if (rbtxn_is_streamed(txn))
2189 {
2190 rb->stream_stop(rb, txn, last_lsn);
2191 ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2192 }
2193
2194 /* All changes must be deallocated */
2195 Assert(txn->size == 0);
2196}

References Assert(), rbtxn_is_prepared, rbtxn_is_streamed, ReorderBufferFreeChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), ReorderBufferTXN::size, and ReorderBuffer::stream_stop.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferRestoreChange()

static void ReorderBufferRestoreChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
char *  data 
)
static

Definition at line 4687 of file reorderbuffer.c.

4689{
4691 ReorderBufferChange *change;
4692
4693 ondisk = (ReorderBufferDiskChange *) data;
4694
4695 change = ReorderBufferAllocChange(rb);
4696
4697 /* copy static part */
4698 memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
4699
4700 data += sizeof(ReorderBufferDiskChange);
4701
4702 /* restore individual stuff */
4703 switch (change->action)
4704 {
4705 /* fall through these, they're all similar enough */
4710 if (change->data.tp.oldtuple)
4711 {
4712 uint32 tuplelen = ((HeapTuple) data)->t_len;
4713
4714 change->data.tp.oldtuple =
4716
4717 /* restore ->tuple */
4718 memcpy(change->data.tp.oldtuple, data,
4719 sizeof(HeapTupleData));
4720 data += sizeof(HeapTupleData);
4721
4722 /* reset t_data pointer into the new tuplebuf */
4723 change->data.tp.oldtuple->t_data =
4724 (HeapTupleHeader) ((char *) change->data.tp.oldtuple + HEAPTUPLESIZE);
4725
4726 /* restore tuple data itself */
4727 memcpy(change->data.tp.oldtuple->t_data, data, tuplelen);
4728 data += tuplelen;
4729 }
4730
4731 if (change->data.tp.newtuple)
4732 {
4733 /* here, data might not be suitably aligned! */
4734 uint32 tuplelen;
4735
4736 memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
4737 sizeof(uint32));
4738
4739 change->data.tp.newtuple =
4741
4742 /* restore ->tuple */
4743 memcpy(change->data.tp.newtuple, data,
4744 sizeof(HeapTupleData));
4745 data += sizeof(HeapTupleData);
4746
4747 /* reset t_data pointer into the new tuplebuf */
4748 change->data.tp.newtuple->t_data =
4749 (HeapTupleHeader) ((char *) change->data.tp.newtuple + HEAPTUPLESIZE);
4750
4751 /* restore tuple data itself */
4752 memcpy(change->data.tp.newtuple->t_data, data, tuplelen);
4753 data += tuplelen;
4754 }
4755
4756 break;
4758 {
4759 Size prefix_size;
4760
4761 /* read prefix */
4762 memcpy(&prefix_size, data, sizeof(Size));
4763 data += sizeof(Size);
4765 prefix_size);
4766 memcpy(change->data.msg.prefix, data, prefix_size);
4767 Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
4768 data += prefix_size;
4769
4770 /* read the message */
4771 memcpy(&change->data.msg.message_size, data, sizeof(Size));
4772 data += sizeof(Size);
4774 change->data.msg.message_size);
4775 memcpy(change->data.msg.message, data,
4776 change->data.msg.message_size);
4777 data += change->data.msg.message_size;
4778
4779 break;
4780 }
4782 {
4783 Size inval_size = sizeof(SharedInvalidationMessage) *
4784 change->data.inval.ninvalidations;
4785
4786 change->data.inval.invalidations =
4787 MemoryContextAlloc(rb->context, inval_size);
4788
4789 /* read the message */
4790 memcpy(change->data.inval.invalidations, data, inval_size);
4791
4792 break;
4793 }
4795 {
4796 Snapshot oldsnap;
4797 Snapshot newsnap;
4798 Size size;
4799
4800 oldsnap = (Snapshot) data;
4801
4802 size = sizeof(SnapshotData) +
4803 sizeof(TransactionId) * oldsnap->xcnt +
4804 sizeof(TransactionId) * (oldsnap->subxcnt + 0);
4805
4806 change->data.snapshot = MemoryContextAllocZero(rb->context, size);
4807
4808 newsnap = change->data.snapshot;
4809
4810 memcpy(newsnap, data, size);
4811 newsnap->xip = (TransactionId *)
4812 (((char *) newsnap) + sizeof(SnapshotData));
4813 newsnap->subxip = newsnap->xip + newsnap->xcnt;
4814 newsnap->copied = true;
4815 break;
4816 }
4817 /* the base struct contains all the data, easy peasy */
4819 {
4820 Oid *relids;
4821
4822 relids = ReorderBufferAllocRelids(rb, change->data.truncate.nrelids);
4823 memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
4824 change->data.truncate.relids = relids;
4825
4826 break;
4827 }
4832 break;
4833 }
4834
4835 dlist_push_tail(&txn->changes, &change->node);
4836 txn->nentries_mem++;
4837
4838 /*
4839 * Update memory accounting for the restored change. We need to do this
4840 * although we don't check the memory limit when restoring the changes in
4841 * this branch (we only do that when initially queueing the changes after
4842 * decoding), because we will release the changes later, and that will
4843 * update the accounting too (subtracting the size from the counters). And
4844 * we don't want to underflow there.
4845 */
4846 ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
4847 ReorderBufferChangeSize(change));
4848}
struct ReorderBufferDiskChange ReorderBufferDiskChange
HeapTuple ReorderBufferAllocTupleBuf(ReorderBuffer *rb, Size tuple_len)
Oid * ReorderBufferAllocRelids(ReorderBuffer *rb, int nrelids)
struct SnapshotData * Snapshot
Definition: snapshot.h:117
ReorderBufferChange change

References ReorderBufferChange::action, Assert(), ReorderBufferDiskChange::change, ReorderBufferTXN::changes, ReorderBuffer::context, SnapshotData::copied, ReorderBufferChange::data, data, dlist_push_tail(), HEAPTUPLESIZE, ReorderBufferChange::inval, ReorderBufferChange::invalidations, MemoryContextAlloc(), MemoryContextAllocZero(), ReorderBufferChange::message, ReorderBufferChange::message_size, ReorderBufferChange::msg, ReorderBufferTXN::nentries_mem, ReorderBufferChange::newtuple, ReorderBufferChange::ninvalidations, ReorderBufferChange::node, ReorderBufferChange::nrelids, ReorderBufferChange::oldtuple, ReorderBufferChange::prefix, ReorderBufferChange::relids, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferAllocChange(), ReorderBufferAllocRelids(), ReorderBufferAllocTupleBuf(), ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), SizeofHeapTupleHeader, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, HeapTupleData::t_data, ReorderBufferChange::tp, ReorderBufferChange::truncate, SnapshotData::xcnt, and SnapshotData::xip.

Referenced by ReorderBufferRestoreChanges().

◆ ReorderBufferRestoreChanges()

static Size ReorderBufferRestoreChanges ( ReorderBuffer rb,
ReorderBufferTXN txn,
TXNEntryFile file,
XLogSegNo segno 
)
static

Definition at line 4544 of file reorderbuffer.c.

4546{
4547 Size restored = 0;
4548 XLogSegNo last_segno;
4549 dlist_mutable_iter cleanup_iter;
4550 File *fd = &file->vfd;
4551
4554
4555 /* free current entries, so we have memory for more */
4556 dlist_foreach_modify(cleanup_iter, &txn->changes)
4557 {
4559 dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
4560
4561 dlist_delete(&cleanup->node);
4563 }
4564 txn->nentries_mem = 0;
4566
4567 XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
4568
4569 while (restored < max_changes_in_memory && *segno <= last_segno)
4570 {
4571 int readBytes;
4573
4575
4576 if (*fd == -1)
4577 {
4578 char path[MAXPGPATH];
4579
4580 /* first time in */
4581 if (*segno == 0)
4582 XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
4583
4584 Assert(*segno != 0 || dlist_is_empty(&txn->changes));
4585
4586 /*
4587 * No need to care about TLIs here, only used during a single run,
4588 * so each LSN only maps to a specific WAL record.
4589 */
4591 *segno);
4592
4593 *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
4594
4595 /* No harm in resetting the offset even in case of failure */
4596 file->curOffset = 0;
4597
4598 if (*fd < 0 && errno == ENOENT)
4599 {
4600 *fd = -1;
4601 (*segno)++;
4602 continue;
4603 }
4604 else if (*fd < 0)
4605 ereport(ERROR,
4607 errmsg("could not open file \"%s\": %m",
4608 path)));
4609 }
4610
4611 /*
4612 * Read the statically sized part of a change which has information
4613 * about the total size. If we couldn't read a record, we're at the
4614 * end of this file.
4615 */
4617 readBytes = FileRead(file->vfd, rb->outbuf,
4619 file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
4620
4621 /* eof */
4622 if (readBytes == 0)
4623 {
4624 FileClose(*fd);
4625 *fd = -1;
4626 (*segno)++;
4627 continue;
4628 }
4629 else if (readBytes < 0)
4630 ereport(ERROR,
4632 errmsg("could not read from reorderbuffer spill file: %m")));
4633 else if (readBytes != sizeof(ReorderBufferDiskChange))
4634 ereport(ERROR,
4636 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4637 readBytes,
4638 (uint32) sizeof(ReorderBufferDiskChange))));
4639
4640 file->curOffset += readBytes;
4641
4642 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4643
4645 sizeof(ReorderBufferDiskChange) + ondisk->size);
4646 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4647
4648 readBytes = FileRead(file->vfd,
4649 rb->outbuf + sizeof(ReorderBufferDiskChange),
4650 ondisk->size - sizeof(ReorderBufferDiskChange),
4651 file->curOffset,
4652 WAIT_EVENT_REORDER_BUFFER_READ);
4653
4654 if (readBytes < 0)
4655 ereport(ERROR,
4657 errmsg("could not read from reorderbuffer spill file: %m")));
4658 else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
4659 ereport(ERROR,
4661 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4662 readBytes,
4663 (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
4664
4665 file->curOffset += readBytes;
4666
4667 /*
4668 * ok, read a full change from disk, now restore it into proper
4669 * in-memory format
4670 */
4671 ReorderBufferRestoreChange(rb, txn, rb->outbuf);
4672 restored++;
4673 }
4674
4675 return restored;
4676}
static void cleanup(void)
Definition: bootstrap.c:715
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1559
static ssize_t FileRead(File file, void *buffer, size_t amount, pgoff_t offset, uint32 wait_event_info)
Definition: fd.h:214
int File
Definition: fd.h:51
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *data)
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
static const Size max_changes_in_memory
int wal_segment_size
Definition: xlog.c:145
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition: xlogdefs.h:52

References Assert(), ReorderBufferTXN::changes, CHECK_FOR_INTERRUPTS, cleanup(), dlist_mutable_iter::cur, TXNEntryFile::curOffset, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FileClose(), FileRead(), ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, max_changes_in_memory, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBuffer::outbuf, PathNameOpenFile(), PG_BINARY, ReorderBufferFreeChange(), ReorderBufferRestoreChange(), ReorderBufferSerializedPath(), ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, TXNEntryFile::vfd, wal_segment_size, ReorderBufferTXN::xid, XLByteToSeg, and XLogRecPtrIsValid.

Referenced by ReorderBufferIterTXNInit(), and ReorderBufferIterTXNNext().

◆ ReorderBufferRestoreCleanup()

static void ReorderBufferRestoreCleanup ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 4854 of file reorderbuffer.c.

4855{
4856 XLogSegNo first;
4857 XLogSegNo cur;
4858 XLogSegNo last;
4859
4862
4865
4866 /* iterate over all possible filenames, and delete them */
4867 for (cur = first; cur <= last; cur++)
4868 {
4869 char path[MAXPGPATH];
4870
4872 if (unlink(path) != 0 && errno != ENOENT)
4873 ereport(ERROR,
4875 errmsg("could not remove file \"%s\": %m", path)));
4876 }
4877}
struct cursor * cur
Definition: ecpg.c:29

References Assert(), cur, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, MAXPGPATH, MyReplicationSlot, ReorderBufferSerializedPath(), wal_segment_size, ReorderBufferTXN::xid, XLByteToSeg, and XLogRecPtrIsValid.

Referenced by ReorderBufferCleanupTXN(), and ReorderBufferTruncateTXN().

◆ ReorderBufferSaveTXNSnapshot()

static void ReorderBufferSaveTXNSnapshot ( ReorderBuffer rb,
ReorderBufferTXN txn,
Snapshot  snapshot_now,
CommandId  command_id 
)
inlinestatic

Definition at line 2120 of file reorderbuffer.c.

2122{
2123 txn->command_id = command_id;
2124
2125 /* Avoid copying if it's already copied. */
2126 if (snapshot_now->copied)
2127 txn->snapshot_now = snapshot_now;
2128 else
2129 txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2130 txn, command_id);
2131}

References ReorderBufferTXN::command_id, SnapshotData::copied, ReorderBufferCopySnap(), and ReorderBufferTXN::snapshot_now.

Referenced by ReorderBufferProcessTXN(), and ReorderBufferResetTXN().

◆ ReorderBufferSerializeChange()

static void ReorderBufferSerializeChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  fd,
ReorderBufferChange change 
)
static

Definition at line 4092 of file reorderbuffer.c.

4094{
4096 Size sz = sizeof(ReorderBufferDiskChange);
4097
4099
4100 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4101 memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
4102
4103 switch (change->action)
4104 {
4105 /* fall through these, they're all similar enough */
4110 {
4111 char *data;
4112 HeapTuple oldtup,
4113 newtup;
4114 Size oldlen = 0;
4115 Size newlen = 0;
4116
4117 oldtup = change->data.tp.oldtuple;
4118 newtup = change->data.tp.newtuple;
4119
4120 if (oldtup)
4121 {
4122 sz += sizeof(HeapTupleData);
4123 oldlen = oldtup->t_len;
4124 sz += oldlen;
4125 }
4126
4127 if (newtup)
4128 {
4129 sz += sizeof(HeapTupleData);
4130 newlen = newtup->t_len;
4131 sz += newlen;
4132 }
4133
4134 /* make sure we have enough space */
4136
4137 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
4138 /* might have been reallocated above */
4139 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4140
4141 if (oldlen)
4142 {
4143 memcpy(data, oldtup, sizeof(HeapTupleData));
4144 data += sizeof(HeapTupleData);
4145
4146 memcpy(data, oldtup->t_data, oldlen);
4147 data += oldlen;
4148 }
4149
4150 if (newlen)
4151 {
4152 memcpy(data, newtup, sizeof(HeapTupleData));
4153 data += sizeof(HeapTupleData);
4154
4155 memcpy(data, newtup->t_data, newlen);
4156 data += newlen;
4157 }
4158 break;
4159 }
4161 {
4162 char *data;
4163 Size prefix_size = strlen(change->data.msg.prefix) + 1;
4164
4165 sz += prefix_size + change->data.msg.message_size +
4166 sizeof(Size) + sizeof(Size);
4168
4169 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
4170
4171 /* might have been reallocated above */
4172 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4173
4174 /* write the prefix including the size */
4175 memcpy(data, &prefix_size, sizeof(Size));
4176 data += sizeof(Size);
4177 memcpy(data, change->data.msg.prefix,
4178 prefix_size);
4179 data += prefix_size;
4180
4181 /* write the message including the size */
4182 memcpy(data, &change->data.msg.message_size, sizeof(Size));
4183 data += sizeof(Size);
4184 memcpy(data, change->data.msg.message,
4185 change->data.msg.message_size);
4186 data += change->data.msg.message_size;
4187
4188 break;
4189 }
4191 {
4192 char *data;
4193 Size inval_size = sizeof(SharedInvalidationMessage) *
4194 change->data.inval.ninvalidations;
4195
4196 sz += inval_size;
4197
4199 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
4200
4201 /* might have been reallocated above */
4202 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4203 memcpy(data, change->data.inval.invalidations, inval_size);
4204 data += inval_size;
4205
4206 break;
4207 }
4209 {
4210 Snapshot snap;
4211 char *data;
4212
4213 snap = change->data.snapshot;
4214
4215 sz += sizeof(SnapshotData) +
4216 sizeof(TransactionId) * snap->xcnt +
4217 sizeof(TransactionId) * snap->subxcnt;
4218
4219 /* make sure we have enough space */
4221 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
4222 /* might have been reallocated above */
4223 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4224
4225 memcpy(data, snap, sizeof(SnapshotData));
4226 data += sizeof(SnapshotData);
4227
4228 if (snap->xcnt)
4229 {
4230 memcpy(data, snap->xip,
4231 sizeof(TransactionId) * snap->xcnt);
4232 data += sizeof(TransactionId) * snap->xcnt;
4233 }
4234
4235 if (snap->subxcnt)
4236 {
4237 memcpy(data, snap->subxip,
4238 sizeof(TransactionId) * snap->subxcnt);
4239 data += sizeof(TransactionId) * snap->subxcnt;
4240 }
4241 break;
4242 }
4244 {
4245 Size size;
4246 char *data;
4247
4248 /* account for the OIDs of truncated relations */
4249 size = sizeof(Oid) * change->data.truncate.nrelids;
4250 sz += size;
4251
4252 /* make sure we have enough space */
4254
4255 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
4256 /* might have been reallocated above */
4257 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4258
4259 memcpy(data, change->data.truncate.relids, size);
4260 data += size;
4261
4262 break;
4263 }
4268 /* ReorderBufferChange contains everything important */
4269 break;
4270 }
4271
4272 ondisk->size = sz;
4273
4274 errno = 0;
4275 pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
4276 if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
4277 {
4278 int save_errno = errno;
4279
4281
4282 /* if write didn't set errno, assume problem is no disk space */
4283 errno = save_errno ? save_errno : ENOSPC;
4284 ereport(ERROR,
4286 errmsg("could not write to data file for XID %u: %m",
4287 txn->xid)));
4288 }
4290
4291 /*
4292 * Keep the transaction's final_lsn up to date with each change we send to
4293 * disk, so that ReorderBufferRestoreCleanup works correctly. (We used to
4294 * only do this on commit and abort records, but that doesn't work if a
4295 * system crash leaves a transaction without its abort record).
4296 *
4297 * Make sure not to move it backwards.
4298 */
4299 if (txn->final_lsn < change->lsn)
4300 txn->final_lsn = change->lsn;
4301
4302 Assert(ondisk->change.action == change->action);
4303}
#define write(a, b, c)
Definition: win32.h:14
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:81

References ReorderBufferChange::action, Assert(), ReorderBufferDiskChange::change, CloseTransientFile(), ReorderBufferChange::data, data, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferTXN::final_lsn, if(), ReorderBufferChange::inval, ReorderBufferChange::invalidations, ReorderBufferChange::lsn, ReorderBufferChange::message, ReorderBufferChange::message_size, ReorderBufferChange::msg, ReorderBufferChange::newtuple, ReorderBufferChange::ninvalidations, ReorderBufferChange::nrelids, ReorderBufferChange::oldtuple, ReorderBuffer::outbuf, pgstat_report_wait_end(), pgstat_report_wait_start(), ReorderBufferChange::prefix, ReorderBufferChange::relids, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, write, SnapshotData::xcnt, ReorderBufferTXN::xid, and SnapshotData::xip.

Referenced by ReorderBufferSerializeTXN().

◆ ReorderBufferSerializedPath()

static void ReorderBufferSerializedPath ( char *  path,
ReplicationSlot slot,
TransactionId  xid,
XLogSegNo  segno 
)
static

Definition at line 4923 of file reorderbuffer.c.

4925{
4926 XLogRecPtr recptr;
4927
4928 XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
4929
4930 snprintf(path, MAXPGPATH, "%s/%s/xid-%u-lsn-%X-%X.spill",
4933 xid, LSN_FORMAT_ARGS(recptr));
4934}
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:47

References ReplicationSlot::data, LSN_FORMAT_ARGS, MAXPGPATH, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, PG_REPLSLOT_DIR, snprintf, wal_segment_size, and XLogSegNoOffsetToRecPtr.

Referenced by ReorderBufferRestoreChanges(), ReorderBufferRestoreCleanup(), and ReorderBufferSerializeTXN().

◆ ReorderBufferSerializeReserve()

static void ReorderBufferSerializeReserve ( ReorderBuffer rb,
Size  sz 
)
static

Definition at line 3772 of file reorderbuffer.c.

3773{
3774 if (!rb->outbufsize)
3775 {
3776 rb->outbuf = MemoryContextAlloc(rb->context, sz);
3777 rb->outbufsize = sz;
3778 }
3779 else if (rb->outbufsize < sz)
3780 {
3781 rb->outbuf = repalloc(rb->outbuf, sz);
3782 rb->outbufsize = sz;
3783 }
3784}

References ReorderBuffer::context, MemoryContextAlloc(), ReorderBuffer::outbuf, ReorderBuffer::outbufsize, and repalloc().

Referenced by ReorderBufferRestoreChanges(), and ReorderBufferSerializeChange().

◆ ReorderBufferSerializeTXN()

static void ReorderBufferSerializeTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 3997 of file reorderbuffer.c.

3998{
3999 dlist_iter subtxn_i;
4000 dlist_mutable_iter change_i;
4001 int fd = -1;
4002 XLogSegNo curOpenSegNo = 0;
4003 Size spilled = 0;
4004 Size size = txn->size;
4005
4006 elog(DEBUG2, "spill %u changes in XID %u to disk",
4007 (uint32) txn->nentries_mem, txn->xid);
4008
4009 /* do the same to all child TXs */
4010 dlist_foreach(subtxn_i, &txn->subtxns)
4011 {
4012 ReorderBufferTXN *subtxn;
4013
4014 subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
4015 ReorderBufferSerializeTXN(rb, subtxn);
4016 }
4017
4018 /* serialize changestream */
4019 dlist_foreach_modify(change_i, &txn->changes)
4020 {
4021 ReorderBufferChange *change;
4022
4023 change = dlist_container(ReorderBufferChange, node, change_i.cur);
4024
4025 /*
4026 * store in segment in which it belongs by start lsn, don't split over
4027 * multiple segments tho
4028 */
4029 if (fd == -1 ||
4030 !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
4031 {
4032 char path[MAXPGPATH];
4033
4034 if (fd != -1)
4036
4037 XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
4038
4039 /*
4040 * No need to care about TLIs here, only used during a single run,
4041 * so each LSN only maps to a specific WAL record.
4042 */
4044 curOpenSegNo);
4045
4046 /* open segment, create it if necessary */
4047 fd = OpenTransientFile(path,
4048 O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
4049
4050 if (fd < 0)
4051 ereport(ERROR,
4053 errmsg("could not open file \"%s\": %m", path)));
4054 }
4055
4056 ReorderBufferSerializeChange(rb, txn, fd, change);
4057 dlist_delete(&change->node);
4058 ReorderBufferFreeChange(rb, change, false);
4059
4060 spilled++;
4061 }
4062
4063 /* Update the memory counter */
4064 ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, size);
4065
4066 /* update the statistics iff we have spilled anything */
4067 if (spilled)
4068 {
4069 rb->spillCount += 1;
4070 rb->spillBytes += size;
4071
4072 /* don't consider already serialized transactions */
4073 rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
4074
4075 /* update the decoding stats */
4077 }
4078
4079 Assert(spilled == txn->nentries_mem);
4081 txn->nentries_mem = 0;
4083
4084 if (fd != -1)
4086}
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
#define rbtxn_is_serialized_clear(txn)
#define RBTXN_IS_SERIALIZED
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)

References Assert(), ReorderBufferTXN::changes, CloseTransientFile(), dlist_iter::cur, dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_delete(), dlist_foreach, dlist_foreach_modify, dlist_is_empty(), elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferChange::lsn, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, OpenTransientFile(), PG_BINARY, ReorderBuffer::private_data, RBTXN_IS_SERIALIZED, rbtxn_is_serialized, rbtxn_is_serialized_clear, ReorderBufferChangeMemoryUpdate(), ReorderBufferFreeChange(), ReorderBufferSerializeChange(), ReorderBufferSerializedPath(), ReorderBufferSerializeTXN(), ReorderBufferTXN::size, ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBufferTXN::subtxns, ReorderBufferTXN::txn_flags, UpdateDecodingStats(), wal_segment_size, ReorderBufferTXN::xid, XLByteInSeg, and XLByteToSeg.

Referenced by ReorderBufferCheckMemoryLimit(), ReorderBufferIterTXNInit(), and ReorderBufferSerializeTXN().

◆ ReorderBufferSetBaseSnapshot()

void ReorderBufferSetBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 3326 of file reorderbuffer.c.

3328{
3329 ReorderBufferTXN *txn;
3330 bool is_new;
3331
3332 Assert(snap != NULL);
3333
3334 /*
3335 * Fetch the transaction to operate on. If we know it's a subtransaction,
3336 * operate on its top-level transaction instead.
3337 */
3338 txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
3339 if (rbtxn_is_known_subxact(txn))
3340 txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3341 NULL, InvalidXLogRecPtr, false);
3342 Assert(txn->base_snapshot == NULL);
3343
3344 txn->base_snapshot = snap;
3345 txn->base_snapshot_lsn = lsn;
3347
3349}

References Assert(), AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBufferTXN::base_snapshot_node, dlist_push_tail(), InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), ReorderBufferTXN::toplevel_xid, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by SnapBuildCommitTxn(), and SnapBuildProcessChange().

◆ ReorderBufferSetRestartPoint()

void ReorderBufferSetRestartPoint ( ReorderBuffer rb,
XLogRecPtr  ptr 
)

Definition at line 1087 of file reorderbuffer.c.

1088{
1090}

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

◆ ReorderBufferSkipPrepare()

void ReorderBufferSkipPrepare ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 2939 of file reorderbuffer.c.

2940{
2941 ReorderBufferTXN *txn;
2942
2943 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2944
2945 /* unknown transaction, nothing to do */
2946 if (txn == NULL)
2947 return;
2948
2949 /* txn must have been marked as a prepared transaction */
2952}

References Assert(), InvalidXLogRecPtr, RBTXN_IS_PREPARED, RBTXN_PREPARE_STATUS_MASK, RBTXN_SKIPPED_PREPARE, ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by DecodePrepare().

◆ ReorderBufferStreamCommit()

static void ReorderBufferStreamCommit ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1983 of file reorderbuffer.c.

1984{
1985 /* we should only call this for previously streamed transactions */
1987
1988 ReorderBufferStreamTXN(rb, txn);
1989
1990 if (rbtxn_is_prepared(txn))
1991 {
1992 /*
1993 * Note, we send stream prepare even if a concurrent abort is
1994 * detected. See DecodePrepare for more information.
1995 */
1997 rb->stream_prepare(rb, txn, txn->final_lsn);
1999
2000 /*
2001 * This is a PREPARED transaction, part of a two-phase commit. The
2002 * full cleanup will happen as part of the COMMIT PREPAREDs, so now
2003 * just truncate txn by removing changes and tuplecids.
2004 */
2005 ReorderBufferTruncateTXN(rb, txn, true);
2006 /* Reset the CheckXidAlive */
2008 }
2009 else
2010 {
2011 rb->stream_commit(rb, txn, txn->final_lsn);
2012 ReorderBufferCleanupTXN(rb, txn);
2013 }
2014}
ReorderBufferStreamPrepareCB stream_prepare
ReorderBufferStreamCommitCB stream_commit

References Assert(), CheckXidAlive, ReorderBufferTXN::final_lsn, InvalidTransactionId, rbtxn_is_prepared, rbtxn_is_streamed, RBTXN_SENT_PREPARE, rbtxn_sent_prepare, ReorderBufferCleanupTXN(), ReorderBufferStreamTXN(), ReorderBufferTruncateTXN(), ReorderBuffer::stream_commit, ReorderBuffer::stream_prepare, and ReorderBufferTXN::txn_flags.

Referenced by ReorderBufferReplay().

◆ ReorderBufferStreamTXN()

static void ReorderBufferStreamTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 4342 of file reorderbuffer.c.

4343{
4344 Snapshot snapshot_now;
4345 CommandId command_id;
4346 Size stream_bytes;
4347 bool txn_is_streamed;
4348
4349 /* We can never reach here for a subtransaction. */
4350 Assert(rbtxn_is_toptxn(txn));
4351
4352 /*
4353 * We can't make any assumptions about base snapshot here, similar to what
4354 * ReorderBufferCommit() does. That relies on base_snapshot getting
4355 * transferred from subxact in ReorderBufferCommitChild(), but that was
4356 * not yet called as the transaction is in-progress.
4357 *
4358 * So just walk the subxacts and use the same logic here. But we only need
4359 * to do that once, when the transaction is streamed for the first time.
4360 * After that we need to reuse the snapshot from the previous run.
4361 *
4362 * Unlike DecodeCommit which adds xids of all the subtransactions in
4363 * snapshot's xip array via SnapBuildCommitTxn, we can't do that here but
4364 * we do add them to subxip array instead via ReorderBufferCopySnap. This
4365 * allows the catalog changes made in subtransactions decoded till now to
4366 * be visible.
4367 */
4368 if (txn->snapshot_now == NULL)
4369 {
4370 dlist_iter subxact_i;
4371
4372 /* make sure this transaction is streamed for the first time */
4374
4375 /* at the beginning we should have invalid command ID */
4377
4378 dlist_foreach(subxact_i, &txn->subtxns)
4379 {
4380 ReorderBufferTXN *subtxn;
4381
4382 subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
4384 }
4385
4386 /*
4387 * If this transaction has no snapshot, it didn't make any changes to
4388 * the database till now, so there's nothing to decode.
4389 */
4390 if (txn->base_snapshot == NULL)
4391 {
4392 Assert(txn->ninvalidations == 0);
4393 return;
4394 }
4395
4396 command_id = FirstCommandId;
4397 snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
4398 txn, command_id);
4399 }
4400 else
4401 {
4402 /* the transaction must have been already streamed */
4404
4405 /*
4406 * Nah, we already have snapshot from the previous streaming run. We
4407 * assume new subxacts can't move the LSN backwards, and so can't beat
4408 * the LSN condition in the previous branch (so no need to walk
4409 * through subxacts again). In fact, we must not do that as we may be
4410 * using snapshot half-way through the subxact.
4411 */
4412 command_id = txn->command_id;
4413
4414 /*
4415 * We can't use txn->snapshot_now directly because after the last
4416 * streaming run, we might have got some new sub-transactions. So we
4417 * need to add them to the snapshot.
4418 */
4419 snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now,
4420 txn, command_id);
4421
4422 /* Free the previously copied snapshot. */
4423 Assert(txn->snapshot_now->copied);
4425 txn->snapshot_now = NULL;
4426 }
4427
4428 /*
4429 * Remember this information to be used later to update stats. We can't
4430 * update the stats here as an error while processing the changes would
4431 * lead to the accumulation of stats even though we haven't streamed all
4432 * the changes.
4433 */
4434 txn_is_streamed = rbtxn_is_streamed(txn);
4435 stream_bytes = txn->total_size;
4436
4437 /* Process and send the changes to output plugin. */
4438 ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
4439 command_id, true);
4440
4441 rb->streamCount += 1;
4442 rb->streamBytes += stream_bytes;
4443
4444 /* Don't consider already streamed transaction. */
4445 rb->streamTxns += (txn_is_streamed) ? 0 : 1;
4446
4447 /* update the decoding stats */
4449
4451 Assert(txn->nentries == 0);
4452 Assert(txn->nentries_mem == 0);
4453}

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::changes, ReorderBufferTXN::command_id, SnapshotData::copied, dlist_iter::cur, dlist_container, dlist_foreach, dlist_is_empty(), FirstCommandId, InvalidCommandId, InvalidXLogRecPtr, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferTXN::ninvalidations, ReorderBuffer::private_data, rbtxn_is_streamed, rbtxn_is_toptxn, ReorderBufferCopySnap(), ReorderBufferFreeSnap(), ReorderBufferProcessTXN(), ReorderBufferTransferSnapToParent(), ReorderBufferTXN::snapshot_now, ReorderBuffer::streamBytes, ReorderBuffer::streamCount, ReorderBuffer::streamTxns, ReorderBufferTXN::subtxns, ReorderBufferTXN::total_size, and UpdateDecodingStats().

Referenced by ReorderBufferCheckMemoryLimit(), ReorderBufferProcessPartialChange(), and ReorderBufferStreamCommit().

◆ ReorderBufferToastAppendChunk()

static void ReorderBufferToastAppendChunk ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 4995 of file reorderbuffer.c.

4997{
4999 HeapTuple newtup;
5000 bool found;
5001 int32 chunksize;
5002 bool isnull;
5003 Pointer chunk;
5004 TupleDesc desc = RelationGetDescr(relation);
5005 Oid chunk_id;
5006 int32 chunk_seq;
5007
5008 if (txn->toast_hash == NULL)
5010
5011 Assert(IsToastRelation(relation));
5012
5013 newtup = change->data.tp.newtuple;
5014 chunk_id = DatumGetObjectId(fastgetattr(newtup, 1, desc, &isnull));
5015 Assert(!isnull);
5016 chunk_seq = DatumGetInt32(fastgetattr(newtup, 2, desc, &isnull));
5017 Assert(!isnull);
5018
5019 ent = (ReorderBufferToastEnt *)
5020 hash_search(txn->toast_hash, &chunk_id, HASH_ENTER, &found);
5021
5022 if (!found)
5023 {
5024 Assert(ent->chunk_id == chunk_id);
5025 ent->num_chunks = 0;
5026 ent->last_chunk_seq = 0;
5027 ent->size = 0;
5028 ent->reconstructed = NULL;
5029 dlist_init(&ent->chunks);
5030
5031 if (chunk_seq != 0)
5032 elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
5033 chunk_seq, chunk_id);
5034 }
5035 else if (found && chunk_seq != ent->last_chunk_seq + 1)
5036 elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
5037 chunk_seq, chunk_id, ent->last_chunk_seq + 1);
5038
5039 chunk = DatumGetPointer(fastgetattr(newtup, 3, desc, &isnull));
5040 Assert(!isnull);
5041
5042 /* calculate size so we can allocate the right size at once later */
5043 if (!VARATT_IS_EXTENDED(chunk))
5044 chunksize = VARSIZE(chunk) - VARHDRSZ;
5045 else if (VARATT_IS_SHORT(chunk))
5046 /* could happen due to heap_form_tuple doing its thing */
5047 chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
5048 else
5049 elog(ERROR, "unexpected type of toast chunk");
5050
5051 ent->size += chunksize;
5052 ent->last_chunk_seq = chunk_seq;
5053 ent->num_chunks++;
5054 dlist_push_tail(&ent->chunks, &change->node);
5055}
#define VARHDRSZ
Definition: c.h:711
void * Pointer
Definition: c.h:543
static Datum fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, bool *isnull)
Definition: htup_details.h:861
static Oid DatumGetObjectId(Datum X)
Definition: postgres.h:252
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:322
#define RelationGetDescr(relation)
Definition: rel.h:541
static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
struct varlena * reconstructed
#define VARHDRSZ_SHORT
Definition: varatt.h:278
static bool VARATT_IS_SHORT(const void *PTR)
Definition: varatt.h:403
static bool VARATT_IS_EXTENDED(const void *PTR)
Definition: varatt.h:410
static Size VARSIZE(const void *PTR)
Definition: varatt.h:298
static Size VARSIZE_SHORT(const void *PTR)
Definition: varatt.h:312

References Assert(), ReorderBufferToastEnt::chunk_id, ReorderBufferToastEnt::chunks, ReorderBufferChange::data, DatumGetInt32(), DatumGetObjectId(), DatumGetPointer(), dlist_init(), dlist_push_tail(), elog, ERROR, fastgetattr(), HASH_ENTER, hash_search(), IsToastRelation(), ReorderBufferToastEnt::last_chunk_seq, ReorderBufferChange::newtuple, ReorderBufferChange::node, ReorderBufferToastEnt::num_chunks, ReorderBufferToastEnt::reconstructed, RelationGetDescr, ReorderBufferToastInitHash(), ReorderBufferToastEnt::size, ReorderBufferTXN::toast_hash, ReorderBufferChange::tp, VARATT_IS_EXTENDED(), VARATT_IS_SHORT(), VARHDRSZ, VARHDRSZ_SHORT, VARSIZE(), and VARSIZE_SHORT().

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferToastInitHash()

static void ReorderBufferToastInitHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 4975 of file reorderbuffer.c.

4976{
4977 HASHCTL hash_ctl;
4978
4979 Assert(txn->toast_hash == NULL);
4980
4981 hash_ctl.keysize = sizeof(Oid);
4982 hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
4983 hash_ctl.hcxt = rb->context;
4984 txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
4986}
struct ReorderBufferToastEnt ReorderBufferToastEnt

References Assert(), ReorderBuffer::context, HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, and ReorderBufferTXN::toast_hash.

Referenced by ReorderBufferToastAppendChunk().

◆ ReorderBufferToastReplace()

static void ReorderBufferToastReplace ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 5078 of file reorderbuffer.c.

5080{
5081 TupleDesc desc;
5082 int natt;
5083 Datum *attrs;
5084 bool *isnull;
5085 bool *free;
5086 HeapTuple tmphtup;
5087 Relation toast_rel;
5088 TupleDesc toast_desc;
5089 MemoryContext oldcontext;
5090 HeapTuple newtup;
5091 Size old_size;
5092
5093 /* no toast tuples changed */
5094 if (txn->toast_hash == NULL)
5095 return;
5096
5097 /*
5098 * We're going to modify the size of the change. So, to make sure the
5099 * accounting is correct we record the current change size and then after
5100 * re-computing the change we'll subtract the recorded size and then
5101 * re-add the new change size at the end. We don't immediately subtract
5102 * the old size because if there is any error before we add the new size,
5103 * we will release the changes and that will update the accounting info
5104 * (subtracting the size from the counters). And we don't want to
5105 * underflow there.
5106 */
5107 old_size = ReorderBufferChangeSize(change);
5108
5109 oldcontext = MemoryContextSwitchTo(rb->context);
5110
5111 /* we should only have toast tuples in an INSERT or UPDATE */
5112 Assert(change->data.tp.newtuple);
5113
5114 desc = RelationGetDescr(relation);
5115
5116 toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
5117 if (!RelationIsValid(toast_rel))
5118 elog(ERROR, "could not open toast relation with OID %u (base relation \"%s\")",
5119 relation->rd_rel->reltoastrelid, RelationGetRelationName(relation));
5120
5121 toast_desc = RelationGetDescr(toast_rel);
5122
5123 /* should we allocate from stack instead? */
5124 attrs = palloc0_array(Datum, desc->natts);
5125 isnull = palloc0_array(bool, desc->natts);
5126 free = palloc0_array(bool, desc->natts);
5127
5128 newtup = change->data.tp.newtuple;
5129
5130 heap_deform_tuple(newtup, desc, attrs, isnull);
5131
5132 for (natt = 0; natt < desc->natts; natt++)
5133 {
5134 CompactAttribute *attr = TupleDescCompactAttr(desc, natt);
5136 struct varlena *varlena;
5137
5138 /* va_rawsize is the size of the original datum -- including header */
5139 struct varatt_external toast_pointer;
5140 struct varatt_indirect redirect_pointer;
5141 struct varlena *new_datum = NULL;
5142 struct varlena *reconstructed;
5143 dlist_iter it;
5144 Size data_done = 0;
5145
5146 if (attr->attisdropped)
5147 continue;
5148
5149 /* not a varlena datatype */
5150 if (attr->attlen != -1)
5151 continue;
5152
5153 /* no data */
5154 if (isnull[natt])
5155 continue;
5156
5157 /* ok, we know we have a toast datum */
5158 varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
5159
5160 /* no need to do anything if the tuple isn't external */
5162 continue;
5163
5164 VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
5165
5166 /*
5167 * Check whether the toast tuple changed, replace if so.
5168 */
5169 ent = (ReorderBufferToastEnt *)
5171 &toast_pointer.va_valueid,
5172 HASH_FIND,
5173 NULL);
5174 if (ent == NULL)
5175 continue;
5176
5177 new_datum =
5179
5180 free[natt] = true;
5181
5182 reconstructed = palloc0(toast_pointer.va_rawsize);
5183
5184 ent->reconstructed = reconstructed;
5185
5186 /* stitch toast tuple back together from its parts */
5187 dlist_foreach(it, &ent->chunks)
5188 {
5189 bool cisnull;
5190 ReorderBufferChange *cchange;
5191 HeapTuple ctup;
5192 Pointer chunk;
5193
5194 cchange = dlist_container(ReorderBufferChange, node, it.cur);
5195 ctup = cchange->data.tp.newtuple;
5196 chunk = DatumGetPointer(fastgetattr(ctup, 3, toast_desc, &cisnull));
5197
5198 Assert(!cisnull);
5199 Assert(!VARATT_IS_EXTERNAL(chunk));
5200 Assert(!VARATT_IS_SHORT(chunk));
5201
5202 memcpy(VARDATA(reconstructed) + data_done,
5203 VARDATA(chunk),
5204 VARSIZE(chunk) - VARHDRSZ);
5205 data_done += VARSIZE(chunk) - VARHDRSZ;
5206 }
5207 Assert(data_done == VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer));
5208
5209 /* make sure its marked as compressed or not */
5210 if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
5211 SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
5212 else
5213 SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
5214
5215 memset(&redirect_pointer, 0, sizeof(redirect_pointer));
5216 redirect_pointer.pointer = reconstructed;
5217
5219 memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
5220 sizeof(redirect_pointer));
5221
5222 attrs[natt] = PointerGetDatum(new_datum);
5223 }
5224
5225 /*
5226 * Build tuple in separate memory & copy tuple back into the tuplebuf
5227 * passed to the output plugin. We can't directly heap_fill_tuple() into
5228 * the tuplebuf because attrs[] will point back into the current content.
5229 */
5230 tmphtup = heap_form_tuple(desc, attrs, isnull);
5231 Assert(newtup->t_len <= MaxHeapTupleSize);
5232 Assert(newtup->t_data == (HeapTupleHeader) ((char *) newtup + HEAPTUPLESIZE));
5233
5234 memcpy(newtup->t_data, tmphtup->t_data, tmphtup->t_len);
5235 newtup->t_len = tmphtup->t_len;
5236
5237 /*
5238 * free resources we won't further need, more persistent stuff will be
5239 * free'd in ReorderBufferToastReset().
5240 */
5241 RelationClose(toast_rel);
5242 pfree(tmphtup);
5243 for (natt = 0; natt < desc->natts; natt++)
5244 {
5245 if (free[natt])
5246 pfree(DatumGetPointer(attrs[natt]));
5247 }
5248 pfree(attrs);
5249 pfree(free);
5250 pfree(isnull);
5251
5252 MemoryContextSwitchTo(oldcontext);
5253
5254 /* subtract the old change size */
5255 ReorderBufferChangeMemoryUpdate(rb, change, NULL, false, old_size);
5256 /* now add the change back, with the correct size */
5257 ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
5258 ReorderBufferChangeSize(change));
5259}
#define INDIRECT_POINTER_SIZE
Definition: detoast.h:34
#define VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr)
Definition: detoast.h:22
#define palloc0_array(type, count)
Definition: fe_memutils.h:77
#define free(a)
Definition: header.h:65
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1117
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:1346
#define MaxHeapTupleSize
Definition: htup_details.h:610
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:332
uint64_t Datum
Definition: postgres.h:70
#define RelationGetRelationName(relation)
Definition: rel.h:549
bool attisdropped
Definition: tupdesc.h:77
int16 attlen
Definition: tupdesc.h:71
Definition: c.h:706
static CompactAttribute * TupleDescCompactAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:175
static void SET_VARSIZE_COMPRESSED(void *PTR, Size len)
Definition: varatt.h:446
static Size VARATT_EXTERNAL_GET_EXTSIZE(struct varatt_external toast_pointer)
Definition: varatt.h:507
static bool VARATT_IS_EXTERNAL(const void *PTR)
Definition: varatt.h:354
static char * VARDATA_EXTERNAL(const void *PTR)
Definition: varatt.h:340
static char * VARDATA(const void *PTR)
Definition: varatt.h:305
static void SET_VARTAG_EXTERNAL(void *PTR, vartag_external tag)
Definition: varatt.h:453
@ VARTAG_INDIRECT
Definition: varatt.h:86
static bool VARATT_EXTERNAL_IS_COMPRESSED(struct varatt_external toast_pointer)
Definition: varatt.h:536
static void SET_VARSIZE(void *PTR, Size len)
Definition: varatt.h:432

References Assert(), CompactAttribute::attisdropped, CompactAttribute::attlen, ReorderBufferToastEnt::chunks, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, DatumGetPointer(), dlist_container, dlist_foreach, elog, ERROR, fastgetattr(), free, HASH_FIND, hash_search(), heap_deform_tuple(), heap_form_tuple(), HEAPTUPLESIZE, INDIRECT_POINTER_SIZE, MaxHeapTupleSize, MemoryContextSwitchTo(), TupleDescData::natts, ReorderBufferChange::newtuple, palloc0(), palloc0_array, pfree(), varatt_indirect::pointer, PointerGetDatum(), RelationData::rd_rel, ReorderBufferToastEnt::reconstructed, RelationClose(), RelationGetDescr, RelationGetRelationName, RelationIdGetRelation(), RelationIsValid, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), SET_VARSIZE(), SET_VARSIZE_COMPRESSED(), SET_VARTAG_EXTERNAL(), HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferTXN::toast_hash, ReorderBufferChange::tp, TupleDescCompactAttr(), varatt_external::va_rawsize, varatt_external::va_valueid, VARATT_EXTERNAL_GET_EXTSIZE(), VARATT_EXTERNAL_GET_POINTER, VARATT_EXTERNAL_IS_COMPRESSED(), VARATT_IS_EXTERNAL(), VARATT_IS_SHORT(), VARDATA(), VARDATA_EXTERNAL(), VARHDRSZ, VARSIZE(), and VARTAG_INDIRECT.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferToastReset()

static void ReorderBufferToastReset ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 5265 of file reorderbuffer.c.

5266{
5267 HASH_SEQ_STATUS hstat;
5269
5270 if (txn->toast_hash == NULL)
5271 return;
5272
5273 /* sequentially walk over the hash and free everything */
5274 hash_seq_init(&hstat, txn->toast_hash);
5275 while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
5276 {
5278
5279 if (ent->reconstructed != NULL)
5280 pfree(ent->reconstructed);
5281
5282 dlist_foreach_modify(it, &ent->chunks)
5283 {
5284 ReorderBufferChange *change =
5286
5287 dlist_delete(&change->node);
5288 ReorderBufferFreeChange(rb, change, true);
5289 }
5290 }
5291
5293 txn->toast_hash = NULL;
5294}
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1415
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1380

References ReorderBufferToastEnt::chunks, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, hash_destroy(), hash_seq_init(), hash_seq_search(), ReorderBufferChange::node, pfree(), ReorderBufferToastEnt::reconstructed, ReorderBufferFreeChange(), and ReorderBufferTXN::toast_hash.

Referenced by ReorderBufferCheckAndTruncateAbortedTXN(), ReorderBufferFreeTXN(), ReorderBufferProcessTXN(), and ReorderBufferResetTXN().

◆ ReorderBufferTransferSnapToParent()

static void ReorderBufferTransferSnapToParent ( ReorderBufferTXN txn,
ReorderBufferTXN subtxn 
)
static

Definition at line 1165 of file reorderbuffer.c.

1167{
1168 Assert(subtxn->toplevel_xid == txn->xid);
1169
1170 if (subtxn->base_snapshot != NULL)
1171 {
1172 if (txn->base_snapshot == NULL ||
1173 subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
1174 {
1175 /*
1176 * If the toplevel transaction already has a base snapshot but
1177 * it's newer than the subxact's, purge it.
1178 */
1179 if (txn->base_snapshot != NULL)
1180 {
1183 }
1184
1185 /*
1186 * The snapshot is now the top transaction's; transfer it, and
1187 * adjust the list position of the top transaction in the list by
1188 * moving it to where the subtransaction is.
1189 */
1190 txn->base_snapshot = subtxn->base_snapshot;
1191 txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
1193 &txn->base_snapshot_node);
1194
1195 /*
1196 * The subtransaction doesn't have a snapshot anymore (so it
1197 * mustn't be in the list.)
1198 */
1199 subtxn->base_snapshot = NULL;
1202 }
1203 else
1204 {
1205 /* Base snap of toplevel is fine, so subxact's is not needed */
1208 subtxn->base_snapshot = NULL;
1210 }
1211 }
1212}
static void dlist_insert_before(dlist_node *before, dlist_node *node)
Definition: ilist.h:393

References Assert(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBufferTXN::base_snapshot_node, dlist_delete(), dlist_insert_before(), InvalidXLogRecPtr, SnapBuildSnapDecRefcount(), ReorderBufferTXN::toplevel_xid, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAssignChild(), and ReorderBufferStreamTXN().

◆ ReorderBufferTruncateTXN()

static void ReorderBufferTruncateTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
bool  txn_prepared 
)
static

Definition at line 1656 of file reorderbuffer.c.

1657{
1658 dlist_mutable_iter iter;
1659 Size mem_freed = 0;
1660
1661 /* cleanup subtransactions & their changes */
1662 dlist_foreach_modify(iter, &txn->subtxns)
1663 {
1664 ReorderBufferTXN *subtxn;
1665
1666 subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1667
1668 /*
1669 * Subtransactions are always associated to the toplevel TXN, even if
1670 * they originally were happening inside another subtxn, so we won't
1671 * ever recurse more than one level deep here.
1672 */
1674 Assert(subtxn->nsubtxns == 0);
1675
1677 ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
1678 }
1679
1680 /* cleanup changes in the txn */
1681 dlist_foreach_modify(iter, &txn->changes)
1682 {
1683 ReorderBufferChange *change;
1684
1685 change = dlist_container(ReorderBufferChange, node, iter.cur);
1686
1687 /* Check we're not mixing changes from different transactions. */
1688 Assert(change->txn == txn);
1689
1690 /* remove the change from its containing list */
1691 dlist_delete(&change->node);
1692
1693 /*
1694 * Instead of updating the memory counter for individual changes, we
1695 * sum up the size of memory to free so we can update the memory
1696 * counter all together below. This saves costs of maintaining the
1697 * max-heap.
1698 */
1699 mem_freed += ReorderBufferChangeSize(change);
1700
1701 ReorderBufferFreeChange(rb, change, false);
1702 }
1703
1704 /* Update the memory counter */
1705 ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
1706
1707 if (txn_prepared)
1708 {
1709 /*
1710 * If this is a prepared txn, cleanup the tuplecids we stored for
1711 * decoding catalog snapshot access. They are always stored in the
1712 * toplevel transaction.
1713 */
1714 dlist_foreach_modify(iter, &txn->tuplecids)
1715 {
1716 ReorderBufferChange *change;
1717
1718 change = dlist_container(ReorderBufferChange, node, iter.cur);
1719
1720 /* Check we're not mixing changes from different transactions. */
1721 Assert(change->txn == txn);
1723
1724 /* Remove the change from its containing list. */
1725 dlist_delete(&change->node);
1726
1727 ReorderBufferFreeChange(rb, change, true);
1728 }
1729 }
1730
1731 /*
1732 * Destroy the (relfilelocator, ctid) hashtable, so that we don't leak any
1733 * memory. We could also keep the hash table and update it with new ctid
1734 * values, but this seems simpler and good enough for now.
1735 */
1736 if (txn->tuplecid_hash != NULL)
1737 {
1739 txn->tuplecid_hash = NULL;
1740 }
1741
1742 /* If this txn is serialized then clean the disk space. */
1743 if (rbtxn_is_serialized(txn))
1744 {
1746 txn->txn_flags &= ~RBTXN_IS_SERIALIZED;
1747
1748 /*
1749 * We set this flag to indicate if the transaction is ever serialized.
1750 * We need this to accurately update the stats as otherwise the same
1751 * transaction can be counted as serialized multiple times.
1752 */
1754 }
1755
1756 /* also reset the number of entries in the transaction */
1757 txn->nentries_mem = 0;
1758 txn->nentries = 0;
1759}
#define RBTXN_IS_SERIALIZED_CLEAR

References ReorderBufferChange::action, Assert(), ReorderBufferTXN::changes, dlist_mutable_iter::cur, dlist_container, dlist_delete(), dlist_foreach_modify, hash_destroy(), ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, rbtxn_is_serialized, RBTXN_IS_SERIALIZED_CLEAR, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferFreeChange(), ReorderBufferMaybeMarkTXNStreamed(), ReorderBufferRestoreCleanup(), ReorderBufferTruncateTXN(), ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecid_hash, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXN::txn_flags.

Referenced by ReorderBufferCheckAndTruncateAbortedTXN(), ReorderBufferProcessTXN(), ReorderBufferResetTXN(), ReorderBufferStreamCommit(), and ReorderBufferTruncateTXN().

◆ ReorderBufferTXNByXid()

static ReorderBufferTXN * ReorderBufferTXNByXid ( ReorderBuffer rb,
TransactionId  xid,
bool  create,
bool *  is_new,
XLogRecPtr  lsn,
bool  create_as_top 
)
static

Definition at line 653 of file reorderbuffer.c.

655{
656 ReorderBufferTXN *txn;
658 bool found;
659
661
662 /*
663 * Check the one-entry lookup cache first
664 */
666 rb->by_txn_last_xid == xid)
667 {
668 txn = rb->by_txn_last_txn;
669
670 if (txn != NULL)
671 {
672 /* found it, and it's valid */
673 if (is_new)
674 *is_new = false;
675 return txn;
676 }
677
678 /*
679 * cached as non-existent, and asked not to create? Then nothing else
680 * to do.
681 */
682 if (!create)
683 return NULL;
684 /* otherwise fall through to create it */
685 }
686
687 /*
688 * If the cache wasn't hit or it yielded a "does-not-exist" and we want to
689 * create an entry.
690 */
691
692 /* search the lookup table */
695 &xid,
696 create ? HASH_ENTER : HASH_FIND,
697 &found);
698 if (found)
699 txn = ent->txn;
700 else if (create)
701 {
702 /* initialize the new entry, if creation was requested */
703 Assert(ent != NULL);
705
706 ent->txn = ReorderBufferAllocTXN(rb);
707 ent->txn->xid = xid;
708 txn = ent->txn;
709 txn->first_lsn = lsn;
711
712 if (create_as_top)
713 {
716 }
717 }
718 else
719 txn = NULL; /* not found and not asked to create */
720
721 /* update cache */
722 rb->by_txn_last_xid = xid;
723 rb->by_txn_last_txn = txn;
724
725 if (is_new)
726 *is_new = !found;
727
728 Assert(!create || txn != NULL);
729 return txn;
730}
static ReorderBufferTXN * ReorderBufferAllocTXN(ReorderBuffer *rb)
ReorderBufferTXN * txn
XLogRecPtr restart_decoding_lsn
#define TransactionIdIsValid(xid)
Definition: transam.h:41

References Assert(), AssertTXNLsnOrder(), ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::current_restart_decoding_lsn, dlist_push_tail(), ReorderBufferTXN::first_lsn, HASH_ENTER, HASH_FIND, hash_search(), ReorderBufferTXN::node, ReorderBufferAllocTXN(), ReorderBufferTXN::restart_decoding_lsn, ReorderBuffer::toplevel_by_lsn, TransactionIdIsValid, ReorderBufferTXNByIdEnt::txn, ReorderBufferTXN::xid, and XLogRecPtrIsValid.

Referenced by ReorderBufferAbort(), ReorderBufferAddDistributedInvalidations(), ReorderBufferAddInvalidations(), ReorderBufferAddNewTupleCids(), ReorderBufferAssignChild(), ReorderBufferCommit(), ReorderBufferCommitChild(), ReorderBufferFinishPrepared(), ReorderBufferForget(), ReorderBufferGetInvalidations(), ReorderBufferInvalidate(), ReorderBufferPrepare(), ReorderBufferProcessXid(), ReorderBufferQueueChange(), ReorderBufferQueueMessage(), ReorderBufferRememberPrepareInfo(), ReorderBufferSetBaseSnapshot(), ReorderBufferSkipPrepare(), ReorderBufferXidHasBaseSnapshot(), ReorderBufferXidHasCatalogChanges(), and ReorderBufferXidSetCatalogChanges().

◆ ReorderBufferTXNSizeCompare()

static int ReorderBufferTXNSizeCompare ( const pairingheap_node a,
const pairingheap_node b,
void *  arg 
)
static

Definition at line 3789 of file reorderbuffer.c.

3790{
3793
3794 if (ta->size < tb->size)
3795 return -1;
3796 if (ta->size > tb->size)
3797 return 1;
3798 return 0;
3799}
#define pairingheap_const_container(type, membername, ptr)
Definition: pairingheap.h:51

References a, b, pairingheap_const_container, and ReorderBufferTXN::size.

Referenced by ReorderBufferAllocate().

◆ ReorderBufferXidHasBaseSnapshot()

bool ReorderBufferXidHasBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 3742 of file reorderbuffer.c.

3743{
3744 ReorderBufferTXN *txn;
3745
3746 txn = ReorderBufferTXNByXid(rb, xid, false,
3747 NULL, InvalidXLogRecPtr, false);
3748
3749 /* transaction isn't known yet, ergo no snapshot */
3750 if (txn == NULL)
3751 return false;
3752
3753 /* a known subtxn? operate on top-level txn instead */
3754 if (rbtxn_is_known_subxact(txn))
3755 txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3756 NULL, InvalidXLogRecPtr, false);
3757
3758 return txn->base_snapshot != NULL;
3759}

References ReorderBufferTXN::base_snapshot, InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), and ReorderBufferTXN::toplevel_xid.

Referenced by SnapBuildCommitTxn(), SnapBuildDistributeSnapshotAndInval(), and SnapBuildProcessChange().

◆ ReorderBufferXidHasCatalogChanges()

bool ReorderBufferXidHasCatalogChanges ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 3725 of file reorderbuffer.c.

3726{
3727 ReorderBufferTXN *txn;
3728
3729 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3730 false);
3731 if (txn == NULL)
3732 return false;
3733
3734 return rbtxn_has_catalog_changes(txn);
3735}

References InvalidXLogRecPtr, rbtxn_has_catalog_changes, and ReorderBufferTXNByXid().

Referenced by SnapBuildXidHasCatalogChanges().

◆ ReorderBufferXidSetCatalogChanges()

void ReorderBufferXidSetCatalogChanges ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3653 of file reorderbuffer.c.

3655{
3656 ReorderBufferTXN *txn;
3657
3658 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3659
3660 if (!rbtxn_has_catalog_changes(txn))
3661 {
3664 }
3665
3666 /*
3667 * Mark top-level transaction as having catalog changes too if one of its
3668 * children has so that the ReorderBufferBuildTupleCidHash can
3669 * conveniently check just top-level transaction and decide whether to
3670 * build the hash table or not.
3671 */
3672 if (rbtxn_is_subtxn(txn))
3673 {
3674 ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
3675
3676 if (!rbtxn_has_catalog_changes(toptxn))
3677 {
3680 }
3681 }
3682}
static void dclist_push_tail(dclist_head *head, dlist_node *node)
Definition: ilist.h:709
#define rbtxn_is_subtxn(txn)
#define RBTXN_HAS_CATALOG_CHANGES

References ReorderBufferTXN::catchange_node, ReorderBuffer::catchange_txns, dclist_push_tail(), rbtxn_get_toptxn, RBTXN_HAS_CATALOG_CHANGES, rbtxn_has_catalog_changes, rbtxn_is_subtxn, ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by SnapBuildProcessNewCid(), and xact_decode().

◆ ResolveCminCmaxDuringDecoding()

bool ResolveCminCmaxDuringDecoding ( HTAB tuplecid_data,
Snapshot  snapshot,
HeapTuple  htup,
Buffer  buffer,
CommandId cmin,
CommandId cmax 
)

Definition at line 5557 of file reorderbuffer.c.

5561{
5564 ForkNumber forkno;
5565 BlockNumber blockno;
5566 bool updated_mapping = false;
5567
5568 /*
5569 * Return unresolved if tuplecid_data is not valid. That's because when
5570 * streaming in-progress transactions we may run into tuples with the CID
5571 * before actually decoding them. Think e.g. about INSERT followed by
5572 * TRUNCATE, where the TRUNCATE may not be decoded yet when applying the
5573 * INSERT. So in such cases, we assume the CID is from the future
5574 * command.
5575 */
5576 if (tuplecid_data == NULL)
5577 return false;
5578
5579 /* be careful about padding */
5580 memset(&key, 0, sizeof(key));
5581
5582 Assert(!BufferIsLocal(buffer));
5583
5584 /*
5585 * get relfilelocator from the buffer, no convenient way to access it
5586 * other than that.
5587 */
5588 BufferGetTag(buffer, &key.rlocator, &forkno, &blockno);
5589
5590 /* tuples can only be in the main fork */
5591 Assert(forkno == MAIN_FORKNUM);
5592 Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
5593
5594 ItemPointerCopy(&htup->t_self,
5595 &key.tid);
5596
5597restart:
5598 ent = (ReorderBufferTupleCidEnt *)
5600
5601 /*
5602 * failed to find a mapping, check whether the table was rewritten and
5603 * apply mapping if so, but only do that once - there can be no new
5604 * mappings while we are in here since we have to hold a lock on the
5605 * relation.
5606 */
5607 if (ent == NULL && !updated_mapping)
5608 {
5610 /* now check but don't update for a mapping again */
5611 updated_mapping = true;
5612 goto restart;
5613 }
5614 else if (ent == NULL)
5615 return false;
5616
5617 if (cmin)
5618 *cmin = ent->cmin;
5619 if (cmax)
5620 *cmax = ent->cmax;
5621 return true;
5622}
uint32 BlockNumber
Definition: block.h:31
#define BufferIsLocal(buffer)
Definition: buf.h:37
void BufferGetTag(Buffer buffer, RelFileLocator *rlocator, ForkNumber *forknum, BlockNumber *blknum)
Definition: bufmgr.c:4244
static BlockNumber ItemPointerGetBlockNumber(const ItemPointerData *pointer)
Definition: itemptr.h:103
ForkNumber
Definition: relpath.h:56
static void UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
ItemPointerData t_self
Definition: htup.h:65
Oid t_tableOid
Definition: htup.h:66

References Assert(), BufferGetTag(), BufferIsLocal, ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, HASH_FIND, hash_search(), ItemPointerCopy(), ItemPointerGetBlockNumber(), sort-test::key, MAIN_FORKNUM, HeapTupleData::t_self, HeapTupleData::t_tableOid, tuplecid_data, and UpdateLogicalMappings().

Referenced by HeapTupleSatisfiesHistoricMVCC().

◆ SetupCheckXidLive()

static void SetupCheckXidLive ( TransactionId  xid)
inlinestatic

Definition at line 2049 of file reorderbuffer.c.

2050{
2051 /*
2052 * If the input transaction id is already set as a CheckXidAlive then
2053 * nothing to do.
2054 */
2056 return;
2057
2058 /*
2059 * setup CheckXidAlive if it's not committed yet. We don't check if the
2060 * xid is aborted. That will happen during catalog access.
2061 */
2062 if (!TransactionIdDidCommit(xid))
2063 CheckXidAlive = xid;
2064 else
2066}
#define TransactionIdEquals(id1, id2)
Definition: transam.h:43

References CheckXidAlive, InvalidTransactionId, TransactionIdDidCommit(), and TransactionIdEquals.

Referenced by ReorderBufferProcessTXN().

◆ StartupReorderBuffer()

void StartupReorderBuffer ( void  )

Definition at line 4941 of file reorderbuffer.c.

4942{
4943 DIR *logical_dir;
4944 struct dirent *logical_de;
4945
4946 logical_dir = AllocateDir(PG_REPLSLOT_DIR);
4947 while ((logical_de = ReadDir(logical_dir, PG_REPLSLOT_DIR)) != NULL)
4948 {
4949 if (strcmp(logical_de->d_name, ".") == 0 ||
4950 strcmp(logical_de->d_name, "..") == 0)
4951 continue;
4952
4953 /* if it cannot be a slot, skip the directory */
4954 if (!ReplicationSlotValidateName(logical_de->d_name, true, DEBUG2))
4955 continue;
4956
4957 /*
4958 * ok, has to be a surviving logical slot, iterate and delete
4959 * everything starting with xid-*
4960 */
4962 }
4963 FreeDir(logical_dir);
4964}
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2953
bool ReplicationSlotValidateName(const char *name, bool allow_reserved_name, int elevel)
Definition: slot.c:266

References AllocateDir(), dirent::d_name, DEBUG2, FreeDir(), PG_REPLSLOT_DIR, ReadDir(), ReorderBufferCleanupSerializedTXNs(), and ReplicationSlotValidateName().

Referenced by StartupXLOG().

◆ TransactionIdInArray()

static bool TransactionIdInArray ( TransactionId  xid,
TransactionId xip,
Size  num 
)
static

Definition at line 5456 of file reorderbuffer.c.

5457{
5458 return bsearch(&xid, xip, num,
5459 sizeof(TransactionId), xidComparator) != NULL;
5460}

References xidComparator().

Referenced by UpdateLogicalMappings().

◆ UpdateLogicalMappings()

static void UpdateLogicalMappings ( HTAB tuplecid_data,
Oid  relid,
Snapshot  snapshot 
)
static

Definition at line 5479 of file reorderbuffer.c.

5480{
5481 DIR *mapping_dir;
5482 struct dirent *mapping_de;
5483 List *files = NIL;
5484 ListCell *file;
5485 Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
5486
5487 mapping_dir = AllocateDir(PG_LOGICAL_MAPPINGS_DIR);
5488 while ((mapping_de = ReadDir(mapping_dir, PG_LOGICAL_MAPPINGS_DIR)) != NULL)
5489 {
5490 Oid f_dboid;
5491 Oid f_relid;
5492 TransactionId f_mapped_xid;
5493 TransactionId f_create_xid;
5494 XLogRecPtr f_lsn;
5495 uint32 f_hi,
5496 f_lo;
5498
5499 if (strcmp(mapping_de->d_name, ".") == 0 ||
5500 strcmp(mapping_de->d_name, "..") == 0)
5501 continue;
5502
5503 /* Ignore files that aren't ours */
5504 if (strncmp(mapping_de->d_name, "map-", 4) != 0)
5505 continue;
5506
5507 if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
5508 &f_dboid, &f_relid, &f_hi, &f_lo,
5509 &f_mapped_xid, &f_create_xid) != 6)
5510 elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
5511
5512 f_lsn = ((uint64) f_hi) << 32 | f_lo;
5513
5514 /* mapping for another database */
5515 if (f_dboid != dboid)
5516 continue;
5517
5518 /* mapping for another relation */
5519 if (f_relid != relid)
5520 continue;
5521
5522 /* did the creating transaction abort? */
5523 if (!TransactionIdDidCommit(f_create_xid))
5524 continue;
5525
5526 /* not for our transaction */
5527 if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
5528 continue;
5529
5530 /* ok, relevant, queue for apply */
5532 f->lsn = f_lsn;
5533 strcpy(f->fname, mapping_de->d_name);
5534 files = lappend(files, f);
5535 }
5536 FreeDir(mapping_dir);
5537
5538 /* sort files so we apply them in LSN order */
5540
5541 foreach(file, files)
5542 {
5544
5545 elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
5546 snapshot->subxip[0]);
5548 pfree(f);
5549 }
5550}
uint64_t uint64
Definition: c.h:553
bool IsSharedRelation(Oid relationId)
Definition: catalog.c:304
#define DEBUG1
Definition: elog.h:30
#define palloc_object(type)
Definition: fe_memutils.h:74
Oid MyDatabaseId
Definition: globals.c:94
List * lappend(List *list, void *datum)
Definition: list.c:339
void list_sort(List *list, list_sort_comparator cmp)
Definition: list.c:1674
#define NIL
Definition: pg_list.h:68
static int file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
static void ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
static bool TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
#define LOGICAL_REWRITE_FORMAT
Definition: rewriteheap.h:54
Definition: pg_list.h:54
char fname[MAXPGPATH]

References AllocateDir(), ApplyLogicalMappingFile(), dirent::d_name, DEBUG1, elog, ERROR, file_sort_by_lsn(), RewriteMappingFile::fname, FreeDir(), InvalidOid, IsSharedRelation(), lappend(), lfirst, list_sort(), LOGICAL_REWRITE_FORMAT, RewriteMappingFile::lsn, MyDatabaseId, NIL, palloc_object, pfree(), PG_LOGICAL_MAPPINGS_DIR, ReadDir(), SnapshotData::subxcnt, SnapshotData::subxip, TransactionIdDidCommit(), TransactionIdInArray(), and tuplecid_data.

Referenced by ResolveCminCmaxDuringDecoding().

Variable Documentation

◆ debug_logical_replication_streaming

int debug_logical_replication_streaming = DEBUG_LOGICAL_REP_STREAMING_BUFFERED

◆ logical_decoding_work_mem

int logical_decoding_work_mem

Definition at line 225 of file reorderbuffer.c.

Referenced by ReorderBufferCheckMemoryLimit().

◆ max_changes_in_memory

const Size max_changes_in_memory = 4096
static

Definition at line 226 of file reorderbuffer.c.

Referenced by ReorderBufferRestoreChanges().