PostgreSQL Source Code git master
reorderbuffer.c File Reference
#include "postgres.h"
#include <unistd.h>
#include <sys/stat.h>
#include "access/detoast.h"
#include "access/heapam.h"
#include "access/rewriteheap.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
#include "common/int.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenumbermap.h"
Include dependency graph for reorderbuffer.c:

Go to the source code of this file.

Data Structures

struct  ReorderBufferTXNByIdEnt
 
struct  ReorderBufferTupleCidKey
 
struct  ReorderBufferTupleCidEnt
 
struct  TXNEntryFile
 
struct  ReorderBufferIterTXNEntry
 
struct  ReorderBufferIterTXNState
 
struct  ReorderBufferToastEnt
 
struct  ReorderBufferDiskChange
 
struct  RewriteMappingFile
 

Macros

#define IsSpecInsert(action)
 
#define IsSpecConfirmOrAbort(action)
 
#define IsInsertOrUpdate(action)
 
#define CHANGES_THRESHOLD   100
 

Typedefs

typedef struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
 
typedef struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
 
typedef struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
 
typedef struct TXNEntryFile TXNEntryFile
 
typedef struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
 
typedef struct ReorderBufferIterTXNState ReorderBufferIterTXNState
 
typedef struct ReorderBufferToastEnt ReorderBufferToastEnt
 
typedef struct ReorderBufferDiskChange ReorderBufferDiskChange
 
typedef struct RewriteMappingFile RewriteMappingFile
 

Functions

static ReorderBufferTXNReorderBufferGetTXN (ReorderBuffer *rb)
 
static void ReorderBufferReturnTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static ReorderBufferTXNReorderBufferTXNByXid (ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
 
static void ReorderBufferTransferSnapToParent (ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
 
static void AssertTXNLsnOrder (ReorderBuffer *rb)
 
static void ReorderBufferIterTXNInit (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
 
static ReorderBufferChangeReorderBufferIterTXNNext (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferIterTXNFinish (ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 
static void ReorderBufferExecuteInvalidations (uint32 nmsgs, SharedInvalidationMessage *msgs)
 
static void ReorderBufferCheckMemoryLimit (ReorderBuffer *rb)
 
static void ReorderBufferSerializeTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferSerializeChange (ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
 
static Size ReorderBufferRestoreChanges (ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)
 
static void ReorderBufferRestoreChange (ReorderBuffer *rb, ReorderBufferTXN *txn, char *data)
 
static void ReorderBufferRestoreCleanup (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferTruncateTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
 
static void ReorderBufferCleanupSerializedTXNs (const char *slotname)
 
static void ReorderBufferSerializedPath (char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
 
static int ReorderBufferTXNSizeCompare (const pairingheap_node *a, const pairingheap_node *b, void *arg)
 
static void ReorderBufferFreeSnap (ReorderBuffer *rb, Snapshot snap)
 
static Snapshot ReorderBufferCopySnap (ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
 
static bool ReorderBufferCanStream (ReorderBuffer *rb)
 
static bool ReorderBufferCanStartStreaming (ReorderBuffer *rb)
 
static void ReorderBufferStreamTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferStreamCommit (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastInitHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReset (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferToastReplace (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void ReorderBufferToastAppendChunk (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static Size ReorderBufferChangeSize (ReorderBufferChange *change)
 
static void ReorderBufferChangeMemoryUpdate (ReorderBuffer *rb, ReorderBufferChange *change, ReorderBufferTXN *txn, bool addition, Size sz)
 
ReorderBufferReorderBufferAllocate (void)
 
void ReorderBufferFree (ReorderBuffer *rb)
 
ReorderBufferChangeReorderBufferGetChange (ReorderBuffer *rb)
 
void ReorderBufferReturnChange (ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
 
HeapTuple ReorderBufferGetTupleBuf (ReorderBuffer *rb, Size tuple_len)
 
void ReorderBufferReturnTupleBuf (HeapTuple tuple)
 
OidReorderBufferGetRelids (ReorderBuffer *rb, int nrelids)
 
void ReorderBufferReturnRelids (ReorderBuffer *rb, Oid *relids)
 
static void ReorderBufferProcessPartialChange (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueChange (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
 
void ReorderBufferQueueMessage (ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message)
 
static void AssertChangeLsnOrder (ReorderBufferTXN *txn)
 
ReorderBufferTXNReorderBufferGetOldestTXN (ReorderBuffer *rb)
 
TransactionId ReorderBufferGetOldestXmin (ReorderBuffer *rb)
 
void ReorderBufferSetRestartPoint (ReorderBuffer *rb, XLogRecPtr ptr)
 
void ReorderBufferAssignChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)
 
void ReorderBufferCommitChild (ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn)
 
static int ReorderBufferIterCompare (Datum a, Datum b, void *arg)
 
static void ReorderBufferCleanupTXN (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void ReorderBufferBuildTupleCidHash (ReorderBuffer *rb, ReorderBufferTXN *txn)
 
static void SetupCheckXidLive (TransactionId xid)
 
static void ReorderBufferApplyChange (ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyTruncate (ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferApplyMessage (ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
 
static void ReorderBufferSaveTXNSnapshot (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
 
static void ReorderBufferResetTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
 
static void ReorderBufferProcessTXN (ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)
 
static void ReorderBufferReplay (ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferCommit (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
bool ReorderBufferRememberPrepareInfo (ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time, RepOriginId origin_id, XLogRecPtr origin_lsn)
 
void ReorderBufferSkipPrepare (ReorderBuffer *rb, TransactionId xid)
 
void ReorderBufferPrepare (ReorderBuffer *rb, TransactionId xid, char *gid)
 
void ReorderBufferFinishPrepared (ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, XLogRecPtr two_phase_at, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit)
 
void ReorderBufferAbort (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, TimestampTz abort_time)
 
void ReorderBufferAbortOld (ReorderBuffer *rb, TransactionId oldestRunningXid)
 
void ReorderBufferForget (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferInvalidate (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferImmediateInvalidation (ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
 
void ReorderBufferProcessXid (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
void ReorderBufferAddSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferSetBaseSnapshot (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Snapshot snap)
 
void ReorderBufferAddNewCommandId (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, CommandId cid)
 
void ReorderBufferAddNewTupleCids (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, RelFileLocator locator, ItemPointerData tid, CommandId cmin, CommandId cmax, CommandId combocid)
 
void ReorderBufferAddInvalidations (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs)
 
void ReorderBufferXidSetCatalogChanges (ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 
TransactionIdReorderBufferGetCatalogChangesXacts (ReorderBuffer *rb)
 
bool ReorderBufferXidHasCatalogChanges (ReorderBuffer *rb, TransactionId xid)
 
bool ReorderBufferXidHasBaseSnapshot (ReorderBuffer *rb, TransactionId xid)
 
static void ReorderBufferSerializeReserve (ReorderBuffer *rb, Size sz)
 
static ReorderBufferTXNReorderBufferLargestTXN (ReorderBuffer *rb)
 
static ReorderBufferTXNReorderBufferLargestStreamableTopTXN (ReorderBuffer *rb)
 
void StartupReorderBuffer (void)
 
static void ApplyLogicalMappingFile (HTAB *tuplecid_data, Oid relid, const char *fname)
 
static bool TransactionIdInArray (TransactionId xid, TransactionId *xip, Size num)
 
static int file_sort_by_lsn (const ListCell *a_p, const ListCell *b_p)
 
static void UpdateLogicalMappings (HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
 
bool ResolveCminCmaxDuringDecoding (HTAB *tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId *cmin, CommandId *cmax)
 

Variables

int logical_decoding_work_mem
 
static const Size max_changes_in_memory = 4096
 
int debug_logical_replication_streaming = DEBUG_LOGICAL_REP_STREAMING_BUFFERED
 

Macro Definition Documentation

◆ CHANGES_THRESHOLD

#define CHANGES_THRESHOLD   100

◆ IsInsertOrUpdate

#define IsInsertOrUpdate (   action)
Value:

Definition at line 193 of file reorderbuffer.c.

◆ IsSpecConfirmOrAbort

#define IsSpecConfirmOrAbort (   action)
Value:
( \
)
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
Definition: reorderbuffer.h:61
@ REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT
Definition: reorderbuffer.h:62

Definition at line 188 of file reorderbuffer.c.

◆ IsSpecInsert

#define IsSpecInsert (   action)
Value:

Definition at line 184 of file reorderbuffer.c.

Typedef Documentation

◆ ReorderBufferDiskChange

◆ ReorderBufferIterTXNEntry

◆ ReorderBufferIterTXNState

◆ ReorderBufferToastEnt

◆ ReorderBufferTupleCidEnt

◆ ReorderBufferTupleCidKey

◆ ReorderBufferTXNByIdEnt

◆ RewriteMappingFile

◆ TXNEntryFile

typedef struct TXNEntryFile TXNEntryFile

Function Documentation

◆ ApplyLogicalMappingFile()

static void ApplyLogicalMappingFile ( HTAB tuplecid_data,
Oid  relid,
const char *  fname 
)
static

Definition at line 5082 of file reorderbuffer.c.

5083{
5084 char path[MAXPGPATH];
5085 int fd;
5086 int readBytes;
5088
5089 sprintf(path, "%s/%s", PG_LOGICAL_MAPPINGS_DIR, fname);
5090 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
5091 if (fd < 0)
5092 ereport(ERROR,
5094 errmsg("could not open file \"%s\": %m", path)));
5095
5096 while (true)
5097 {
5100 ReorderBufferTupleCidEnt *new_ent;
5101 bool found;
5102
5103 /* be careful about padding */
5104 memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
5105
5106 /* read all mappings till the end of the file */
5107 pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ);
5108 readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
5110
5111 if (readBytes < 0)
5112 ereport(ERROR,
5114 errmsg("could not read file \"%s\": %m",
5115 path)));
5116 else if (readBytes == 0) /* EOF */
5117 break;
5118 else if (readBytes != sizeof(LogicalRewriteMappingData))
5119 ereport(ERROR,
5121 errmsg("could not read from file \"%s\": read %d instead of %d bytes",
5122 path, readBytes,
5123 (int32) sizeof(LogicalRewriteMappingData))));
5124
5125 key.rlocator = map.old_locator;
5127 &key.tid);
5128
5129
5130 ent = (ReorderBufferTupleCidEnt *)
5132
5133 /* no existing mapping, no need to update */
5134 if (!ent)
5135 continue;
5136
5137 key.rlocator = map.new_locator;
5139 &key.tid);
5140
5141 new_ent = (ReorderBufferTupleCidEnt *)
5143
5144 if (found)
5145 {
5146 /*
5147 * Make sure the existing mapping makes sense. We sometime update
5148 * old records that did not yet have a cmax (e.g. pg_class' own
5149 * entry while rewriting it) during rewrites, so allow that.
5150 */
5151 Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
5152 Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
5153 }
5154 else
5155 {
5156 /* update mapping */
5157 new_ent->cmin = ent->cmin;
5158 new_ent->cmax = ent->cmax;
5159 new_ent->combocid = ent->combocid;
5160 }
5161 }
5162
5163 if (CloseTransientFile(fd) != 0)
5164 ereport(ERROR,
5166 errmsg("could not close file \"%s\": %m", path)));
5167}
#define InvalidCommandId
Definition: c.h:623
#define Assert(condition)
Definition: c.h:812
#define PG_BINARY
Definition: c.h:1227
int32_t int32
Definition: c.h:481
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
int errcode_for_file_access(void)
Definition: elog.c:876
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
int CloseTransientFile(int fd)
Definition: fd.c:2831
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2655
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_ENTER
Definition: hsearch.h:114
#define read(a, b, c)
Definition: win32.h:13
static void ItemPointerCopy(const ItemPointerData *fromPointer, ItemPointerData *toPointer)
Definition: itemptr.h:172
#define MAXPGPATH
#define sprintf
Definition: port.h:240
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_LOGICAL_MAPPINGS_DIR
Definition: reorderbuffer.h:23
static HTAB * tuplecid_data
Definition: snapmgr.c:103
ItemPointerData new_tid
Definition: rewriteheap.h:40
RelFileLocator old_locator
Definition: rewriteheap.h:37
ItemPointerData old_tid
Definition: rewriteheap.h:39
RelFileLocator new_locator
Definition: rewriteheap.h:38
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:85
static void pgstat_report_wait_end(void)
Definition: wait_event.h:101

References Assert, CloseTransientFile(), ReorderBufferTupleCidEnt::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferTupleCidEnt::combocid, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), HASH_ENTER, HASH_FIND, hash_search(), InvalidCommandId, ItemPointerCopy(), sort-test::key, MAXPGPATH, LogicalRewriteMappingData::new_locator, LogicalRewriteMappingData::new_tid, LogicalRewriteMappingData::old_locator, LogicalRewriteMappingData::old_tid, OpenTransientFile(), PG_BINARY, PG_LOGICAL_MAPPINGS_DIR, pgstat_report_wait_end(), pgstat_report_wait_start(), read, sprintf, and tuplecid_data.

Referenced by UpdateLogicalMappings().

◆ AssertChangeLsnOrder()

static void AssertChangeLsnOrder ( ReorderBufferTXN txn)
static

Definition at line 991 of file reorderbuffer.c.

992{
993#ifdef USE_ASSERT_CHECKING
994 dlist_iter iter;
995 XLogRecPtr prev_lsn = txn->first_lsn;
996
997 dlist_foreach(iter, &txn->changes)
998 {
999 ReorderBufferChange *cur_change;
1000
1001 cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
1002
1004 Assert(cur_change->lsn != InvalidXLogRecPtr);
1005 Assert(txn->first_lsn <= cur_change->lsn);
1006
1007 if (txn->end_lsn != InvalidXLogRecPtr)
1008 Assert(cur_change->lsn <= txn->end_lsn);
1009
1010 Assert(prev_lsn <= cur_change->lsn);
1011
1012 prev_lsn = cur_change->lsn;
1013 }
1014#endif
1015}
#define dlist_foreach(iter, lhead)
Definition: ilist.h:623
#define dlist_container(type, membername, ptr)
Definition: ilist.h:593
XLogRecPtr first_lsn
XLogRecPtr end_lsn
dlist_head changes
dlist_node * cur
Definition: ilist.h:179
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert, ReorderBufferTXN::changes, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, and ReorderBufferChange::lsn.

Referenced by ReorderBufferIterTXNInit().

◆ AssertTXNLsnOrder()

static void AssertTXNLsnOrder ( ReorderBuffer rb)
static

Definition at line 920 of file reorderbuffer.c.

921{
922#ifdef USE_ASSERT_CHECKING
924 dlist_iter iter;
925 XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
926 XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
927
928 /*
929 * Skip the verification if we don't reach the LSN at which we start
930 * decoding the contents of transactions yet because until we reach the
931 * LSN, we could have transactions that don't have the association between
932 * the top-level transaction and subtransaction yet and consequently have
933 * the same LSN. We don't guarantee this association until we try to
934 * decode the actual contents of transaction. The ordering of the records
935 * prior to the start_decoding_at LSN should have been checked before the
936 * restart.
937 */
939 return;
940
942 {
944 iter.cur);
945
946 /* start LSN must be set */
948
949 /* If there is an end LSN, it must be higher than start LSN */
950 if (cur_txn->end_lsn != InvalidXLogRecPtr)
951 Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
952
953 /* Current initial LSN must be strictly higher than previous */
954 if (prev_first_lsn != InvalidXLogRecPtr)
955 Assert(prev_first_lsn < cur_txn->first_lsn);
956
957 /* known-as-subtxn txns must not be listed */
959
960 prev_first_lsn = cur_txn->first_lsn;
961 }
962
964 {
966 base_snapshot_node,
967 iter.cur);
968
969 /* base snapshot (and its LSN) must be set */
970 Assert(cur_txn->base_snapshot != NULL);
972
973 /* current LSN must be strictly higher than previous */
974 if (prev_base_snap_lsn != InvalidXLogRecPtr)
975 Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
976
977 /* known-as-subtxn txns must not be listed */
979
980 prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
981 }
982#endif
983}
#define rbtxn_is_known_subxact(txn)
bool SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
Definition: snapbuild.c:304
XLogReaderState * reader
Definition: logical.h:42
struct SnapBuild * snapshot_builder
Definition: logical.h:44
XLogRecPtr base_snapshot_lsn
Snapshot base_snapshot
dlist_head txns_by_base_snapshot_lsn
dlist_head toplevel_by_lsn
void * private_data
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, dlist_iter::cur, dlist_container, dlist_foreach, ReorderBufferTXN::end_lsn, XLogReaderState::EndRecPtr, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, ReorderBuffer::private_data, rbtxn_is_known_subxact, LogicalDecodingContext::reader, SnapBuildXactNeedsSkip(), LogicalDecodingContext::snapshot_builder, ReorderBuffer::toplevel_by_lsn, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferAssignChild(), ReorderBufferGetOldestTXN(), ReorderBufferGetOldestXmin(), ReorderBufferSetBaseSnapshot(), and ReorderBufferTXNByXid().

◆ file_sort_by_lsn()

static int file_sort_by_lsn ( const ListCell a_p,
const ListCell b_p 
)
static

Definition at line 5184 of file reorderbuffer.c.

5185{
5188
5189 return pg_cmp_u64(a->lsn, b->lsn);
5190}
static int pg_cmp_u64(uint64 a, uint64 b)
Definition: int.h:664
int b
Definition: isn.c:69
int a
Definition: isn.c:68
#define lfirst(lc)
Definition: pg_list.h:172

References a, b, lfirst, and pg_cmp_u64().

Referenced by UpdateLogicalMappings().

◆ ReorderBufferAbort()

void ReorderBufferAbort ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
TimestampTz  abort_time 
)

Definition at line 2931 of file reorderbuffer.c.

2933{
2934 ReorderBufferTXN *txn;
2935
2936 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2937 false);
2938
2939 /* unknown, nothing to remove */
2940 if (txn == NULL)
2941 return;
2942
2943 txn->xact_time.abort_time = abort_time;
2944
2945 /* For streamed transactions notify the remote node about the abort. */
2946 if (rbtxn_is_streamed(txn))
2947 {
2948 rb->stream_abort(rb, txn, lsn);
2949
2950 /*
2951 * We might have decoded changes for this transaction that could load
2952 * the cache as per the current transaction's view (consider DDL's
2953 * happened in this transaction). We don't want the decoding of future
2954 * transactions to use those cache entries so execute invalidations.
2955 */
2956 if (txn->ninvalidations > 0)
2958 txn->invalidations);
2959 }
2960
2961 /* cosmetic... */
2962 txn->final_lsn = lsn;
2963
2964 /* remove potential on-disk data, and deallocate */
2965 ReorderBufferCleanupTXN(rb, txn);
2966}
static void ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, SharedInvalidationMessage *invalidations)
static ReorderBufferTXN * ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, bool *is_new, XLogRecPtr lsn, bool create_as_top)
#define rbtxn_is_streamed(txn)
SharedInvalidationMessage * invalidations
TimestampTz abort_time
XLogRecPtr final_lsn
union ReorderBufferTXN::@115 xact_time
ReorderBufferStreamAbortCB stream_abort

References ReorderBufferTXN::abort_time, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), ReorderBufferTXNByXid(), ReorderBuffer::stream_abort, and ReorderBufferTXN::xact_time.

Referenced by DecodeAbort().

◆ ReorderBufferAbortOld()

void ReorderBufferAbortOld ( ReorderBuffer rb,
TransactionId  oldestRunningXid 
)

Definition at line 2976 of file reorderbuffer.c.

2977{
2979
2980 /*
2981 * Iterate through all (potential) toplevel TXNs and abort all that are
2982 * older than what possibly can be running. Once we've found the first
2983 * that is alive we stop, there might be some that acquired an xid earlier
2984 * but started writing later, but it's unlikely and they will be cleaned
2985 * up in a later call to this function.
2986 */
2988 {
2989 ReorderBufferTXN *txn;
2990
2991 txn = dlist_container(ReorderBufferTXN, node, it.cur);
2992
2993 if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
2994 {
2995 elog(DEBUG2, "aborting old transaction %u", txn->xid);
2996
2997 /* Notify the remote node about the crash/immediate restart. */
2998 if (rbtxn_is_streamed(txn))
2999 rb->stream_abort(rb, txn, InvalidXLogRecPtr);
3000
3001 /* remove potential on-disk data, and deallocate this tx */
3002 ReorderBufferCleanupTXN(rb, txn);
3003 }
3004 else
3005 return;
3006 }
3007}
#define DEBUG2
Definition: elog.h:29
#define elog(elevel,...)
Definition: elog.h:225
#define dlist_foreach_modify(iter, lhead)
Definition: ilist.h:640
TransactionId xid
dlist_node * cur
Definition: ilist.h:200
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280

References dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_foreach_modify, elog, InvalidXLogRecPtr, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBuffer::stream_abort, ReorderBuffer::toplevel_by_lsn, TransactionIdPrecedes(), and ReorderBufferTXN::xid.

Referenced by standby_decode().

◆ ReorderBufferAddInvalidations()

void ReorderBufferAddInvalidations ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Size  nmsgs,
SharedInvalidationMessage msgs 
)

Definition at line 3329 of file reorderbuffer.c.

3332{
3333 ReorderBufferTXN *txn;
3334 MemoryContext oldcontext;
3335 ReorderBufferChange *change;
3336
3337 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3338
3339 oldcontext = MemoryContextSwitchTo(rb->context);
3340
3341 /*
3342 * Collect all the invalidations under the top transaction, if available,
3343 * so that we can execute them all together. See comments atop this
3344 * function.
3345 */
3346 txn = rbtxn_get_toptxn(txn);
3347
3348 Assert(nmsgs > 0);
3349
3350 /* Accumulate invalidations. */
3351 if (txn->ninvalidations == 0)
3352 {
3353 txn->ninvalidations = nmsgs;
3355 palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3356 memcpy(txn->invalidations, msgs,
3357 sizeof(SharedInvalidationMessage) * nmsgs);
3358 }
3359 else
3360 {
3363 (txn->ninvalidations + nmsgs));
3364
3365 memcpy(txn->invalidations + txn->ninvalidations, msgs,
3366 nmsgs * sizeof(SharedInvalidationMessage));
3367 txn->ninvalidations += nmsgs;
3368 }
3369
3370 change = ReorderBufferGetChange(rb);
3372 change->data.inval.ninvalidations = nmsgs;
3374 palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3375 memcpy(change->data.inval.invalidations, msgs,
3376 sizeof(SharedInvalidationMessage) * nmsgs);
3377
3378 ReorderBufferQueueChange(rb, xid, lsn, change, false);
3379
3380 MemoryContextSwitchTo(oldcontext);
3381}
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:1541
void * palloc(Size size)
Definition: mcxt.c:1317
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
ReorderBufferChange * ReorderBufferGetChange(ReorderBuffer *rb)
void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, ReorderBufferChange *change, bool toast_insert)
#define rbtxn_get_toptxn(txn)
@ REORDER_BUFFER_CHANGE_INVALIDATION
Definition: reorderbuffer.h:56
struct ReorderBufferChange::@109::@114 inval
ReorderBufferChangeType action
Definition: reorderbuffer.h:81
SharedInvalidationMessage * invalidations
union ReorderBufferChange::@109 data
MemoryContext context

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::invalidations, ReorderBufferTXN::invalidations, MemoryContextSwitchTo(), ReorderBufferChange::ninvalidations, ReorderBufferTXN::ninvalidations, palloc(), rbtxn_get_toptxn, REORDER_BUFFER_CHANGE_INVALIDATION, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), and repalloc().

Referenced by xact_decode().

◆ ReorderBufferAddNewCommandId()

void ReorderBufferAddNewCommandId ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
CommandId  cid 
)

◆ ReorderBufferAddNewTupleCids()

void ReorderBufferAddNewTupleCids ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
RelFileLocator  locator,
ItemPointerData  tid,
CommandId  cmin,
CommandId  cmax,
CommandId  combocid 
)

Definition at line 3292 of file reorderbuffer.c.

3296{
3298 ReorderBufferTXN *txn;
3299
3300 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3301
3302 change->data.tuplecid.locator = locator;
3303 change->data.tuplecid.tid = tid;
3304 change->data.tuplecid.cmin = cmin;
3305 change->data.tuplecid.cmax = cmax;
3306 change->data.tuplecid.combocid = combocid;
3307 change->lsn = lsn;
3308 change->txn = txn;
3310
3311 dlist_push_tail(&txn->tuplecids, &change->node);
3312 txn->ntuplecids++;
3313}
static void dlist_push_tail(dlist_head *head, dlist_node *node)
Definition: ilist.h:364
@ REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID
Definition: reorderbuffer.h:59
ItemPointerData tid
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:84
RelFileLocator locator
struct ReorderBufferChange::@109::@113 tuplecid
dlist_head tuplecids

References ReorderBufferChange::action, ReorderBufferChange::cmax, ReorderBufferChange::cmin, ReorderBufferChange::combocid, ReorderBufferChange::data, dlist_push_tail(), ReorderBufferChange::locator, ReorderBufferChange::lsn, ReorderBufferChange::node, ReorderBufferTXN::ntuplecids, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferGetChange(), ReorderBufferTXNByXid(), ReorderBufferChange::tid, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecids, and ReorderBufferChange::txn.

Referenced by SnapBuildProcessNewCid().

◆ ReorderBufferAddSnapshot()

void ReorderBufferAddSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

◆ ReorderBufferAllocate()

ReorderBuffer * ReorderBufferAllocate ( void  )

Definition at line 309 of file reorderbuffer.c.

310{
311 ReorderBuffer *buffer;
312 HASHCTL hash_ctl;
313 MemoryContext new_ctx;
314
315 Assert(MyReplicationSlot != NULL);
316
317 /* allocate memory in own context, to have better accountability */
319 "ReorderBuffer",
321
322 buffer =
323 (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
324
325 memset(&hash_ctl, 0, sizeof(hash_ctl));
326
327 buffer->context = new_ctx;
328
329 buffer->change_context = SlabContextCreate(new_ctx,
330 "Change",
332 sizeof(ReorderBufferChange));
333
334 buffer->txn_context = SlabContextCreate(new_ctx,
335 "TXN",
337 sizeof(ReorderBufferTXN));
338
339 /*
340 * To minimize memory fragmentation caused by long-running transactions
341 * with changes spanning multiple memory blocks, we use a single
342 * fixed-size memory block for decoded tuple storage. The performance
343 * testing showed that the default memory block size maintains logical
344 * decoding performance without causing fragmentation due to concurrent
345 * transactions. One might think that we can use the max size as
346 * SLAB_LARGE_BLOCK_SIZE but the test also showed it doesn't help resolve
347 * the memory fragmentation.
348 */
349 buffer->tup_context = GenerationContextCreate(new_ctx,
350 "Tuples",
354
355 hash_ctl.keysize = sizeof(TransactionId);
356 hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
357 hash_ctl.hcxt = buffer->context;
358
359 buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
361
363 buffer->by_txn_last_txn = NULL;
364
365 buffer->outbuf = NULL;
366 buffer->outbufsize = 0;
367 buffer->size = 0;
368
369 /* txn_heap is ordered by transaction size */
371
372 buffer->spillTxns = 0;
373 buffer->spillCount = 0;
374 buffer->spillBytes = 0;
375 buffer->streamTxns = 0;
376 buffer->streamCount = 0;
377 buffer->streamBytes = 0;
378 buffer->totalTxns = 0;
379 buffer->totalBytes = 0;
380
382
383 dlist_init(&buffer->toplevel_by_lsn);
385 dclist_init(&buffer->catchange_txns);
386
387 /*
388 * Ensure there's no stale data from prior uses of this slot, in case some
389 * prior exit avoided calling ReorderBufferFree. Failure to do this can
390 * produce duplicated txns, and it's very cheap if there's nothing there.
391 */
393
394 return buffer;
395}
#define NameStr(name)
Definition: c.h:700
uint32 TransactionId
Definition: c.h:606
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
MemoryContext GenerationContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: generation.c:160
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
static void dclist_init(dclist_head *head)
Definition: ilist.h:671
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1181
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define SLAB_DEFAULT_BLOCK_SIZE
Definition: memutils.h:189
pairingheap * pairingheap_allocate(pairingheap_comparator compare, void *arg)
Definition: pairingheap.c:42
static int ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg)
struct ReorderBufferTXNByIdEnt ReorderBufferTXNByIdEnt
static void ReorderBufferCleanupSerializedTXNs(const char *slotname)
MemoryContext SlabContextCreate(MemoryContext parent, const char *name, Size blockSize, Size chunkSize)
Definition: slab.c:322
ReplicationSlot * MyReplicationSlot
Definition: slot.c:138
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
MemoryContext hcxt
Definition: hsearch.h:86
dclist_head catchange_txns
MemoryContext change_context
ReorderBufferTXN * by_txn_last_txn
TransactionId by_txn_last_xid
MemoryContext tup_context
pairingheap * txn_heap
MemoryContext txn_context
XLogRecPtr current_restart_decoding_lsn
ReplicationSlotPersistentData data
Definition: slot.h:181
#define InvalidTransactionId
Definition: transam.h:31

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, ReorderBuffer::by_txn, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBuffer::catchange_txns, ReorderBuffer::change_context, ReorderBuffer::context, ReorderBuffer::current_restart_decoding_lsn, CurrentMemoryContext, ReplicationSlot::data, dclist_init(), dlist_init(), HASHCTL::entrysize, GenerationContextCreate(), HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, InvalidTransactionId, InvalidXLogRecPtr, HASHCTL::keysize, MemoryContextAlloc(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, ReorderBuffer::outbuf, ReorderBuffer::outbufsize, pairingheap_allocate(), ReorderBufferCleanupSerializedTXNs(), ReorderBufferTXNSizeCompare(), ReorderBuffer::size, SLAB_DEFAULT_BLOCK_SIZE, SlabContextCreate(), ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBuffer::streamBytes, ReorderBuffer::streamCount, ReorderBuffer::streamTxns, ReorderBuffer::toplevel_by_lsn, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBuffer::tup_context, ReorderBuffer::txn_context, ReorderBuffer::txn_heap, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by StartupDecodingContext().

◆ ReorderBufferApplyChange()

static void ReorderBufferApplyChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 1994 of file reorderbuffer.c.

1997{
1998 if (streaming)
1999 rb->stream_change(rb, txn, relation, change);
2000 else
2001 rb->apply_change(rb, txn, relation, change);
2002}
ReorderBufferStreamChangeCB stream_change
ReorderBufferApplyChangeCB apply_change

References ReorderBuffer::apply_change, and ReorderBuffer::stream_change.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferApplyMessage()

static void ReorderBufferApplyMessage ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 2022 of file reorderbuffer.c.

2024{
2025 if (streaming)
2026 rb->stream_message(rb, txn, change->lsn, true,
2027 change->data.msg.prefix,
2028 change->data.msg.message_size,
2029 change->data.msg.message);
2030 else
2031 rb->message(rb, txn, change->lsn, true,
2032 change->data.msg.prefix,
2033 change->data.msg.message_size,
2034 change->data.msg.message);
2035}
struct ReorderBufferChange::@109::@112 msg
ReorderBufferStreamMessageCB stream_message
ReorderBufferMessageCB message

References ReorderBufferChange::data, ReorderBufferChange::lsn, ReorderBufferChange::message, ReorderBuffer::message, ReorderBufferChange::message_size, ReorderBufferChange::msg, ReorderBufferChange::prefix, and ReorderBuffer::stream_message.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferApplyTruncate()

static void ReorderBufferApplyTruncate ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  nrelations,
Relation relations,
ReorderBufferChange change,
bool  streaming 
)
inlinestatic

Definition at line 2008 of file reorderbuffer.c.

2011{
2012 if (streaming)
2013 rb->stream_truncate(rb, txn, nrelations, relations, change);
2014 else
2015 rb->apply_truncate(rb, txn, nrelations, relations, change);
2016}
ReorderBufferStreamTruncateCB stream_truncate
ReorderBufferApplyTruncateCB apply_truncate

References ReorderBuffer::apply_truncate, and ReorderBuffer::stream_truncate.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferAssignChild()

void ReorderBufferAssignChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  lsn 
)

Definition at line 1077 of file reorderbuffer.c.

1079{
1080 ReorderBufferTXN *txn;
1081 ReorderBufferTXN *subtxn;
1082 bool new_top;
1083 bool new_sub;
1084
1085 txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1086 subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1087
1088 if (!new_sub)
1089 {
1090 if (rbtxn_is_known_subxact(subtxn))
1091 {
1092 /* already associated, nothing to do */
1093 return;
1094 }
1095 else
1096 {
1097 /*
1098 * We already saw this transaction, but initially added it to the
1099 * list of top-level txns. Now that we know it's not top-level,
1100 * remove it from there.
1101 */
1102 dlist_delete(&subtxn->node);
1103 }
1104 }
1105
1106 subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1107 subtxn->toplevel_xid = xid;
1108 Assert(subtxn->nsubtxns == 0);
1109
1110 /* set the reference to top-level transaction */
1111 subtxn->toptxn = txn;
1112
1113 /* add to subtransaction list */
1114 dlist_push_tail(&txn->subtxns, &subtxn->node);
1115 txn->nsubtxns++;
1116
1117 /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1119
1120 /* Verify LSN-ordering invariant */
1122}
static void dlist_delete(dlist_node *node)
Definition: ilist.h:405
static void AssertTXNLsnOrder(ReorderBuffer *rb)
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
#define RBTXN_IS_SUBXACT
TransactionId toplevel_xid
struct ReorderBufferTXN * toptxn
dlist_head subtxns

References Assert, AssertTXNLsnOrder(), dlist_delete(), dlist_push_tail(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_is_known_subxact, RBTXN_IS_SUBXACT, ReorderBufferTransferSnapToParent(), ReorderBufferTXNByXid(), ReorderBufferTXN::subtxns, ReorderBufferTXN::toplevel_xid, ReorderBufferTXN::toptxn, and ReorderBufferTXN::txn_flags.

Referenced by LogicalDecodingProcessRecord(), and ReorderBufferCommitChild().

◆ ReorderBufferBuildTupleCidHash()

static void ReorderBufferBuildTupleCidHash ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1760 of file reorderbuffer.c.

1761{
1762 dlist_iter iter;
1763 HASHCTL hash_ctl;
1764
1766 return;
1767
1768 hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1769 hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1770 hash_ctl.hcxt = rb->context;
1771
1772 /*
1773 * create the hash with the exact number of to-be-stored tuplecids from
1774 * the start
1775 */
1776 txn->tuplecid_hash =
1777 hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1779
1780 dlist_foreach(iter, &txn->tuplecids)
1781 {
1784 bool found;
1785 ReorderBufferChange *change;
1786
1787 change = dlist_container(ReorderBufferChange, node, iter.cur);
1788
1790
1791 /* be careful about padding */
1792 memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1793
1794 key.rlocator = change->data.tuplecid.locator;
1795
1797 &key.tid);
1798
1799 ent = (ReorderBufferTupleCidEnt *)
1800 hash_search(txn->tuplecid_hash, &key, HASH_ENTER, &found);
1801 if (!found)
1802 {
1803 ent->cmin = change->data.tuplecid.cmin;
1804 ent->cmax = change->data.tuplecid.cmax;
1805 ent->combocid = change->data.tuplecid.combocid;
1806 }
1807 else
1808 {
1809 /*
1810 * Maybe we already saw this tuple before in this transaction, but
1811 * if so it must have the same cmin.
1812 */
1813 Assert(ent->cmin == change->data.tuplecid.cmin);
1814
1815 /*
1816 * cmax may be initially invalid, but once set it can only grow,
1817 * and never become invalid again.
1818 */
1819 Assert((ent->cmax == InvalidCommandId) ||
1820 ((change->data.tuplecid.cmax != InvalidCommandId) &&
1821 (change->data.tuplecid.cmax > ent->cmax)));
1822 ent->cmax = change->data.tuplecid.cmax;
1823 }
1824 }
1825}
static bool dlist_is_empty(const dlist_head *head)
Definition: ilist.h:336
struct ReorderBufferTupleCidEnt ReorderBufferTupleCidEnt
struct ReorderBufferTupleCidKey ReorderBufferTupleCidKey
#define rbtxn_has_catalog_changes(txn)

References ReorderBufferChange::action, Assert, ReorderBufferTupleCidEnt::cmax, ReorderBufferChange::cmax, ReorderBufferTupleCidEnt::cmin, ReorderBufferChange::cmin, ReorderBufferTupleCidEnt::combocid, ReorderBufferChange::combocid, ReorderBuffer::context, dlist_iter::cur, ReorderBufferChange::data, dlist_container, dlist_foreach, dlist_is_empty(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), HASHCTL::hcxt, InvalidCommandId, ItemPointerCopy(), sort-test::key, HASHCTL::keysize, ReorderBufferChange::locator, ReorderBufferTXN::ntuplecids, rbtxn_has_catalog_changes, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferChange::tid, ReorderBufferChange::tuplecid, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::tuplecids.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferCanStartStreaming()

static bool ReorderBufferCanStartStreaming ( ReorderBuffer rb)
inlinestatic

Definition at line 4029 of file reorderbuffer.c.

4030{
4032 SnapBuild *builder = ctx->snapshot_builder;
4033
4034 /* We can't start streaming unless a consistent state is reached. */
4036 return false;
4037
4038 /*
4039 * We can't start streaming immediately even if the streaming is enabled
4040 * because we previously decoded this transaction and now just are
4041 * restarting.
4042 */
4043 if (ReorderBufferCanStream(rb) &&
4044 !SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr))
4045 return true;
4046
4047 return false;
4048}
static bool ReorderBufferCanStream(ReorderBuffer *rb)
SnapBuildState SnapBuildCurrentState(SnapBuild *builder)
Definition: snapbuild.c:277
@ SNAPBUILD_CONSISTENT
Definition: snapbuild.h:50
XLogRecPtr ReadRecPtr
Definition: xlogreader.h:206

References ReorderBuffer::private_data, LogicalDecodingContext::reader, XLogReaderState::ReadRecPtr, ReorderBufferCanStream(), SNAPBUILD_CONSISTENT, SnapBuildCurrentState(), SnapBuildXactNeedsSkip(), and LogicalDecodingContext::snapshot_builder.

Referenced by ReorderBufferCheckMemoryLimit(), and ReorderBufferProcessPartialChange().

◆ ReorderBufferCanStream()

static bool ReorderBufferCanStream ( ReorderBuffer rb)
inlinestatic

◆ ReorderBufferChangeMemoryUpdate()

static void ReorderBufferChangeMemoryUpdate ( ReorderBuffer rb,
ReorderBufferChange change,
ReorderBufferTXN txn,
bool  addition,
Size  sz 
)
static

Definition at line 3221 of file reorderbuffer.c.

3225{
3226 ReorderBufferTXN *toptxn;
3227
3228 Assert(txn || change);
3229
3230 /*
3231 * Ignore tuple CID changes, because those are not evicted when reaching
3232 * memory limit. So we just don't count them, because it might easily
3233 * trigger a pointless attempt to spill.
3234 */
3235 if (change && change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
3236 return;
3237
3238 if (sz == 0)
3239 return;
3240
3241 if (txn == NULL)
3242 txn = change->txn;
3243 Assert(txn != NULL);
3244
3245 /*
3246 * Update the total size in top level as well. This is later used to
3247 * compute the decoding stats.
3248 */
3249 toptxn = rbtxn_get_toptxn(txn);
3250
3251 if (addition)
3252 {
3253 Size oldsize = txn->size;
3254
3255 txn->size += sz;
3256 rb->size += sz;
3257
3258 /* Update the total size in the top transaction. */
3259 toptxn->total_size += sz;
3260
3261 /* Update the max-heap */
3262 if (oldsize != 0)
3264 pairingheap_add(rb->txn_heap, &txn->txn_node);
3265 }
3266 else
3267 {
3268 Assert((rb->size >= sz) && (txn->size >= sz));
3269 txn->size -= sz;
3270 rb->size -= sz;
3271
3272 /* Update the total size in the top transaction. */
3273 toptxn->total_size -= sz;
3274
3275 /* Update the max-heap */
3277 if (txn->size != 0)
3278 pairingheap_add(rb->txn_heap, &txn->txn_node);
3279 }
3280
3281 Assert(txn->size <= rb->size);
3282}
size_t Size
Definition: c.h:559
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:170
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:112
pairingheap_node txn_node

References ReorderBufferChange::action, Assert, pairingheap_add(), pairingheap_remove(), rbtxn_get_toptxn, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferTXN::size, ReorderBuffer::size, ReorderBufferTXN::total_size, ReorderBufferChange::txn, ReorderBuffer::txn_heap, and ReorderBufferTXN::txn_node.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferReturnChange(), ReorderBufferSerializeTXN(), ReorderBufferToastReplace(), and ReorderBufferTruncateTXN().

◆ ReorderBufferChangeSize()

static Size ReorderBufferChangeSize ( ReorderBufferChange change)
static

Definition at line 4172 of file reorderbuffer.c.

4173{
4174 Size sz = sizeof(ReorderBufferChange);
4175
4176 switch (change->action)
4177 {
4178 /* fall through these, they're all similar enough */
4183 {
4184 HeapTuple oldtup,
4185 newtup;
4186 Size oldlen = 0;
4187 Size newlen = 0;
4188
4189 oldtup = change->data.tp.oldtuple;
4190 newtup = change->data.tp.newtuple;
4191
4192 if (oldtup)
4193 {
4194 sz += sizeof(HeapTupleData);
4195 oldlen = oldtup->t_len;
4196 sz += oldlen;
4197 }
4198
4199 if (newtup)
4200 {
4201 sz += sizeof(HeapTupleData);
4202 newlen = newtup->t_len;
4203 sz += newlen;
4204 }
4205
4206 break;
4207 }
4209 {
4210 Size prefix_size = strlen(change->data.msg.prefix) + 1;
4211
4212 sz += prefix_size + change->data.msg.message_size +
4213 sizeof(Size) + sizeof(Size);
4214
4215 break;
4216 }
4218 {
4219 sz += sizeof(SharedInvalidationMessage) *
4220 change->data.inval.ninvalidations;
4221 break;
4222 }
4224 {
4225 Snapshot snap;
4226
4227 snap = change->data.snapshot;
4228
4229 sz += sizeof(SnapshotData) +
4230 sizeof(TransactionId) * snap->xcnt +
4231 sizeof(TransactionId) * snap->subxcnt;
4232
4233 break;
4234 }
4236 {
4237 sz += sizeof(Oid) * change->data.truncate.nrelids;
4238
4239 break;
4240 }
4245 /* ReorderBufferChange contains everything important */
4246 break;
4247 }
4248
4249 return sz;
4250}
struct HeapTupleData HeapTupleData
unsigned int Oid
Definition: postgres_ext.h:31
struct ReorderBufferChange ReorderBufferChange
@ REORDER_BUFFER_CHANGE_MESSAGE
Definition: reorderbuffer.h:55
@ REORDER_BUFFER_CHANGE_TRUNCATE
Definition: reorderbuffer.h:63
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:54
struct SnapshotData SnapshotData
uint32 t_len
Definition: htup.h:64
struct ReorderBufferChange::@109::@111 truncate
struct ReorderBufferChange::@109::@110 tp
int32 subxcnt
Definition: snapshot.h:177
uint32 xcnt
Definition: snapshot.h:165

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::message_size, ReorderBufferChange::msg, ReorderBufferChange::newtuple, ReorderBufferChange::ninvalidations, ReorderBufferChange::nrelids, ReorderBufferChange::oldtuple, ReorderBufferChange::prefix, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::snapshot, SnapshotData::subxcnt, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, and SnapshotData::xcnt.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferQueueChange(), ReorderBufferRestoreChange(), ReorderBufferReturnChange(), ReorderBufferToastReplace(), and ReorderBufferTruncateTXN().

◆ ReorderBufferCheckMemoryLimit()

static void ReorderBufferCheckMemoryLimit ( ReorderBuffer rb)
static

Definition at line 3637 of file reorderbuffer.c.

3638{
3639 ReorderBufferTXN *txn;
3640
3641 /*
3642 * Bail out if debug_logical_replication_streaming is buffered and we
3643 * haven't exceeded the memory limit.
3644 */
3646 rb->size < logical_decoding_work_mem * 1024L)
3647 return;
3648
3649 /*
3650 * If debug_logical_replication_streaming is immediate, loop until there's
3651 * no change. Otherwise, loop until we reach under the memory limit. One
3652 * might think that just by evicting the largest (sub)transaction we will
3653 * come under the memory limit based on assumption that the selected
3654 * transaction is at least as large as the most recent change (which
3655 * caused us to go over the memory limit). However, that is not true
3656 * because a user can reduce the logical_decoding_work_mem to a smaller
3657 * value before the most recent change.
3658 */
3659 while (rb->size >= logical_decoding_work_mem * 1024L ||
3661 rb->size > 0))
3662 {
3663 /*
3664 * Pick the largest transaction and evict it from memory by streaming,
3665 * if possible. Otherwise, spill to disk.
3666 */
3668 (txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
3669 {
3670 /* we know there has to be one, because the size is not zero */
3671 Assert(txn && rbtxn_is_toptxn(txn));
3672 Assert(txn->total_size > 0);
3673 Assert(rb->size >= txn->total_size);
3674
3675 ReorderBufferStreamTXN(rb, txn);
3676 }
3677 else
3678 {
3679 /*
3680 * Pick the largest transaction (or subtransaction) and evict it
3681 * from memory by serializing it to disk.
3682 */
3683 txn = ReorderBufferLargestTXN(rb);
3684
3685 /* we know there has to be one, because the size is not zero */
3686 Assert(txn);
3687 Assert(txn->size > 0);
3688 Assert(rb->size >= txn->size);
3689
3691 }
3692
3693 /*
3694 * After eviction, the transaction should have no entries in memory,
3695 * and should use 0 bytes for changes.
3696 */
3697 Assert(txn->size == 0);
3698 Assert(txn->nentries_mem == 0);
3699 }
3700
3701 /* We must be under the memory limit now. */
3702 Assert(rb->size < logical_decoding_work_mem * 1024L);
3703
3704}
static ReorderBufferTXN * ReorderBufferLargestTXN(ReorderBuffer *rb)
static bool ReorderBufferCanStartStreaming(ReorderBuffer *rb)
int logical_decoding_work_mem
int debug_logical_replication_streaming
static ReorderBufferTXN * ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
@ DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE
Definition: reorderbuffer.h:34
@ DEBUG_LOGICAL_REP_STREAMING_BUFFERED
Definition: reorderbuffer.h:33
#define rbtxn_is_toptxn(txn)

References Assert, DEBUG_LOGICAL_REP_STREAMING_BUFFERED, DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE, debug_logical_replication_streaming, logical_decoding_work_mem, ReorderBufferTXN::nentries_mem, rbtxn_is_toptxn, ReorderBufferCanStartStreaming(), ReorderBufferLargestStreamableTopTXN(), ReorderBufferLargestTXN(), ReorderBufferSerializeTXN(), ReorderBufferStreamTXN(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBufferTXN::total_size.

Referenced by ReorderBufferQueueChange().

◆ ReorderBufferCleanupSerializedTXNs()

static void ReorderBufferCleanupSerializedTXNs ( const char *  slotname)
static

Definition at line 4598 of file reorderbuffer.c.

4599{
4600 DIR *spill_dir;
4601 struct dirent *spill_de;
4602 struct stat statbuf;
4603 char path[MAXPGPATH * 2 + sizeof(PG_REPLSLOT_DIR)];
4604
4605 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, slotname);
4606
4607 /* we're only handling directories here, skip if it's not ours */
4608 if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
4609 return;
4610
4611 spill_dir = AllocateDir(path);
4612 while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
4613 {
4614 /* only look at names that can be ours */
4615 if (strncmp(spill_de->d_name, "xid", 3) == 0)
4616 {
4617 snprintf(path, sizeof(path),
4618 "%s/%s/%s", PG_REPLSLOT_DIR, slotname,
4619 spill_de->d_name);
4620
4621 if (unlink(path) != 0)
4622 ereport(ERROR,
4624 errmsg("could not remove file \"%s\" during removal of %s/%s/xid*: %m",
4625 path, PG_REPLSLOT_DIR, slotname)));
4626 }
4627 }
4628 FreeDir(spill_dir);
4629}
#define INFO
Definition: elog.h:34
int FreeDir(DIR *dir)
Definition: fd.c:2983
struct dirent * ReadDirExtended(DIR *dir, const char *dirname, int elevel)
Definition: fd.c:2946
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2865
#define snprintf
Definition: port.h:238
#define PG_REPLSLOT_DIR
Definition: slot.h:21
Definition: dirent.c:26
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15
#define lstat(path, sb)
Definition: win32_port.h:283
#define S_ISDIR(m)
Definition: win32_port.h:323

References AllocateDir(), dirent::d_name, ereport, errcode_for_file_access(), errmsg(), ERROR, FreeDir(), INFO, lstat, MAXPGPATH, PG_REPLSLOT_DIR, ReadDirExtended(), S_ISDIR, snprintf, sprintf, and stat::st_mode.

Referenced by ReorderBufferAllocate(), ReorderBufferFree(), and StartupReorderBuffer().

◆ ReorderBufferCleanupTXN()

static void ReorderBufferCleanupTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1513 of file reorderbuffer.c.

1514{
1515 bool found;
1516 dlist_mutable_iter iter;
1517 Size mem_freed = 0;
1518
1519 /* cleanup subtransactions & their changes */
1520 dlist_foreach_modify(iter, &txn->subtxns)
1521 {
1522 ReorderBufferTXN *subtxn;
1523
1524 subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1525
1526 /*
1527 * Subtransactions are always associated to the toplevel TXN, even if
1528 * they originally were happening inside another subtxn, so we won't
1529 * ever recurse more than one level deep here.
1530 */
1532 Assert(subtxn->nsubtxns == 0);
1533
1534 ReorderBufferCleanupTXN(rb, subtxn);
1535 }
1536
1537 /* cleanup changes in the txn */
1538 dlist_foreach_modify(iter, &txn->changes)
1539 {
1540 ReorderBufferChange *change;
1541
1542 change = dlist_container(ReorderBufferChange, node, iter.cur);
1543
1544 /* Check we're not mixing changes from different transactions. */
1545 Assert(change->txn == txn);
1546
1547 /*
1548 * Instead of updating the memory counter for individual changes, we
1549 * sum up the size of memory to free so we can update the memory
1550 * counter all together below. This saves costs of maintaining the
1551 * max-heap.
1552 */
1553 mem_freed += ReorderBufferChangeSize(change);
1554
1555 ReorderBufferReturnChange(rb, change, false);
1556 }
1557
1558 /* Update the memory counter */
1559 ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
1560
1561 /*
1562 * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1563 * They are always stored in the toplevel transaction.
1564 */
1565 dlist_foreach_modify(iter, &txn->tuplecids)
1566 {
1567 ReorderBufferChange *change;
1568
1569 change = dlist_container(ReorderBufferChange, node, iter.cur);
1570
1571 /* Check we're not mixing changes from different transactions. */
1572 Assert(change->txn == txn);
1574
1575 ReorderBufferReturnChange(rb, change, true);
1576 }
1577
1578 /*
1579 * Cleanup the base snapshot, if set.
1580 */
1581 if (txn->base_snapshot != NULL)
1582 {
1585 }
1586
1587 /*
1588 * Cleanup the snapshot for the last streamed run.
1589 */
1590 if (txn->snapshot_now != NULL)
1591 {
1594 }
1595
1596 /*
1597 * Remove TXN from its containing lists.
1598 *
1599 * Note: if txn is known as subxact, we are deleting the TXN from its
1600 * parent's list of known subxacts; this leaves the parent's nsubxacts
1601 * count too high, but we don't care. Otherwise, we are deleting the TXN
1602 * from the LSN-ordered list of toplevel TXNs. We remove the TXN from the
1603 * list of catalog modifying transactions as well.
1604 */
1605 dlist_delete(&txn->node);
1608
1609 /* now remove reference from buffer */
1610 hash_search(rb->by_txn, &txn->xid, HASH_REMOVE, &found);
1611 Assert(found);
1612
1613 /* remove entries spilled to disk */
1614 if (rbtxn_is_serialized(txn))
1616
1617 /* deallocate */
1618 ReorderBufferReturnTXN(rb, txn);
1619}
@ HASH_REMOVE
Definition: hsearch.h:115
static void dclist_delete_from(dclist_head *head, dlist_node *node)
Definition: ilist.h:763
void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, bool upd_mem)
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
static Size ReorderBufferChangeSize(ReorderBufferChange *change)
static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, ReorderBufferChange *change, ReorderBufferTXN *txn, bool addition, Size sz)
#define rbtxn_is_serialized(txn)
void SnapBuildSnapDecRefcount(Snapshot snap)
Definition: snapbuild.c:328
Snapshot snapshot_now
dlist_node catchange_node
dlist_node base_snapshot_node

References ReorderBufferChange::action, Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_node, ReorderBuffer::by_txn, ReorderBufferTXN::catchange_node, ReorderBuffer::catchange_txns, ReorderBufferTXN::changes, dlist_mutable_iter::cur, dclist_delete_from(), dlist_container, dlist_delete(), dlist_foreach_modify, HASH_REMOVE, hash_search(), ReorderBufferTXN::node, ReorderBufferTXN::nsubtxns, rbtxn_has_catalog_changes, rbtxn_is_known_subxact, rbtxn_is_serialized, rbtxn_is_streamed, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferCleanupTXN(), ReorderBufferFreeSnap(), ReorderBufferRestoreCleanup(), ReorderBufferReturnChange(), ReorderBufferReturnTXN(), SnapBuildSnapDecRefcount(), ReorderBufferTXN::snapshot_now, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferAbort(), ReorderBufferAbortOld(), ReorderBufferCleanupTXN(), ReorderBufferFinishPrepared(), ReorderBufferForget(), ReorderBufferProcessTXN(), ReorderBufferReplay(), and ReorderBufferStreamCommit().

◆ ReorderBufferCommit()

void ReorderBufferCommit ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2740 of file reorderbuffer.c.

2744{
2745 ReorderBufferTXN *txn;
2746
2747 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2748 false);
2749
2750 /* unknown transaction, nothing to replay */
2751 if (txn == NULL)
2752 return;
2753
2754 ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2755 origin_id, origin_lsn);
2756}
static void ReorderBufferReplay(ReorderBufferTXN *txn, ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn)

References InvalidXLogRecPtr, ReorderBufferReplay(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

◆ ReorderBufferCommitChild()

void ReorderBufferCommitChild ( ReorderBuffer rb,
TransactionId  xid,
TransactionId  subxid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn 
)

Definition at line 1197 of file reorderbuffer.c.

1200{
1201 ReorderBufferTXN *subtxn;
1202
1203 subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1204 InvalidXLogRecPtr, false);
1205
1206 /*
1207 * No need to do anything if that subtxn didn't contain any changes
1208 */
1209 if (!subtxn)
1210 return;
1211
1212 subtxn->final_lsn = commit_lsn;
1213 subtxn->end_lsn = end_lsn;
1214
1215 /*
1216 * Assign this subxact as a child of the toplevel xact (no-op if already
1217 * done.)
1218 */
1220}
void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, TransactionId subxid, XLogRecPtr lsn)

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferAssignChild(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit(), and DecodePrepare().

◆ ReorderBufferCopySnap()

static Snapshot ReorderBufferCopySnap ( ReorderBuffer rb,
Snapshot  orig_snap,
ReorderBufferTXN txn,
CommandId  cid 
)
static

Definition at line 1833 of file reorderbuffer.c.

1835{
1836 Snapshot snap;
1837 dlist_iter iter;
1838 int i = 0;
1839 Size size;
1840
1841 size = sizeof(SnapshotData) +
1842 sizeof(TransactionId) * orig_snap->xcnt +
1843 sizeof(TransactionId) * (txn->nsubtxns + 1);
1844
1845 snap = MemoryContextAllocZero(rb->context, size);
1846 memcpy(snap, orig_snap, sizeof(SnapshotData));
1847
1848 snap->copied = true;
1849 snap->active_count = 1; /* mark as active so nobody frees it */
1850 snap->regd_count = 0;
1851 snap->xip = (TransactionId *) (snap + 1);
1852
1853 memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1854
1855 /*
1856 * snap->subxip contains all txids that belong to our transaction which we
1857 * need to check via cmin/cmax. That's why we store the toplevel
1858 * transaction in there as well.
1859 */
1860 snap->subxip = snap->xip + snap->xcnt;
1861 snap->subxip[i++] = txn->xid;
1862
1863 /*
1864 * txn->nsubtxns isn't decreased when subtransactions abort, so count
1865 * manually. Since it's an upper boundary it is safe to use it for the
1866 * allocation above.
1867 */
1868 snap->subxcnt = 1;
1869
1870 dlist_foreach(iter, &txn->subtxns)
1871 {
1872 ReorderBufferTXN *sub_txn;
1873
1874 sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1875 snap->subxip[i++] = sub_txn->xid;
1876 snap->subxcnt++;
1877 }
1878
1879 /* sort so we can bsearch() later */
1880 qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1881
1882 /* store the specified current CommandId */
1883 snap->curcid = cid;
1884
1885 return snap;
1886}
int i
Definition: isn.c:72
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1215
#define qsort(a, b, c, d)
Definition: port.h:474
static pg_noinline void Size size
Definition: slab.c:607
bool copied
Definition: snapshot.h:181
uint32 regd_count
Definition: snapshot.h:201
uint32 active_count
Definition: snapshot.h:200
CommandId curcid
Definition: snapshot.h:183
TransactionId * subxip
Definition: snapshot.h:176
TransactionId * xip
Definition: snapshot.h:164
int xidComparator(const void *arg1, const void *arg2)
Definition: xid.c:152

References SnapshotData::active_count, ReorderBuffer::context, SnapshotData::copied, dlist_iter::cur, SnapshotData::curcid, dlist_container, dlist_foreach, i, MemoryContextAllocZero(), ReorderBufferTXN::nsubtxns, qsort, SnapshotData::regd_count, size, ReorderBufferTXN::subtxns, SnapshotData::subxcnt, SnapshotData::subxip, SnapshotData::xcnt, ReorderBufferTXN::xid, xidComparator(), and SnapshotData::xip.

Referenced by ReorderBufferProcessTXN(), ReorderBufferSaveTXNSnapshot(), and ReorderBufferStreamTXN().

◆ ReorderBufferExecuteInvalidations()

static void ReorderBufferExecuteInvalidations ( uint32  nmsgs,
SharedInvalidationMessage msgs 
)
static

Definition at line 3388 of file reorderbuffer.c.

3389{
3390 int i;
3391
3392 for (i = 0; i < nmsgs; i++)
3394}
void LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
Definition: inval.c:762

References i, and LocalExecuteInvalidationMessage().

Referenced by ReorderBufferFinishPrepared(), and ReorderBufferProcessTXN().

◆ ReorderBufferFinishPrepared()

void ReorderBufferFinishPrepared ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
XLogRecPtr  two_phase_at,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn,
char *  gid,
bool  is_commit 
)

Definition at line 2846 of file reorderbuffer.c.

2851{
2852 ReorderBufferTXN *txn;
2853 XLogRecPtr prepare_end_lsn;
2854 TimestampTz prepare_time;
2855
2856 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
2857
2858 /* unknown transaction, nothing to do */
2859 if (txn == NULL)
2860 return;
2861
2862 /*
2863 * By this time the txn has the prepare record information, remember it to
2864 * be later used for rollback.
2865 */
2866 prepare_end_lsn = txn->end_lsn;
2867 prepare_time = txn->xact_time.prepare_time;
2868
2869 /* add the gid in the txn */
2870 txn->gid = pstrdup(gid);
2871
2872 /*
2873 * It is possible that this transaction is not decoded at prepare time
2874 * either because by that time we didn't have a consistent snapshot, or
2875 * two_phase was not enabled, or it was decoded earlier but we have
2876 * restarted. We only need to send the prepare if it was not decoded
2877 * earlier. We don't need to decode the xact for aborts if it is not done
2878 * already.
2879 */
2880 if ((txn->final_lsn < two_phase_at) && is_commit)
2881 {
2882 txn->txn_flags |= RBTXN_PREPARE;
2883
2884 /*
2885 * The prepare info must have been updated in txn even if we skip
2886 * prepare.
2887 */
2889
2890 /*
2891 * By this time the txn has the prepare record information and it is
2892 * important to use that so that downstream gets the accurate
2893 * information. If instead, we have passed commit information here
2894 * then downstream can behave as it has already replayed commit
2895 * prepared after the restart.
2896 */
2897 ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2898 txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2899 }
2900
2901 txn->final_lsn = commit_lsn;
2902 txn->end_lsn = end_lsn;
2903 txn->xact_time.commit_time = commit_time;
2904 txn->origin_id = origin_id;
2905 txn->origin_lsn = origin_lsn;
2906
2907 if (is_commit)
2908 rb->commit_prepared(rb, txn, commit_lsn);
2909 else
2910 rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
2911
2912 /* cleanup: make sure there's no cache pollution */
2914 txn->invalidations);
2915 ReorderBufferCleanupTXN(rb, txn);
2916}
int64 TimestampTz
Definition: timestamp.h:39
char * pstrdup(const char *in)
Definition: mcxt.c:1696
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
#define RBTXN_PREPARE
TimestampTz commit_time
RepOriginId origin_id
XLogRecPtr origin_lsn
TimestampTz prepare_time
ReorderBufferCommitPreparedCB commit_prepared
ReorderBufferRollbackPreparedCB rollback_prepared

References Assert, ReorderBuffer::commit_prepared, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, pstrdup(), RBTXN_PREPARE, ReorderBufferCleanupTXN(), ReorderBufferExecuteInvalidations(), ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBuffer::rollback_prepared, ReorderBufferTXN::txn_flags, and ReorderBufferTXN::xact_time.

Referenced by DecodeAbort(), and DecodeCommit().

◆ ReorderBufferForget()

void ReorderBufferForget ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3023 of file reorderbuffer.c.

3024{
3025 ReorderBufferTXN *txn;
3026
3027 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3028 false);
3029
3030 /* unknown, nothing to forget */
3031 if (txn == NULL)
3032 return;
3033
3034 /* this transaction mustn't be streamed */
3036
3037 /* cosmetic... */
3038 txn->final_lsn = lsn;
3039
3040 /*
3041 * Process cache invalidation messages if there are any. Even if we're not
3042 * interested in the transaction's contents, it could have manipulated the
3043 * catalog and we need to update the caches according to that.
3044 */
3045 if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3047 txn->invalidations);
3048 else
3049 Assert(txn->ninvalidations == 0);
3050
3051 /* remove potential on-disk data, and deallocate */
3052 ReorderBufferCleanupTXN(rb, txn);
3053}

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::final_lsn, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, rbtxn_is_streamed, ReorderBufferCleanupTXN(), ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodeCommit().

◆ ReorderBufferFree()

void ReorderBufferFree ( ReorderBuffer rb)

Definition at line 401 of file reorderbuffer.c.

402{
403 MemoryContext context = rb->context;
404
405 /*
406 * We free separately allocated data by entirely scrapping reorderbuffer's
407 * memory context.
408 */
409 MemoryContextDelete(context);
410
411 /* Free disk space used by unconsumed reorder buffers */
413}
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454

References ReorderBuffer::context, ReplicationSlot::data, MemoryContextDelete(), MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, and ReorderBufferCleanupSerializedTXNs().

Referenced by FreeDecodingContext().

◆ ReorderBufferFreeSnap()

static void ReorderBufferFreeSnap ( ReorderBuffer rb,
Snapshot  snap 
)
static

Definition at line 1892 of file reorderbuffer.c.

1893{
1894 if (snap->copied)
1895 pfree(snap);
1896 else
1898}
void pfree(void *pointer)
Definition: mcxt.c:1521

References SnapshotData::copied, pfree(), and SnapBuildSnapDecRefcount().

Referenced by ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), ReorderBufferReturnChange(), and ReorderBufferStreamTXN().

◆ ReorderBufferGetCatalogChangesXacts()

TransactionId * ReorderBufferGetCatalogChangesXacts ( ReorderBuffer rb)

Definition at line 3438 of file reorderbuffer.c.

3439{
3440 dlist_iter iter;
3441 TransactionId *xids = NULL;
3442 size_t xcnt = 0;
3443
3444 /* Quick return if the list is empty */
3445 if (dclist_count(&rb->catchange_txns) == 0)
3446 return NULL;
3447
3448 /* Initialize XID array */
3449 xids = (TransactionId *) palloc(sizeof(TransactionId) *
3451 dclist_foreach(iter, &rb->catchange_txns)
3452 {
3454 catchange_node,
3455 iter.cur);
3456
3458
3459 xids[xcnt++] = txn->xid;
3460 }
3461
3462 qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
3463
3464 Assert(xcnt == dclist_count(&rb->catchange_txns));
3465 return xids;
3466}
#define dclist_container(type, membername, ptr)
Definition: ilist.h:947
static uint32 dclist_count(const dclist_head *head)
Definition: ilist.h:932
#define dclist_foreach(iter, lhead)
Definition: ilist.h:970

References Assert, ReorderBuffer::catchange_txns, dlist_iter::cur, dclist_container, dclist_count(), dclist_foreach, palloc(), qsort, rbtxn_has_catalog_changes, ReorderBufferTXN::xid, and xidComparator().

Referenced by SnapBuildSerialize().

◆ ReorderBufferGetChange()

◆ ReorderBufferGetOldestTXN()

ReorderBufferTXN * ReorderBufferGetOldestTXN ( ReorderBuffer rb)

Definition at line 1022 of file reorderbuffer.c.

1023{
1024 ReorderBufferTXN *txn;
1025
1027
1029 return NULL;
1030
1032
1035 return txn;
1036}
#define dlist_head_element(type, membername, lhead)
Definition: ilist.h:603

References Assert, AssertTXNLsnOrder(), dlist_head_element, dlist_is_empty(), ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, rbtxn_is_known_subxact, and ReorderBuffer::toplevel_by_lsn.

Referenced by SnapBuildProcessRunningXacts().

◆ ReorderBufferGetOldestXmin()

TransactionId ReorderBufferGetOldestXmin ( ReorderBuffer rb)

◆ ReorderBufferGetRelids()

Oid * ReorderBufferGetRelids ( ReorderBuffer rb,
int  nrelids 
)

Definition at line 603 of file reorderbuffer.c.

604{
605 Oid *relids;
606 Size alloc_len;
607
608 alloc_len = sizeof(Oid) * nrelids;
609
610 relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
611
612 return relids;
613}

References ReorderBuffer::context, and MemoryContextAlloc().

Referenced by DecodeTruncate(), and ReorderBufferRestoreChange().

◆ ReorderBufferGetTupleBuf()

HeapTuple ReorderBufferGetTupleBuf ( ReorderBuffer rb,
Size  tuple_len 
)

Definition at line 570 of file reorderbuffer.c.

571{
572 HeapTuple tuple;
573 Size alloc_len;
574
575 alloc_len = tuple_len + SizeofHeapTupleHeader;
576
578 HEAPTUPLESIZE + alloc_len);
579 tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
580
581 return tuple;
582}
#define HEAPTUPLESIZE
Definition: htup.h:73
HeapTupleData * HeapTuple
Definition: htup.h:71
HeapTupleHeaderData * HeapTupleHeader
Definition: htup.h:23
#define SizeofHeapTupleHeader
Definition: htup_details.h:185
HeapTupleHeader t_data
Definition: htup.h:68

References HEAPTUPLESIZE, MemoryContextAlloc(), SizeofHeapTupleHeader, HeapTupleData::t_data, and ReorderBuffer::tup_context.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeUpdate(), and ReorderBufferRestoreChange().

◆ ReorderBufferGetTXN()

static ReorderBufferTXN * ReorderBufferGetTXN ( ReorderBuffer rb)
static

Definition at line 419 of file reorderbuffer.c.

420{
421 ReorderBufferTXN *txn;
422
423 txn = (ReorderBufferTXN *)
425
426 memset(txn, 0, sizeof(ReorderBufferTXN));
427
428 dlist_init(&txn->changes);
429 dlist_init(&txn->tuplecids);
430 dlist_init(&txn->subtxns);
431
432 /* InvalidCommandId is not zero, so set it explicitly */
434 txn->output_plugin_private = NULL;
435
436 return txn;
437}
CommandId command_id
void * output_plugin_private

References ReorderBufferTXN::changes, ReorderBufferTXN::command_id, dlist_init(), InvalidCommandId, MemoryContextAlloc(), ReorderBufferTXN::output_plugin_private, ReorderBufferTXN::subtxns, ReorderBufferTXN::tuplecids, and ReorderBuffer::txn_context.

Referenced by ReorderBufferTXNByXid().

◆ ReorderBufferImmediateInvalidation()

void ReorderBufferImmediateInvalidation ( ReorderBuffer rb,
uint32  ninvalidations,
SharedInvalidationMessage invalidations 
)

Definition at line 3095 of file reorderbuffer.c.

3097{
3098 bool use_subtxn = IsTransactionOrTransactionBlock();
3099 int i;
3100
3101 if (use_subtxn)
3103
3104 /*
3105 * Force invalidations to happen outside of a valid transaction - that way
3106 * entries will just be marked as invalid without accessing the catalog.
3107 * That's advantageous because we don't need to setup the full state
3108 * necessary for catalog access.
3109 */
3110 if (use_subtxn)
3112
3113 for (i = 0; i < ninvalidations; i++)
3114 LocalExecuteInvalidationMessage(&invalidations[i]);
3115
3116 if (use_subtxn)
3118}
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4981
void BeginInternalSubTransaction(const char *name)
Definition: xact.c:4686
void RollbackAndReleaseCurrentSubTransaction(void)
Definition: xact.c:4788
void AbortCurrentTransaction(void)
Definition: xact.c:3443

References AbortCurrentTransaction(), BeginInternalSubTransaction(), i, IsTransactionOrTransactionBlock(), LocalExecuteInvalidationMessage(), and RollbackAndReleaseCurrentSubTransaction().

Referenced by ReorderBufferAbort(), ReorderBufferForget(), ReorderBufferInvalidate(), and xact_decode().

◆ ReorderBufferInvalidate()

void ReorderBufferInvalidate ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3064 of file reorderbuffer.c.

3065{
3066 ReorderBufferTXN *txn;
3067
3068 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3069 false);
3070
3071 /* unknown, nothing to do */
3072 if (txn == NULL)
3073 return;
3074
3075 /*
3076 * Process cache invalidation messages if there are any. Even if we're not
3077 * interested in the transaction's contents, it could have manipulated the
3078 * catalog and we need to update the caches according to that.
3079 */
3080 if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3082 txn->invalidations);
3083 else
3084 Assert(txn->ninvalidations == 0);
3085}

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::invalidations, InvalidXLogRecPtr, ReorderBufferTXN::ninvalidations, ReorderBufferImmediateInvalidation(), and ReorderBufferTXNByXid().

Referenced by DecodePrepare().

◆ ReorderBufferIterCompare()

static int ReorderBufferIterCompare ( Datum  a,
Datum  b,
void *  arg 
)
static

Definition at line 1239 of file reorderbuffer.c.

1240{
1242 XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn;
1243 XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn;
1244
1245 if (pos_a < pos_b)
1246 return 1;
1247 else if (pos_a == pos_b)
1248 return 0;
1249 return -1;
1250}
void * arg
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:202
Definition: regguts.h:323

References a, arg, b, and DatumGetInt32().

Referenced by ReorderBufferIterTXNInit().

◆ ReorderBufferIterTXNFinish()

static void ReorderBufferIterTXNFinish ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1482 of file reorderbuffer.c.

1484{
1485 int32 off;
1486
1487 for (off = 0; off < state->nr_txns; off++)
1488 {
1489 if (state->entries[off].file.vfd != -1)
1490 FileClose(state->entries[off].file.vfd);
1491 }
1492
1493 /* free memory we might have "leaked" in the last *Next call */
1494 if (!dlist_is_empty(&state->old_change))
1495 {
1496 ReorderBufferChange *change;
1497
1498 change = dlist_container(ReorderBufferChange, node,
1499 dlist_pop_head_node(&state->old_change));
1500 ReorderBufferReturnChange(rb, change, true);
1501 Assert(dlist_is_empty(&state->old_change));
1502 }
1503
1504 binaryheap_free(state->heap);
1505 pfree(state);
1506}
void binaryheap_free(binaryheap *heap)
Definition: binaryheap.c:75
void FileClose(File file)
Definition: fd.c:1977
static dlist_node * dlist_pop_head_node(dlist_head *head)
Definition: ilist.h:450

References Assert, binaryheap_free(), dlist_container, dlist_is_empty(), dlist_pop_head_node(), FileClose(), pfree(), and ReorderBufferReturnChange().

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferIterTXNInit()

static void ReorderBufferIterTXNInit ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferIterTXNState *volatile *  iter_state 
)
static

Definition at line 1262 of file reorderbuffer.c.

1264{
1265 Size nr_txns = 0;
1267 dlist_iter cur_txn_i;
1268 int32 off;
1269
1270 *iter_state = NULL;
1271
1272 /* Check ordering of changes in the toplevel transaction. */
1274
1275 /*
1276 * Calculate the size of our heap: one element for every transaction that
1277 * contains changes. (Besides the transactions already in the reorder
1278 * buffer, we count the one we were directly passed.)
1279 */
1280 if (txn->nentries > 0)
1281 nr_txns++;
1282
1283 dlist_foreach(cur_txn_i, &txn->subtxns)
1284 {
1285 ReorderBufferTXN *cur_txn;
1286
1287 cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1288
1289 /* Check ordering of changes in this subtransaction. */
1290 AssertChangeLsnOrder(cur_txn);
1291
1292 if (cur_txn->nentries > 0)
1293 nr_txns++;
1294 }
1295
1296 /* allocate iteration state */
1300 sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1301
1302 state->nr_txns = nr_txns;
1303 dlist_init(&state->old_change);
1304
1305 for (off = 0; off < state->nr_txns; off++)
1306 {
1307 state->entries[off].file.vfd = -1;
1308 state->entries[off].segno = 0;
1309 }
1310
1311 /* allocate heap */
1312 state->heap = binaryheap_allocate(state->nr_txns,
1314 state);
1315
1316 /* Now that the state fields are initialized, it is safe to return it. */
1317 *iter_state = state;
1318
1319 /*
1320 * Now insert items into the binary heap, in an unordered fashion. (We
1321 * will run a heap assembly step at the end; this is more efficient.)
1322 */
1323
1324 off = 0;
1325
1326 /* add toplevel transaction if it contains changes */
1327 if (txn->nentries > 0)
1328 {
1329 ReorderBufferChange *cur_change;
1330
1331 if (rbtxn_is_serialized(txn))
1332 {
1333 /* serialize remaining changes */
1335 ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1336 &state->entries[off].segno);
1337 }
1338
1339 cur_change = dlist_head_element(ReorderBufferChange, node,
1340 &txn->changes);
1341
1342 state->entries[off].lsn = cur_change->lsn;
1343 state->entries[off].change = cur_change;
1344 state->entries[off].txn = txn;
1345
1347 }
1348
1349 /* add subtransactions if they contain changes */
1350 dlist_foreach(cur_txn_i, &txn->subtxns)
1351 {
1352 ReorderBufferTXN *cur_txn;
1353
1354 cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1355
1356 if (cur_txn->nentries > 0)
1357 {
1358 ReorderBufferChange *cur_change;
1359
1360 if (rbtxn_is_serialized(cur_txn))
1361 {
1362 /* serialize remaining changes */
1363 ReorderBufferSerializeTXN(rb, cur_txn);
1364 ReorderBufferRestoreChanges(rb, cur_txn,
1365 &state->entries[off].file,
1366 &state->entries[off].segno);
1367 }
1368 cur_change = dlist_head_element(ReorderBufferChange, node,
1369 &cur_txn->changes);
1370
1371 state->entries[off].lsn = cur_change->lsn;
1372 state->entries[off].change = cur_change;
1373 state->entries[off].txn = cur_txn;
1374
1376 }
1377 }
1378
1379 /* assemble a valid binary heap */
1380 binaryheap_build(state->heap);
1381}
void binaryheap_build(binaryheap *heap)
Definition: binaryheap.c:138
void binaryheap_add_unordered(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:116
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
Definition: binaryheap.c:39
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
static int ReorderBufferIterCompare(Datum a, Datum b, void *arg)
static void AssertChangeLsnOrder(ReorderBufferTXN *txn)
struct ReorderBufferIterTXNEntry ReorderBufferIterTXNEntry
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, TXNEntryFile *file, XLogSegNo *segno)

References AssertChangeLsnOrder(), binaryheap_add_unordered(), binaryheap_allocate(), binaryheap_build(), ReorderBufferTXN::changes, ReorderBuffer::context, dlist_iter::cur, dlist_container, dlist_foreach, dlist_head_element, dlist_init(), Int32GetDatum(), ReorderBufferChange::lsn, MemoryContextAllocZero(), ReorderBufferTXN::nentries, rbtxn_is_serialized, ReorderBufferIterCompare(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), and ReorderBufferTXN::subtxns.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferIterTXNNext()

static ReorderBufferChange * ReorderBufferIterTXNNext ( ReorderBuffer rb,
ReorderBufferIterTXNState state 
)
static

Definition at line 1390 of file reorderbuffer.c.

1391{
1392 ReorderBufferChange *change;
1394 int32 off;
1395
1396 /* nothing there anymore */
1397 if (state->heap->bh_size == 0)
1398 return NULL;
1399
1400 off = DatumGetInt32(binaryheap_first(state->heap));
1401 entry = &state->entries[off];
1402
1403 /* free memory we might have "leaked" in the previous *Next call */
1404 if (!dlist_is_empty(&state->old_change))
1405 {
1406 change = dlist_container(ReorderBufferChange, node,
1407 dlist_pop_head_node(&state->old_change));
1408 ReorderBufferReturnChange(rb, change, true);
1409 Assert(dlist_is_empty(&state->old_change));
1410 }
1411
1412 change = entry->change;
1413
1414 /*
1415 * update heap with information about which transaction has the next
1416 * relevant change in LSN order
1417 */
1418
1419 /* there are in-memory changes */
1420 if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1421 {
1422 dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1423 ReorderBufferChange *next_change =
1425
1426 /* txn stays the same */
1427 state->entries[off].lsn = next_change->lsn;
1428 state->entries[off].change = next_change;
1429
1431 return change;
1432 }
1433
1434 /* try to load changes from disk */
1435 if (entry->txn->nentries != entry->txn->nentries_mem)
1436 {
1437 /*
1438 * Ugly: restoring changes will reuse *Change records, thus delete the
1439 * current one from the per-tx list and only free in the next call.
1440 */
1441 dlist_delete(&change->node);
1442 dlist_push_tail(&state->old_change, &change->node);
1443
1444 /*
1445 * Update the total bytes processed by the txn for which we are
1446 * releasing the current set of changes and restoring the new set of
1447 * changes.
1448 */
1449 rb->totalBytes += entry->txn->size;
1450 if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1451 &state->entries[off].segno))
1452 {
1453 /* successfully restored changes from disk */
1454 ReorderBufferChange *next_change =
1456 &entry->txn->changes);
1457
1458 elog(DEBUG2, "restored %u/%u changes from disk",
1459 (uint32) entry->txn->nentries_mem,
1460 (uint32) entry->txn->nentries);
1461
1462 Assert(entry->txn->nentries_mem);
1463 /* txn stays the same */
1464 state->entries[off].lsn = next_change->lsn;
1465 state->entries[off].change = next_change;
1467
1468 return change;
1469 }
1470 }
1471
1472 /* ok, no changes there anymore, remove */
1474
1475 return change;
1476}
void binaryheap_replace_first(binaryheap *heap, bh_node_type d)
Definition: binaryheap.c:255
bh_node_type binaryheap_first(binaryheap *heap)
Definition: binaryheap.c:177
bh_node_type binaryheap_remove_first(binaryheap *heap)
Definition: binaryheap.c:192
static int32 next
Definition: blutils.c:219
uint32_t uint32
Definition: c.h:485
static bool dlist_has_next(const dlist_head *head, const dlist_node *node)
Definition: ilist.h:503
static dlist_node * dlist_next_node(dlist_head *head, dlist_node *node)
Definition: ilist.h:537
ReorderBufferChange * change
ReorderBufferTXN * txn

References Assert, binaryheap_first(), binaryheap_remove_first(), binaryheap_replace_first(), ReorderBufferIterTXNEntry::change, ReorderBufferTXN::changes, DatumGetInt32(), DEBUG2, dlist_container, dlist_delete(), dlist_has_next(), dlist_head_element, dlist_is_empty(), dlist_next_node(), dlist_pop_head_node(), dlist_push_tail(), elog, ReorderBufferIterTXNEntry::file, Int32GetDatum(), ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, next, ReorderBufferChange::node, ReorderBufferRestoreChanges(), ReorderBufferReturnChange(), ReorderBufferTXN::size, ReorderBuffer::totalBytes, and ReorderBufferIterTXNEntry::txn.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferLargestStreamableTopTXN()

static ReorderBufferTXN * ReorderBufferLargestStreamableTopTXN ( ReorderBuffer rb)
static

Definition at line 3593 of file reorderbuffer.c.

3594{
3595 dlist_iter iter;
3596 Size largest_size = 0;
3597 ReorderBufferTXN *largest = NULL;
3598
3599 /* Find the largest top-level transaction having a base snapshot. */
3601 {
3602 ReorderBufferTXN *txn;
3603
3604 txn = dlist_container(ReorderBufferTXN, base_snapshot_node, iter.cur);
3605
3606 /* must not be a subtxn */
3608 /* base_snapshot must be set */
3609 Assert(txn->base_snapshot != NULL);
3610
3611 if ((largest == NULL || txn->total_size > largest_size) &&
3612 (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) &&
3614 {
3615 largest = txn;
3616 largest_size = txn->total_size;
3617 }
3618 }
3619
3620 return largest;
3621}
#define rbtxn_has_streamable_change(txn)
#define rbtxn_has_partial_change(txn)

References Assert, ReorderBufferTXN::base_snapshot, dlist_iter::cur, dlist_container, dlist_foreach, rbtxn_has_partial_change, rbtxn_has_streamable_change, rbtxn_is_known_subxact, ReorderBufferTXN::total_size, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by ReorderBufferCheckMemoryLimit().

◆ ReorderBufferLargestTXN()

static ReorderBufferTXN * ReorderBufferLargestTXN ( ReorderBuffer rb)
static

Definition at line 3553 of file reorderbuffer.c.

3554{
3555 ReorderBufferTXN *largest;
3556
3557 /* Get the largest transaction from the max-heap */
3558 largest = pairingheap_container(ReorderBufferTXN, txn_node,
3560
3561 Assert(largest);
3562 Assert(largest->size > 0);
3563 Assert(largest->size <= rb->size);
3564
3565 return largest;
3566}
pairingheap_node * pairingheap_first(pairingheap *heap)
Definition: pairingheap.c:130
#define pairingheap_container(type, membername, ptr)
Definition: pairingheap.h:43

References Assert, pairingheap_container, pairingheap_first(), ReorderBufferTXN::size, ReorderBuffer::size, and ReorderBuffer::txn_heap.

Referenced by ReorderBufferCheckMemoryLimit().

◆ ReorderBufferPrepare()

void ReorderBufferPrepare ( ReorderBuffer rb,
TransactionId  xid,
char *  gid 
)

Definition at line 2809 of file reorderbuffer.c.

2811{
2812 ReorderBufferTXN *txn;
2813
2814 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2815 false);
2816
2817 /* unknown transaction, nothing to replay */
2818 if (txn == NULL)
2819 return;
2820
2821 txn->txn_flags |= RBTXN_PREPARE;
2822 txn->gid = pstrdup(gid);
2823
2824 /* The prepare info must have been updated in txn by now. */
2826
2827 ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2828 txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2829
2830 /*
2831 * We send the prepare for the concurrently aborted xacts so that later
2832 * when rollback prepared is decoded and sent, the downstream should be
2833 * able to rollback such a xact. See comments atop DecodePrepare.
2834 *
2835 * Note, for the concurrent_abort + streaming case a stream_prepare was
2836 * already sent within the ReorderBufferReplay call above.
2837 */
2838 if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
2839 rb->prepare(rb, txn, txn->final_lsn);
2840}
ReorderBufferPrepareCB prepare

References Assert, ReorderBufferTXN::concurrent_abort, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, ReorderBufferTXN::gid, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBuffer::prepare, ReorderBufferTXN::prepare_time, pstrdup(), rbtxn_is_streamed, RBTXN_PREPARE, ReorderBufferReplay(), ReorderBufferTXNByXid(), ReorderBufferTXN::txn_flags, and ReorderBufferTXN::xact_time.

Referenced by DecodePrepare().

◆ ReorderBufferProcessPartialChange()

static void ReorderBufferProcessPartialChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
ReorderBufferChange change,
bool  toast_insert 
)
static

Definition at line 719 of file reorderbuffer.c.

722{
723 ReorderBufferTXN *toptxn;
724
725 /*
726 * The partial changes need to be processed only while streaming
727 * in-progress transactions.
728 */
729 if (!ReorderBufferCanStream(rb))
730 return;
731
732 /* Get the top transaction. */
733 toptxn = rbtxn_get_toptxn(txn);
734
735 /*
736 * Indicate a partial change for toast inserts. The change will be
737 * considered as complete once we get the insert or update on the main
738 * table and we are sure that the pending toast chunks are not required
739 * anymore.
740 *
741 * If we allow streaming when there are pending toast chunks then such
742 * chunks won't be released till the insert (multi_insert) is complete and
743 * we expect the txn to have streamed all changes after streaming. This
744 * restriction is mainly to ensure the correctness of streamed
745 * transactions and it doesn't seem worth uplifting such a restriction
746 * just to allow this case because anyway we will stream the transaction
747 * once such an insert is complete.
748 */
749 if (toast_insert)
751 else if (rbtxn_has_partial_change(toptxn) &&
752 IsInsertOrUpdate(change->action) &&
754 toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
755
756 /*
757 * Indicate a partial change for speculative inserts. The change will be
758 * considered as complete once we get the speculative confirm or abort
759 * token.
760 */
761 if (IsSpecInsert(change->action))
763 else if (rbtxn_has_partial_change(toptxn) &&
765 toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
766
767 /*
768 * Stream the transaction if it is serialized before and the changes are
769 * now complete in the top-level transaction.
770 *
771 * The reason for doing the streaming of such a transaction as soon as we
772 * get the complete change for it is that previously it would have reached
773 * the memory threshold and wouldn't get streamed because of incomplete
774 * changes. Delaying such transactions would increase apply lag for them.
775 */
777 !(rbtxn_has_partial_change(toptxn)) &&
778 rbtxn_is_serialized(txn) &&
780 ReorderBufferStreamTXN(rb, toptxn);
781}
#define IsSpecInsert(action)
#define IsInsertOrUpdate(action)
#define IsSpecConfirmOrAbort(action)
#define RBTXN_HAS_PARTIAL_CHANGE

References ReorderBufferChange::action, ReorderBufferChange::clear_toast_afterwards, ReorderBufferChange::data, IsInsertOrUpdate, IsSpecConfirmOrAbort, IsSpecInsert, rbtxn_get_toptxn, RBTXN_HAS_PARTIAL_CHANGE, rbtxn_has_partial_change, rbtxn_has_streamable_change, rbtxn_is_serialized, ReorderBufferCanStartStreaming(), ReorderBufferCanStream(), ReorderBufferStreamTXN(), ReorderBufferChange::tp, and ReorderBufferTXN::txn_flags.

Referenced by ReorderBufferQueueChange().

◆ ReorderBufferProcessTXN()

static void ReorderBufferProcessTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn,
volatile Snapshot  snapshot_now,
volatile CommandId  command_id,
bool  streaming 
)
static

Definition at line 2109 of file reorderbuffer.c.

2114{
2115 bool using_subtxn;
2117 ReorderBufferIterTXNState *volatile iterstate = NULL;
2118 volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
2119 ReorderBufferChange *volatile specinsert = NULL;
2120 volatile bool stream_started = false;
2121 ReorderBufferTXN *volatile curtxn = NULL;
2122
2123 /* build data to be able to lookup the CommandIds of catalog tuples */
2125
2126 /* setup the initial snapshot */
2127 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2128
2129 /*
2130 * Decoding needs access to syscaches et al., which in turn use
2131 * heavyweight locks and such. Thus we need to have enough state around to
2132 * keep track of those. The easiest way is to simply use a transaction
2133 * internally. That also allows us to easily enforce that nothing writes
2134 * to the database by checking for xid assignments.
2135 *
2136 * When we're called via the SQL SRF there's already a transaction
2137 * started, so start an explicit subtransaction there.
2138 */
2139 using_subtxn = IsTransactionOrTransactionBlock();
2140
2141 PG_TRY();
2142 {
2143 ReorderBufferChange *change;
2144 int changes_count = 0; /* used to accumulate the number of
2145 * changes */
2146
2147 if (using_subtxn)
2148 BeginInternalSubTransaction(streaming ? "stream" : "replay");
2149 else
2151
2152 /*
2153 * We only need to send begin/begin-prepare for non-streamed
2154 * transactions.
2155 */
2156 if (!streaming)
2157 {
2158 if (rbtxn_prepared(txn))
2159 rb->begin_prepare(rb, txn);
2160 else
2161 rb->begin(rb, txn);
2162 }
2163
2164 ReorderBufferIterTXNInit(rb, txn, &iterstate);
2165 while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
2166 {
2167 Relation relation = NULL;
2168 Oid reloid;
2169
2171
2172 /*
2173 * We can't call start stream callback before processing first
2174 * change.
2175 */
2176 if (prev_lsn == InvalidXLogRecPtr)
2177 {
2178 if (streaming)
2179 {
2180 txn->origin_id = change->origin_id;
2181 rb->stream_start(rb, txn, change->lsn);
2182 stream_started = true;
2183 }
2184 }
2185
2186 /*
2187 * Enforce correct ordering of changes, merged from multiple
2188 * subtransactions. The changes may have the same LSN due to
2189 * MULTI_INSERT xlog records.
2190 */
2191 Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
2192
2193 prev_lsn = change->lsn;
2194
2195 /*
2196 * Set the current xid to detect concurrent aborts. This is
2197 * required for the cases when we decode the changes before the
2198 * COMMIT record is processed.
2199 */
2200 if (streaming || rbtxn_prepared(change->txn))
2201 {
2202 curtxn = change->txn;
2203 SetupCheckXidLive(curtxn->xid);
2204 }
2205
2206 switch (change->action)
2207 {
2209
2210 /*
2211 * Confirmation for speculative insertion arrived. Simply
2212 * use as a normal record. It'll be cleaned up at the end
2213 * of INSERT processing.
2214 */
2215 if (specinsert == NULL)
2216 elog(ERROR, "invalid ordering of speculative insertion changes");
2217 Assert(specinsert->data.tp.oldtuple == NULL);
2218 change = specinsert;
2220
2221 /* intentionally fall through */
2225 Assert(snapshot_now);
2226
2227 reloid = RelidByRelfilenumber(change->data.tp.rlocator.spcOid,
2228 change->data.tp.rlocator.relNumber);
2229
2230 /*
2231 * Mapped catalog tuple without data, emitted while
2232 * catalog table was in the process of being rewritten. We
2233 * can fail to look up the relfilenumber, because the
2234 * relmapper has no "historic" view, in contrast to the
2235 * normal catalog during decoding. Thus repeated rewrites
2236 * can cause a lookup failure. That's OK because we do not
2237 * decode catalog changes anyway. Normally such tuples
2238 * would be skipped over below, but we can't identify
2239 * whether the table should be logically logged without
2240 * mapping the relfilenumber to the oid.
2241 */
2242 if (reloid == InvalidOid &&
2243 change->data.tp.newtuple == NULL &&
2244 change->data.tp.oldtuple == NULL)
2245 goto change_done;
2246 else if (reloid == InvalidOid)
2247 elog(ERROR, "could not map filenumber \"%s\" to relation OID",
2248 relpathperm(change->data.tp.rlocator,
2249 MAIN_FORKNUM));
2250
2251 relation = RelationIdGetRelation(reloid);
2252
2253 if (!RelationIsValid(relation))
2254 elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",
2255 reloid,
2256 relpathperm(change->data.tp.rlocator,
2257 MAIN_FORKNUM));
2258
2259 if (!RelationIsLogicallyLogged(relation))
2260 goto change_done;
2261
2262 /*
2263 * Ignore temporary heaps created during DDL unless the
2264 * plugin has asked for them.
2265 */
2266 if (relation->rd_rel->relrewrite && !rb->output_rewrites)
2267 goto change_done;
2268
2269 /*
2270 * For now ignore sequence changes entirely. Most of the
2271 * time they don't log changes using records we
2272 * understand, so it doesn't make sense to handle the few
2273 * cases we do.
2274 */
2275 if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
2276 goto change_done;
2277
2278 /* user-triggered change */
2279 if (!IsToastRelation(relation))
2280 {
2281 ReorderBufferToastReplace(rb, txn, relation, change);
2282 ReorderBufferApplyChange(rb, txn, relation, change,
2283 streaming);
2284
2285 /*
2286 * Only clear reassembled toast chunks if we're sure
2287 * they're not required anymore. The creator of the
2288 * tuple tells us.
2289 */
2290 if (change->data.tp.clear_toast_afterwards)
2291 ReorderBufferToastReset(rb, txn);
2292 }
2293 /* we're not interested in toast deletions */
2294 else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
2295 {
2296 /*
2297 * Need to reassemble the full toasted Datum in
2298 * memory, to ensure the chunks don't get reused till
2299 * we're done remove it from the list of this
2300 * transaction's changes. Otherwise it will get
2301 * freed/reused while restoring spooled data from
2302 * disk.
2303 */
2304 Assert(change->data.tp.newtuple != NULL);
2305
2306 dlist_delete(&change->node);
2307 ReorderBufferToastAppendChunk(rb, txn, relation,
2308 change);
2309 }
2310
2311 change_done:
2312
2313 /*
2314 * If speculative insertion was confirmed, the record
2315 * isn't needed anymore.
2316 */
2317 if (specinsert != NULL)
2318 {
2319 ReorderBufferReturnChange(rb, specinsert, true);
2320 specinsert = NULL;
2321 }
2322
2323 if (RelationIsValid(relation))
2324 {
2325 RelationClose(relation);
2326 relation = NULL;
2327 }
2328 break;
2329
2331
2332 /*
2333 * Speculative insertions are dealt with by delaying the
2334 * processing of the insert until the confirmation record
2335 * arrives. For that we simply unlink the record from the
2336 * chain, so it does not get freed/reused while restoring
2337 * spooled data from disk.
2338 *
2339 * This is safe in the face of concurrent catalog changes
2340 * because the relevant relation can't be changed between
2341 * speculative insertion and confirmation due to
2342 * CheckTableNotInUse() and locking.
2343 */
2344
2345 /* clear out a pending (and thus failed) speculation */
2346 if (specinsert != NULL)
2347 {
2348 ReorderBufferReturnChange(rb, specinsert, true);
2349 specinsert = NULL;
2350 }
2351
2352 /* and memorize the pending insertion */
2353 dlist_delete(&change->node);
2354 specinsert = change;
2355 break;
2356
2358
2359 /*
2360 * Abort for speculative insertion arrived. So cleanup the
2361 * specinsert tuple and toast hash.
2362 *
2363 * Note that we get the spec abort change for each toast
2364 * entry but we need to perform the cleanup only the first
2365 * time we get it for the main table.
2366 */
2367 if (specinsert != NULL)
2368 {
2369 /*
2370 * We must clean the toast hash before processing a
2371 * completely new tuple to avoid confusion about the
2372 * previous tuple's toast chunks.
2373 */
2375 ReorderBufferToastReset(rb, txn);
2376
2377 /* We don't need this record anymore. */
2378 ReorderBufferReturnChange(rb, specinsert, true);
2379 specinsert = NULL;
2380 }
2381 break;
2382
2384 {
2385 int i;
2386 int nrelids = change->data.truncate.nrelids;
2387 int nrelations = 0;
2388 Relation *relations;
2389
2390 relations = palloc0(nrelids * sizeof(Relation));
2391 for (i = 0; i < nrelids; i++)
2392 {
2393 Oid relid = change->data.truncate.relids[i];
2394 Relation rel;
2395
2396 rel = RelationIdGetRelation(relid);
2397
2398 if (!RelationIsValid(rel))
2399 elog(ERROR, "could not open relation with OID %u", relid);
2400
2401 if (!RelationIsLogicallyLogged(rel))
2402 continue;
2403
2404 relations[nrelations++] = rel;
2405 }
2406
2407 /* Apply the truncate. */
2408 ReorderBufferApplyTruncate(rb, txn, nrelations,
2409 relations, change,
2410 streaming);
2411
2412 for (i = 0; i < nrelations; i++)
2413 RelationClose(relations[i]);
2414
2415 break;
2416 }
2417
2419 ReorderBufferApplyMessage(rb, txn, change, streaming);
2420 break;
2421
2423 /* Execute the invalidation messages locally */
2425 change->data.inval.invalidations);
2426 break;
2427
2429 /* get rid of the old */
2431
2432 if (snapshot_now->copied)
2433 {
2434 ReorderBufferFreeSnap(rb, snapshot_now);
2435 snapshot_now =
2437 txn, command_id);
2438 }
2439
2440 /*
2441 * Restored from disk, need to be careful not to double
2442 * free. We could introduce refcounting for that, but for
2443 * now this seems infrequent enough not to care.
2444 */
2445 else if (change->data.snapshot->copied)
2446 {
2447 snapshot_now =
2449 txn, command_id);
2450 }
2451 else
2452 {
2453 snapshot_now = change->data.snapshot;
2454 }
2455
2456 /* and continue with the new one */
2457 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2458 break;
2459
2462
2463 if (command_id < change->data.command_id)
2464 {
2465 command_id = change->data.command_id;
2466
2467 if (!snapshot_now->copied)
2468 {
2469 /* we don't use the global one anymore */
2470 snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2471 txn, command_id);
2472 }
2473
2474 snapshot_now->curcid = command_id;
2475
2477 SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2478 }
2479
2480 break;
2481
2483 elog(ERROR, "tuplecid value in changequeue");
2484 break;
2485 }
2486
2487 /*
2488 * It is possible that the data is not sent to downstream for a
2489 * long time either because the output plugin filtered it or there
2490 * is a DDL that generates a lot of data that is not processed by
2491 * the plugin. So, in such cases, the downstream can timeout. To
2492 * avoid that we try to send a keepalive message if required.
2493 * Trying to send a keepalive message after every change has some
2494 * overhead, but testing showed there is no noticeable overhead if
2495 * we do it after every ~100 changes.
2496 */
2497#define CHANGES_THRESHOLD 100
2498
2499 if (++changes_count >= CHANGES_THRESHOLD)
2500 {
2501 rb->update_progress_txn(rb, txn, change->lsn);
2502 changes_count = 0;
2503 }
2504 }
2505
2506 /* speculative insertion record must be freed by now */
2507 Assert(!specinsert);
2508
2509 /* clean up the iterator */
2510 ReorderBufferIterTXNFinish(rb, iterstate);
2511 iterstate = NULL;
2512
2513 /*
2514 * Update total transaction count and total bytes processed by the
2515 * transaction and its subtransactions. Ensure to not count the
2516 * streamed transaction multiple times.
2517 *
2518 * Note that the statistics computation has to be done after
2519 * ReorderBufferIterTXNFinish as it releases the serialized change
2520 * which we have already accounted in ReorderBufferIterTXNNext.
2521 */
2522 if (!rbtxn_is_streamed(txn))
2523 rb->totalTxns++;
2524
2525 rb->totalBytes += txn->total_size;
2526
2527 /*
2528 * Done with current changes, send the last message for this set of
2529 * changes depending upon streaming mode.
2530 */
2531 if (streaming)
2532 {
2533 if (stream_started)
2534 {
2535 rb->stream_stop(rb, txn, prev_lsn);
2536 stream_started = false;
2537 }
2538 }
2539 else
2540 {
2541 /*
2542 * Call either PREPARE (for two-phase transactions) or COMMIT (for
2543 * regular ones).
2544 */
2545 if (rbtxn_prepared(txn))
2546 rb->prepare(rb, txn, commit_lsn);
2547 else
2548 rb->commit(rb, txn, commit_lsn);
2549 }
2550
2551 /* this is just a sanity check against bad output plugin behaviour */
2553 elog(ERROR, "output plugin used XID %u",
2555
2556 /*
2557 * Remember the command ID and snapshot for the next set of changes in
2558 * streaming mode.
2559 */
2560 if (streaming)
2561 ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2562 else if (snapshot_now->copied)
2563 ReorderBufferFreeSnap(rb, snapshot_now);
2564
2565 /* cleanup */
2567
2568 /*
2569 * Aborting the current (sub-)transaction as a whole has the right
2570 * semantics. We want all locks acquired in here to be released, not
2571 * reassigned to the parent and we do not want any database access
2572 * have persistent effects.
2573 */
2575
2576 /* make sure there's no cache pollution */
2578
2579 if (using_subtxn)
2581
2582 /*
2583 * We are here due to one of the four reasons: 1. Decoding an
2584 * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
2585 * prepared txn that was (partially) streamed. 4. Decoding a committed
2586 * txn.
2587 *
2588 * For 1, we allow truncation of txn data by removing the changes
2589 * already streamed but still keeping other things like invalidations,
2590 * snapshot, and tuplecids. For 2 and 3, we indicate
2591 * ReorderBufferTruncateTXN to do more elaborate truncation of txn
2592 * data as the entire transaction has been decoded except for commit.
2593 * For 4, as the entire txn has been decoded, we can fully clean up
2594 * the TXN reorder buffer.
2595 */
2596 if (streaming || rbtxn_prepared(txn))
2597 {
2599 /* Reset the CheckXidAlive */
2601 }
2602 else
2603 ReorderBufferCleanupTXN(rb, txn);
2604 }
2605 PG_CATCH();
2606 {
2608 ErrorData *errdata = CopyErrorData();
2609
2610 /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
2611 if (iterstate)
2612 ReorderBufferIterTXNFinish(rb, iterstate);
2613
2615
2616 /*
2617 * Force cache invalidation to happen outside of a valid transaction
2618 * to prevent catalog access as we just caught an error.
2619 */
2621
2622 /* make sure there's no cache pollution */
2624 txn->invalidations);
2625
2626 if (using_subtxn)
2628
2629 /*
2630 * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
2631 * abort of the (sub)transaction we are streaming or preparing. We
2632 * need to do the cleanup and return gracefully on this error, see
2633 * SetupCheckXidLive.
2634 *
2635 * This error code can be thrown by one of the callbacks we call
2636 * during decoding so we need to ensure that we return gracefully only
2637 * when we are sending the data in streaming mode and the streaming is
2638 * not finished yet or when we are sending the data out on a PREPARE
2639 * during a two-phase commit.
2640 */
2641 if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
2642 (stream_started || rbtxn_prepared(txn)))
2643 {
2644 /* curtxn must be set for streaming or prepared transactions */
2645 Assert(curtxn);
2646
2647 /* Cleanup the temporary error state. */
2649 FreeErrorData(errdata);
2650 errdata = NULL;
2651 curtxn->concurrent_abort = true;
2652
2653 /* Reset the TXN so that it is allowed to stream remaining data. */
2654 ReorderBufferResetTXN(rb, txn, snapshot_now,
2655 command_id, prev_lsn,
2656 specinsert);
2657 }
2658 else
2659 {
2660 ReorderBufferCleanupTXN(rb, txn);
2662 PG_RE_THROW();
2663 }
2664 }
2665 PG_END_TRY();
2666}
bool IsToastRelation(Relation relation)
Definition: catalog.c:175
void FreeErrorData(ErrorData *edata)
Definition: elog.c:1818
ErrorData * CopyErrorData(void)
Definition: elog.c:1746
void FlushErrorState(void)
Definition: elog.c:1867
#define PG_RE_THROW()
Definition: elog.h:412
#define PG_TRY(...)
Definition: elog.h:371
#define PG_END_TRY(...)
Definition: elog.h:396
#define PG_CATCH(...)
Definition: elog.h:381
void * palloc0(Size size)
Definition: mcxt.c:1347
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
const void * data
#define InvalidOid
Definition: postgres_ext.h:36
#define RelationIsLogicallyLogged(relation)
Definition: rel.h:701
#define RelationIsValid(relation)
Definition: rel.h:478
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2049
void RelationClose(Relation relation)
Definition: relcache.c:2171
Oid RelidByRelfilenumber(Oid reltablespace, RelFileNumber relfilenumber)
@ MAIN_FORKNUM
Definition: relpath.h:58
#define relpathperm(rlocator, forknum)
Definition: relpath.h:98
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id, XLogRecPtr last_lsn, ReorderBufferChange *specinsert)
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferIterTXNState *volatile *iter_state)
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
static void ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, Snapshot snapshot_now, CommandId command_id)
static void ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change, bool streaming)
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
#define CHANGES_THRESHOLD
static void ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool streaming)
static void SetupCheckXidLive(TransactionId xid)
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, ReorderBufferTXN *txn, CommandId cid)
static void ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation *relations, ReorderBufferChange *change, bool streaming)
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
static ReorderBufferChange * ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
#define rbtxn_prepared(txn)
void TeardownHistoricSnapshot(bool is_error)
Definition: snapmgr.c:1613
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
Definition: snapmgr.c:1597
int sqlerrcode
Definition: elog.h:439
RelFileNumber relNumber
Form_pg_class rd_rel
Definition: rel.h:111
RelFileLocator rlocator
Definition: reorderbuffer.h:98
RepOriginId origin_id
Definition: reorderbuffer.h:86
ReorderBufferBeginCB begin_prepare
ReorderBufferUpdateProgressTxnCB update_progress_txn
ReorderBufferStreamStopCB stream_stop
ReorderBufferCommitCB commit
ReorderBufferStreamStartCB stream_start
ReorderBufferBeginCB begin
TransactionId CheckXidAlive
Definition: xact.c:98
void StartTransactionCommand(void)
Definition: xact.c:3051
TransactionId GetCurrentTransactionIdIfAny(void)
Definition: xact.c:470
TransactionId GetCurrentTransactionId(void)
Definition: xact.c:453

References AbortCurrentTransaction(), ReorderBufferChange::action, Assert, ReorderBuffer::begin, ReorderBuffer::begin_prepare, BeginInternalSubTransaction(), CHANGES_THRESHOLD, CHECK_FOR_INTERRUPTS, CheckXidAlive, ReorderBufferChange::clear_toast_afterwards, ReorderBufferChange::command_id, ReorderBuffer::commit, ReorderBufferTXN::concurrent_abort, SnapshotData::copied, CopyErrorData(), SnapshotData::curcid, CurrentMemoryContext, ReorderBufferChange::data, data, dlist_delete(), elog, ERROR, FlushErrorState(), FreeErrorData(), GetCurrentTransactionId(), GetCurrentTransactionIdIfAny(), i, ReorderBufferChange::inval, ReorderBufferChange::invalidations, ReorderBufferTXN::invalidations, InvalidCommandId, InvalidOid, InvalidTransactionId, InvalidXLogRecPtr, IsToastRelation(), IsTransactionOrTransactionBlock(), ReorderBufferChange::lsn, MAIN_FORKNUM, MemoryContextSwitchTo(), ReorderBufferChange::newtuple, ReorderBufferChange::ninvalidations, ReorderBufferTXN::ninvalidations, ReorderBufferChange::node, ReorderBufferChange::nrelids, ReorderBufferChange::oldtuple, ReorderBufferChange::origin_id, ReorderBufferTXN::origin_id, ReorderBuffer::output_rewrites, palloc0(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, ReorderBuffer::prepare, rbtxn_is_streamed, rbtxn_prepared, RelationData::rd_rel, RelationClose(), RelationIdGetRelation(), RelationIsLogicallyLogged, RelationIsValid, RelidByRelfilenumber(), ReorderBufferChange::relids, RelFileLocator::relNumber, relpathperm, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferApplyChange(), ReorderBufferApplyMessage(), ReorderBufferApplyTruncate(), ReorderBufferBuildTupleCidHash(), ReorderBufferCleanupTXN(), ReorderBufferCopySnap(), ReorderBufferExecuteInvalidations(), ReorderBufferFreeSnap(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNInit(), ReorderBufferIterTXNNext(), ReorderBufferResetTXN(), ReorderBufferReturnChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastAppendChunk(), ReorderBufferToastReplace(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), ReorderBufferChange::rlocator, RollbackAndReleaseCurrentSubTransaction(), SetupCheckXidLive(), SetupHistoricSnapshot(), ReorderBufferChange::snapshot, RelFileLocator::spcOid, ErrorData::sqlerrcode, StartTransactionCommand(), ReorderBuffer::stream_start, ReorderBuffer::stream_stop, TeardownHistoricSnapshot(), ReorderBufferTXN::total_size, ReorderBuffer::totalBytes, ReorderBuffer::totalTxns, ReorderBufferChange::tp, ReorderBufferChange::truncate, ReorderBufferTXN::tuplecid_hash, ReorderBufferChange::txn, ReorderBuffer::update_progress_txn, and ReorderBufferTXN::xid.

Referenced by ReorderBufferReplay(), and ReorderBufferStreamTXN().

◆ ReorderBufferProcessXid()

void ReorderBufferProcessXid ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn 
)

Definition at line 3131 of file reorderbuffer.c.

3132{
3133 /* many records won't have an xid assigned, centralize check here */
3134 if (xid != InvalidTransactionId)
3135 ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3136}

References InvalidTransactionId, and ReorderBufferTXNByXid().

Referenced by heap2_decode(), heap_decode(), LogicalDecodingProcessRecord(), logicalmsg_decode(), standby_decode(), xact_decode(), and xlog_decode().

◆ ReorderBufferQueueChange()

void ReorderBufferQueueChange ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
ReorderBufferChange change,
bool  toast_insert 
)

Definition at line 788 of file reorderbuffer.c.

790{
791 ReorderBufferTXN *txn;
792
793 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
794
795 /*
796 * While streaming the previous changes we have detected that the
797 * transaction is aborted. So there is no point in collecting further
798 * changes for it.
799 */
800 if (txn->concurrent_abort)
801 {
802 /*
803 * We don't need to update memory accounting for this change as we
804 * have not added it to the queue yet.
805 */
806 ReorderBufferReturnChange(rb, change, false);
807 return;
808 }
809
810 /*
811 * The changes that are sent downstream are considered streamable. We
812 * remember such transactions so that only those will later be considered
813 * for streaming.
814 */
815 if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
821 {
822 ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
823
825 }
826
827 change->lsn = lsn;
828 change->txn = txn;
829
831 dlist_push_tail(&txn->changes, &change->node);
832 txn->nentries++;
833 txn->nentries_mem++;
834
835 /* update memory accounting information */
836 ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
838
839 /* process partial change */
840 ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
841
842 /* check the memory limits and evict something if needed */
844}
static void ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferChange *change, bool toast_insert)
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
#define RBTXN_HAS_STREAMABLE_CHANGE

References ReorderBufferChange::action, Assert, ReorderBufferTXN::changes, ReorderBufferTXN::concurrent_abort, dlist_push_tail(), InvalidXLogRecPtr, ReorderBufferChange::lsn, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, rbtxn_get_toptxn, RBTXN_HAS_STREAMABLE_CHANGE, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferCheckMemoryLimit(), ReorderBufferProcessPartialChange(), ReorderBufferReturnChange(), ReorderBufferTXNByXid(), ReorderBufferChange::txn, and ReorderBufferTXN::txn_flags.

Referenced by DecodeDelete(), DecodeInsert(), DecodeMultiInsert(), DecodeSpecConfirm(), DecodeTruncate(), DecodeUpdate(), ReorderBufferAddInvalidations(), ReorderBufferAddNewCommandId(), ReorderBufferAddSnapshot(), and ReorderBufferQueueMessage().

◆ ReorderBufferQueueMessage()

void ReorderBufferQueueMessage ( ReorderBuffer rb,
TransactionId  xid,
Snapshot  snap,
XLogRecPtr  lsn,
bool  transactional,
const char *  prefix,
Size  message_size,
const char *  message 
)

Definition at line 851 of file reorderbuffer.c.

855{
856 if (transactional)
857 {
858 MemoryContext oldcontext;
859 ReorderBufferChange *change;
860
862
863 /*
864 * We don't expect snapshots for transactional changes - we'll use the
865 * snapshot derived later during apply (unless the change gets
866 * skipped).
867 */
868 Assert(!snap);
869
870 oldcontext = MemoryContextSwitchTo(rb->context);
871
872 change = ReorderBufferGetChange(rb);
874 change->data.msg.prefix = pstrdup(prefix);
875 change->data.msg.message_size = message_size;
876 change->data.msg.message = palloc(message_size);
877 memcpy(change->data.msg.message, message, message_size);
878
879 ReorderBufferQueueChange(rb, xid, lsn, change, false);
880
881 MemoryContextSwitchTo(oldcontext);
882 }
883 else
884 {
885 ReorderBufferTXN *txn = NULL;
886 volatile Snapshot snapshot_now = snap;
887
888 /* Non-transactional changes require a valid snapshot. */
889 Assert(snapshot_now);
890
891 if (xid != InvalidTransactionId)
892 txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
893
894 /* setup snapshot to allow catalog access */
895 SetupHistoricSnapshot(snapshot_now, NULL);
896 PG_TRY();
897 {
898 rb->message(rb, txn, lsn, false, prefix, message_size, message);
899
901 }
902 PG_CATCH();
903 {
905 PG_RE_THROW();
906 }
907 PG_END_TRY();
908 }
909}

References ReorderBufferChange::action, Assert, ReorderBuffer::context, ReorderBufferChange::data, InvalidTransactionId, MemoryContextSwitchTo(), ReorderBufferChange::message, ReorderBuffer::message, ReorderBufferChange::message_size, ReorderBufferChange::msg, palloc(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, ReorderBufferChange::prefix, pstrdup(), REORDER_BUFFER_CHANGE_MESSAGE, ReorderBufferGetChange(), ReorderBufferQueueChange(), ReorderBufferTXNByXid(), SetupHistoricSnapshot(), and TeardownHistoricSnapshot().

Referenced by logicalmsg_decode().

◆ ReorderBufferRememberPrepareInfo()

bool ReorderBufferRememberPrepareInfo ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  prepare_lsn,
XLogRecPtr  end_lsn,
TimestampTz  prepare_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)

Definition at line 2762 of file reorderbuffer.c.

2766{
2767 ReorderBufferTXN *txn;
2768
2769 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2770
2771 /* unknown transaction, nothing to do */
2772 if (txn == NULL)
2773 return false;
2774
2775 /*
2776 * Remember the prepare information to be later used by commit prepared in
2777 * case we skip doing prepare.
2778 */
2779 txn->final_lsn = prepare_lsn;
2780 txn->end_lsn = end_lsn;
2781 txn->xact_time.prepare_time = prepare_time;
2782 txn->origin_id = origin_id;
2783 txn->origin_lsn = origin_lsn;
2784
2785 return true;
2786}

References ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, InvalidXLogRecPtr, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, ReorderBufferTXN::prepare_time, ReorderBufferTXNByXid(), and ReorderBufferTXN::xact_time.

Referenced by DecodePrepare().

◆ ReorderBufferReplay()

static void ReorderBufferReplay ( ReorderBufferTXN txn,
ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  commit_lsn,
XLogRecPtr  end_lsn,
TimestampTz  commit_time,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn 
)
static

Definition at line 2679 of file reorderbuffer.c.

2684{
2685 Snapshot snapshot_now;
2686 CommandId command_id = FirstCommandId;
2687
2688 txn->final_lsn = commit_lsn;
2689 txn->end_lsn = end_lsn;
2690 txn->xact_time.commit_time = commit_time;
2691 txn->origin_id = origin_id;
2692 txn->origin_lsn = origin_lsn;
2693
2694 /*
2695 * If the transaction was (partially) streamed, we need to commit it in a
2696 * 'streamed' way. That is, we first stream the remaining part of the
2697 * transaction, and then invoke stream_commit message.
2698 *
2699 * Called after everything (origin ID, LSN, ...) is stored in the
2700 * transaction to avoid passing that information directly.
2701 */
2702 if (rbtxn_is_streamed(txn))
2703 {
2705 return;
2706 }
2707
2708 /*
2709 * If this transaction has no snapshot, it didn't make any changes to the
2710 * database, so there's nothing to decode. Note that
2711 * ReorderBufferCommitChild will have transferred any snapshots from
2712 * subtransactions if there were any.
2713 */
2714 if (txn->base_snapshot == NULL)
2715 {
2716 Assert(txn->ninvalidations == 0);
2717
2718 /*
2719 * Removing this txn before a commit might result in the computation
2720 * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
2721 */
2722 if (!rbtxn_prepared(txn))
2723 ReorderBufferCleanupTXN(rb, txn);
2724 return;
2725 }
2726
2727 snapshot_now = txn->base_snapshot;
2728
2729 /* Process and send the changes to output plugin. */
2730 ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2731 command_id, false);
2732}
#define FirstCommandId
Definition: c.h:622
uint32 CommandId
Definition: c.h:620
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
static void ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, XLogRecPtr commit_lsn, volatile Snapshot snapshot_now, volatile CommandId command_id, bool streaming)

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::commit_time, ReorderBufferTXN::end_lsn, ReorderBufferTXN::final_lsn, FirstCommandId, ReorderBufferTXN::ninvalidations, ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, rbtxn_is_streamed, rbtxn_prepared, ReorderBufferCleanupTXN(), ReorderBufferProcessTXN(), ReorderBufferStreamCommit(), and ReorderBufferTXN::xact_time.

Referenced by ReorderBufferCommit(), ReorderBufferFinishPrepared(), and ReorderBufferPrepare().

◆ ReorderBufferResetTXN()

static void ReorderBufferResetTXN ( ReorderBuffer rb,
ReorderBufferTXN txn,
Snapshot  snapshot_now,
CommandId  command_id,
XLogRecPtr  last_lsn,
ReorderBufferChange specinsert 
)
static

Definition at line 2063 of file reorderbuffer.c.

2068{
2069 /* Discard the changes that we just streamed */
2071
2072 /* Free all resources allocated for toast reconstruction */
2073 ReorderBufferToastReset(rb, txn);
2074
2075 /* Return the spec insert change if it is not NULL */
2076 if (specinsert != NULL)
2077 {
2078 ReorderBufferReturnChange(rb, specinsert, true);
2079 specinsert = NULL;
2080 }
2081
2082 /*
2083 * For the streaming case, stop the stream and remember the command ID and
2084 * snapshot for the streaming run.
2085 */
2086 if (rbtxn_is_streamed(txn))
2087 {
2088 rb->stream_stop(rb, txn, last_lsn);
2089 ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2090 }
2091
2092 /* All changes must be deallocated */
2093 Assert(txn->size == 0);
2094}

References Assert, rbtxn_is_streamed, rbtxn_prepared, ReorderBufferReturnChange(), ReorderBufferSaveTXNSnapshot(), ReorderBufferToastReset(), ReorderBufferTruncateTXN(), ReorderBufferTXN::size, and ReorderBuffer::stream_stop.

Referenced by ReorderBufferProcessTXN().

◆ ReorderBufferRestoreChange()

static void ReorderBufferRestoreChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
char *  data 
)
static

Definition at line 4400 of file reorderbuffer.c.

4402{
4404 ReorderBufferChange *change;
4405
4406 ondisk = (ReorderBufferDiskChange *) data;
4407
4408 change = ReorderBufferGetChange(rb);
4409
4410 /* copy static part */
4411 memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
4412
4413 data += sizeof(ReorderBufferDiskChange);
4414
4415 /* restore individual stuff */
4416 switch (change->action)
4417 {
4418 /* fall through these, they're all similar enough */
4423 if (change->data.tp.oldtuple)
4424 {
4425 uint32 tuplelen = ((HeapTuple) data)->t_len;
4426
4427 change->data.tp.oldtuple =
4429
4430 /* restore ->tuple */
4431 memcpy(change->data.tp.oldtuple, data,
4432 sizeof(HeapTupleData));
4433 data += sizeof(HeapTupleData);
4434
4435 /* reset t_data pointer into the new tuplebuf */
4436 change->data.tp.oldtuple->t_data =
4437 (HeapTupleHeader) ((char *) change->data.tp.oldtuple + HEAPTUPLESIZE);
4438
4439 /* restore tuple data itself */
4440 memcpy(change->data.tp.oldtuple->t_data, data, tuplelen);
4441 data += tuplelen;
4442 }
4443
4444 if (change->data.tp.newtuple)
4445 {
4446 /* here, data might not be suitably aligned! */
4447 uint32 tuplelen;
4448
4449 memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
4450 sizeof(uint32));
4451
4452 change->data.tp.newtuple =
4454
4455 /* restore ->tuple */
4456 memcpy(change->data.tp.newtuple, data,
4457 sizeof(HeapTupleData));
4458 data += sizeof(HeapTupleData);
4459
4460 /* reset t_data pointer into the new tuplebuf */
4461 change->data.tp.newtuple->t_data =
4462 (HeapTupleHeader) ((char *) change->data.tp.newtuple + HEAPTUPLESIZE);
4463
4464 /* restore tuple data itself */
4465 memcpy(change->data.tp.newtuple->t_data, data, tuplelen);
4466 data += tuplelen;
4467 }
4468
4469 break;
4471 {
4472 Size prefix_size;
4473
4474 /* read prefix */
4475 memcpy(&prefix_size, data, sizeof(Size));
4476 data += sizeof(Size);
4478 prefix_size);
4479 memcpy(change->data.msg.prefix, data, prefix_size);
4480 Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
4481 data += prefix_size;
4482
4483 /* read the message */
4484 memcpy(&change->data.msg.message_size, data, sizeof(Size));
4485 data += sizeof(Size);
4487 change->data.msg.message_size);
4488 memcpy(change->data.msg.message, data,
4489 change->data.msg.message_size);
4490 data += change->data.msg.message_size;
4491
4492 break;
4493 }
4495 {
4496 Size inval_size = sizeof(SharedInvalidationMessage) *
4497 change->data.inval.ninvalidations;
4498
4499 change->data.inval.invalidations =
4500 MemoryContextAlloc(rb->context, inval_size);
4501
4502 /* read the message */
4503 memcpy(change->data.inval.invalidations, data, inval_size);
4504
4505 break;
4506 }
4508 {
4509 Snapshot oldsnap;
4510 Snapshot newsnap;
4511 Size size;
4512
4513 oldsnap = (Snapshot) data;
4514
4515 size = sizeof(SnapshotData) +
4516 sizeof(TransactionId) * oldsnap->xcnt +
4517 sizeof(TransactionId) * (oldsnap->subxcnt + 0);
4518
4520
4521 newsnap = change->data.snapshot;
4522
4523 memcpy(newsnap, data, size);
4524 newsnap->xip = (TransactionId *)
4525 (((char *) newsnap) + sizeof(SnapshotData));
4526 newsnap->subxip = newsnap->xip + newsnap->xcnt;
4527 newsnap->copied = true;
4528 break;
4529 }
4530 /* the base struct contains all the data, easy peasy */
4532 {
4533 Oid *relids;
4534
4535 relids = ReorderBufferGetRelids(rb,
4536 change->data.truncate.nrelids);
4537 memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
4538 change->data.truncate.relids = relids;
4539
4540 break;
4541 }
4546 break;
4547 }
4548
4549 dlist_push_tail(&txn->changes, &change->node);
4550 txn->nentries_mem++;
4551
4552 /*
4553 * Update memory accounting for the restored change. We need to do this
4554 * although we don't check the memory limit when restoring the changes in
4555 * this branch (we only do that when initially queueing the changes after
4556 * decoding), because we will release the changes later, and that will
4557 * update the accounting too (subtracting the size from the counters). And
4558 * we don't want to underflow there.
4559 */
4560 ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
4561 ReorderBufferChangeSize(change));
4562}
struct ReorderBufferDiskChange ReorderBufferDiskChange
Oid * ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids)
HeapTuple ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
struct SnapshotData * Snapshot
Definition: snapshot.h:117
ReorderBufferChange change

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, ReorderBufferTXN::changes, ReorderBuffer::context, SnapshotData::copied, ReorderBufferChange::data, data, dlist_push_tail(), HEAPTUPLESIZE, ReorderBufferChange::inval, ReorderBufferChange::invalidations, MemoryContextAlloc(), MemoryContextAllocZero(), ReorderBufferChange::message, ReorderBufferChange::message_size, ReorderBufferChange::msg, ReorderBufferTXN::nentries_mem, ReorderBufferChange::newtuple, ReorderBufferChange::ninvalidations, ReorderBufferChange::node, ReorderBufferChange::nrelids, ReorderBufferChange::oldtuple, ReorderBufferChange::prefix, ReorderBufferChange::relids, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferGetChange(), ReorderBufferGetRelids(), ReorderBufferGetTupleBuf(), size, SizeofHeapTupleHeader, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, HeapTupleData::t_data, ReorderBufferChange::tp, ReorderBufferChange::truncate, SnapshotData::xcnt, and SnapshotData::xip.

Referenced by ReorderBufferRestoreChanges().

◆ ReorderBufferRestoreChanges()

static Size ReorderBufferRestoreChanges ( ReorderBuffer rb,
ReorderBufferTXN txn,
TXNEntryFile file,
XLogSegNo segno 
)
static

Definition at line 4257 of file reorderbuffer.c.

4259{
4260 Size restored = 0;
4261 XLogSegNo last_segno;
4262 dlist_mutable_iter cleanup_iter;
4263 File *fd = &file->vfd;
4264
4267
4268 /* free current entries, so we have memory for more */
4269 dlist_foreach_modify(cleanup_iter, &txn->changes)
4270 {
4272 dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
4273
4274 dlist_delete(&cleanup->node);
4276 }
4277 txn->nentries_mem = 0;
4279
4280 XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
4281
4282 while (restored < max_changes_in_memory && *segno <= last_segno)
4283 {
4284 int readBytes;
4286
4288
4289 if (*fd == -1)
4290 {
4291 char path[MAXPGPATH];
4292
4293 /* first time in */
4294 if (*segno == 0)
4295 XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
4296
4297 Assert(*segno != 0 || dlist_is_empty(&txn->changes));
4298
4299 /*
4300 * No need to care about TLIs here, only used during a single run,
4301 * so each LSN only maps to a specific WAL record.
4302 */
4304 *segno);
4305
4306 *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
4307
4308 /* No harm in resetting the offset even in case of failure */
4309 file->curOffset = 0;
4310
4311 if (*fd < 0 && errno == ENOENT)
4312 {
4313 *fd = -1;
4314 (*segno)++;
4315 continue;
4316 }
4317 else if (*fd < 0)
4318 ereport(ERROR,
4320 errmsg("could not open file \"%s\": %m",
4321 path)));
4322 }
4323
4324 /*
4325 * Read the statically sized part of a change which has information
4326 * about the total size. If we couldn't read a record, we're at the
4327 * end of this file.
4328 */
4330 readBytes = FileRead(file->vfd, rb->outbuf,
4332 file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
4333
4334 /* eof */
4335 if (readBytes == 0)
4336 {
4337 FileClose(*fd);
4338 *fd = -1;
4339 (*segno)++;
4340 continue;
4341 }
4342 else if (readBytes < 0)
4343 ereport(ERROR,
4345 errmsg("could not read from reorderbuffer spill file: %m")));
4346 else if (readBytes != sizeof(ReorderBufferDiskChange))
4347 ereport(ERROR,
4349 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4350 readBytes,
4351 (uint32) sizeof(ReorderBufferDiskChange))));
4352
4353 file->curOffset += readBytes;
4354
4355 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4356
4358 sizeof(ReorderBufferDiskChange) + ondisk->size);
4359 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4360
4361 readBytes = FileRead(file->vfd,
4362 rb->outbuf + sizeof(ReorderBufferDiskChange),
4363 ondisk->size - sizeof(ReorderBufferDiskChange),
4364 file->curOffset,
4365 WAIT_EVENT_REORDER_BUFFER_READ);
4366
4367 if (readBytes < 0)
4368 ereport(ERROR,
4370 errmsg("could not read from reorderbuffer spill file: %m")));
4371 else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
4372 ereport(ERROR,
4374 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4375 readBytes,
4376 (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
4377
4378 file->curOffset += readBytes;
4379
4380 /*
4381 * ok, read a full change from disk, now restore it into proper
4382 * in-memory format
4383 */
4384 ReorderBufferRestoreChange(rb, txn, rb->outbuf);
4385 restored++;
4386 }
4387
4388 return restored;
4389}
static void cleanup(void)
Definition: bootstrap.c:713
File PathNameOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1574
static ssize_t FileRead(File file, void *buffer, size_t amount, off_t offset, uint32 wait_event_info)
Definition: fd.h:196
int File
Definition: fd.h:51
static void ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *data)
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, XLogSegNo segno)
static const Size max_changes_in_memory
int wal_segment_size
Definition: xlog.c:143
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition: xlogdefs.h:48

References Assert, ReorderBufferTXN::changes, CHECK_FOR_INTERRUPTS, cleanup(), dlist_mutable_iter::cur, TXNEntryFile::curOffset, dlist_container, dlist_delete(), dlist_foreach_modify, dlist_is_empty(), ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), FileClose(), FileRead(), ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, max_changes_in_memory, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBuffer::outbuf, PathNameOpenFile(), PG_BINARY, ReorderBufferRestoreChange(), ReorderBufferReturnChange(), ReorderBufferSerializedPath(), ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, TXNEntryFile::vfd, wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferIterTXNInit(), and ReorderBufferIterTXNNext().

◆ ReorderBufferRestoreCleanup()

static void ReorderBufferRestoreCleanup ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 4568 of file reorderbuffer.c.

4569{
4570 XLogSegNo first;
4571 XLogSegNo cur;
4572 XLogSegNo last;
4573
4576
4579
4580 /* iterate over all possible filenames, and delete them */
4581 for (cur = first; cur <= last; cur++)
4582 {
4583 char path[MAXPGPATH];
4584
4586 if (unlink(path) != 0 && errno != ENOENT)
4587 ereport(ERROR,
4589 errmsg("could not remove file \"%s\": %m", path)));
4590 }
4591}
struct cursor * cur
Definition: ecpg.c:29

References Assert, cur, ereport, errcode_for_file_access(), errmsg(), ERROR, ReorderBufferTXN::final_lsn, ReorderBufferTXN::first_lsn, InvalidXLogRecPtr, MAXPGPATH, MyReplicationSlot, ReorderBufferSerializedPath(), wal_segment_size, ReorderBufferTXN::xid, and XLByteToSeg.

Referenced by ReorderBufferCleanupTXN(), and ReorderBufferTruncateTXN().

◆ ReorderBufferReturnChange()

void ReorderBufferReturnChange ( ReorderBuffer rb,
ReorderBufferChange change,
bool  upd_mem 
)

Definition at line 500 of file reorderbuffer.c.

502{
503 /* update memory accounting info */
504 if (upd_mem)
505 ReorderBufferChangeMemoryUpdate(rb, change, NULL, false,
507
508 /* free contained data */
509 switch (change->action)
510 {
515 if (change->data.tp.newtuple)
516 {
518 change->data.tp.newtuple = NULL;
519 }
520
521 if (change->data.tp.oldtuple)
522 {
524 change->data.tp.oldtuple = NULL;
525 }
526 break;
528 if (change->data.msg.prefix != NULL)
529 pfree(change->data.msg.prefix);
530 change->data.msg.prefix = NULL;
531 if (change->data.msg.message != NULL)
532 pfree(change->data.msg.message);
533 change->data.msg.message = NULL;
534 break;
536 if (change->data.inval.invalidations)
537 pfree(change->data.inval.invalidations);
538 change->data.inval.invalidations = NULL;
539 break;
541 if (change->data.snapshot)
542 {
544 change->data.snapshot = NULL;
545 }
546 break;
547 /* no data in addition to the struct itself */
549 if (change->data.truncate.relids != NULL)
550 {
552 change->data.truncate.relids = NULL;
553 }
554 break;
559 break;
560 }
561
562 pfree(change);
563}
void ReorderBufferReturnTupleBuf(HeapTuple tuple)
void ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids)

References ReorderBufferChange::action, ReorderBufferChange::data, ReorderBufferChange::inval, ReorderBufferChange::invalidations, ReorderBufferChange::message, ReorderBufferChange::msg, ReorderBufferChange::newtuple, ReorderBufferChange::oldtuple, pfree(), ReorderBufferChange::prefix, ReorderBufferChange::relids, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChangeMemoryUpdate(), ReorderBufferChangeSize(), ReorderBufferFreeSnap(), ReorderBufferReturnRelids(), ReorderBufferReturnTupleBuf(), ReorderBufferChange::snapshot, ReorderBufferChange::tp, and ReorderBufferChange::truncate.

Referenced by ReorderBufferCleanupTXN(), ReorderBufferIterTXNFinish(), ReorderBufferIterTXNNext(), ReorderBufferProcessTXN(), ReorderBufferQueueChange(), ReorderBufferResetTXN(), ReorderBufferRestoreChanges(), ReorderBufferSerializeTXN(), ReorderBufferToastReset(), and ReorderBufferTruncateTXN().

◆ ReorderBufferReturnRelids()

void ReorderBufferReturnRelids ( ReorderBuffer rb,
Oid relids 
)

Definition at line 619 of file reorderbuffer.c.

620{
621 pfree(relids);
622}

References pfree().

Referenced by ReorderBufferReturnChange().

◆ ReorderBufferReturnTupleBuf()

void ReorderBufferReturnTupleBuf ( HeapTuple  tuple)

Definition at line 588 of file reorderbuffer.c.

589{
590 pfree(tuple);
591}

References pfree().

Referenced by ReorderBufferReturnChange().

◆ ReorderBufferReturnTXN()

static void ReorderBufferReturnTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 443 of file reorderbuffer.c.

444{
445 /* clean the lookup cache if we were cached (quite likely) */
446 if (rb->by_txn_last_xid == txn->xid)
447 {
449 rb->by_txn_last_txn = NULL;
450 }
451
452 /* free data that's contained */
453
454 if (txn->gid != NULL)
455 {
456 pfree(txn->gid);
457 txn->gid = NULL;
458 }
459
460 if (txn->tuplecid_hash != NULL)
461 {
463 txn->tuplecid_hash = NULL;
464 }
465
466 if (txn->invalidations)
467 {
468 pfree(txn->invalidations);
469 txn->invalidations = NULL;
470 }
471
472 /* Reset the toast hash */
474
475 /* All changes must be deallocated */
476 Assert(txn->size == 0);
477
478 pfree(txn);
479}
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:865

References Assert, ReorderBuffer::by_txn_last_txn, ReorderBuffer::by_txn_last_xid, ReorderBufferTXN::gid, hash_destroy(), ReorderBufferTXN::invalidations, InvalidTransactionId, pfree(), ReorderBufferToastReset(), ReorderBufferTXN::size, ReorderBufferTXN::tuplecid_hash, and ReorderBufferTXN::xid.

Referenced by ReorderBufferCleanupTXN().

◆ ReorderBufferSaveTXNSnapshot()

static void ReorderBufferSaveTXNSnapshot ( ReorderBuffer rb,
ReorderBufferTXN txn,
Snapshot  snapshot_now,
CommandId  command_id 
)
inlinestatic

Definition at line 2042 of file reorderbuffer.c.

2044{
2045 txn->command_id = command_id;
2046
2047 /* Avoid copying if it's already copied. */
2048 if (snapshot_now->copied)
2049 txn->snapshot_now = snapshot_now;
2050 else
2051 txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2052 txn, command_id);
2053}

References ReorderBufferTXN::command_id, SnapshotData::copied, ReorderBufferCopySnap(), and ReorderBufferTXN::snapshot_now.

Referenced by ReorderBufferProcessTXN(), and ReorderBufferResetTXN().

◆ ReorderBufferSerializeChange()

static void ReorderBufferSerializeChange ( ReorderBuffer rb,
ReorderBufferTXN txn,
int  fd,
ReorderBufferChange change 
)
static

Definition at line 3805 of file reorderbuffer.c.

3807{
3809 Size sz = sizeof(ReorderBufferDiskChange);
3810
3812
3813 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3814 memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
3815
3816 switch (change->action)
3817 {
3818 /* fall through these, they're all similar enough */
3823 {
3824 char *data;
3825 HeapTuple oldtup,
3826 newtup;
3827 Size oldlen = 0;
3828 Size newlen = 0;
3829
3830 oldtup = change->data.tp.oldtuple;
3831 newtup = change->data.tp.newtuple;
3832
3833 if (oldtup)
3834 {
3835 sz += sizeof(HeapTupleData);
3836 oldlen = oldtup->t_len;
3837 sz += oldlen;
3838 }
3839
3840 if (newtup)
3841 {
3842 sz += sizeof(HeapTupleData);
3843 newlen = newtup->t_len;
3844 sz += newlen;
3845 }
3846
3847 /* make sure we have enough space */
3849
3850 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3851 /* might have been reallocated above */
3852 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3853
3854 if (oldlen)
3855 {
3856 memcpy(data, oldtup, sizeof(HeapTupleData));
3857 data += sizeof(HeapTupleData);
3858
3859 memcpy(data, oldtup->t_data, oldlen);
3860 data += oldlen;
3861 }
3862
3863 if (newlen)
3864 {
3865 memcpy(data, newtup, sizeof(HeapTupleData));
3866 data += sizeof(HeapTupleData);
3867
3868 memcpy(data, newtup->t_data, newlen);
3869 data += newlen;
3870 }
3871 break;
3872 }
3874 {
3875 char *data;
3876 Size prefix_size = strlen(change->data.msg.prefix) + 1;
3877
3878 sz += prefix_size + change->data.msg.message_size +
3879 sizeof(Size) + sizeof(Size);
3881
3882 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3883
3884 /* might have been reallocated above */
3885 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3886
3887 /* write the prefix including the size */
3888 memcpy(data, &prefix_size, sizeof(Size));
3889 data += sizeof(Size);
3890 memcpy(data, change->data.msg.prefix,
3891 prefix_size);
3892 data += prefix_size;
3893
3894 /* write the message including the size */
3895 memcpy(data, &change->data.msg.message_size, sizeof(Size));
3896 data += sizeof(Size);
3897 memcpy(data, change->data.msg.message,
3898 change->data.msg.message_size);
3899 data += change->data.msg.message_size;
3900
3901 break;
3902 }
3904 {
3905 char *data;
3906 Size inval_size = sizeof(SharedInvalidationMessage) *
3907 change->data.inval.ninvalidations;
3908
3909 sz += inval_size;
3910
3912 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3913
3914 /* might have been reallocated above */
3915 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3916 memcpy(data, change->data.inval.invalidations, inval_size);
3917 data += inval_size;
3918
3919 break;
3920 }
3922 {
3923 Snapshot snap;
3924 char *data;
3925
3926 snap = change->data.snapshot;
3927
3928 sz += sizeof(SnapshotData) +
3929 sizeof(TransactionId) * snap->xcnt +
3930 sizeof(TransactionId) * snap->subxcnt;
3931
3932 /* make sure we have enough space */
3934 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3935 /* might have been reallocated above */
3936 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3937
3938 memcpy(data, snap, sizeof(SnapshotData));
3939 data += sizeof(SnapshotData);
3940
3941 if (snap->xcnt)
3942 {
3943 memcpy(data, snap->xip,
3944 sizeof(TransactionId) * snap->xcnt);
3945 data += sizeof(TransactionId) * snap->xcnt;
3946 }
3947
3948 if (snap->subxcnt)
3949 {
3950 memcpy(data, snap->subxip,
3951 sizeof(TransactionId) * snap->subxcnt);
3952 data += sizeof(TransactionId) * snap->subxcnt;
3953 }
3954 break;
3955 }
3957 {
3958 Size size;
3959 char *data;
3960
3961 /* account for the OIDs of truncated relations */
3962 size = sizeof(Oid) * change->data.truncate.nrelids;
3963 sz += size;
3964
3965 /* make sure we have enough space */
3967
3968 data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3969 /* might have been reallocated above */
3970 ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3971
3972 memcpy(data, change->data.truncate.relids, size);
3973 data += size;
3974
3975 break;
3976 }
3981 /* ReorderBufferChange contains everything important */
3982 break;
3983 }
3984
3985 ondisk->size = sz;
3986
3987 errno = 0;
3988 pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
3989 if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
3990 {
3991 int save_errno = errno;
3992
3994
3995 /* if write didn't set errno, assume problem is no disk space */
3996 errno = save_errno ? save_errno : ENOSPC;
3997 ereport(ERROR,
3999 errmsg("could not write to data file for XID %u: %m",
4000 txn->xid)));
4001 }
4003
4004 /*
4005 * Keep the transaction's final_lsn up to date with each change we send to
4006 * disk, so that ReorderBufferRestoreCleanup works correctly. (We used to
4007 * only do this on commit and abort records, but that doesn't work if a
4008 * system crash leaves a transaction without its abort record).
4009 *
4010 * Make sure not to move it backwards.
4011 */
4012 if (txn->final_lsn < change->lsn)
4013 txn->final_lsn = change->lsn;
4014
4015 Assert(ondisk->change.action == change->action);
4016}
#define write(a, b, c)
Definition: win32.h:14
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:76

References ReorderBufferChange::action, Assert, ReorderBufferDiskChange::change, CloseTransientFile(), ReorderBufferChange::data, data, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferTXN::final_lsn, if(), ReorderBufferChange::inval, ReorderBufferChange::invalidations, ReorderBufferChange::lsn, ReorderBufferChange::message, ReorderBufferChange::message_size, ReorderBufferChange::msg, ReorderBufferChange::newtuple, ReorderBufferChange::ninvalidations, ReorderBufferChange::nrelids, ReorderBufferChange::oldtuple, ReorderBuffer::outbuf, pgstat_report_wait_end(), pgstat_report_wait_start(), ReorderBufferChange::prefix, ReorderBufferChange::relids, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID, REORDER_BUFFER_CHANGE_INVALIDATION, REORDER_BUFFER_CHANGE_MESSAGE, REORDER_BUFFER_CHANGE_TRUNCATE, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferSerializeReserve(), ReorderBufferDiskChange::size, size, ReorderBufferChange::snapshot, SnapshotData::subxcnt, SnapshotData::subxip, HeapTupleData::t_data, HeapTupleData::t_len, ReorderBufferChange::tp, ReorderBufferChange::truncate, write, SnapshotData::xcnt, ReorderBufferTXN::xid, and SnapshotData::xip.

Referenced by ReorderBufferSerializeTXN().

◆ ReorderBufferSerializedPath()

static void ReorderBufferSerializedPath ( char *  path,
ReplicationSlot slot,
TransactionId  xid,
XLogSegNo  segno 
)
static

Definition at line 4637 of file reorderbuffer.c.

4639{
4640 XLogRecPtr recptr;
4641
4642 XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
4643
4644 snprintf(path, MAXPGPATH, "%s/%s/xid-%u-lsn-%X-%X.spill",
4647 xid, LSN_FORMAT_ARGS(recptr));
4648}
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43

References ReplicationSlot::data, LSN_FORMAT_ARGS, MAXPGPATH, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, PG_REPLSLOT_DIR, snprintf, wal_segment_size, and XLogSegNoOffsetToRecPtr.

Referenced by ReorderBufferRestoreChanges(), ReorderBufferRestoreCleanup(), and ReorderBufferSerializeTXN().

◆ ReorderBufferSerializeReserve()

static void ReorderBufferSerializeReserve ( ReorderBuffer rb,
Size  sz 
)
static

Definition at line 3520 of file reorderbuffer.c.

3521{
3522 if (!rb->outbufsize)
3523 {
3524 rb->outbuf = MemoryContextAlloc(rb->context, sz);
3525 rb->outbufsize = sz;
3526 }
3527 else if (rb->outbufsize < sz)
3528 {
3529 rb->outbuf = repalloc(rb->outbuf, sz);
3530 rb->outbufsize = sz;
3531 }
3532}

References ReorderBuffer::context, MemoryContextAlloc(), ReorderBuffer::outbuf, ReorderBuffer::outbufsize, and repalloc().

Referenced by ReorderBufferRestoreChanges(), and ReorderBufferSerializeChange().

◆ ReorderBufferSerializeTXN()

static void ReorderBufferSerializeTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 3710 of file reorderbuffer.c.

3711{
3712 dlist_iter subtxn_i;
3713 dlist_mutable_iter change_i;
3714 int fd = -1;
3715 XLogSegNo curOpenSegNo = 0;
3716 Size spilled = 0;
3717 Size size = txn->size;
3718
3719 elog(DEBUG2, "spill %u changes in XID %u to disk",
3720 (uint32) txn->nentries_mem, txn->xid);
3721
3722 /* do the same to all child TXs */
3723 dlist_foreach(subtxn_i, &txn->subtxns)
3724 {
3725 ReorderBufferTXN *subtxn;
3726
3727 subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
3728 ReorderBufferSerializeTXN(rb, subtxn);
3729 }
3730
3731 /* serialize changestream */
3732 dlist_foreach_modify(change_i, &txn->changes)
3733 {
3734 ReorderBufferChange *change;
3735
3736 change = dlist_container(ReorderBufferChange, node, change_i.cur);
3737
3738 /*
3739 * store in segment in which it belongs by start lsn, don't split over
3740 * multiple segments tho
3741 */
3742 if (fd == -1 ||
3743 !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
3744 {
3745 char path[MAXPGPATH];
3746
3747 if (fd != -1)
3749
3750 XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
3751
3752 /*
3753 * No need to care about TLIs here, only used during a single run,
3754 * so each LSN only maps to a specific WAL record.
3755 */
3757 curOpenSegNo);
3758
3759 /* open segment, create it if necessary */
3760 fd = OpenTransientFile(path,
3761 O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
3762
3763 if (fd < 0)
3764 ereport(ERROR,
3766 errmsg("could not open file \"%s\": %m", path)));
3767 }
3768
3769 ReorderBufferSerializeChange(rb, txn, fd, change);
3770 dlist_delete(&change->node);
3771 ReorderBufferReturnChange(rb, change, false);
3772
3773 spilled++;
3774 }
3775
3776 /* Update the memory counter */
3777 ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, size);
3778
3779 /* update the statistics iff we have spilled anything */
3780 if (spilled)
3781 {
3782 rb->spillCount += 1;
3783 rb->spillBytes += size;
3784
3785 /* don't consider already serialized transactions */
3786 rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
3787
3788 /* update the decoding stats */
3790 }
3791
3792 Assert(spilled == txn->nentries_mem);
3794 txn->nentries_mem = 0;
3796
3797 if (fd != -1)
3799}
void UpdateDecodingStats(LogicalDecodingContext *ctx)
Definition: logical.c:1934
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, int fd, ReorderBufferChange *change)
#define rbtxn_is_serialized_clear(txn)
#define RBTXN_IS_SERIALIZED
#define XLByteInSeg(xlrp, logSegNo, wal_segsz_bytes)

References Assert, ReorderBufferTXN::changes, CloseTransientFile(), dlist_iter::cur, dlist_mutable_iter::cur, DEBUG2, dlist_container, dlist_delete(), dlist_foreach, dlist_foreach_modify, dlist_is_empty(), elog, ereport, errcode_for_file_access(), errmsg(), ERROR, fd(), ReorderBufferChange::lsn, MAXPGPATH, MyReplicationSlot, ReorderBufferTXN::nentries_mem, ReorderBufferChange::node, OpenTransientFile(), PG_BINARY, ReorderBuffer::private_data, RBTXN_IS_SERIALIZED, rbtxn_is_serialized, rbtxn_is_serialized_clear, ReorderBufferChangeMemoryUpdate(), ReorderBufferReturnChange(), ReorderBufferSerializeChange(), ReorderBufferSerializedPath(), ReorderBufferSerializeTXN(), size, ReorderBufferTXN::size, ReorderBuffer::spillBytes, ReorderBuffer::spillCount, ReorderBuffer::spillTxns, ReorderBufferTXN::subtxns, ReorderBufferTXN::txn_flags, UpdateDecodingStats(), wal_segment_size, ReorderBufferTXN::xid, XLByteInSeg, and XLByteToSeg.

Referenced by ReorderBufferCheckMemoryLimit(), ReorderBufferIterTXNInit(), and ReorderBufferSerializeTXN().

◆ ReorderBufferSetBaseSnapshot()

void ReorderBufferSetBaseSnapshot ( ReorderBuffer rb,
TransactionId  xid,
XLogRecPtr  lsn,
Snapshot  snap 
)

Definition at line 3162 of file reorderbuffer.c.

3164{
3165 ReorderBufferTXN *txn;
3166 bool is_new;
3167
3168 Assert(snap != NULL);
3169
3170 /*
3171 * Fetch the transaction to operate on. If we know it's a subtransaction,
3172 * operate on its top-level transaction instead.
3173 */
3174 txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
3175 if (rbtxn_is_known_subxact(txn))
3176 txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3177 NULL, InvalidXLogRecPtr, false);
3178 Assert(txn->base_snapshot == NULL);
3179
3180 txn->base_snapshot = snap;
3181 txn->base_snapshot_lsn = lsn;
3183
3185}

References Assert, AssertTXNLsnOrder(), ReorderBufferTXN::base_snapshot, ReorderBufferTXN::base_snapshot_lsn, ReorderBufferTXN::base_snapshot_node, dlist_push_tail(), InvalidXLogRecPtr, rbtxn_is_known_subxact, ReorderBufferTXNByXid(), ReorderBufferTXN::toplevel_xid, and ReorderBuffer::txns_by_base_snapshot_lsn.

Referenced by SnapBuildCommitTxn(), and SnapBuildProcessChange().

◆ ReorderBufferSetRestartPoint()

void ReorderBufferSetRestartPoint ( ReorderBuffer rb,
XLogRecPtr  ptr 
)

Definition at line 1065 of file reorderbuffer.c.

1066{
1068}

References ReorderBuffer::current_restart_decoding_lsn.

Referenced by SnapBuildRestore(), and SnapBuildSerialize().

◆ ReorderBufferSkipPrepare()

void ReorderBufferSkipPrepare ( ReorderBuffer rb,
TransactionId  xid 
)

Definition at line 2790 of file reorderbuffer.c.

2791{
2792 ReorderBufferTXN *txn;
2793
2794 txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2795
2796 /* unknown transaction, nothing to do */
2797 if (txn == NULL)
2798 return;
2799
2801}
#define RBTXN_SKIPPED_PREPARE

References InvalidXLogRecPtr, RBTXN_SKIPPED_PREPARE, ReorderBufferTXNByXid(), and ReorderBufferTXN::txn_flags.

Referenced by DecodePrepare().

◆ ReorderBufferStreamCommit()

static void ReorderBufferStreamCommit ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 1907 of file reorderbuffer.c.

1908{
1909 /* we should only call this for previously streamed transactions */
1911
1912 ReorderBufferStreamTXN(rb, txn);
1913
1914 if (rbtxn_prepared(txn))
1915 {
1916 /*
1917 * Note, we send stream prepare even if a concurrent abort is
1918 * detected. See DecodePrepare for more information.
1919 */
1920 rb->stream_prepare(rb, txn, txn->final_lsn);
1921
1922 /*
1923 * This is a PREPARED transaction, part of a two-phase commit. The
1924 * full cleanup will happen as part of the COMMIT PREPAREDs, so now
1925 * just truncate txn by removing changes and tuplecids.
1926 */
1927 ReorderBufferTruncateTXN(rb, txn, true);
1928 /* Reset the CheckXidAlive */
1930 }
1931 else
1932 {
1933 rb->stream_commit(rb, txn, txn->final_lsn);
1934 ReorderBufferCleanupTXN(rb, txn);
1935 }
1936}
ReorderBufferStreamPrepareCB stream_prepare
ReorderBufferStreamCommitCB stream_commit

References Assert, CheckXidAlive, ReorderBufferTXN::final_lsn, InvalidTransactionId, rbtxn_is_streamed, rbtxn_prepared, ReorderBufferCleanupTXN(), ReorderBufferStreamTXN(), ReorderBufferTruncateTXN(), ReorderBuffer::stream_commit, and ReorderBuffer::stream_prepare.

Referenced by ReorderBufferReplay().

◆ ReorderBufferStreamTXN()

static void ReorderBufferStreamTXN ( ReorderBuffer rb,
ReorderBufferTXN txn 
)
static

Definition at line 4055 of file reorderbuffer.c.

4056{
4057 Snapshot snapshot_now;
4058 CommandId command_id;
4059 Size stream_bytes;
4060 bool txn_is_streamed;
4061
4062 /* We can never reach here for a subtransaction. */
4063 Assert(rbtxn_is_toptxn(txn));
4064
4065 /*
4066 * We can't make any assumptions about base snapshot here, similar to what
4067 * ReorderBufferCommit() does. That relies on base_snapshot getting
4068 * transferred from subxact in ReorderBufferCommitChild(), but that was
4069 * not yet called as the transaction is in-progress.
4070 *
4071 * So just walk the subxacts and use the same logic here. But we only need
4072 * to do that once, when the transaction is streamed for the first time.
4073 * After that we need to reuse the snapshot from the previous run.
4074 *
4075 * Unlike DecodeCommit which adds xids of all the subtransactions in
4076 * snapshot's xip array via SnapBuildCommitTxn, we can't do that here but
4077 * we do add them to subxip array instead via ReorderBufferCopySnap. This
4078 * allows the catalog changes made in subtransactions decoded till now to
4079 * be visible.
4080 */
4081 if (txn->snapshot_now == NULL)
4082 {
4083 dlist_iter subxact_i;
4084
4085 /* make sure this transaction is streamed for the first time */
4087
4088 /* at the beginning we should have invalid command ID */
4090
4091 dlist_foreach(subxact_i, &txn->subtxns)
4092 {
4093 ReorderBufferTXN *subtxn;
4094
4095 subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
4097 }
4098
4099 /*
4100 * If this transaction has no snapshot, it didn't make any changes to
4101 * the database till now, so there's nothing to decode.
4102 */
4103 if (txn->base_snapshot == NULL)
4104 {
4105 Assert(txn->ninvalidations == 0);
4106 return;
4107 }
4108
4109 command_id = FirstCommandId;
4110 snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
4111 txn, command_id);
4112 }
4113 else
4114 {
4115 /* the transaction must have been already streamed */
4117
4118 /*
4119 * Nah, we already have snapshot from the previous streaming run. We
4120 * assume new subxacts can't move the LSN backwards, and so can't beat
4121 * the LSN condition in the previous branch (so no need to walk
4122 * through subxacts again). In fact, we must not do that as we may be
4123 * using snapshot half-way through the subxact.
4124 */
4125 command_id = txn->command_id;
4126
4127 /*
4128 * We can't use txn->snapshot_now directly because after the last
4129 * streaming run, we might have got some new sub-transactions. So we
4130 * need to add them to the snapshot.
4131 */
4132 snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now,
4133 txn, command_id);
4134
4135 /* Free the previously copied snapshot. */
4136 Assert(txn->snapshot_now->copied);
4138 txn->snapshot_now = NULL;
4139 }
4140
4141 /*
4142 * Remember this information to be used later to update stats. We can't
4143 * update the stats here as an error while processing the changes would
4144 * lead to the accumulation of stats even though we haven't streamed all
4145 * the changes.
4146 */
4147 txn_is_streamed = rbtxn_is_streamed(txn);
4148 stream_bytes = txn->total_size;
4149
4150 /* Process and send the changes to output plugin. */
4151 ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
4152 command_id, true);
4153
4154 rb->streamCount += 1;
4155 rb->streamBytes += stream_bytes;
4156
4157 /* Don't consider already streamed transaction. */
4158 rb->streamTxns += (txn_is_streamed) ? 0 : 1;
4159
4160 /* update the decoding stats */
4162
4164 Assert(txn->nentries == 0);
4165 Assert(txn->nentries_mem == 0);
4166}

References Assert, ReorderBufferTXN::base_snapshot, ReorderBufferTXN::changes, ReorderBufferTXN::command_id, SnapshotData::copied, dlist_iter::cur, dlist_container, dlist_foreach, dlist_is_empty(), FirstCommandId, InvalidCommandId, InvalidXLogRecPtr, ReorderBufferTXN::nentries, ReorderBufferTXN::nentries_mem, ReorderBufferTXN::ninvalidations, ReorderBuffer::private_data, rbtxn_is_streamed, rbtxn_is_toptxn, ReorderBufferCopySnap(), ReorderBufferFreeSnap(), ReorderBufferProcessTXN(), ReorderBufferTransferSnapToParent(), ReorderBufferTXN::snapshot_now,